Celery 动态队列创建和路由

Celery dynamic queue creation and routing(Celery 动态队列创建和路由)
本文介绍了Celery 动态队列创建和路由的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我正在尝试调用一个任务并为该任务创建一个队列,如果它不存在,则立即将调用的任务插入该队列.我有以下代码:

I'm trying to call a task and create a queue for that task if it doesn't exist then immediately insert to that queue the called task. I have the following code:

@task
def greet(name):
    return "Hello %s!" % name


def run():
    result = greet.delay(args=['marc'], queue='greet.1',
        routing_key='greet.1')
    print result.ready()

然后我有一个自定义路由器:

then I have a custom router:

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'tasks.greet':
            return {'queue': kwargs['queue'],
                    'exchange': 'greet',
                    'exchange_type': 'direct',
                    'routing_key': kwargs['routing_key']}
        return None

这将创建一个名为 greet.1 的交换和一个名为 greet.1 的队列,但该队列为空.交换器应该只称为 greet,它知道如何将路由键(如 greet.1)路由到名为 greet.1 的队列.

this creates an exchange called greet.1 and a queue called greet.1 but the queue is empty. The exchange should be just called greet which knows how to route a routing key like greet.1 to the queue called greet.1.

有什么想法吗?

推荐答案

当您执行以下操作时:

task.apply_async(queue='foo', routing_key='foobar')

然后 Celery 将从 CELERY_QUEUES 中的 'foo' 队列中获取默认值,或者如果它不存在,则使用 (queue=foo, exchange=foo, routing_key=foo) 自动创建它

Then Celery will take default values from the 'foo' queue in CELERY_QUEUES, or if it does not exist then automatically create it using (queue=foo, exchange=foo, routing_key=foo)

因此,如果 CELERY_QUEUES 中不存在foo",您最终会得到:

So if 'foo' does not exist in CELERY_QUEUES you will end up with:

queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo')

然后生产者将声明该队列,但是由于您覆盖了 routing_key,实际使用 routing_key = 'foobar'

The producer will then declare that queue, but since you override the routing_key, actually send the message using routing_key = 'foobar'

这可能看起来很奇怪,但这种行为实际上对主题交换很有用,发布到不同主题的地方.

This may seem strange but the behavior is actually useful for topic exchanges, where you publish to different topics.

虽然很难做你想做的事,你可以自己创建队列并声明它,但这不适用于自动消息发布重试.如果 apply_async 的 queue 参数可以支持会更好一个自定义的 kombu.Queue 将被声明并用作目的地.也许你可以在 http://github.com/celery/celery/issues

It's harder to do what you want though, you can create the queue yourself and declare it, but that won't work well with automatic message publish retries. It would be better if the queue argument to apply_async could support a custom kombu.Queue instead that will be both declared and used as the destination. Maybe you could open an issue for that at http://github.com/celery/celery/issues

这篇关于Celery 动态队列创建和路由的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

Seasonal Decomposition of Time Series by Loess with Python(Loess 用 Python 对时间序列进行季节性分解)
Resample a time series with the index of another time series(使用另一个时间序列的索引重新采样一个时间序列)
How can I simply calculate the rolling/moving variance of a time series in python?(如何在 python 中简单地计算时间序列的滚动/移动方差?)
How to use Dynamic Time warping with kNN in python(如何在python中使用动态时间扭曲和kNN)
Keras LSTM: a time-series multi-step multi-features forecasting - poor results(Keras LSTM:时间序列多步多特征预测 - 结果不佳)
Python pandas time series interpolation and regularization(Python pandas 时间序列插值和正则化)