9.3DDD之集成事件
和领域事件不同,集成事件主要用于在微服务之间进行事件传递,即可能在多个服务器之间进行通信。本文讲解RabbitMQ
中间件来完成集成事件的处理。
- RabbitMQ的基本概念:
- 信道(channel),信道是消息的生产者,消费者和服务器之间进行通信的虚拟连接。
- 队列,队列是用来进行消息收发的地方,生产者将消息放到队列中,消费者从队列中获取消息。
- 交换机,交换机用于把消息路由到队列中。
- RabbitMQ的routing模式:
- 生产者把消息发布到交换机中,消息会携带routingKey属性,交互机根据routingKey的值吧消息发送到一个或者多个队列,然后消费者从队列中获取消息。这种模式的优点是交换机和队列都位于RabbitMQ服务器的内部,即使消费者不在线,相关消息也会保存在队列中,等消费者上线后就可以获取到消息了。
使用步骤
Nuget安装RabbitMQ.Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| using RabbitMQ.Client; using System.Text;
var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.DispatchConsumersAsync = true; string exchangeName = "exchange1"; string eventName = "myEvent"; using var conn = factory.CreateConnection(); while (true) { string msg = DateTime.Now.TimeOfDay.ToString(); using (var channel = conn.CreateModel()) { var properties = channel.CreateBasicProperties; properties.DeliveryMode = 2; channel.ExchangeDeclare(exchange: exchangeName, type: "direct"); byte[] body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: exchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body); } Console.WriteLine("发布了消息:" + msg); Thread.Sleep(1000); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text;
var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.DispatchConsumersAsync = true; string exchangeName = "exchange1"; string eventName = "myEvent"; using var conn = factory.CreateConnection(); using var channel = conn.CreateModel(); string queueName = "queue1";
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: eventName);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer); Console.ReadLine();
async Task Consumer_Received(object sender, BasicDeliverEventArgs args) { try { var bytes = args.Body.ToArray(); string msg = Encoding.UTF8.GetString(bytes); Console.WriteLine(DateTime.Now + "收到了消息" + msg); channel.BasicAck(args.DeliveryTag, multiple: false); await Task.Delay(800); } catch (Exception ex) { channel.BasicReject(args.DeliveryTag, true); Console.WriteLine("处理收到的消息出错" + ex); } }
|
简化框架
- Nuget安装
Zack.EventBus
- 在配置系统下创建EventBus节点
1 2 3 4
| "EventBus": { "HostName": "127.0.01", "ExchangeName": "EventBusDemo1" }
|
- 在program.cs中进行配置
1 2 3 4 5 6 7 8
| var eventBusSec = builder.Configuration.GetSection("EventBus"); builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
builder.Services.AddEventBus("EventBusDemo1_Q1", Assembly.GetExecutingAssembly()); var app = builder.Build(); app.UseEventBus();
|
1 2 3 4 5
| var eventBusSec = builder.Configuration.GetSection("EventBus"); builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec); builder.Services.AddEventBus("EventBusDemo1_Q2", Assembly.GetExecutingAssembly()); var app = builder.Build(); app.UseEventBus();
|
- 在需要发布事件的类中注入IEventBus服务,调用Publish方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| [Route("api/[controller]")] [ApiController] public class DemoController : ControllerBase { private IEventBus eventBus;
public DemoController(IEventBus eventBus) { this.eventBus = eventBus; }
[HttpPost] public string Publish() { eventBus.Publish("UserAdded", new { UserName = "yzk", Age = 18 }); return "ok"; } }
|
- 编写事件处理者
- 实现
IIntegrationEventHandler
接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| [EventName("UserAdded")] public class UserAddesEventHandler : IIntegrationEventHandler { private readonly ILogger<UserAddesEventHandler> logger; public UserAddesEventHandler(ILogger<UserAddesEventHandler> logger) { this.logger = logger; } public Task Handle(string eventName, string eventData) { logger.LogInformation("新建了用户:" + eventData); return Task.CompletedTask; } }
|
- 事件数据是以JSON格式传入,可以使用
JsonIntegrationEventHandler<T>
接口来解析成.net对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public record UserData(string UserName, int Age);
[EventName("UserAdded")] public class UserAddesEventHandler3 : JsonIntegrationEventHandler<UserData> { private readonly ILogger<UserAddesEventHandler3> logger; public UserAddesEventHandler3(ILogger<UserAddesEventHandler3> logger) { this.logger = logger; } public override Task HandleJson(string eventName, UserData eventData) { logger.LogInformation($"Json:{eventData.UserName}"); return Task.CompletedTask; } }
|
- 进行微服务开发时,为了降低耦合,一般不会新建一个UserData类供多个微服务使用。则可以使用
DynamicIntegrationEventHandler
接口来将JSON解析为dynamic类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| [EventName("UserAdded")] public class UserAddesEventHandler2 : DynamicIntegrationEventHandler { private readonly ILogger<UserAddesEventHandler2> logger; public UserAddesEventHandler2(ILogger<UserAddesEventHandler2> logger) { this.logger = logger; } public override Task HandleDynamic(string eventName, dynamic eventData) { logger.LogInformation($"Dynamic:{eventData.UserName}"); return Task.CompletedTask; } }
|