git地址

https://github.com/nsqio/nsq/releases

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import nsq
import tornado.ioloop
import time

def pub_message():
writer.pub('nsq_reader', "31222112time.strftime('%H:%M:%S'1)11".encode(), finish_pub)

def finish_pub(conn, data):
print("data")
tornado.ioloop.IOLoop.current().stop()

writer = nsq.Writer(['127.0.0.1:4150'])
tornado.ioloop.PeriodicCallback(pub_message, 1000).start()
nsq.run()

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import nsq


def process_message(message):
# 处理接收到的消息
print("Received message:", message.body.decode())

# 标记消息为已处理
message.finish()


def consume_messages():
# 创建消费者
reader = nsq.Reader(
message_handler=process_message,
nsqd_tcp_addresses=['127.0.0.1:4150'],
topic='nsq_reader',
channel='my_channel'
)

# 运行消费者
nsq.run()


if __name__ == '__main__':
consume_messages()