事件
工作流还可以设定等待外部事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class EventSampleWorkflow : IWorkflow<MyDataClass> { public void Build(IWorkflowBuilder<MyDataClass> builder) { builder .StartWith(context => ExecutionResult.Next()) .WaitFor("MyEvent", data => "0") .Output(data => data.Value, step => step.EventData) .Then<CustomMessage>() .Input(step => step.Message, data => "The data from the event is " + data.Value); } } ...
host.PublishEvent("MyEvent", "0", "hello");
|
生效日期
还可以在等待事件时设定指定生效日期。以便于响应过去已经发生的事件,或者仅仅响应生效日期之后发生的事件。
案例
1 2 3 4
| public class MyDataClass { public string Value1 { get; set; } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| public class CustomMessage : StepBody { public string Message { get; set; }
public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine(Message); return ExecutionResult.Next(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class EventSampleWorkflow : IWorkflow<MyDataClass> { public string Id => "EventSampleWorkflow"; public int Version => 1; public void Build(IWorkflowBuilder<MyDataClass> builder) { builder .StartWith(context => ExecutionResult.Next()) .WaitFor("MyEvent", (data, context) => context.Workflow.Id) .Output(data => data.Value1, step => step.EventData) .Then<CustomMessage>() .Input(step => step.Message, data => "The data from the event is " + data.Value1) .Then(context => Console.WriteLine("workflow complete")); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| IServiceCollection services = new ServiceCollection(); services.AddLogging(); services.AddWorkflow(); var serviceProvider = services.BuildServiceProvider(); var host = serviceProvider.GetService<IWorkflowHost>(); host.RegisterWorkflow<EventSampleWorkflow, MyDataClass>(); host.Start();
var initialData = new MyDataClass(); var workflowId = host.StartWorkflow("EventSampleWorkflow", 1, initialData).Result;
Console.WriteLine("Enter value to publish"); string value = Console.ReadLine(); host.PublishEvent("MyEvent", workflowId, value);
Console.ReadLine(); host.Stop();
|
活动
活动是外部的一个运行程序,该程序可以被工作流等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public class ActivityWorkflow : IWorkflow<MyData> { public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith<HelloWorld>() .Activity("activity-1", (data) => data.Value1) .Output(data => data.Value2, step => step.Result) .Then<PrintMessage>() .Input(step => step.Message, data => data.Value2); }
} ...
var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
if (activity != null) { Console.WriteLine(activity.Parameters); host.SubmitActivitySuccess(activity.Token, "Some response data"); }
|
案例
1 2 3 4 5
| class MyData { public string Request { get; set; } public string ApprovedBy { get; set; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class HelloWorld : StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Hello world"); return ExecutionResult.Next(); } } public class GoodbyeWorld : StepBody { public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine("Goodbye world"); return ExecutionResult.Next(); } } public class CustomMessage : StepBody { public string Message { get; set; }
public override ExecutionResult Run(IStepExecutionContext context) { Console.WriteLine(Message); return ExecutionResult.Next(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class ActivityWorkflow : IWorkflow<MyData> { public string Id => "activity-sample"; public int Version => 1;
public void Build(IWorkflowBuilder<MyData> builder) { builder .StartWith<HelloWorld>() .Activity("get-approval", (data) => data.Request) .Output(data => data.ApprovedBy, step => step.Result) .Then<CustomMessage>() .Input(step => step.Message, data => "Approved by " + data.ApprovedBy) .Then<GoodbyeWorld>(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| IServiceCollection services = new ServiceCollection(); services.AddLogging(cfg => { cfg.AddConsole(); cfg.AddDebug(); }); services.AddWorkflow(); var serviceProvider = services.BuildServiceProvider(); var host = serviceProvider.GetService<IWorkflowHost>(); host.RegisterWorkflow<ActivityWorkflow, MyData>(); host.Start();
Console.WriteLine("Starting workflow...");
var workflowId = host.StartWorkflow("activity-sample", new MyData { Request = "Spend $1,000,000" }).Result;
var approval = host.GetPendingActivity("get-approval", workflowId, TimeSpan.FromMinutes(1)).Result;
if (approval != null) { Console.WriteLine("Approval required for " + approval.Parameters); host.SubmitActivitySuccess(approval.Token, "John Smith"); }
Console.ReadLine(); host.Stop();
|
错误处理
每个步骤都可以配置自己的错误处理流程,可以稍后重试,暂停工作流或者终止工作流
1 2 3 4 5 6 7
| public void Build(IWorkflowBuilder<object> builder) { builder .StartWith<HelloWorld>() .OnError(WorkflowErrorHandling.Retry, TimeSpan.FromMinutes(10)) .Then<GoodbyeWorld>(); }
|
WorkflowHost服务可以定于全局的错误处理,可以用于全局级别拦截工作流的异常
host.OnStepError +=处理异常事件