This commit is contained in:
老道 2023-02-01 17:49:34 +08:00
parent 9f13a0f060
commit fb296bb61c
2 changed files with 377 additions and 183 deletions

View File

@ -1836,25 +1836,44 @@ t = {target}");
#region MyRegion #region MyRegion
private static NoticeSocketClient _noticeSocket; private static NoticeSocketClient _noticeSocket;
private static RemoteCloudConnectService _remoteCloudConnectService = new RemoteCloudConnectService();
public static void Init() public static void Init()
{ {
_noticeSocket = new NoticeSocketClient("event.api.52cmg.cn", "tksoft", "circle_goods_task"); _remoteCloudConnectService.Initialize();
_noticeSocket.OnMessageEvent += NoticeSocket_OnMessageEvent; //"tksoft", "circle_goods_task"
_noticeSocket.OnConnectedEvent += NoticeSocket_OnConnectedEvent; _noticeSocket = new NoticeSocketClient();
_noticeSocket.Connect(); _remoteCloudConnectService.CloudNoticeEvent += _remoteCloudConnectService_CloudNoticeEvent;
_remoteCloudConnectService.OnOpen += _remoteCloudConnectService_OnOpen;
ThreadPool.QueueUserWorkItem(o => { RefreshTkUrl(); });
} }
// 连接打开
//收到服务端通知 private static void _remoteCloudConnectService_OnOpen()
private static void NoticeSocket_OnMessageEvent(object sender, NoticeSocketClient.AckNoticeData e)
{ {
var arr = new string[] { "tksoft", "circle_goods_task" , "laow" };
foreach (var item in arr)
{
_remoteCloudConnectService.SendString("subchannel\u0000" + item);
}
}
/// <summary>
/// 防止重复
/// </summary>
private static List<string> repeatList = new List<string>();
private static void _remoteCloudConnectService_CloudNoticeEvent(object sender, CloudNoticeEventArgs e)
{
if (repeatList.Contains(e.Id))//去重复
{
return;
}
repeatList.Add(e.Id);
try try
{ {
LogHelper.GetSingleObj().Info("", "域名相关通知 => " + e.Data); LogHelper.GetSingleObj().Info("", "域名相关通知 => " + e.Body);
if (e.CommonId == "change_tkurl") if (e.CommonId == "change_tkurl")
{ {
var data = JObject.Parse(e.Data); var data = JObject.Parse(e.Body);
var newUrl = (data["new"] ?? string.Empty).Value<string>(); var newUrl = (data["new"] ?? string.Empty).Value<string>();
var oldUrl = (data["old"] ?? string.Empty).Value<string>(); var oldUrl = (data["old"] ?? string.Empty).Value<string>();
if (string.IsNullOrEmpty(newUrl) || string.IsNullOrEmpty(oldUrl)) return; if (string.IsNullOrEmpty(newUrl) || string.IsNullOrEmpty(oldUrl)) return;
@ -1890,28 +1909,33 @@ t = {target}");
{ {
RefreshTkUrl(); RefreshTkUrl();
} }
else if (e.CommonId == "send") else if (e.CommonId == "send" && e.Channel == "circle_goods_task")
{ {
//{"Id":"122522636742819840","TaskId":"122522636742819840","IsGoods":true} //{"Id":"122522636742819840","TaskId":"122522636742819840","IsGoods":true}
EventClient.OnDyyNoticeEvent(null, new DyyNoticeEvent() { Data = e.Data }); EventClient.OnDyyNoticeEvent(null, new DyyNoticeEvent() { Data = e.Body });
} }
} }
catch (Exception exception) catch (Exception exception)
{ {
EventClient.OnEvent("收到域名变化通知", $"数据:{e.Data};异常:{exception.Message}"); EventClient.OnEvent("收到域名变化通知", $"数据:{e.Body};异常:{exception.Message}");
}
finally
{
if (repeatList.Count > 10000)
{
repeatList.RemoveRange(0, 9000);
} }
} }
}
/// <summary> /// <summary>
/// 淘客的地址 /// 淘客的地址
/// </summary> /// </summary>
public static TkUrlClass TkUrl { get; private set; } public static TkUrlClass TkUrl { get; private set; }
//建议连接成功后,主动再去拉一次配置(防止有时候客户端崩溃,但这个时候服务器有更新,没有收到)
private static void NoticeSocket_OnConnectedEvent(object sender, EventArgs e)
{
RefreshTkUrl();
}
/// <summary> /// <summary>
/// 刷新淘客中间页 /// 刷新淘客中间页

View File

@ -3,6 +3,7 @@ using System.Linq;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Xml; using System.Xml;
using Api.Framework.Utils;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using WebSocket = WebSocketSharp.WebSocket; using WebSocket = WebSocketSharp.WebSocket;
@ -11,12 +12,32 @@ using WebSocketSharp;
namespace Api.Framework namespace Api.Framework
{ {
/// <summary> /// <summary>
/// 通知客户端 /// 云通知数据
/// </summary> /// </summary>
public class NoticeSocketClient public class CloudNoticeEventArgs : EventArgs
{ {
/// <summary>
/// 通知数据ID,通过这个排重
/// </summary>
public string Id { get; set; }
/// <summary>
/// 通知内容
/// </summary>
public string Body { get; set; }
/// <summary>
/// 命令ID
/// </summary>
public string CommonId { get; set; }
/// <summary>
/// 发布时间
/// </summary>
public DateTime CreateDateTime { get; set; }
/// <summary>
/// 信号通道
/// </summary>
public string Channel { get; set; }
}
/// <summary> /// <summary>
/// 发送Ack模式通知数据 /// 发送Ack模式通知数据
/// </summary> /// </summary>
@ -51,154 +72,217 @@ namespace Api.Framework
/// </summary> /// </summary>
public bool IsRetry { get; set; } public bool IsRetry { get; set; }
} }
/// <summary>
/// 通知客户端
/// </summary>
public class NoticeSocketClient
{
/// <summary> /// <summary>
/// 构造函数 /// 获取数据
/// </summary> /// </summary>
/// <param name="host"></param> /// <param name="key"></param>
/// <param name="channel"></param> /// <returns></returns>
public NoticeSocketClient(string host, params string[] channel) public string GetData(string key)
{ {
this._host = host; var http = new HttpHelper();
this._channels = channel.ToArray(); var html = http.GetHtml($"http://event.api.52cmg.cn/api/TransferData/Get?key={key}&rd={DateTime.Now.Ticks}").Html;
var rest = JObject.Parse(html);
//if (!(rest["Ok"] ?? false).Value<bool>() throw new Exception(html);
//return (rest["Data"] ?? string.Empty).Value<string>();
//EventClient.OnEvent($"获取淘客域名数据", $"{html}");
if (!(rest["Ok"] ?? false).Value<bool>())
throw new Exception(html);
return (rest["Data"] ?? string.Empty).Value<string>();
} }
private readonly string[] _channels;
private readonly string _host;
/// <summary>
/// 连接对象
/// </summary>
private WebSocket _websocket;
/// <summary>
/// 连接状态
/// </summary>
public bool IsConnected => (_websocket != null && _websocket.IsAlive);
private string ClientId { get; set; } = Guid.NewGuid().ToString("N");
/// <summary>
/// 连接
/// </summary>
public void Connect()
{
if (IsConnected)
{
_websocket.OnOpen -= _websocket_OnOpen;
_websocket.OnMessage -= _websocket_OnMessage;
_websocket?.Close();
} }
_websocket = new WebSocket($"ws://{this._host}/api/push?id={ClientId}");
_websocket.OnOpen += _websocket_OnOpen; /// <summary>
_websocket.OnError += _websocket_OnError; /// 远程调度服务连接
_websocket.OnClose += _websocket_OnClose; /// </summary>
_websocket.OnMessage += _websocket_OnMessage; public class RemoteCloudConnectService
_websocket.Connect();
_pingId = Guid.NewGuid().ToString("N");
_thread = new Thread(Ping)
{ {
IsBackground = true
};
_thread.Start();
/// <summary>
/// 初始化
/// </summary>
public void Initialize()
{
this.Url = "ws://event.api.52cmg.cn";
ThreadPool.QueueUserWorkItem(o => { DisputeConnect(); });
}
/// <summary>
/// 争抢连接
/// </summary>
private void DisputeConnect()
{
try
{
Start();
}
finally
{
}
}
/// <summary>
/// 地址
/// </summary>
public string Url { get; set; }
/// <summary>
/// 启动 连接
/// </summary>
public void Start()
{
while (true)
{
if (Connect())
{
return;
}
Thread.Sleep(5000);
}
}
/// <summary>
/// 是否正常连接
/// </summary>
public bool IsConnect { get; private set; }
/// <summary>
/// 收到消息
/// </summary>
public event Action<MessageEventArgs> OnMessage;
/// <summary>
/// 收到云端通知事件
/// </summary>
public event EventHandler<CloudNoticeEventArgs> CloudNoticeEvent;
/// <summary>
/// 连接打开
/// </summary>
public event Action OnOpen;
/// <summary>
/// websocket
/// </summary>
private WebSocket _ws;
/// <summary>
/// ping
/// </summary>
private string _pingThradId;
/// <summary>
/// ping失败次数
/// </summary>
public int PingFailureCount { get; set; }
/// <summary>
/// 重新连接次数
/// </summary>
public int ReconnectionCount { get; set; }
/// <summary>
/// 连接ID
/// </summary>
public string ConnectId { get; set; } = Guid.NewGuid().ToString("N");
/// <summary>
/// 连接服务器
/// </summary>
private bool Connect()
{
try
{
if (IsConnect)
{
return true;
}
if (string.IsNullOrWhiteSpace(Url))
{
return false;
}
_ws = new WebSocket(this.Url + "/api/push?id=" + this.ConnectId);
_ws.OnOpen += _ws_OnOpen;
_ws.OnClose += _ws_OnClose;
_ws.OnMessage += _ws_OnMessage;
_ws.OnError += _ws_OnError;
_ws.Connect();
Thread.Sleep(1000);
if (!IsConnect)
{
return false;
}
//连接成功
StartPing();
}
catch (Exception e)
{
// _log.Error(e);
}
return true;
}
/// <summary>
/// 错误信息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
/// <exception cref="NotImplementedException"></exception>
private void _ws_OnError(object sender, ErrorEventArgs e)
{
//_log.Warn(e.Exception, "知易云服务连接断开" + e.Message);
this.IsConnect = false;
OnReconnection();
} }
/// <summary> /// <summary>
/// 连接断开 /// 连接断开
/// </summary> /// </summary>
/// <param name="sender"></param> /// <param name="sender"></param>
/// <param name="e"></param> /// <param name="e"></param>
private void _websocket_OnClose(object sender, CloseEventArgs e) private void _ws_OnClose(object sender, CloseEventArgs e)
{ {
//断开自动重连 // _log.Warn("知易云服务连接断开");
this.Connect(); this.IsConnect = false;
OnReconnection();
} }
/// <summary> /// <summary>
/// 出现连接错误 /// 连接打开
/// </summary> /// </summary>
/// <param name="sender"></param> /// <param name="sender"></param>
/// <param name="e"></param> /// <param name="e"></param>
private void _websocket_OnError(object sender, ErrorEventArgs e) private void _ws_OnOpen(object sender, EventArgs e)
{ {
//断开自动重连 // _log.Info("知易云服务连接成功");
this.Connect(); this.IsConnect = true;
OnOpen?.Invoke();
} }
/// <summary> /// <summary>
/// 收到消息事件 /// 启动Ping
/// </summary> /// </summary>
public event EventHandler<AckNoticeData> OnMessageEvent; public void StartPing()
/// <summary>
/// 连接成功事件
/// </summary>
public event EventHandler<EventArgs> OnConnectedEvent;
private void _websocket_OnMessage(object sender, WebSocketSharp.MessageEventArgs e)
{ {
try // _log.Info("启动心跳检测");
var tr = new Thread(() =>
{ {
var index = e.Data.IndexOf("\u0000", StringComparison.Ordinal); var id = _pingThradId = Guid.NewGuid().ToString();
while (id == _pingThradId)
if (index <= -1) {
Thread.Sleep(1000 * 10);
if (_ws == null)
{ {
return; return;
} }
var commonId = e.Data.Substring(0, index);
if (commonId != "notice") return;
var body = e.Data.Substring(index + 1);
var data = JsonConvert.DeserializeObject<AckNoticeData>(body);
if (data == null || string.IsNullOrEmpty(data.Id)) return;
this._websocket.Send("notice_ack\u0000" + data.Id);
OnMessageEvent?.Invoke(this, data);
}
catch (Exception ex)
{
Console.WriteLine($"即时通讯API接收异常{ex.Message}");
}
}
private Thread _thread;
private void _websocket_OnOpen(object sender, System.EventArgs e)
{
try try
{ {
foreach (var v in this._channels) if (_ws.ReadyState == WebSocketState.Open)
{ {
this._websocket.Send($"subchannel\u0000{v}");//订阅通道 _ws.Send("\u0001\u0002");
}
this.OnConnectedEvent?.Invoke(this, EventArgs.Empty);
}
catch (Exception ex)
{
Console.WriteLine($"即时通讯API连接异常:{ex.Message}");
}
}
private int PingFailureCount { get; set; }
private string _pingId = Guid.NewGuid().ToString();
private void Ping()
{
var id = Guid.NewGuid().ToString();
_pingId = id;
while (_pingId == id)
{
Thread.Sleep(1000 * 10);
try
{
//测试ping是否正常
try
{
if (_websocket == null)
{
continue;
}
if (_websocket.ReadyState == WebSocketState.Open)
{
_websocket.Send("\u0001\u0002");
this.PingFailureCount = 0; this.PingFailureCount = 0;
} }
else else
{ {
@ -214,39 +298,125 @@ namespace Api.Framework
{ {
if (this.PingFailureCount > 5) if (this.PingFailureCount > 5)
{ {
Console.WriteLine("心跳异常连接断开"); this.IsConnect = false;
_websocket?.Close(); //_log.Error("心跳异常连接断开");
_ws?.Close();
} }
} }
} }
catch (Exception e) });
{ tr.IsBackground = true;
Console.WriteLine($"即时通讯API Ping异常:{e.Message}"); tr.Start();
}
}
// ReSharper disable once FunctionNeverReturns
} }
/// <summary> /// <summary>
/// 获取数据 /// 是否允许重新连接
/// </summary> /// </summary>
/// <param name="key"></param> public bool AllowReconnected { get; set; } = true;
/// <returns></returns> private bool _isBeginRconnection;
public string GetData(string key) private readonly object _lockReconnection = new object();
private void OnReconnection()
{ {
var http = new HttpHelper(); if (_isBeginRconnection || !AllowReconnected) return;
var html = http.GetHtml($"http://{this._host}/api/TransferData/Get?key={key}&rd={DateTime.Now.Ticks}").Html; lock (_lockReconnection)
var rest = JObject.Parse(html); {
//if (!(rest["Ok"] ?? false).Value<bool>() throw new Exception(html); if (_isBeginRconnection || !AllowReconnected) return;
//return (rest["Data"] ?? string.Empty).Value<string>(); _onReconnection();
}
}
private void _onReconnection()
{
if (_isBeginRconnection || !AllowReconnected) return;
try
{
_isBeginRconnection = true;
//_log.Debug("重连中...");
while (true)
{
Thread.Sleep(1000 * 5);
if (AllowReconnected && !IsConnect)
{
this.ReconnectionCount++;
if (Connect())
{
// _log.Debug("重连成功");
break;
}
}
}
}
finally
{
_isBeginRconnection = false;
}
}
/// <summary>
/// 收到消息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
/// <exception cref="NotImplementedException"></exception>
private void _ws_OnMessage(object sender, MessageEventArgs e)
{
if (e.IsText)
{
var index = e.Data.IndexOf("\u0000", StringComparison.Ordinal);
if (index <= -1)
{
return;
}
//EventClient.OnEvent($"获取淘客域名数据", $"{html}"); LogHelper.GetSingleObj().Info("接收到通知",$"{e.Data}");
if (!(rest["Ok"] ?? false).Value<bool>()) var commonId = e.Data.Substring(0, index);
throw new Exception(html); var body = e.Data.Substring(index + 1);
return (rest["Data"] ?? string.Empty).Value<string>(); if (commonId == "notice")//收到通知
{
var ackNoticeData = Newtonsoft.Json.JsonConvert.DeserializeObject<AckNoticeData>(body);
if (ackNoticeData == null)
{
return;
}
if (ackNoticeData.IsRemove)
{
return;
}
this.CloudNoticeEvent?.Invoke(this, new CloudNoticeEventArgs()
{
Id = ackNoticeData.Id,
Body = ackNoticeData.Data,
CommonId = ackNoticeData.CommonId,
CreateDateTime = ackNoticeData.CreateDateTime,
Channel = ackNoticeData.Channel
});
_ws.Send("notice_ack\u0000" + ackNoticeData.Id);//回复确认标示
return;
}
}
this.OnMessage?.Invoke(e);
}
/// <summary>
/// 发送字节
/// </summary>
/// <param name="bytes"></param>
public void SendBytes(byte[] bytes)
{
if (IsConnect)
{
_ws.Send(bytes);
}
}
/// <summary>
/// 发送字符串
/// </summary>
/// <param name="context"></param>
public void SendString(string context)
{
if (IsConnect)
{
_ws.Send(context);
}
} }
} }