git地址
https://github.com/nsqio/nsq/releases
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import nsq import tornado.ioloop import time
def pub_message(): writer.pub('nsq_reader', "31222112time.strftime('%H:%M:%S'1)11".encode(), finish_pub)
def finish_pub(conn, data): print("data") tornado.ioloop.IOLoop.current().stop()
writer = nsq.Writer(['127.0.0.1:4150']) tornado.ioloop.PeriodicCallback(pub_message, 1000).start() nsq.run()
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import nsq
def process_message(message): print("Received message:", message.body.decode())
message.finish()
def consume_messages(): reader = nsq.Reader( message_handler=process_message, nsqd_tcp_addresses=['127.0.0.1:4150'], topic='nsq_reader', channel='my_channel' )
nsq.run()
if __name__ == '__main__': consume_messages()
|