ZhiYi/ZhiYi.Core.Application/RedisServices/SubscribeAndPublishService.cs

108 lines
3.8 KiB
C#

namespace ZhiYi.Core.Application.RedisServices
{
/// <summary>
/// Redis订阅/发布
/// </summary>
public class SubscribeAndPublishService
{
private readonly IConnectionMultiplexer _redisConnectionMultiplexer;
private readonly ConcurrentDictionary<string, (ISubscriber subscriber, RedisChannel redisChannel)> _subscriptions = new();
private readonly ILogger<SubscribeAndPublishService> _logger;
public SubscribeAndPublishService(IConnectionMultiplexer connectionMultiplexer, ILogger<SubscribeAndPublishService> logger)
{
_redisConnectionMultiplexer = connectionMultiplexer;
_logger = logger;
}
/// <summary>
/// 订阅
/// </summary>
/// <param name="clientId"></param>
public void Subscribe(string clientId, IWebSocket socket)
{
try
{
var subscriber = _redisConnectionMultiplexer.GetSubscriber();
var channel = "notifications:" + clientId;//频道可拓展为动态
_logger.LogInformation("开始订阅 'notifications' 频道, clientID为 {clientId}", clientId);
var handler = new Action<RedisChannel, RedisValue>(async (ch, value) =>
{
var notification = JsonConvert.DeserializeObject<Notification>(Encoding.UTF8.GetString(value));
if (notification != null && notification.ClientId == clientId)
{
_logger.LogInformation("订阅到 'notifications' 频道, clientID为 {clientId}", clientId);
//await _hubContext.Clients.Client(clientId).SendAsync("ReceiveNotification", notification.Message);
await socket.SendAsync(notification.Message);
}
});
subscriber.Subscribe(channel, handler);
_subscriptions[clientId] = (subscriber, channel);
}
catch (Exception ex)
{
_logger.LogError("订阅出错, {message}", ex.Message);
}
}
/// <summary>
/// 发布
/// </summary>
/// <param name="message"></param>
/// <param name="clientId"></param>
public void Publish(string message, string clientId)
{
try
{
var notification = new Notification { Message = message, ClientId = clientId };
var serializedMessage = JsonConvert.SerializeObject(notification);
var bytes = Encoding.UTF8.GetBytes(serializedMessage);
var publisher = _redisConnectionMultiplexer.GetSubscriber();
publisher.Publish(channel: "notifications:" + clientId, message: bytes);
}
catch (Exception ex)
{
_logger.LogError("发布出错, {message}", ex.Message);
}
}
/// <summary>
/// 取消订阅
/// </summary>
/// <param name="clientId"></param>
public void Unsubscribe(string clientId)
{
if (_subscriptions.TryGetValue(clientId, out var value))
{
value.subscriber.Unsubscribe(value.redisChannel + ":" + clientId);
_ = _subscriptions.TryRemove(clientId, out var sub);
}
}
}
/// <summary>
/// 消息管道内容
/// </summary>
public class Notification
{
/// <summary>
/// 消息
/// </summary>
public string Message { get; set; }
/// <summary>
/// 客户端ID
/// </summary>
public string ClientId { get; set; }
/// <summary>
/// 连接信息
/// </summary>
public IWebSocket webSocket { get; set; }
}
}