结合DI,实现发布者与订阅者的解耦,属于本次事务的对象主体不应定义为订阅者,因为订阅者不应与发布者产生任何关联
一、发布者订阅者模式
发布者发出一个事件主题,一个或多个订阅者接收这个事件,中间通过事件总线通讯(消息队列),并且发布者与订阅者这两者间是无状态的,根据产品实际场景需要,可以自己实现单机单点的发布订阅,也可选择使用目前流行的分布式消息中间件:
RabbitMQ、ActiveMQ、RocketMQ、kafka等
二、观察者与订阅者的区别
观察者与业务主体是耦合的,并且是即时通知的;订阅者与业务主体完全解耦,只通过中间的信息通道通知,互相不知道对方的存在,可以是同步也可以是异步
三、具体实现
本文主要讲解单点模式,有需要随时可以扩展为分布式方案
发布者接口:
namespace Xms.Event.Abstractions { /// <summary> /// 事件发布接口 /// </summary> public interface IEventPublisher { /// <summary> /// 发布事件 /// </summary> /// <typeparam name="TEvent">事件类型</typeparam> /// <param name="e"></param> void Publish<TEvent>(TEvent e); } }
发布者实现:
using System; using System.Linq; using Xms.Event.Abstractions; using Xms.Infrastructure.Inject; using Xms.Logging.AppLog; namespace Xms.Event { /// <summary> /// 事件发布者 /// </summary> public class EventPublisher : IEventPublisher { private readonly ILogService _logService; private readonly IServiceResolver _serviceResolver; public EventPublisher(ILogService logService , IServiceResolver serviceResolver) { _logService = logService; _serviceResolver = serviceResolver; } #region Methods /// <summary> /// 发布事件 /// </summary> /// <typeparam name="TEvent">事件类</typeparam> /// <param name="e">事件对象</param> public virtual void Publish<TEvent>(TEvent e) { //获取所有事件接收者 var consumers = _serviceResolver.GetAll<IConsumer<TEvent>>().ToList(); foreach (var consumer in consumers) { try { //处理事件 consumer.HandleEvent(e); } catch (Exception exception) { _logService.Error(exception); } } } #endregion Methods } }
订阅(消费)者接口:
namespace Xms.Event.Abstractions { /// <summary> /// 事件接收接口 /// </summary> /// <typeparam name="T"></typeparam> public interface IConsumer<T> { /// <summary> /// 处理事件 /// </summary> /// <param name="eventMessage">事件</param> void HandleEvent(T eventMessage); } }
事件(消息):
这里只给出一个作为示例,实际上一般会有记录的:“创建”、“修改”、“删除”,流程相关的:“发起审批”、“审批通过”、“审批完成”等等
namespace Xms.Flow.Core.Events { /// <summary> /// 工作流启动后事件 /// </summary> public class WorkFlowStartedEvent { public WorkFlowStartUpContext Context { get; set; } public WorkFlowExecutionResult Result { get; set; } } }
服务注册:
详细实现回看.netcore之DI批量注入(支持泛型)
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Xms.Core; using Xms.Infrastructure.Inject; namespace Xms.Event { /// <summary> /// 事件模块服务注册 /// </summary> public class ServiceRegistrar : IServiceRegistrar { ; public void Add(IServiceCollection services, IConfiguration configuration) { //event publisher services.AddScoped<Event.Abstractions.IEventPublisher, Event.EventPublisher>(); //event consumers services.RegisterScope(typeof(Event.Abstractions.IConsumer<>)); } } }
四、应用场景
比如在工作流启动审批后发送通知
using System.Collections.Generic; using Xms.Context; using Xms.Event.Abstractions; using Xms.Flow.Core.Events; using Xms.Infrastructure.Utility; using Xms.Localization.Abstractions; using Xms.Notify.Abstractions; using Xms.Notify.Internal; namespace Xms.EventConsumers.Notify { /// <summary> /// 工作流启动审批后发送通知 /// </summary> public class WorkflowStartedNotify : IConsumer<WorkFlowStartedEvent> { private readonly IAppContext _appContext; private readonly ILocalizedTextProvider _loc; private readonly IEnumerable<INotify> _notifies; public WorkflowStartedNotify(IAppContext appContext , IEnumerable<INotify> notifies) { _appContext = appContext; _loc = _appContext.GetFeature<ILocalizedTextProvider>(); _notifies = notifies; } public void HandleEvent(WorkFlowStartedEvent eventMessage) { //当前节点处理人 foreach (var handlerId in eventMessage.Result.NextHandlerId) { //通知方式:微信、短信、邮件、系统消息等 var msg = _loc["workflow_newtasknotify"].FormatWith(eventMessage.Context.EntityMetaData.LocalizedName); //发送消息 foreach (var notifier in _notifies) { notifier.Send(new InternalNotifyBody() { TypeCode = , Subject = msg , Content = "到你审批了,快到碗里来" , ToUserId = handlerId , LinkTo = "/entity/create?entityid=" + eventMessage.Context.EntityMetaData.EntityId + "&recordid=" + eventMessage.Context.ObjectId }); } } } } }