问题描述
我正在测试 Spring-AMQP
与 Spring-Integration
支持,我有以下配置和测试:
I am testing Spring-AMQP
with Spring-Integration
support, I've following configuration and test:
在此配置中,很明显,一旦消息到达 sumptionChannel
,它们就会被确认并因此从队列中删除.我通过在 receive
之后放置一个高 sleep
并检查 queue-size
来验证这一点.没有进一步的控制.
In this configuration it was evident that as soon as message arrives at consumingChannel
they are Acked and hence removed from queue. I validated this by placing a high sleep
after receive
and check queue-size
. There are no further control on it.
现在,如果我设置 acknowledge-mode=MANUAL
,似乎没有办法通过 spring 集成手动执行 ack
.
Now if I set acknowledge-mode=MANUAL
, there are no ways seems to do manual ack
via spring integration.
我需要处理消息,并在处理后执行 manual-ack
,以便 ack
消息保持在 durableQ
处.
My need is to process message and after processing do a manual-ack
so till ack
message remains persisted at durableQ
.
有没有办法用 spring-amqp-integration
处理 MANUAL
ack?我想避免将 ChannelAwareMessageListener
传递给 inbound-channel-adapter
,因为我想控制消费者的 receive
.
Is there any way to handle MANUAL
ack with spring-amqp-integration
? I want to avoid passing ChannelAwareMessageListener
to inbound-channel-adapter
since I want to have control of consumer's receive
.
更新:
当使用自己的 listener-container
和 inbound-channel-adapter
时,这似乎是不可能的:
It even doesn't seems to be possible when using own listener-container
with inbound-channel-adapter
:
上述配置抛出错误,因为 messageListener
属性是不允许的,请参阅标记的内联注释.所以使用 listner-container
的目的被打败了(通过 ChannelAwareMessageListener
暴露 channel
).
Above configuration throws error as messageListener
property is not allowed, see inline comment on tag. So purpose of using listner-container
got defeated (for exposing channel
via ChannelAwareMessageListener
).
对我来说 spring-integration
不能用于 manual-acknowledgement
(我知道,这很难说!),任何人都可以帮我验证这个或者是我缺少任何特定的方法/配置吗?
To me spring-integration
cannot be used for manual-acknowledgement
(I know, this is a hard saying!), Can anyone help me in validating this or Is there any specific approach/configuration required for this which I am missing?
推荐答案
问题是因为您使用的是使用 QueueChannel
的异步切换.通常最好控制容器中的并发性(concurrent-consumers="2"
),并且不要在流程中进行任何异步切换(使用 DirectChannel
s).这样,AUTO ack 就可以正常工作了.而不是从 PollableChannel
接收 new MessageHandler()
到 SubscribableChannel
.
The problem is because you are using async handoff using a QueueChannel
. It is generally better to control the concurrency in the container (concurrent-consumers="2"
) and don't do any async handoffs in your flow (use DirectChannel
s). That way, AUTO ack will work just fine. Instead of receiving from the PollableChannel
subscribe a new MessageHandler()
to a SubscribableChannel
.
更新:
您通常不需要在 SI 应用程序中处理消息,但您使用 DirectChannel 进行的测试相当于...
You normally don't need to deal with Messages in an SI application, but the equivalent of your test with a DirectChannel would be...
这篇关于Spring AMQP 集成 - 消费者手册 致谢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!