订阅管理器


  订阅管理器 ISubscribeManager 接口提供了发布消息和订阅消息的方法。Fireasy 的订阅器同时充当生产者和消费者的角色。


1、获得实例

  要使用订阅管理器,首先得获取到它的实例。我们不推荐使用 new 构造一个实例,而是使用 SubscribeManagerFactory 工厂类来创建一个实例。该工厂类提供一个 CreateManager 方法,可以接收一个 configName 参数,这个参数对应订阅器配置(可参考 配置与扩展)中配置项的名称,如果未配置订阅配置项,则返回默认的订阅管理器单例。如下所示:

[TestMethod]
public void TestCreateManager()
{
    var manager = SubscribeManagerFactory.CreateManager();
    Assert.IsNotNull(manager); // DefaultSubscribeManager
}

  在使用 IOC 的环境中,也可以通过构造注入或属性注入得到 ISubscribeManager 的实例。如下所示:

public class HomeController : Controller
{
    private readonly ISubscribeManager _subscribeManager;

    public HomeController(ISubscribeManager subscribeManager)
    {
        _subscribeManager = subscribeManager;
    }
}

2、订阅主题

  订阅主题是将消息结构化,使用自己定义的一个类对象来传递消息,发布消息时先序列化为二进制流,接收消息时先反序列化为对象。如下所示:

public class ChatMessage
{
    public string Sender { get; set; }

    public string Receiver { get; set; }

    public string Message { get; set; }
}

  每一类消息是通过唯一的名称进行传递的,默认是订阅主题类的全称,如以上定义的类对应的主题名称是 demo.ChatMessage,当然你也可以使用 TopicAttribute 特性指定一个其他的名称。如下所示:

[Topic("mychatmessage")]
public class ChatMessage
{
    public string Sender { get; set; }

    public string Receiver { get; set; }

    public string Message { get; set; }
}

3、添加订阅委托

  订阅器通过添加一个委托,来接收并消费消息。使用 AddSubscriber 方法添加同步订阅,使用 AddAsyncSubscriber 方法添加异步订阅。如下所示:

[TestMethod]
public void TestAddSubscriber()
{
    var manager = SubscribeManagerFactory.CreateManager();

    manager.AddSubscriber<ChatMessage>(msg => 
    {
        Console.WriteLine(msg.Sender + msg.Receiver + msg.Message);
    });

    manager.AddAsyncSubscriber<ChatMessage>(async msg => 
    {
        Console.WriteLine(msg.Sender + msg.Receiver + msg.Message);
    });
}

  同一消息类型可以添加多个订阅委托。


4、订阅器和订阅处理器

  订阅器 ISubscriber<T> 接口允许你对主题消息进行订阅,而订阅处理器 ISubscribeHandler 接口则可以在一个类里处理多种主题的订阅。如下所示:

public class ChatMessageSubscriber : ISubscriber<ChatMessage>
{
    public void Accept(ChatMessage msg)
    {
        Console.WriteLine(msg.Sender + msg.Receiver + msg.Message);
    }
}

public class MySubscribeHandler : ISubscribeHandler
{
    public void Handle(ChatMessage msg)
    {
        Console.WriteLine(msg.Sender + msg.Receiver + msg.Message);
    }

    public void Handle(OtherMessage msg)
    {
    }
}

public class OtherMessage 
{
}

  这两种订阅方式一般用在 .Net Core 应用程序中。在 Startup.ConfigureServices 方法里,使用 AddSubscribers 方法添加程序集中的所有订阅器及订阅处理器。如下所示:

namespace demo
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddSubscribers(this.GetType().Assembly);
        }
    }
}

  当省略 assembly 参数时,默认是在 IServiceCollection 集合里查找订阅器和订阅处理器,因此,需要将订阅器及订阅处理器添加到 IServiceCollection 里,你也可以使用 ITransientServiceIScopedService 接口进行标记。如下所示:

public class ChatMessageSubscriber : ISubscriber<ChatMessage>, IScopedService
{
    public void Accept(ChatMessage msg)
    {
        Console.WriteLine(msg.Sender + msg.Receiver + msg.Message);
    }
}

public class MySubscribeHandler : ISubscribeHandler, IScopedService
{
    public void Handle(ChatMessage msg)
    {
        Console.WriteLine(msg.Sender + msg.Receiver + msg.Message);
    }

