diff --git a/类库/Api.Framework/Cps/AlimamaApi.cs b/类库/Api.Framework/Cps/AlimamaApi.cs
index 9fff6a4..6841d28 100644
--- a/类库/Api.Framework/Cps/AlimamaApi.cs
+++ b/类库/Api.Framework/Cps/AlimamaApi.cs
@@ -1836,25 +1836,44 @@ t = {target}");
#region MyRegion
private static NoticeSocketClient _noticeSocket;
-
+ private static RemoteCloudConnectService _remoteCloudConnectService = new RemoteCloudConnectService();
public static void Init()
{
- _noticeSocket = new NoticeSocketClient("event.api.52cmg.cn", "tksoft", "circle_goods_task");
- _noticeSocket.OnMessageEvent += NoticeSocket_OnMessageEvent;
- _noticeSocket.OnConnectedEvent += NoticeSocket_OnConnectedEvent;
- _noticeSocket.Connect();
+ _remoteCloudConnectService.Initialize();
+ //"tksoft", "circle_goods_task"
+ _noticeSocket = new NoticeSocketClient();
+ _remoteCloudConnectService.CloudNoticeEvent += _remoteCloudConnectService_CloudNoticeEvent;
+ _remoteCloudConnectService.OnOpen += _remoteCloudConnectService_OnOpen;
+ ThreadPool.QueueUserWorkItem(o => { RefreshTkUrl(); });
}
-
- //收到服务端通知
- private static void NoticeSocket_OnMessageEvent(object sender, NoticeSocketClient.AckNoticeData e)
+ // 连接打开
+ private static void _remoteCloudConnectService_OnOpen()
{
+ var arr = new string[] { "tksoft", "circle_goods_task" , "laow" };
+ foreach (var item in arr)
+ {
+ _remoteCloudConnectService.SendString("subchannel\u0000" + item);
+ }
+ }
+ ///
+ /// 防止重复
+ ///
+ private static List repeatList = new List();
+ private static void _remoteCloudConnectService_CloudNoticeEvent(object sender, CloudNoticeEventArgs e)
+ {
+ if (repeatList.Contains(e.Id))//去重复
+ {
+ return;
+ }
+ repeatList.Add(e.Id);
+
try
{
- LogHelper.GetSingleObj().Info("", "域名相关通知 => " + e.Data);
+ LogHelper.GetSingleObj().Info("", "域名相关通知 => " + e.Body);
if (e.CommonId == "change_tkurl")
{
- var data = JObject.Parse(e.Data);
+ var data = JObject.Parse(e.Body);
var newUrl = (data["new"] ?? string.Empty).Value();
var oldUrl = (data["old"] ?? string.Empty).Value();
if (string.IsNullOrEmpty(newUrl) || string.IsNullOrEmpty(oldUrl)) return;
@@ -1890,28 +1909,33 @@ t = {target}");
{
RefreshTkUrl();
}
- else if (e.CommonId == "send")
+ else if (e.CommonId == "send" && e.Channel == "circle_goods_task")
{
//{"Id":"122522636742819840","TaskId":"122522636742819840","IsGoods":true}
- EventClient.OnDyyNoticeEvent(null, new DyyNoticeEvent() { Data = e.Data });
+ EventClient.OnDyyNoticeEvent(null, new DyyNoticeEvent() { Data = e.Body });
}
}
catch (Exception exception)
{
- EventClient.OnEvent("收到域名变化通知", $"数据:{e.Data};异常:{exception.Message}");
+ EventClient.OnEvent("收到域名变化通知", $"数据:{e.Body};异常:{exception.Message}");
+ }
+ finally
+ {
+ if (repeatList.Count > 10000)
+ {
+ repeatList.RemoveRange(0, 9000);
+ }
}
}
+
+
///
/// 淘客的地址
///
public static TkUrlClass TkUrl { get; private set; }
- //建议连接成功后,主动再去拉一次配置(防止有时候客户端崩溃,但这个时候服务器有更新,没有收到)
- private static void NoticeSocket_OnConnectedEvent(object sender, EventArgs e)
- {
- RefreshTkUrl();
- }
+
///
/// 刷新淘客中间页
diff --git a/类库/Api.Framework/NoticeSocketClient.cs b/类库/Api.Framework/NoticeSocketClient.cs
index b64db5f..2653a9e 100644
--- a/类库/Api.Framework/NoticeSocketClient.cs
+++ b/类库/Api.Framework/NoticeSocketClient.cs
@@ -3,6 +3,7 @@ using System.Linq;
using System.Text;
using System.Threading;
using System.Xml;
+using Api.Framework.Utils;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using WebSocket = WebSocketSharp.WebSocket;
@@ -11,194 +12,277 @@ using WebSocketSharp;
namespace Api.Framework
{
-
+ ///
+ /// 云通知数据
+ ///
+ public class CloudNoticeEventArgs : EventArgs
+ {
+ ///
+ /// 通知数据ID,通过这个排重
+ ///
+ public string Id { get; set; }
+ ///
+ /// 通知内容
+ ///
+ public string Body { get; set; }
+ ///
+ /// 命令ID
+ ///
+ public string CommonId { get; set; }
+ ///
+ /// 发布时间
+ ///
+ public DateTime CreateDateTime { get; set; }
+ ///
+ /// 信号通道
+ ///
+ public string Channel { get; set; }
+ }
+ ///
+ /// 发送Ack模式通知数据
+ ///
+ public class AckNoticeData : EventArgs
+ {
+ ///
+ /// ID
+ ///
+ public string Id { get; set; }
+ ///
+ /// 命令ID
+ ///
+ public string CommonId { get; set; }
+ ///
+ /// 信号道
+ ///
+ public string Channel { get; set; }
+ ///
+ /// 数据
+ ///
+ public string Data { get; set; }
+ ///
+ /// 生成时间
+ ///
+ public DateTime CreateDateTime { get; set; }
+ ///
+ /// 标记被删除
+ ///
+ public bool IsRemove { get; set; }
+ ///
+ /// 重试
+ ///
+ public bool IsRetry { get; set; }
+ }
///
/// 通知客户端
///
public class NoticeSocketClient
{
+
///
- /// 发送Ack模式通知数据
+ /// 获取数据
///
- public class AckNoticeData : EventArgs
+ ///
+ ///
+ public string GetData(string key)
{
- ///
- /// ID
- ///
- public string Id { get; set; }
- ///
- /// 命令ID
- ///
- public string CommonId { get; set; }
- ///
- /// 信号道
- ///
- public string Channel { get; set; }
- ///
- /// 数据
- ///
- public string Data { get; set; }
- ///
- /// 生成时间
- ///
- public DateTime CreateDateTime { get; set; }
- ///
- /// 标记被删除
- ///
- public bool IsRemove { get; set; }
- ///
- /// 重试
- ///
- public bool IsRetry { get; set; }
+ var http = new HttpHelper();
+ var html = http.GetHtml($"http://event.api.52cmg.cn/api/TransferData/Get?key={key}&rd={DateTime.Now.Ticks}").Html;
+ var rest = JObject.Parse(html);
+ //if (!(rest["Ok"] ?? false).Value() throw new Exception(html);
+ //return (rest["Data"] ?? string.Empty).Value();
+
+ //EventClient.OnEvent($"获取淘客域名数据", $"{html}");
+
+ if (!(rest["Ok"] ?? false).Value())
+ throw new Exception(html);
+ return (rest["Data"] ?? string.Empty).Value();
+
}
+ }
+
+
+ ///
+ /// 远程调度服务连接
+ ///
+ public class RemoteCloudConnectService
+ {
+
+
+
///
- /// 构造函数
+ /// 初始化
///
- ///
- ///
- public NoticeSocketClient(string host, params string[] channel)
+ public void Initialize()
{
- this._host = host;
- this._channels = channel.ToArray();
+ this.Url = "ws://event.api.52cmg.cn";
+ ThreadPool.QueueUserWorkItem(o => { DisputeConnect(); });
}
- private readonly string[] _channels;
- private readonly string _host;
///
- /// 连接对象
+ /// 争抢连接
///
- private WebSocket _websocket;
-
- ///
- /// 连接状态
- ///
- public bool IsConnected => (_websocket != null && _websocket.IsAlive);
-
- private string ClientId { get; set; } = Guid.NewGuid().ToString("N");
-
- ///
- /// 连接
- ///
- public void Connect()
+ private void DisputeConnect()
{
- if (IsConnected)
+ try
{
- _websocket.OnOpen -= _websocket_OnOpen;
- _websocket.OnMessage -= _websocket_OnMessage;
- _websocket?.Close();
+ Start();
}
-
- _websocket = new WebSocket($"ws://{this._host}/api/push?id={ClientId}");
- _websocket.OnOpen += _websocket_OnOpen;
- _websocket.OnError += _websocket_OnError;
- _websocket.OnClose += _websocket_OnClose;
- _websocket.OnMessage += _websocket_OnMessage;
- _websocket.Connect();
- _pingId = Guid.NewGuid().ToString("N");
- _thread = new Thread(Ping)
+ finally
{
- IsBackground = true
- };
- _thread.Start();
+ }
+ }
+
+ ///
+ /// 地址
+ ///
+ public string Url { get; set; }
+ ///
+ /// 启动 连接
+ ///
+ public void Start()
+ {
+ while (true)
+ {
+ if (Connect())
+ {
+ return;
+ }
+
+ Thread.Sleep(5000);
+ }
+ }
+ ///
+ /// 是否正常连接
+ ///
+ public bool IsConnect { get; private set; }
+ ///
+ /// 收到消息
+ ///
+ public event Action OnMessage;
+ ///
+ /// 收到云端通知事件
+ ///
+ public event EventHandler CloudNoticeEvent;
+ ///
+ /// 连接打开
+ ///
+ public event Action OnOpen;
+
+ ///
+ /// websocket
+ ///
+ private WebSocket _ws;
+ ///
+ /// ping
+ ///
+ private string _pingThradId;
+ ///
+ /// ping失败次数
+ ///
+ public int PingFailureCount { get; set; }
+ ///
+ /// 重新连接次数
+ ///
+ public int ReconnectionCount { get; set; }
+ ///
+ /// 连接ID
+ ///
+ public string ConnectId { get; set; } = Guid.NewGuid().ToString("N");
+
+ ///
+ /// 连接服务器
+ ///
+ private bool Connect()
+ {
+ try
+ {
+ if (IsConnect)
+ {
+ return true;
+ }
+ if (string.IsNullOrWhiteSpace(Url))
+ {
+ return false;
+ }
+ _ws = new WebSocket(this.Url + "/api/push?id=" + this.ConnectId);
+ _ws.OnOpen += _ws_OnOpen;
+ _ws.OnClose += _ws_OnClose;
+ _ws.OnMessage += _ws_OnMessage;
+ _ws.OnError += _ws_OnError;
+ _ws.Connect();
+ Thread.Sleep(1000);
+ if (!IsConnect)
+ {
+ return false;
+ }
+ //连接成功
+ StartPing();
+ }
+ catch (Exception e)
+ {
+ // _log.Error(e);
+ }
+ return true;
+ }
+ ///
+ /// 错误信息
+ ///
+ ///
+ ///
+ ///
+ private void _ws_OnError(object sender, ErrorEventArgs e)
+ {
+ //_log.Warn(e.Exception, "知易云服务连接断开" + e.Message);
+ this.IsConnect = false;
+ OnReconnection();
}
///
/// 连接断开
///
///
///
- private void _websocket_OnClose(object sender, CloseEventArgs e)
+ private void _ws_OnClose(object sender, CloseEventArgs e)
{
- //断开自动重连
- this.Connect();
+ // _log.Warn("知易云服务连接断开");
+ this.IsConnect = false;
+ OnReconnection();
}
+
///
- /// 出现连接错误
+ /// 连接打开
///
///
///
- private void _websocket_OnError(object sender, ErrorEventArgs e)
+ private void _ws_OnOpen(object sender, EventArgs e)
{
- //断开自动重连
- this.Connect();
+ // _log.Info("知易云服务连接成功");
+ this.IsConnect = true;
+ OnOpen?.Invoke();
}
-
///
- /// 收到消息事件
+ /// 启动Ping
///
- public event EventHandler OnMessageEvent;
-
- ///
- /// 连接成功事件
- ///
- public event EventHandler OnConnectedEvent;
-
- private void _websocket_OnMessage(object sender, WebSocketSharp.MessageEventArgs e)
+ public void StartPing()
{
- try
+ // _log.Info("启动心跳检测");
+ var tr = new Thread(() =>
{
- var index = e.Data.IndexOf("\u0000", StringComparison.Ordinal);
-
- if (index <= -1)
+ var id = _pingThradId = Guid.NewGuid().ToString();
+ while (id == _pingThradId)
{
- return;
- }
-
- var commonId = e.Data.Substring(0, index);
- if (commonId != "notice") return;
- var body = e.Data.Substring(index + 1);
- var data = JsonConvert.DeserializeObject(body);
- if (data == null || string.IsNullOrEmpty(data.Id)) return;
- this._websocket.Send("notice_ack\u0000" + data.Id);
- OnMessageEvent?.Invoke(this, data);
- }
- catch (Exception ex)
- {
- Console.WriteLine($"即时通讯API接收异常:{ex.Message}");
- }
- }
-
- private Thread _thread;
- private void _websocket_OnOpen(object sender, System.EventArgs e)
- {
- try
- {
- foreach (var v in this._channels)
- {
- this._websocket.Send($"subchannel\u0000{v}");//订阅通道
- }
- this.OnConnectedEvent?.Invoke(this, EventArgs.Empty);
- }
- catch (Exception ex)
- {
- Console.WriteLine($"即时通讯API连接异常:{ex.Message}");
- }
- }
-
- private int PingFailureCount { get; set; }
- private string _pingId = Guid.NewGuid().ToString();
- private void Ping()
- {
- var id = Guid.NewGuid().ToString();
- _pingId = id;
- while (_pingId == id)
- {
- Thread.Sleep(1000 * 10);
- try
- {
- //测试ping是否正常
+ Thread.Sleep(1000 * 10);
+ if (_ws == null)
+ {
+ return;
+ }
try
{
- if (_websocket == null)
+ if (_ws.ReadyState == WebSocketState.Open)
{
- continue;
- }
- if (_websocket.ReadyState == WebSocketState.Open)
- {
- _websocket.Send("\u0001\u0002");
+ _ws.Send("\u0001\u0002");
this.PingFailureCount = 0;
-
}
else
{
@@ -214,39 +298,125 @@ namespace Api.Framework
{
if (this.PingFailureCount > 5)
{
- Console.WriteLine("心跳异常连接断开");
- _websocket?.Close();
+ this.IsConnect = false;
+ //_log.Error("心跳异常连接断开");
+ _ws?.Close();
}
}
-
}
- catch (Exception e)
- {
- Console.WriteLine($"即时通讯API Ping异常:{e.Message}");
- }
- }
- // ReSharper disable once FunctionNeverReturns
+ });
+ tr.IsBackground = true;
+ tr.Start();
}
///
- /// 获取数据
+ /// 是否允许重新连接
///
- ///
- ///
- public string GetData(string key)
+ public bool AllowReconnected { get; set; } = true;
+ private bool _isBeginRconnection;
+ private readonly object _lockReconnection = new object();
+ private void OnReconnection()
{
- var http = new HttpHelper();
- var html = http.GetHtml($"http://{this._host}/api/TransferData/Get?key={key}&rd={DateTime.Now.Ticks}").Html;
- var rest = JObject.Parse(html);
- //if (!(rest["Ok"] ?? false).Value() throw new Exception(html);
- //return (rest["Data"] ?? string.Empty).Value();
+ if (_isBeginRconnection || !AllowReconnected) return;
+ lock (_lockReconnection)
+ {
+ if (_isBeginRconnection || !AllowReconnected) return;
+ _onReconnection();
+ }
+ }
+ private void _onReconnection()
+ {
+ if (_isBeginRconnection || !AllowReconnected) return;
+ try
+ {
+ _isBeginRconnection = true;
+ //_log.Debug("重连中...");
+ while (true)
+ {
+ Thread.Sleep(1000 * 5);
+ if (AllowReconnected && !IsConnect)
+ {
+ this.ReconnectionCount++;
+ if (Connect())
+ {
+ // _log.Debug("重连成功");
+ break;
+ }
+ }
+ }
+ }
+ finally
+ {
+ _isBeginRconnection = false;
+ }
+ }
+ ///
+ /// 收到消息
+ ///
+ ///
+ ///
+ ///
+ private void _ws_OnMessage(object sender, MessageEventArgs e)
+ {
+ if (e.IsText)
+ {
+ var index = e.Data.IndexOf("\u0000", StringComparison.Ordinal);
+ if (index <= -1)
+ {
+ return;
+ }
- //EventClient.OnEvent($"获取淘客域名数据", $"{html}");
+ LogHelper.GetSingleObj().Info("接收到通知",$"{e.Data}");
- if (!(rest["Ok"] ?? false).Value())
- throw new Exception(html);
- return (rest["Data"] ?? string.Empty).Value();
+ var commonId = e.Data.Substring(0, index);
+ var body = e.Data.Substring(index + 1);
+ if (commonId == "notice")//收到通知
+ {
+ var ackNoticeData = Newtonsoft.Json.JsonConvert.DeserializeObject(body);
+ if (ackNoticeData == null)
+ {
+ return;
+ }
+ if (ackNoticeData.IsRemove)
+ {
+ return;
+ }
+ this.CloudNoticeEvent?.Invoke(this, new CloudNoticeEventArgs()
+ {
+ Id = ackNoticeData.Id,
+ Body = ackNoticeData.Data,
+ CommonId = ackNoticeData.CommonId,
+ CreateDateTime = ackNoticeData.CreateDateTime,
+ Channel = ackNoticeData.Channel
+ });
+ _ws.Send("notice_ack\u0000" + ackNoticeData.Id);//回复确认标示
+ return;
+ }
+ }
+ this.OnMessage?.Invoke(e);
+ }
+ ///
+ /// 发送字节
+ ///
+ ///
+ public void SendBytes(byte[] bytes)
+ {
+ if (IsConnect)
+ {
+ _ws.Send(bytes);
+ }
+ }
+ ///
+ /// 发送字符串
+ ///
+ ///
+ public void SendString(string context)
+ {
+ if (IsConnect)
+ {
+ _ws.Send(context);
+ }
}
}