7.Workflow Core之持久化和在Asp.net core中的使用

7.Workflow Core之持久化和在Asp.net core中的使用

流程持久化

  • sqlite
  1. nuget安装WorkflowCore.Persistence.Sqlite
  2. services.AddWorkflow(x => x.UseSqlite(@"Data Source=database.db;", true));
  • sqlServer
  1. nuget安装WorkflowCore.Persistence.SqlServer
  2. services.AddWorkflow(x => x.UseSqlServer(@"Server=.;Database=WorkflowCore;Trusted_Connection=True;", true, true));

在Asp.net core中使用

  • workflow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MyDataClass
{
public string Value1 { get; set; }
}
public class TestWorkflow : IWorkflow<MyDataClass>
{
public string Id => "TestWorkflow";

public int Version => 1;

public void Build(IWorkflowBuilder<MyDataClass> builder)
{
builder
.StartWith(context => ExecutionResult.Next())
.WaitFor("MyEvent", (data, context) => context.Workflow.Id, data => DateTime.Now)
.Output(data => data.Value1, step => step.EventData)
.Then(context => Console.WriteLine("workflow complete"));
}
}
  • program
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
//设置持久化和Elastic搜索
builder.Services.AddWorkflow(config => {
config.UseSqlite("Data Source=MySqLite.db",true);
config.UseElasticsearch(new ConnectionSettings(new Uri("http://elastic:9200")), "workflows");
});

var app = builder.Build();
IWorkflowHost? host= app.Services.GetService<IWorkflowHost>();
host!.RegisterWorkflow<TestWorkflow, MyDataClass>();
host.Start();

if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}

app.UseAuthorization();
app.MapControllers();
app.Run();
  • controllers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
[Route("api/[controller]")]
[Controller]
public class WorkflowsController : Controller
{
private readonly IWorkflowController _workflowService; //workflow服务和host相似
private readonly IWorkflowRegistry _registry;//获取注册过的workflow
private readonly IPersistenceProvider _workflowStore;//持久化相关,从数据库中获取
private readonly ISearchIndex _searchService;//查询相关

public WorkflowsController(IWorkflowController workflowService, ISearchIndex searchService, IWorkflowRegistry registry, IPersistenceProvider workflowStore)
{
_workflowService = workflowService;
_workflowStore = workflowStore;
_registry = registry;
_searchService = searchService;
}

[HttpGet]
public async Task<IActionResult> Get(string terms, WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take = 10)
{
var filters = new List<SearchFilter>();

if (status.HasValue)
filters.Add(StatusFilter.Equals(status.Value));

if (createdFrom.HasValue)
filters.Add(DateRangeFilter.After(x => x.CreateTime, createdFrom.Value));

if (createdTo.HasValue)
filters.Add(DateRangeFilter.Before(x => x.CreateTime, createdTo.Value));

if (!string.IsNullOrEmpty(type))
filters.Add(ScalarFilter.Equals(x => x.WorkflowDefinitionId, type));

var result = await _searchService.Search(terms, skip, take, filters.ToArray());

return Json(result);
}

[HttpGet("{id}")]
public async Task<IActionResult> Get(string id)
{
var result = await _workflowStore.GetWorkflowInstance(id);
return Json(result);
}

[HttpPost("{id}")]
public async Task<IActionResult> Post1(string id, int? version)
{
string workflowId = null;
var def = _registry.GetDefinition(id, version);
if (def == null)
return BadRequest(String.Format("Workflow defintion {0} for version {1} not found", id, version));


workflowId = await _workflowService.StartWorkflow(id, version);


return Ok(workflowId);
}

[HttpPut("{id}/suspend")]
public Task<bool> Suspend(string id)
{
return _workflowService.SuspendWorkflow(id);
}

[HttpPut("{id}/resume")]
public Task<bool> Resume(string id)
{
return _workflowService.ResumeWorkflow(id);
}

[HttpDelete("{id}")]
public Task<bool> Terminate(string id)
{
return _workflowService.TerminateWorkflow(id);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[Route("api/[controller]")]
[Controller]
public class EventsController : Controller
{
private readonly IWorkflowController _workflowService;

public EventsController(IWorkflowController workflowService)
{
_workflowService = workflowService;
}

[HttpPost("{eventName}/{eventKey}")]
public async Task<IActionResult> Post(string eventName, string eventKey, [FromBody] MyDataClass eventData)
{
await _workflowService.PublishEvent(eventName, eventKey, eventData.Value1);
return Ok();
}

}

7.Workflow Core之持久化和在Asp.net core中的使用

https://bubuweiying.site/7.WokflowCore之持久化及在ASP.NETCORE中的使用/

作者

步步为营

发布于

2024-03-23

更新于

2025-03-15

许可协议