订阅管理器
订阅管理器 ISubscribeManager
接口提供了发布消息和订阅消息的方法。Fireasy 的订阅器同时充当生产者和消费者的角色。
1、获得实例
要使用订阅管理器,首先得获取到它的实例。我们不推荐使用 new 构造一个实例,而是使用 SubscribeManagerFactory
工厂类来创建一个实例。该工厂类提供一个 CreateManager 方法,可以接收一个 configName 参数,这个参数对应订阅器配置(可参考 配置与扩展)中配置项的名称,如果未配置订阅配置项,则返回默认的订阅管理器单例。如下所示:
[TestMethod]
public void TestCreateManager()
{
var manager = SubscribeManagerFactory.CreateManager();
Assert.IsNotNull(manager); // DefaultSubscribeManager
}
在使用 IOC
的环境中,也可以通过构造注入或属性注入得到 ISubscribeManager
的实例。如下所示:
public class HomeController : Controller
{
private readonly ISubscribeManager _subscribeManager;
public HomeController(ISubscribeManager subscribeManager)
{
_subscribeManager = subscribeManager;
}
}
2、订阅主题
订阅主题是将消息结构化,使用自己定义的一个类对象来传递消息,发布消息时先序列化为二进制流,接收消息时先反序列化为对象。如下所示:
public class ChatMessage
{
public string Sender { get; set; }
public string Receiver { get; set; }
public string Message { get; set; }
}
每一类消息是通过唯一的名称进行传递的,默认是订阅主题类的全称,如以上定义的类对应的主题名称是 demo.ChatMessage,当然你也可以使用 TopicAttribute
特性指定一个其他的名称。如下所示:
[Topic("mychatmessage")]
public class ChatMessage
{
public string Sender { get; set; }
public string Receiver { get; set; }
public string Message { get; set; }
}
3、添加订阅委托
订阅器通过添加一个委托,来接收并消费消息。使用 AddSubscriber 方法添加同步订阅,使用 AddAsyncSubscriber 方法添加异步订阅。如下所示:
[TestMethod]
public void TestAddSubscriber()
{
var manager = SubscribeManagerFactory.CreateManager();
manager.AddSubscriber<ChatMessage>(msg =>
{
Console.WriteLine(msg.Sender + msg.Receiver + msg.Message);
});
manager.AddAsyncSubscriber<ChatMessage>(async msg =>
{
Console.WriteLine(msg.Sender + msg.Receiver + msg.Message);
});
}
同一消息类型可以添加多个订阅委托。
4、订阅器和订阅处理器
订阅器 ISubscriber<T>
接口允许你对主题消息进行订阅,而订阅处理器 ISubscribeHandler
接口则可以在一个类里处理多种主题的订阅。如下所示:
public class ChatMessageSubscriber : ISubscriber<ChatMessage>
{
public void Accept(ChatMessage msg)
{
Console.WriteLine(msg.Sender + msg.Receiver + msg.Message);
}
}
public class MySubscribeHandler : ISubscribeHandler
{
public void Handle(ChatMessage msg)
{
Console.WriteLine(msg.Sender + msg.Receiver + msg.Message);
}
public void Handle(OtherMessage msg)
{
}
}
public class OtherMessage
{
}
这两种订阅方式一般用在 .Net Core 应用程序中。在 Startup.ConfigureServices 方法里,使用 AddSubscribers 方法添加程序集中的所有订阅器及订阅处理器。如下所示:
namespace demo
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddSubscribers(this.GetType().Assembly);
}
}
}
当省略 assembly 参数时,默认是在 IServiceCollection
集合里查找订阅器和订阅处理器,因此,需要将订阅器及订阅处理器添加到 IServiceCollection
里,你也可以使用 ITransientService
或 IScopedService
接口进行标记。如下所示:
public class ChatMessageSubscriber : ISubscriber<ChatMessage>, IScopedService
{
public void Accept(ChatMessage msg)
{
Console.WriteLine(msg.Sender + msg.Receiver + msg.Message);
}
}
public class MySubscribeHandler : ISubscribeHandler, IScopedService
{
public void Handle(ChatMessage msg)
{
Console.WriteLine(msg.Sender + msg.Receiver + msg.Message);
}
public void Handle(OtherMessage msg)
{
}
}
由于使用了 Fireasy 的 IOC
因此需要在配置文件里将程序集添加到 contaier,参见 容器的配置。Startup.ConfigureServices 方法也需要调整如下:
namespace demo
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddFireasy().AddIoc().AddSubscribers();
}
}
}
5、发布消息
使用 Publish 方法发布一个消息。如下所示:
[TestMethod]
public void TestPublish()
{
var manager = SubscribeManagerFactory.CreateManager();
manager.Publish(new ChatMessage
{
Sender = "XiaoMei",
Receiver = "XiaoMing",
Message = "Hello!"
});
}
[TestMethod]
public async Task TestPublishAsync()
{
var manager = SubscribeManagerFactory.CreateManager();
await manager.PublishAsync(new ChatMessage
{
Sender = "XiaoMei",
Receiver = "XiaoMing",
Message = "Hello!"
});
}
当主题名称与主题类不对应时,可以手动指定主题名称。如下所示:
[TestMethod]
public void TestPublish()
{
var manager = SubscribeManagerFactory.CreateManager();
manager.Publish("x-chat-message", new ChatMessage
{
Sender = "XiaoMei",
Receiver = "XiaoMing",
Message = "Hello!"
});
}
6、消息持久化
某些时候由于网络原因导致消息发布失败,随之丢失。为了避免这种情况,Fireasy 提供了一个 ISubjectPersistance
接口,当消息发布失败时,调用 SaveSubject 方法将消息进行持久化,同时使用定时器 PersistentTimer
来触发重试发布。以下是它的两种实现:
LocalFilePersistance 类,消息存储在当前应用的 _subpersis 目录下。默认的。
IsolatedFilePersistance 类,消息存储在当前用户的应用数据目录下,见:
C:\Users\当前用户\AppData\Local\IsolatedStorage
ISubjectPersistance
接口可以在 IOC
里进行配置切换使用。
💡 多说一句
过期或重试次数达到上限的消息,会存储到以上目录的子目录 disabled 下。
7、异常通知
Fireasy 提供了一个 ISubscribeNotification
接口,以供你订阅异常通知之用。当发布消息或接收消息时发生异常,会通过 OnPublishError 或 OnConsumeError 方法进行通知。
接收到通知时,可以设置 CanRetry = false 阻止定时器 PersistentTimer
进行重试。
ISubscribeNotification
接口可以在 IOC
里进行配置切换使用。
8、关于两个实现
DefaultSubscribeManager
类是默认的订阅管理器,它在内存中使用一个队列来存放消息,使用定时器来消费队列中的消息,因此它是不安全的,我们更多的是使用第三方的组件来替代它。
SynchronizedSubscribeManager
类是同步的订阅管理器,它不存储消息,而是在发布消息后,立即调用相关的订阅委托来执行消费,因此称它为同步订阅。实体事件订阅管理器 就是基于这种模式。
💡 注意事项
如果在其他的应用程序中使用了第三方组件与 Fireasy 进行消息交换时,应注意消息体的结构,Body 才是你所定义的 Subject。如下所示:
{
"Key": "{Guid}", //唯一标识
"Name": "{TopicName}", //主题名称
"Body": "{Base64}", //Subject的内容(Json)
"PublishRetries": 1, //发送时重试次数
"AcceptRetries": 1, //接收时重试次数
"ExpiresAt": "2021-01-22 12:00:00" //有效期
}
如上面 TestPublish 示例中,传递的消息内容为:
{
"Key": "9e8926f0-50a2-4050-8a80-b7c500a9e61e",
"Name": "x-chat-message",
"Body": "eyAiU2VuZGVyIjogIlhpYW9NZWkiLCAiUmVjZWl2ZXIiOiAiWGlhb01pbmciLCAiTWVzc2FnZSI6ICJIZWxsbyEiIH0=",
"PublishRetries": 1,
"AcceptRetries": 1,
"ExpiresAt": "2021-01-22 12:00:00"
}