问题描述
我有一个现有的具有 2 个函数和一个存储队列的函数应用程序.F1 由服务总线主题中的消息触发.对于收到的每个消息,F1 计算一些必须以不同延迟量执行的子任务(T1,T2,...).Ex - T1 在 3 分钟后触发,T2 在 5 分钟后触发,等等.F1 将消息发布到具有适当可见性超时(以模拟延迟)的存储队列,并且只要队列中的消息可见,就会触发 F2.一切正常.
I have an existing Function App with 2 Functions and a storage queue. F1 is triggered by a message in a service bus topic. For each msg received, F1 calculates some sub-tasks (T1,T2,...) which have to be executed with varying amount of delay. Ex - T1 to be fired after 3 mins, T2 after 5min etc. F1 posts messages to a storage queue with appropriate visibility timeouts (to simulate the delay) and F2 is triggered whenever a message is visible in the queue. All works fine.
我现在想迁移此应用以使用持久功能".F1 现在只启动协调器.编排器代码如下 -
I now want to migrate this app to use 'Durable Functions'. F1 now only starts the orchestrator. The orchestrator code is something as follows -
public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
List<Task> tasks = new List<Task>();
foreach (var value in results)
{
var pnTask = context.CallActivityAsync("PerformSubTask", value);
tasks.Add(pnTask);
}
//dont't await as we want to fire and forget. No fan-in!
//await Task.WhenAll(tasks);
}
[FunctionName("PerformSubTask")]
public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
{
TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;
//will still keep the activity function running and incur costs??
await Task.Delay(actualDelay);
//perform subtask work after delay!
}
我只想扇出(没有扇入来收集结果)并启动子任务.编排器启动所有任务并避免调用await Task.WhenAll".Activity 函数调用 'Task.Delay' 等待指定的时间,然后开始工作.
I would only like to fan-out (no fan-in to collect the results) and start the subtasks. The orchestrator starts all the tasks and avoids call 'await Task.WhenAll'. The activity function calls 'Task.Delay' to wait for the specified amount of time and then does its work.
我的问题
在这个工作流程中使用 Durable Functions 是否有意义?
Does it make sense to use Durable Functions for this workflow?
这是编排扇出"工作流程的正确方法吗?
Is this the right approach to orchestrate 'Fan-out' workflow?
我不喜欢活动功能在指定的时间(3 或 5 分钟)内什么都不做的事实.会产生成本,还是?
I do not like the fact that the activity function is running for specified amount of time (3 or 5 mins) doing nothing. It will incurs costs,or?
此外,如果需要 10 分钟以上的延迟,还有 没有办法让活动函数通过这种方法成功!
Also if a delay of more than 10 minutes is required there is no way for an activity function to succeed with this approach!
我之前尝试避免这种情况是在协调器中使用CreateTimer",然后将活动添加为延续,但我在历史"表中只看到计时器条目.续集不火!我是否违反了 编排器代码的约束 - '编排器代码绝不能启动任何异步操作'?
My earlier attempt to avoid this was to use 'CreateTimer' in the orchestrator and then add the activity as a continuation, but I see only timer entries in the 'History' table. The continuation does not fire! Am I violating the constraint for orchestrator code - 'Orchestrator code must never initiate any async operation' ?
foreach (var value in results)
{
//calculate time to start
var timeToStart = ;
var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value));
tasks.Add(pnTask);
}
更新:使用 Chris 建议的方法
UPDATE : using approach suggested by Chris
计算子任务和延迟的活动
Activity that calculates subtasks and delays
[FunctionName("CalculateTasks")]
public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log)
{
//in reality time is obtained by calling an endpoint
DateTime currentTime = DateTime.UtcNow;
return new List<TaskInfo> {
new TaskInfo{ DelayInSeconds = 10, Origin = currentTime },
new TaskInfo{ DelayInSeconds = 20, Origin = currentTime },
new TaskInfo{ DelayInSeconds = 30, Origin = currentTime },
};
}
public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
var currentTime = context.CurrentUtcDateTime;
List<Task> tasks = new List<Task>();
foreach (var value in results)
{
TimeSpan timeDifference = currentTime - value.Origin;
TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds);
var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;
var timeToStart = currentTime.Add(actualDelay);
Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(t => context.CallActivityAsync("PerformSubtask", value));
tasks.Add(delayedActivityCall);
}
await Task.WhenAll(tasks);
}
从编排器中简单地安排任务似乎可以工作.在我的情况下,我正在计算循环之前另一个活动(CalculateTasks)中的任务和延迟.我希望使用运行活动时的当前时间"来计算延迟.我在活动中使用 DateTime.UtcNow.这在编排器中使用时不知何故不能很好地发挥作用.'ContinueWith' 指定的活动不运行,编排器始终处于运行"状态.
Simply scheduling tasks from within the orchestrator seems to work.In my case I am calculating the tasks and the delays in another activity (CalculateTasks) before the loop. I want the delays to be calculated using the 'current time' when the activity was run. I am using DateTime.UtcNow in the activity. This somehow does not play well when used in the orchestrator. The activities specified by 'ContinueWith' just don't run and the orchestrator is always in 'Running' state.
我可以不使用编排器内的活动记录的时间吗?
更新 2
所以 Chris 建议的解决方法有效!
So the workaround suggested by Chris works!
由于我不想收集活动的结果,因此我避免在安排所有活动后调用await Tasks.WhenAll(tasks)
".我这样做是为了减少控制队列上的争用,即如果需要,能够启动另一个编排.尽管如此,'orchestrator' 的状态仍然是 'Running',直到所有活动完成运行.我猜它只有在最后一个活动向控制队列发布完成"消息后才会移动到完成".
Since I do not want to collect the results of the activities I avoid calling 'await Tasks.WhenAll(tasks)
' after scheduling all activities. I do this in order to reduce the contention on the control queue i.e. be able to start another orchestration if reqd. Nevertheless the status of the 'orchestrator' is still 'Running' till all the activities finish running. I guess it moves to 'Complete' only after the last activity posts a 'done' message to the control queue.
我说的对吗?有什么方法可以更早地释放编排器,即在安排所有活动之后?
推荐答案
ContinueWith
方法对我来说效果很好.我能够使用以下编排器代码模拟您的场景版本:
The ContinueWith
approach worked fine for me. I was able to simulate a version of your scenario using the following orchestrator code:
[FunctionName("Orchestrator")]
public static async Task Orchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context,
TraceWriter log)
{
var tasks = new List<Task>(10);
for (int i = 0; i < 10; i++)
{
int j = i;
DateTime timeToStart = context.CurrentUtcDateTime.AddSeconds(10 * j);
Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(t => context.CallActivityAsync("PerformSubtask", j));
tasks.Add(delayedActivityCall);
}
await Task.WhenAll(tasks);
}
不管它的价值,这里是活动功能代码.
And for what it's worth, here is the activity function code.
[FunctionName("PerformSubtask")]
public static void Activity([ActivityTrigger] int j, TraceWriter log)
{
log.Warning($"{DateTime.Now:o}: {j:00}");
}
从日志输出中,我看到所有活动调用的运行时间间隔为 10 秒.
From the log output, I saw that all activity invocations ran 10 seconds apart from each other.
另一种方法是分散到多个子编排(如 @jeffhollan 建议的那样),这些子编排是一个简单的持久计时器延迟和您的活动调用的短序列.
Another approach would be to fan out to multiple sub-orchestrations (like @jeffhollan suggested) which are simple a short sequence of a durable timer delay and your activity call.
更新我尝试使用您更新的示例并且能够重现您的问题!如果您在 Visual Studio 中本地运行并将异常设置配置为始终中断异常,那么您应该会看到以下内容:
UPDATE I tried using your updated sample and was able to reproduce your problem! If you run locally in Visual Studio and configure the exception settings to always break on exceptions, then you should see the following:
System.InvalidOperationException: '检测到多线程执行.如果编排器函数代码等待不是由 DurableOrchestrationContext 方法创建的任务,则可能会发生这种情况.可以在这篇文章中找到更多详细信息 https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints.'
System.InvalidOperationException: 'Multithreaded execution was detected. This can happen if the orchestrator function code awaits on a task that was not created by a DurableOrchestrationContext method. More details can be found in this article https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints.'
这意味着调用 context.CallActivityAsync("PerformSubtask", j)
的线程与调用 Orchestrator 函数的线程不相同.我不知道为什么我最初的示例没有达到这一点,或者为什么你的版本没有.它与 TPL 如何决定使用哪个线程来运行您的 ContinueWith
委托有关 - 我需要进一步研究.
This means the thread which called context.CallActivityAsync("PerformSubtask", j)
was not the same as the thread which called the orchestrator function. I don't know why my initial example didn't hit this, or why your version did. It has something to do with how the TPL decides which thread to use to run your ContinueWith
delegate - something I need to look more into.
好消息是有一个简单的解决方法,即指定 TaskContinuationOptions.ExecuteSynchronously,像这样:
The good news is that there is a simple workaround, which is to specify TaskContinuationOptions.ExecuteSynchronously, like this:
Task delayedActivityCall = context
.CreateTimer(timeToStart, CancellationToken.None)
.ContinueWith(
t => context.CallActivityAsync("PerformSubtask", j),
TaskContinuationOptions.ExecuteSynchronously);
请尝试一下,如果这能解决您观察到的问题,请告诉我.
Please try that and let me know if that fixes the issue you're observing.
理想情况下,您在使用 Task.ContinueWith
时不需要执行此解决方法.我在 GitHub 中打开了一个问题来跟踪它:https://github.com/Azure/azure-functions-durable-extension/issues/317
Ideally you wouldn't need to do this workaround when using Task.ContinueWith
. I've opened an issue in GitHub to track this: https://github.com/Azure/azure-functions-durable-extension/issues/317
由于我不想收集活动的结果,因此我避免在安排所有活动后调用 await Tasks.WhenAll(tasks)
.我这样做是为了减少控制队列上的争用,即如果需要,能够启动另一个编排.尽管如此,协调者"的状态仍然是正在运行",直到所有活动完成运行.我猜它只有在最后一个活动向控制队列发布完成"消息后才会移动到完成".
Since I do not want to collect the results of the activities I avoid calling
await Tasks.WhenAll(tasks)
after scheduling all activities. I do this in order to reduce the contention on the control queue i.e. be able to start another orchestration if reqd. Nevertheless the status of the 'orchestrator' is still 'Running' till all the activities finish running. I guess it moves to 'Complete' only after the last activity posts a 'done' message to the control queue.
这是意料之中的.在所有未完成的持久任务完成之前,Orchestrator 功能永远不会真正完成.没有任何方法可以解决这个问题.请注意,您仍然可以启动其他 Orchestrator 实例,如果它们碰巧落在同一个分区上(默认情况下有 4 个分区),可能会发生争用.
This is expected. Orchestrator functions never actually complete until all outstanding durable tasks have completed. There isn't any way to work around this. Note that you can still start other orchestrator instances, there just might be some contention if they happen to land on the same partition (there are 4 partitions by default).
这篇关于仅在持久功能中扇出(并忘记)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!