启动 celery worker 并为广播队列启用它

start celery worker and enable it for broadcast queue(启动 celery worker 并为广播队列启用它)
本文介绍了启动 celery worker 并为广播队列启用它的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我正在尝试启动 celery worker,所以它只听单个队列.这不是问题,我可以这样做:

I'm trying to start celery worker so it only listens to single queue. This is not a problem, I can do this that way:

python -m celery worker -A my_module -Q my_queue -c 1

但现在我也希望这个 my_queue 队列成为广播队列,所以我在 celeryconfig 中这样做:

But now I also want this my_queue queue to be a broadcast queue, so I do this in my celeryconfig:

from kombu.common import Broadcast
CELERY_QUEUES = (Broadcast('my_queue'),)

但是一旦我这样做了,我就不能再启动我的工人了,我从 rabbitmq 收到错误:

But as soon as I do this I cannot start my worker anymore, I get error from rabbitmq:

amqp.exceptions.PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'myvhost': received 'fanout' but current is 'direct'

如果我在没有 -Q 的情况下启动 worker(但如上所述将 Broadcast 留在 celeryconfig.py 中)并且我可以列出 rabbitmq 队列看到广播队列的创建和命名如下:

If I start worker without -Q (but leaving Broadcast in celeryconfig.py as described above) and I list rabbitmq queues I can see broadcast queue is created and named like this:

bcast.43fecba7-786a-461c-a322-620039b29b8b

同样,如果我在 worker 中定义这个队列(使用上面提到的 -Q )或在 celeryconfig.py 中作为简单的 Queue这个:

And similarly if I define this queue within worker (using -Q as mentioned above) or as simple Queue in celeryconfig.py like this:

from kombu import Queue
CELERY_QUEUES = (Queue('my_queue'),)

我可以像这样在 rabbitmq 中看到这个队列:

I can see this queue in rabbitmq like this:

my_queue

在定义队列时,我在 Broadcast 调用中添加了什么并不重要 - 这似乎是内部 celery 名称,而不是传递给 rabbitmq.

It apperas it does not matter what I put into Broadcast call when defining the queue - this seems to be internal celery name, not passed to rabbitmq.

所以我猜当工人开始时,然后 my_queue 被创建,一旦完成它就不能 Broadcast.

So I'm guessing when worker is starting then my_queue is created and once that's done it cannot be made Broadcast.

我可以让一个工作人员监听任何队列(不仅是 my_queue),我将从删除 -Q 参数开始.但是如果我能有一个进程只监听那个特定的队列,那就太好了,因为我在那里投入的任务很快,而且我想尽可能地降低延迟.

I can have a worker that listens to any queue (not only to my_queue) which I would start by removing the -Q argument. But it would be nice to be able to have a single process that only listens to that particular queue since my tasks I throw in there are fast and I'd like to bring latency down as much as possible.

--- 编辑 1 ---花了一些时间解决这个问题,上面提到的 bcast 队列似乎并不一致.重置 rabbitmq 并在没有 -Q 选项的情况下运行 celery 后,bcast 队列没有出现...

--- Edit 1 --- Spent some time with this problem and it seems bcast queue mentioned above does not appear consistently. After reseting rabbitmq and running celery without -Q option bcast queue did not appear...

推荐答案

当使用代理发送消息时,客户端和工作人员必须就相同的配置值达成一致.如果您必须更改配置,则需要清除现有消息并重新启动所有内容以使它们同步.

When using a broker for sending messages, client and workers must agree on same configuration values. If you have to change config, you need to purge existing messages and restart everything so that they are in sync.

启动广播队列时,您可以设置交换类型并配置队列.

When starting a broadcast queue you can set exchange type and configure the queue.

from kombu.common import Broadcast
from kombu import Exchange


exchange = Exchange('custom_exchange', type='fanout')

CELERY_QUEUES = (
    Broadcast(name='bcast', exchange=exchange),
)

现在你可以用

celery worker -l info -A tasks -Q bcast 

这篇关于启动 celery worker 并为广播队列启用它的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

Seasonal Decomposition of Time Series by Loess with Python(Loess 用 Python 对时间序列进行季节性分解)
Resample a time series with the index of another time series(使用另一个时间序列的索引重新采样一个时间序列)
How can I simply calculate the rolling/moving variance of a time series in python?(如何在 python 中简单地计算时间序列的滚动/移动方差?)
How to use Dynamic Time warping with kNN in python(如何在python中使用动态时间扭曲和kNN)
Keras LSTM: a time-series multi-step multi-features forecasting - poor results(Keras LSTM:时间序列多步多特征预测 - 结果不佳)
Python pandas time series interpolation and regularization(Python pandas 时间序列插值和正则化)