问题描述
我正在尝试调用一个任务并为该任务创建一个队列,如果它不存在,则立即将调用的任务插入该队列.我有以下代码:
I'm trying to call a task and create a queue for that task if it doesn't exist then immediately insert to that queue the called task. I have the following code:
@task
def greet(name):
return "Hello %s!" % name
def run():
result = greet.delay(args=['marc'], queue='greet.1',
routing_key='greet.1')
print result.ready()
然后我有一个自定义路由器:
then I have a custom router:
class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task == 'tasks.greet':
return {'queue': kwargs['queue'],
'exchange': 'greet',
'exchange_type': 'direct',
'routing_key': kwargs['routing_key']}
return None
这将创建一个名为 greet.1
的交换和一个名为 greet.1
的队列,但该队列为空.交换器应该只称为 greet
,它知道如何将路由键(如 greet.1
)路由到名为 greet.1
的队列.
this creates an exchange called greet.1
and a queue called greet.1
but the queue is empty. The exchange should be just called greet
which knows how to route a routing key like greet.1
to the queue called greet.1
.
有什么想法吗?
推荐答案
当您执行以下操作时:
task.apply_async(queue='foo', routing_key='foobar')
然后 Celery 将从 CELERY_QUEUES 中的 'foo' 队列中获取默认值,或者如果它不存在,则使用 (queue=foo, exchange=foo, routing_key=foo) 自动创建它
Then Celery will take default values from the 'foo' queue in CELERY_QUEUES, or if it does not exist then automatically create it using (queue=foo, exchange=foo, routing_key=foo)
因此,如果 CELERY_QUEUES 中不存在foo",您最终会得到:
So if 'foo' does not exist in CELERY_QUEUES you will end up with:
queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo')
然后生产者将声明该队列,但是由于您覆盖了 routing_key,实际使用 routing_key = 'foobar'
The producer will then declare that queue, but since you override the routing_key,
actually send the message using routing_key = 'foobar'
这可能看起来很奇怪,但这种行为实际上对主题交换很有用,发布到不同主题的地方.
This may seem strange but the behavior is actually useful for topic exchanges, where you publish to different topics.
虽然很难做你想做的事,你可以自己创建队列并声明它,但这不适用于自动消息发布重试.如果 apply_async 的 queue 参数可以支持会更好一个自定义的 kombu.Queue
将被声明并用作目的地.也许你可以在 http://github.com/celery/celery/issues
It's harder to do what you want though, you can create the queue yourself
and declare it, but that won't work well with automatic message publish retries.
It would be better if the queue argument to apply_async could support
a custom kombu.Queue
instead that will be both declared and used as the destination.
Maybe you could open an issue for that at http://github.com/celery/celery/issues
这篇关于Celery 动态队列创建和路由的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!