python下使用redis构造一个简单的队列
本文展示了如何使用redis构建一个简单的多生产者,多消费者队列并且提供类似python标准库queue一样的接口。你可以使用这个队列方便的从多个进程或者耗时的计算到多个消费者进程之间共享数据。
我们使用redis列表来保存数据。redis列表按照字符串插入的顺序保存数据。
下面的redis命令会被用到:
- rpush 在列表的末尾插入一个元素
- blpop 从列表开头获取一个元素,如果列表是空则阻塞
- lpop 从列表开头获取一个元素,如果列表是空则返回空
- llen 返回列表的长度
实现过程使用了redis-py库和服务器进行交互:
RedisQueue.py
import redis
class RedisQueue(object):
"""Simple Queue with Redis Backend"""
def __init__(self, name, namespace='queue', **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.__db= redis.Redis(**redis_kwargs)
self.key = '%s:%s' %(namespace, name)
def qsize(self):
"""Return the approximate size of the queue."""
return self.__db.llen(self.key)
def empty(self):
"""Return True if the queue is empty, False otherwise."""
return self.qsize() == 0
def put(self, item):
"""Put item into the queue."""
self.__db.rpush(self.key, item)
def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
If optional args block is true and timeout is None (the default), block
if necessary until an item is available."""
if block:
item = self.__db.blpop(self.key, timeout=timeout)
else:
item = self.__db.lpop(self.key)
if item:
item = item[1]
return item
def get_nowait(self):
"""Equivalent to get(False)."""
return self.get(False)
使用:
>>> from RedisQueue import RedisQueue
>>> q = RedisQueue('test')
>>> q.put('hello world')
现在我们使用redis-cli客户端查看数据库,期望的结果如下:
redis 127.0.0.1:6379> keys *
1) "queue:test"
redis 127.0.0.1:6379> type queue:test
list
redis 127.0.0.1:6379> llen queue:test
(integer) 1
redis 127.0.0.1:6379> lrange queue:test 0 1
1) "hello world"
我们可以使用一个不同的脚本来获取数据:
>>> from RedisQueue import RedisQueue
>>> q = RedisQueue('test')
>>> q.get()
'hello world'
随后的q.get()调用会一直阻塞直到某人重新向队列发送一个新的数据。
接下来的工作将是到队列的 编码/解码(例如python-json),这样你就可以不受限制的发送任何字符串。
现在已经存在漂亮而又简单的hotqueue库,它具有像上面例子中的接口别且提供编码/解码功能。
其他值得提到的使用redis做后端的有:
- flask-redis flask里使用redis做后端的一个基本的消息队列。
- celery 一个基于分布式消息传递的异步任务队列/工作队列。比其他类库更高级点,可以配合不同的后端工作。
- rq 简单的python类库作用是队列化任务并且在后端使用消费者进程处理它们。
- resque 一个使用redis做后端的ruby库,主要为了创建后台工作,把他们放到多个队列,稍后处理他们。github在使用,并且有一个漂亮的web监控页面。
- pyres python下resque的克隆版。
原文中的一个小bug,如下:
生产者:
In [1]: from RedisQueue import RedisQueue
In [2]: q = RedisQueue('test')
In [3]: q.put('1')
In [4]: q.put('2')
消费者:
In [10]: from RedisQueue import RedisQueue
In [11]: q = RedisQueue('test')
In [12]: q.get()
Out[12]: '1'
In [13
]: q.get_nowait()
IndexError Traceback (most recent call last)
<ipython-input-13-4f5e3b567b9e> in <module>()
----> 1 q.get_nowait()
/data9/RedisQueue.py in get_nowait(self)
36 def get_nowait(self):
37 """Equivalent to get(False)."""
---> 38 return self.get(False)
39
/data9/RedisQueue.py in get(self, block, timeout)
31
32 if item:
---> 33 item = item[1]
34 return item
35
IndexError: string index out of range
使用redis-py库连接redis数据库,获取下数据,很容易就看到问题出在哪里。
In [55]: r = redis.Redis()
In [56]: r.blpop('queue:test')
Out[56]: ('queue:test', '1')
In [57]: r.lpop('queue:test')
Out[57]: '2'
当使用非阻塞方式获取数据时,redis客户端返回的是一个string;当使用阻塞方式获取数据时,redis返回的数据是一个tuple。因此,修改下get函数中对item的判断条件就可以了:
if isinstance(item,tuple):
item = item[1]