    public void Handle(OtherMessage msg)
    {
    }
}

  由于使用了 Fireasy 的 IOC 因此需要在配置文件里将程序集添加到 contaier,参见 容器的配置。Startup.ConfigureServices 方法也需要调整如下:

namespace demo
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddFireasy().AddIoc().AddSubscribers();
        }
    }
}

5、发布消息

  使用 Publish 方法发布一个消息。如下所示:

[TestMethod]
public void TestPublish()
{
    var manager = SubscribeManagerFactory.CreateManager();

    manager.Publish(new ChatMessage 
    {
        Sender = "XiaoMei", 
        Receiver = "XiaoMing", 
        Message = "Hello!" 
    });
}

[TestMethod]
public async Task TestPublishAsync()
{
    var manager = SubscribeManagerFactory.CreateManager();

    await manager.PublishAsync(new ChatMessage 
    {
        Sender = "XiaoMei", 
        Receiver = "XiaoMing", 
        Message = "Hello!" 
    });
}

  当主题名称与主题类不对应时,可以手动指定主题名称。如下所示:

[TestMethod]
public void TestPublish()
{
    var manager = SubscribeManagerFactory.CreateManager();

    manager.Publish("x-chat-message", new ChatMessage 
    {
        Sender = "XiaoMei", 
        Receiver = "XiaoMing", 
        Message = "Hello!" 
    });
}

6、消息持久化

  某些时候由于网络原因导致消息发布失败,随之丢失。为了避免这种情况,Fireasy 提供了一个 ISubjectPersistance 接口,当消息发布失败时,调用 SaveSubject 方法将消息进行持久化,同时使用定时器 PersistentTimer 来触发重试发布。以下是它的两种实现:

  • LocalFilePersistance 类,消息存储在当前应用的 _subpersis 目录下。默认的。

  • IsolatedFilePersistance 类,消息存储在当前用户的应用数据目录下,见:

C:\Users\当前用户\AppData\Local\IsolatedStorage

  ISubjectPersistance 接口可以在 IOC 里进行配置切换使用。


💡 多说一句

  过期或重试次数达到上限的消息,会存储到以上目录的子目录 disabled 下。


7、异常通知

  Fireasy 提供了一个 ISubscribeNotification 接口,以供你订阅异常通知之用。当发布消息或接收消息时发生异常,会通过 OnPublishError 或 OnConsumeError 方法进行通知。

  接收到通知时,可以设置 CanRetry = false 阻止定时器 PersistentTimer 进行重试。

  ISubscribeNotification 接口可以在 IOC 里进行配置切换使用。


8、关于两个实现

  DefaultSubscribeManager 类是默认的订阅管理器,它在内存中使用一个队列来存放消息,使用定时器来消费队列中的消息,因此它是不安全的,我们更多的是使用第三方的组件来替代它。

  SynchronizedSubscribeManager 类是同步的订阅管理器,它不存储消息,而是在发布消息后,立即调用相关的订阅委托来执行消费,因此称它为同步订阅。实体事件订阅管理器 就是基于这种模式。


💡 注意事项

  如果在其他的应用程序中使用了第三方组件与 Fireasy 进行消息交换时,应注意消息体的结构,Body 才是你所定义的 Subject。如下所示:

{
    "Key": "{Guid}", //唯一标识
    "Name": "{TopicName}", //主题名称
    "Body": "{Base64}", //Subject的内容(Json)
    "PublishRetries": 1, //发送时重试次数
    "AcceptRetries": 1, //接收时重试次数
    "ExpiresAt": "2021-01-22 12:00:00" //有效期
}

  如上面 TestPublish 示例中,传递的消息内容为:

{
    "Key": "9e8926f0-50a2-4050-8a80-b7c500a9e61e",
    "Name": "x-chat-message",
    "Body": "eyAiU2VuZGVyIjogIlhpYW9NZWkiLCAiUmVjZWl2ZXIiOiAiWGlhb01pbmciLCAiTWVzc2FnZSI6ICJIZWxsbyEiIH0=",
    "PublishRetries": 1,
    "AcceptRetries": 1,
    "ExpiresAt": "2021-01-22 12:00:00"
}