.netcore-xms跨平台基础框架

2019-11-14 来源: 斗半米哥 发布在  https://www.cnblogs.com/migomiddle/p/11854814.html

结合DI,实现发布者与订阅者的解耦,属于本次事务的对象主体不应定义为订阅者,因为订阅者不应与发布者产生任何关联

一、发布者订阅者模式

发布者发出一个事件主题,一个或多个订阅者接收这个事件,中间通过事件总线通讯(消息队列),并且发布者与订阅者这两者间是无状态的,根据产品实际场景需要,可以自己实现单机单点的发布订阅,也可选择使用目前流行的分布式消息中间件:

RabbitMQ、ActiveMQ、RocketMQ、kafka等

二、观察者与订阅者的区别

观察者与业务主体是耦合的,并且是即时通知的;订阅者与业务主体完全解耦,只通过中间的信息通道通知,互相不知道对方的存在,可以是同步也可以是异步

三、具体实现

本文主要讲解单点模式,有需要随时可以扩展为分布式方案

发布者接口:

 namespace Xms.Event.Abstractions
 {
     /// <summary>
     /// 事件发布接口
     /// </summary>
     public interface IEventPublisher
     {
         /// <summary>
         /// 发布事件
         /// </summary>
         /// <typeparam name="TEvent">事件类型</typeparam>
         /// <param name="e"></param>
         void Publish<TEvent>(TEvent e);
     }
 }

发布者实现:

 using System;
 using System.Linq;
 using Xms.Event.Abstractions;
 using Xms.Infrastructure.Inject;
 using Xms.Logging.AppLog;

 namespace Xms.Event
 {
     /// <summary>
     /// 事件发布者
     /// </summary>
     public class EventPublisher : IEventPublisher
     {
         private readonly ILogService _logService;
         private readonly IServiceResolver _serviceResolver;

         public EventPublisher(ILogService logService
             , IServiceResolver serviceResolver)
         {
             _logService = logService;
             _serviceResolver = serviceResolver;
         }

         #region Methods

         /// <summary>
         /// 发布事件
         /// </summary>
         /// <typeparam name="TEvent">事件类</typeparam>
         /// <param name="e">事件对象</param>
         public virtual void Publish<TEvent>(TEvent e)
         {
             //获取所有事件接收者
             var consumers = _serviceResolver.GetAll<IConsumer<TEvent>>().ToList();
             foreach (var consumer in consumers)
             {
                 try
                 {
                     //处理事件
                     consumer.HandleEvent(e);
                 }
                 catch (Exception exception)
                 {
                     _logService.Error(exception);
                 }
             }
         }

         #endregion Methods
     }
 }

订阅(消费)者接口:

 namespace Xms.Event.Abstractions
 {
     /// <summary>
     /// 事件接收接口
     /// </summary>
     /// <typeparam name="T"></typeparam>
     public interface IConsumer<T>
     {
         /// <summary>
         /// 处理事件
         /// </summary>
         /// <param name="eventMessage">事件</param>
         void HandleEvent(T eventMessage);
     }
 }

事件(消息):

这里只给出一个作为示例,实际上一般会有记录的:“创建”、“修改”、“删除”,流程相关的:“发起审批”、“审批通过”、“审批完成”等等

 namespace Xms.Flow.Core.Events
 {
     /// <summary>
     /// 工作流启动后事件
     /// </summary>
     public class WorkFlowStartedEvent
     {
         public WorkFlowStartUpContext Context { get; set; }
         public WorkFlowExecutionResult Result { get; set; }
     }
 }

服务注册:

详细实现回看.netcore之DI批量注入(支持泛型)

 using Microsoft.Extensions.Configuration;
 using Microsoft.Extensions.DependencyInjection;
 using Xms.Core;
 using Xms.Infrastructure.Inject;

 namespace Xms.Event
 {
     /// <summary>
     /// 事件模块服务注册
     /// </summary>
     public class ServiceRegistrar : IServiceRegistrar
     {
         ;

         public void Add(IServiceCollection services, IConfiguration configuration)
         {
             //event publisher
             services.AddScoped<Event.Abstractions.IEventPublisher, Event.EventPublisher>();
             //event consumers
             services.RegisterScope(typeof(Event.Abstractions.IConsumer<>));
         }
     }
 }

四、应用场景

比如在工作流启动审批后发送通知

 using System.Collections.Generic;
 using Xms.Context;
 using Xms.Event.Abstractions;
 using Xms.Flow.Core.Events;
 using Xms.Infrastructure.Utility;
 using Xms.Localization.Abstractions;
 using Xms.Notify.Abstractions;
 using Xms.Notify.Internal;

 namespace Xms.EventConsumers.Notify
 {
     /// <summary>
     /// 工作流启动审批后发送通知
     /// </summary>
     public class WorkflowStartedNotify : IConsumer<WorkFlowStartedEvent>
     {
         private readonly IAppContext _appContext;
         private readonly ILocalizedTextProvider _loc;
         private readonly IEnumerable<INotify> _notifies;

         public WorkflowStartedNotify(IAppContext appContext
             , IEnumerable<INotify> notifies)
         {
             _appContext = appContext;
             _loc = _appContext.GetFeature<ILocalizedTextProvider>();
             _notifies = notifies;
         }
         public void HandleEvent(WorkFlowStartedEvent eventMessage)
         {
             //当前节点处理人
             foreach (var handlerId in eventMessage.Result.NextHandlerId)
             {
                 //通知方式:微信、短信、邮件、系统消息等
                 var msg = _loc["workflow_newtasknotify"].FormatWith(eventMessage.Context.EntityMetaData.LocalizedName);
                 //发送消息
                 foreach (var notifier in _notifies)
                 {
                     notifier.Send(new InternalNotifyBody()
                     {
                         TypeCode =
                         ,
                         Subject = msg
                         ,
                         Content = "到你审批了,快到碗里来"
                         ,
                         ToUserId = handlerId
                         ,
                         LinkTo = "/entity/create?entityid=" + eventMessage.Context.EntityMetaData.EntityId + "&recordid=" + eventMessage.Context.ObjectId
                     });
                 }
             }
         }
     }
 }

相关文章