问题描述
我在 python 中使用 pika 框架编写了非常基本的生产者-消费者代码.问题是 - 消费者端在队列中的消息上运行太慢.我进行了一些测试,发现我可以通过多处理将工作流程加快 27 倍.问题是 - 我不知道向我的代码添加多处理功能的正确方法是什么.
I have very basic producer-consumer code written with pika framework in python. The problem is - consumer side runs too slow on messages in queue. I ran some tests and found out that i can speed up the workflow up to 27 times with multiprocessing. The problem is - I don't know what is the right way to add multiprocessing functionality to my code.
import pika
import json
from datetime import datetime
from functions import download_xmls
def callback(ch, method, properties, body):
print('Got something')
body = json.loads(body)
type = body[-1]['Type']
print('Object type in work currently ' + type)
cnums = [x['cadnum'] for x in body[:-1]]
print('Got {} cnums to work with'.format(len(cnums)))
date_start = datetime.now()
download_xmls(type,cnums)
date_end = datetime.now()
ch.basic_ack(delivery_tag=method.delivery_tag)
print('Download complete in {} seconds'.format((date_end-date_start).total_seconds()))
def consume(queue_name = 'bot-test'):
parameters = pika.URLParameters('server@address')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='bot-test')
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
如何从这里开始添加多处理功能?
How do I start with adding multiprocessing functionality from here?
推荐答案
Pika 有广泛的 示例代码,我建议您查看.请注意,此代码仅供 示例 使用.在处理线程的情况下,您将不得不使用更智能的方式来管理您的线程.
Pika has extensive example code that I recommend you check out. Note that this code is for example use only. In the case of doing work on threads, you will have to use a more intelligent way to manage your threads.
目标是不阻塞运行 Pika IO 循环的线程,并从您的工作线程正确回调到 IO 循环.这就是 add_callback_threadsafe
存在并在该代码中使用的原因.
The goal is to not block the thread that runs Pika's IO loop, and to call back into the IO loop correctly from your worker threads. That's why add_callback_threadsafe
exists and is used in that code.
注意: RabbitMQ 团队监控 rabbitmq-users
邮件列表,并且有时只回答 StackOverflow 上的问题.
NOTE: the RabbitMQ team monitors the rabbitmq-users
mailing list and only sometimes answers questions on StackOverflow.
这篇关于如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!