消息队列,顾名思义就是一个用来传递任务的队列。消息队列在开发中十分常见,经常用在页面后台处理需要很长时间的操作时,例如发送邮件、短信以及进行复杂数据运算操作等,这些操作通常会阻塞页面相当长的时间,为了避免用户等待太久,一般会先给用户页面进行相应,然后在后台使用独立的线程或者进程来处理这些复杂的操作。
消息队列分为两个部分,生产者和消费者。生产者负责把任务放进队列,消费者则负责从队列中取出任务执行。最常见的一个场景是:当我们在某个站点注册账号时,一般都会给我们的邮箱发送邮件验证,由于发送邮件比较耗时,并且邮件的实时性要求也不是很高,所以这里就可以使用消息队列来完成。先把发送邮件放到队列中,然后开启另外的一个线程专门读取任务,读取邮件并发送出去。
二、使用redis实现一个简单的任务队列
可以使用redis中的列表来实现一个任务队列,开启两个程序,一个作为生产者使用LPUSH
写队列,一个作为消费者使用RPOP
读队列,由于消费者并不知道什么时候会有数据过来,所以消费者需要一直循环读取数据。两者的消息使用json
进行封装协议传输。
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# -*- coding: utf8 -*- """ 生产者模型 """ import json import redis # 消息类型 MSG_TYPE_READ_BOOK = 0 MSG_TYPE_PLAY_GAME = 1 MSG_TYPE_SING_SONG = 2 def make_message(m_id, m_type): """ 产生一个消息 :param m_id: 消息的id :param m_type: 消息类型 :return: json字符串 """ mess_dict = {"id": m_id, "type": m_type} return json.dumps(mess_dict) def creator(): """ 生产消息并放入消息队列 """ conn = redis.StrictRedis() for i in range(1, 10): js_data = make_message(i, i % 3) print "push message: %s" % js_data conn.lpush("msgQueue", js_data) if __name__ == "__main__": creator() |
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# -*- coding: utf8 -*- """ 消费者模型 """ import json import redis # 消息类型 MSG_TYPE_READ_BOOK = 0 MSG_TYPE_PLAY_GAME = 1 MSG_TYPE_SING_SONG = 2 def parse_message(js_data): """ 把消息队列中的消息解析成字典 :param js_data: json字符串 :return: 字典 """ return json.loads(js_data) def handle_message(): """ 从消息队列读取消息并执行 """ conn = redis.StrictRedis() print "start handle message!" while True: msg = conn.rpop("msgQueue") if msg is None: continue msg_dict = parse_message(msg) m_id = msg_dict["id"] m_type = msg_dict["type"] if m_type == MSG_TYPE_PLAY_GAME: print "消息%d:我要打游戏" % m_id elif m_type == MSG_TYPE_READ_BOOK: print "消息%d:我要读书" % m_id else: print "消息%d:我要唱歌" % m_id if __name__ == "__main__": handle_message() |
先运行消费者的代码,会输出一下信息:
1 |
start handle message! |
由于此时队列没有消息,所以不会有其他的消息被打印,此时运行生产者,生产者会把消息插入到消息队列:
1 2 3 4 5 6 7 8 9 |
push message: {"type": 1, "id": 1} push message: {"type": 2, "id": 2} push message: {"type": 0, "id": 3} push message: {"type": 1, "id": 4} push message: {"type": 2, "id": 5} push message: {"type": 0, "id": 6} push message: {"type": 1, "id": 7} push message: {"type": 2, "id": 8} push message: {"type": 0, "id": 9} |
消费者则会读取消息队列中的消息:
1 2 3 4 5 6 7 8 9 10 |
start handle message! 消息1:我要打游戏 消息2:我要唱歌 消息3:我要读书 消息4:我要打游戏 消息5:我要唱歌 消息6:我要读书 消息7:我要打游戏 消息8:我要唱歌 消息9:我要读书 |
二、改进
上面的代码中,消费者在没有读到数据情况下会一直循环读取,对电脑来说十分占资源,此时可以利用redis的阻塞读取命令BRPOP
来进行改进,修改消费者代码:
1 2 3 4 5 6 |
def handle_message(): ... while True: msg = conn.brpop("msgQueue")[1] msg_dict = parse_message(msg) ... |
同样也能和上面一样完成同样的功能,只是和上面不同的是,这里读取消息不会一直循环去读取,而是一直阻塞,等到有消息过来才读取。
三、优先级队列
某些时候会有一些需求要把不同的需求根据不同优先级来执行,例如给用户发送邮件的途中突然发现用户账户异常,需要发送短信优先提醒,这时就需要用到优先级队列。
优先级队列依旧使用BRPOP
命令完成,BRPOP
命令后面可以跟多个参数:
1 |
BRPOP queue1 queue2 |
redis会先读取queue1
中的数据,只有queue1
中的数据读完之后才会读queue2
中的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# -*- coding: utf8 -*- import redis conn = redis.StrictRedis() def creator(): """ 生产者,插入两个队列 """ msg1 = ["msg1: 1", "msg1: 2", "msg1: 3"] msg2 = ["msg2: 1", "msg2: 2", "msg2: 3"] for i in msg1: conn.lpush("msg1", i) for i in msg2: conn.lpush("msg2", i) def customer(): """ 循环读取消息队列 :return: """ while True: # msg2 优先级高 msg = conn.brpop(["msg2", "msg1"]) print msg[1] if __name__ == "__main__": creator() customer() |
输出:
1 2 3 4 5 6 |
msg2: 1 msg2: 2 msg2: 3 msg1: 1 msg1: 2 msg1: 3 |
评论