如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理

How to add multiprocessing to consumer with pika (RabbitMQ) in python(如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理)
本文介绍了如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我在 python 中使用 pika 框架编写了非常基本的生产者-消费者代码.问题是 - 消费者端在队列中的消息上运行太慢.我进行了一些测试,发现我可以通过多处理将工作流程加快 27 倍.问题是 - 我不知道向我的代码添加多处理功能的正确方法是什么.

I have very basic producer-consumer code written with pika framework in python. The problem is - consumer side runs too slow on messages in queue. I ran some tests and found out that i can speed up the workflow up to 27 times with multiprocessing. The problem is - I don't know what is the right way to add multiprocessing functionality to my code.

import pika
import json
from datetime import datetime
from functions import download_xmls


def callback(ch, method, properties, body):
    print('Got something')
    body = json.loads(body)
    type = body[-1]['Type']
    print('Object type in work currently ' + type)
    cnums = [x['cadnum'] for x in body[:-1]]
    print('Got {} cnums to work with'.format(len(cnums)))

    date_start = datetime.now()
    download_xmls(type,cnums)
    date_end = datetime.now()
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print('Download complete in {} seconds'.format((date_end-date_start).total_seconds()))


def consume(queue_name = 'bot-test'):
    parameters = pika.URLParameters('server@address')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='bot-test')
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

如何从这里开始添加多处理功能?

How do I start with adding multiprocessing functionality from here?

推荐答案

Pika 有广泛的 示例代码,我建议您查看.请注意,此代码仅供 示例 使用.在处理线程的情况下,您将不得不使用更智能的方式来管理您的线程.

Pika has extensive example code that I recommend you check out. Note that this code is for example use only. In the case of doing work on threads, you will have to use a more intelligent way to manage your threads.

目标是不阻塞运行 Pika IO 循环的线程,并从您的工作线程正确回调到 IO 循环.这就是 add_callback_threadsafe 存在并在该代码中使用的原因.

The goal is to not block the thread that runs Pika's IO loop, and to call back into the IO loop correctly from your worker threads. That's why add_callback_threadsafe exists and is used in that code.

注意: RabbitMQ 团队监控 rabbitmq-users 邮件列表,并且有时只回答 StackOverflow 上的问题.

NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

这篇关于如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

Seasonal Decomposition of Time Series by Loess with Python(Loess 用 Python 对时间序列进行季节性分解)
Resample a time series with the index of another time series(使用另一个时间序列的索引重新采样一个时间序列)
How can I simply calculate the rolling/moving variance of a time series in python?(如何在 python 中简单地计算时间序列的滚动/移动方差?)
How to use Dynamic Time warping with kNN in python(如何在python中使用动态时间扭曲和kNN)
Keras LSTM: a time-series multi-step multi-features forecasting - poor results(Keras LSTM:时间序列多步多特征预测 - 结果不佳)
Python pandas time series interpolation and regularization(Python pandas 时间序列插值和正则化)