From fb296bb61cad16de20ae4a361c739f28e93dc484 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=88=E6=A3=AE=E5=90=B4?= <8402134@qq.com> Date: Wed, 1 Feb 2023 17:49:34 +0800 Subject: [PATCH] 1 --- 类库/Api.Framework/Cps/AlimamaApi.cs | 60 ++- 类库/Api.Framework/NoticeSocketClient.cs | 500 +++++++++++++++-------- 2 files changed, 377 insertions(+), 183 deletions(-) diff --git a/类库/Api.Framework/Cps/AlimamaApi.cs b/类库/Api.Framework/Cps/AlimamaApi.cs index 9fff6a4..6841d28 100644 --- a/类库/Api.Framework/Cps/AlimamaApi.cs +++ b/类库/Api.Framework/Cps/AlimamaApi.cs @@ -1836,25 +1836,44 @@ t = {target}"); #region MyRegion private static NoticeSocketClient _noticeSocket; - + private static RemoteCloudConnectService _remoteCloudConnectService = new RemoteCloudConnectService(); public static void Init() { - _noticeSocket = new NoticeSocketClient("event.api.52cmg.cn", "tksoft", "circle_goods_task"); - _noticeSocket.OnMessageEvent += NoticeSocket_OnMessageEvent; - _noticeSocket.OnConnectedEvent += NoticeSocket_OnConnectedEvent; - _noticeSocket.Connect(); + _remoteCloudConnectService.Initialize(); + //"tksoft", "circle_goods_task" + _noticeSocket = new NoticeSocketClient(); + _remoteCloudConnectService.CloudNoticeEvent += _remoteCloudConnectService_CloudNoticeEvent; + _remoteCloudConnectService.OnOpen += _remoteCloudConnectService_OnOpen; + ThreadPool.QueueUserWorkItem(o => { RefreshTkUrl(); }); } - - //收到服务端通知 - private static void NoticeSocket_OnMessageEvent(object sender, NoticeSocketClient.AckNoticeData e) + // 连接打开 + private static void _remoteCloudConnectService_OnOpen() { + var arr = new string[] { "tksoft", "circle_goods_task" , "laow" }; + foreach (var item in arr) + { + _remoteCloudConnectService.SendString("subchannel\u0000" + item); + } + } + /// + /// 防止重复 + /// + private static List repeatList = new List(); + private static void _remoteCloudConnectService_CloudNoticeEvent(object sender, CloudNoticeEventArgs e) + { + if (repeatList.Contains(e.Id))//去重复 + { + return; + } + repeatList.Add(e.Id); + try { - LogHelper.GetSingleObj().Info("", "域名相关通知 => " + e.Data); + LogHelper.GetSingleObj().Info("", "域名相关通知 => " + e.Body); if (e.CommonId == "change_tkurl") { - var data = JObject.Parse(e.Data); + var data = JObject.Parse(e.Body); var newUrl = (data["new"] ?? string.Empty).Value(); var oldUrl = (data["old"] ?? string.Empty).Value(); if (string.IsNullOrEmpty(newUrl) || string.IsNullOrEmpty(oldUrl)) return; @@ -1890,28 +1909,33 @@ t = {target}"); { RefreshTkUrl(); } - else if (e.CommonId == "send") + else if (e.CommonId == "send" && e.Channel == "circle_goods_task") { //{"Id":"122522636742819840","TaskId":"122522636742819840","IsGoods":true} - EventClient.OnDyyNoticeEvent(null, new DyyNoticeEvent() { Data = e.Data }); + EventClient.OnDyyNoticeEvent(null, new DyyNoticeEvent() { Data = e.Body }); } } catch (Exception exception) { - EventClient.OnEvent("收到域名变化通知", $"数据:{e.Data};异常:{exception.Message}"); + EventClient.OnEvent("收到域名变化通知", $"数据:{e.Body};异常:{exception.Message}"); + } + finally + { + if (repeatList.Count > 10000) + { + repeatList.RemoveRange(0, 9000); + } } } + + /// /// 淘客的地址 /// public static TkUrlClass TkUrl { get; private set; } - //建议连接成功后,主动再去拉一次配置(防止有时候客户端崩溃,但这个时候服务器有更新,没有收到) - private static void NoticeSocket_OnConnectedEvent(object sender, EventArgs e) - { - RefreshTkUrl(); - } + /// /// 刷新淘客中间页 diff --git a/类库/Api.Framework/NoticeSocketClient.cs b/类库/Api.Framework/NoticeSocketClient.cs index b64db5f..2653a9e 100644 --- a/类库/Api.Framework/NoticeSocketClient.cs +++ b/类库/Api.Framework/NoticeSocketClient.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Text; using System.Threading; using System.Xml; +using Api.Framework.Utils; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using WebSocket = WebSocketSharp.WebSocket; @@ -11,194 +12,277 @@ using WebSocketSharp; namespace Api.Framework { - + /// + /// 云通知数据 + /// + public class CloudNoticeEventArgs : EventArgs + { + /// + /// 通知数据ID,通过这个排重 + /// + public string Id { get; set; } + /// + /// 通知内容 + /// + public string Body { get; set; } + /// + /// 命令ID + /// + public string CommonId { get; set; } + /// + /// 发布时间 + /// + public DateTime CreateDateTime { get; set; } + /// + /// 信号通道 + /// + public string Channel { get; set; } + } + /// + /// 发送Ack模式通知数据 + /// + public class AckNoticeData : EventArgs + { + /// + /// ID + /// + public string Id { get; set; } + /// + /// 命令ID + /// + public string CommonId { get; set; } + /// + /// 信号道 + /// + public string Channel { get; set; } + /// + /// 数据 + /// + public string Data { get; set; } + /// + /// 生成时间 + /// + public DateTime CreateDateTime { get; set; } + /// + /// 标记被删除 + /// + public bool IsRemove { get; set; } + /// + /// 重试 + /// + public bool IsRetry { get; set; } + } /// /// 通知客户端 /// public class NoticeSocketClient { + /// - /// 发送Ack模式通知数据 + /// 获取数据 /// - public class AckNoticeData : EventArgs + /// + /// + public string GetData(string key) { - /// - /// ID - /// - public string Id { get; set; } - /// - /// 命令ID - /// - public string CommonId { get; set; } - /// - /// 信号道 - /// - public string Channel { get; set; } - /// - /// 数据 - /// - public string Data { get; set; } - /// - /// 生成时间 - /// - public DateTime CreateDateTime { get; set; } - /// - /// 标记被删除 - /// - public bool IsRemove { get; set; } - /// - /// 重试 - /// - public bool IsRetry { get; set; } + var http = new HttpHelper(); + var html = http.GetHtml($"http://event.api.52cmg.cn/api/TransferData/Get?key={key}&rd={DateTime.Now.Ticks}").Html; + var rest = JObject.Parse(html); + //if (!(rest["Ok"] ?? false).Value() throw new Exception(html); + //return (rest["Data"] ?? string.Empty).Value(); + + //EventClient.OnEvent($"获取淘客域名数据", $"{html}"); + + if (!(rest["Ok"] ?? false).Value()) + throw new Exception(html); + return (rest["Data"] ?? string.Empty).Value(); + } + } + + + /// + /// 远程调度服务连接 + /// + public class RemoteCloudConnectService + { + + + /// - /// 构造函数 + /// 初始化 /// - /// - /// - public NoticeSocketClient(string host, params string[] channel) + public void Initialize() { - this._host = host; - this._channels = channel.ToArray(); + this.Url = "ws://event.api.52cmg.cn"; + ThreadPool.QueueUserWorkItem(o => { DisputeConnect(); }); } - private readonly string[] _channels; - private readonly string _host; /// - /// 连接对象 + /// 争抢连接 /// - private WebSocket _websocket; - - /// - /// 连接状态 - /// - public bool IsConnected => (_websocket != null && _websocket.IsAlive); - - private string ClientId { get; set; } = Guid.NewGuid().ToString("N"); - - /// - /// 连接 - /// - public void Connect() + private void DisputeConnect() { - if (IsConnected) + try { - _websocket.OnOpen -= _websocket_OnOpen; - _websocket.OnMessage -= _websocket_OnMessage; - _websocket?.Close(); + Start(); } - - _websocket = new WebSocket($"ws://{this._host}/api/push?id={ClientId}"); - _websocket.OnOpen += _websocket_OnOpen; - _websocket.OnError += _websocket_OnError; - _websocket.OnClose += _websocket_OnClose; - _websocket.OnMessage += _websocket_OnMessage; - _websocket.Connect(); - _pingId = Guid.NewGuid().ToString("N"); - _thread = new Thread(Ping) + finally { - IsBackground = true - }; - _thread.Start(); + } + } + + /// + /// 地址 + /// + public string Url { get; set; } + /// + /// 启动 连接 + /// + public void Start() + { + while (true) + { + if (Connect()) + { + return; + } + + Thread.Sleep(5000); + } + } + /// + /// 是否正常连接 + /// + public bool IsConnect { get; private set; } + /// + /// 收到消息 + /// + public event Action OnMessage; + /// + /// 收到云端通知事件 + /// + public event EventHandler CloudNoticeEvent; + /// + /// 连接打开 + /// + public event Action OnOpen; + + /// + /// websocket + /// + private WebSocket _ws; + /// + /// ping + /// + private string _pingThradId; + /// + /// ping失败次数 + /// + public int PingFailureCount { get; set; } + /// + /// 重新连接次数 + /// + public int ReconnectionCount { get; set; } + /// + /// 连接ID + /// + public string ConnectId { get; set; } = Guid.NewGuid().ToString("N"); + + /// + /// 连接服务器 + /// + private bool Connect() + { + try + { + if (IsConnect) + { + return true; + } + if (string.IsNullOrWhiteSpace(Url)) + { + return false; + } + _ws = new WebSocket(this.Url + "/api/push?id=" + this.ConnectId); + _ws.OnOpen += _ws_OnOpen; + _ws.OnClose += _ws_OnClose; + _ws.OnMessage += _ws_OnMessage; + _ws.OnError += _ws_OnError; + _ws.Connect(); + Thread.Sleep(1000); + if (!IsConnect) + { + return false; + } + //连接成功 + StartPing(); + } + catch (Exception e) + { + // _log.Error(e); + } + return true; + } + /// + /// 错误信息 + /// + /// + /// + /// + private void _ws_OnError(object sender, ErrorEventArgs e) + { + //_log.Warn(e.Exception, "知易云服务连接断开" + e.Message); + this.IsConnect = false; + OnReconnection(); } /// /// 连接断开 /// /// /// - private void _websocket_OnClose(object sender, CloseEventArgs e) + private void _ws_OnClose(object sender, CloseEventArgs e) { - //断开自动重连 - this.Connect(); + // _log.Warn("知易云服务连接断开"); + this.IsConnect = false; + OnReconnection(); } + /// - /// 出现连接错误 + /// 连接打开 /// /// /// - private void _websocket_OnError(object sender, ErrorEventArgs e) + private void _ws_OnOpen(object sender, EventArgs e) { - //断开自动重连 - this.Connect(); + // _log.Info("知易云服务连接成功"); + this.IsConnect = true; + OnOpen?.Invoke(); } - /// - /// 收到消息事件 + /// 启动Ping /// - public event EventHandler OnMessageEvent; - - /// - /// 连接成功事件 - /// - public event EventHandler OnConnectedEvent; - - private void _websocket_OnMessage(object sender, WebSocketSharp.MessageEventArgs e) + public void StartPing() { - try + // _log.Info("启动心跳检测"); + var tr = new Thread(() => { - var index = e.Data.IndexOf("\u0000", StringComparison.Ordinal); - - if (index <= -1) + var id = _pingThradId = Guid.NewGuid().ToString(); + while (id == _pingThradId) { - return; - } - - var commonId = e.Data.Substring(0, index); - if (commonId != "notice") return; - var body = e.Data.Substring(index + 1); - var data = JsonConvert.DeserializeObject(body); - if (data == null || string.IsNullOrEmpty(data.Id)) return; - this._websocket.Send("notice_ack\u0000" + data.Id); - OnMessageEvent?.Invoke(this, data); - } - catch (Exception ex) - { - Console.WriteLine($"即时通讯API接收异常:{ex.Message}"); - } - } - - private Thread _thread; - private void _websocket_OnOpen(object sender, System.EventArgs e) - { - try - { - foreach (var v in this._channels) - { - this._websocket.Send($"subchannel\u0000{v}");//订阅通道 - } - this.OnConnectedEvent?.Invoke(this, EventArgs.Empty); - } - catch (Exception ex) - { - Console.WriteLine($"即时通讯API连接异常:{ex.Message}"); - } - } - - private int PingFailureCount { get; set; } - private string _pingId = Guid.NewGuid().ToString(); - private void Ping() - { - var id = Guid.NewGuid().ToString(); - _pingId = id; - while (_pingId == id) - { - Thread.Sleep(1000 * 10); - try - { - //测试ping是否正常 + Thread.Sleep(1000 * 10); + if (_ws == null) + { + return; + } try { - if (_websocket == null) + if (_ws.ReadyState == WebSocketState.Open) { - continue; - } - if (_websocket.ReadyState == WebSocketState.Open) - { - _websocket.Send("\u0001\u0002"); + _ws.Send("\u0001\u0002"); this.PingFailureCount = 0; - } else { @@ -214,39 +298,125 @@ namespace Api.Framework { if (this.PingFailureCount > 5) { - Console.WriteLine("心跳异常连接断开"); - _websocket?.Close(); + this.IsConnect = false; + //_log.Error("心跳异常连接断开"); + _ws?.Close(); } } - } - catch (Exception e) - { - Console.WriteLine($"即时通讯API Ping异常:{e.Message}"); - } - } - // ReSharper disable once FunctionNeverReturns + }); + tr.IsBackground = true; + tr.Start(); } /// - /// 获取数据 + /// 是否允许重新连接 /// - /// - /// - public string GetData(string key) + public bool AllowReconnected { get; set; } = true; + private bool _isBeginRconnection; + private readonly object _lockReconnection = new object(); + private void OnReconnection() { - var http = new HttpHelper(); - var html = http.GetHtml($"http://{this._host}/api/TransferData/Get?key={key}&rd={DateTime.Now.Ticks}").Html; - var rest = JObject.Parse(html); - //if (!(rest["Ok"] ?? false).Value() throw new Exception(html); - //return (rest["Data"] ?? string.Empty).Value(); + if (_isBeginRconnection || !AllowReconnected) return; + lock (_lockReconnection) + { + if (_isBeginRconnection || !AllowReconnected) return; + _onReconnection(); + } + } + private void _onReconnection() + { + if (_isBeginRconnection || !AllowReconnected) return; + try + { + _isBeginRconnection = true; + //_log.Debug("重连中..."); + while (true) + { + Thread.Sleep(1000 * 5); + if (AllowReconnected && !IsConnect) + { + this.ReconnectionCount++; + if (Connect()) + { + // _log.Debug("重连成功"); + break; + } + } + } + } + finally + { + _isBeginRconnection = false; + } + } + /// + /// 收到消息 + /// + /// + /// + /// + private void _ws_OnMessage(object sender, MessageEventArgs e) + { + if (e.IsText) + { + var index = e.Data.IndexOf("\u0000", StringComparison.Ordinal); + if (index <= -1) + { + return; + } - //EventClient.OnEvent($"获取淘客域名数据", $"{html}"); + LogHelper.GetSingleObj().Info("接收到通知",$"{e.Data}"); - if (!(rest["Ok"] ?? false).Value()) - throw new Exception(html); - return (rest["Data"] ?? string.Empty).Value(); + var commonId = e.Data.Substring(0, index); + var body = e.Data.Substring(index + 1); + if (commonId == "notice")//收到通知 + { + var ackNoticeData = Newtonsoft.Json.JsonConvert.DeserializeObject(body); + if (ackNoticeData == null) + { + return; + } + if (ackNoticeData.IsRemove) + { + return; + } + this.CloudNoticeEvent?.Invoke(this, new CloudNoticeEventArgs() + { + Id = ackNoticeData.Id, + Body = ackNoticeData.Data, + CommonId = ackNoticeData.CommonId, + CreateDateTime = ackNoticeData.CreateDateTime, + Channel = ackNoticeData.Channel + }); + _ws.Send("notice_ack\u0000" + ackNoticeData.Id);//回复确认标示 + return; + } + } + this.OnMessage?.Invoke(e); + } + /// + /// 发送字节 + /// + /// + public void SendBytes(byte[] bytes) + { + if (IsConnect) + { + _ws.Send(bytes); + } + } + /// + /// 发送字符串 + /// + /// + public void SendString(string context) + { + if (IsConnect) + { + _ws.Send(context); + } } }