ntminer 5 years ago
parent
commit
094cf96339

+ 2 - 8
src/NTMinerServer/Core/Messages.cs

@@ -148,17 +148,11 @@ namespace NTMiner.Core {
     }
 
     [MessageType(description: "收到了SpeedData Mq消息后")]
-    public class SpeedDataMqMessage : EventBase {
-        public SpeedDataMqMessage(string appId, SpeedData speedData, string minerIp, DateTime timestamp) {
-            this.AppId = appId;
-            this.Timestamp = timestamp;
-            this.SpeedData = speedData;
+    public class SpeedDataMqMessage : MinerClientMqMessage {
+        public SpeedDataMqMessage(string appId, Guid clientId, string minerIp, DateTime timestamp) : base(appId, clientId, timestamp) {
             this.MinerIp = minerIp;
         }
 
-        public string AppId { get; private set; }
-        public DateTime Timestamp { get; private set; }
-        public SpeedData SpeedData { get; private set; }
         public string MinerIp { get; private set; }
     }
 

+ 0 - 11
src/NTMinerServer/Core/Mq/OperationMqBodyUtil.cs

@@ -1,6 +1,5 @@
 using NTMiner.Core.MinerClient;
 using NTMiner.Core.MinerServer;
-using NTMiner.Report;
 using NTMiner.VirtualMemory;
 using System;
 using System.Collections.Generic;
@@ -142,16 +141,6 @@ namespace NTMiner.Core.Mq {
         }
         #endregion
 
-        #region Speed
-        public static byte[] GetSpeedMqSendBody(SpeedData data) {
-            return Encoding.UTF8.GetBytes(VirtualRoot.JsonSerializer.Serialize(data));
-        }
-        public static SpeedData GetSpeedMqReceiveBody(byte[] body) {
-            string json = Encoding.UTF8.GetString(body);
-            return VirtualRoot.JsonSerializer.Deserialize<SpeedData>(json);
-        }
-        #endregion
-
         #region SwitchRadeonGpu
         public static byte[] GetSwitchRadeonGpuMqSendBody(bool on) {
             return Encoding.UTF8.GetBytes(VirtualRoot.JsonSerializer.Serialize(on.ToString()));

+ 0 - 0
src/WebApiServer/Core/Redis/ISpeedDataRedis.cs → src/NTMinerServer/Core/Redis/ISpeedDataRedis.cs


+ 0 - 0
src/WebApiServer/Core/Redis/Impl/SpeedDataRedis.cs → src/NTMinerServer/Core/Redis/Impl/SpeedDataRedis.cs


+ 2 - 0
src/NTMinerServer/NTMinerServer.csproj

@@ -52,6 +52,8 @@
     <Compile Include="Core\Mq\MqMessagePaths\AbstractMqMessagePath`3.cs" />
     <Compile Include="Core\Mq\MqMessagePaths\AbstractMqMessagePath`2.cs" />
     <Compile Include="Core\Mq\MqMessagePaths\ReadOnlyUserMqMessagePath.cs" />
+    <Compile Include="Core\Redis\Impl\SpeedDataRedis.cs" />
+    <Compile Include="Core\Redis\ISpeedDataRedis.cs" />
     <Compile Include="ServerAppType.cs" />
     <Compile Include="ServerRoot.cs" />
     <Compile Include="MinerSignExtensions.cs" />

+ 2 - 2
src/WebApiServer/Controllers/ReportController.cs

@@ -11,7 +11,7 @@ namespace NTMiner.Controllers {
                 if (speedData == null) {
                     return ResponseBase.InvalidInput<ReportResponse>();
                 }
-                WebApiRoot.ClientDataSet.ReportSpeed(speedData, MinerIp);
+                WebApiRoot.ClientDataSet.ReportSpeed(speedData, MinerIp, isFromWsServerNode: false);
                 if (Version.TryParse(speedData.Version, out Version version)) {
                     string jsonVersionKey = HomePath.GetServerJsonVersion(version);
                     var response = ReportResponse.Ok(WebApiRoot.GetServerStateResponse(jsonVersionKey));
@@ -34,7 +34,7 @@ namespace NTMiner.Controllers {
         [HttpPost]
         public void ReportState([FromBody]ReportState request) {
             try {
-                WebApiRoot.ClientDataSet.ReportState(request, MinerIp);
+                WebApiRoot.ClientDataSet.ReportState(request, MinerIp, isFromWsServerNode: false);
             }
             catch (Exception e) {
                 Logger.ErrorDebugLine(e);

+ 2 - 2
src/WebApiServer/Core/IClientDataSet.cs

@@ -6,7 +6,7 @@ namespace NTMiner.Core {
         /// 该集合的成员是异步从redis中加载数据初始化的,所以有了这个IsReadied属性。
         /// </summary>
         bool IsReadied { get; }
-        void ReportSpeed(SpeedData speedData, string clientIp);
-        void ReportState(ReportState state, string clientIp);
+        void ReportSpeed(SpeedData speedData, string clientIp, bool isFromWsServerNode);
+        void ReportState(ReportState state, string clientIp, bool isFromWsServerNode);
     }
 }

+ 6 - 6
src/WebApiServer/Core/Impl/ClientDataSet.cs

@@ -41,14 +41,16 @@ namespace NTMiner.Core.Impl {
                 if (message.AppId == ServerRoot.HostConfig.ThisServerAddress) {
                     return;
                 }
-                if (message.SpeedData == null) {
+                if (message.ClientId == Guid.Empty) {
                     return;
                 }
                 if (IsOldMqMessage(message.Timestamp)) {
                     Write.UserOk(_safeIgnoreMessage);
                     return;
                 }
-                ReportSpeed(message.SpeedData, message.MinerIp);
+                speedDataRedis.GetByClientIdAsync(message.ClientId).ContinueWith(t => {
+                    ReportSpeed(t.Result, message.MinerIp, isFromWsServerNode: true);
+                });
             }, this.GetType());
             VirtualRoot.AddEventPath<MinerClientWsOpenedMqMessage>("收到MinerClientWsOpenedMq消息后更新NetActiveOn和IsOnline", LogEnum.None, action: message => {
                 if (IsOldMqMessage(message.Timestamp)) {
@@ -105,7 +107,7 @@ namespace NTMiner.Core.Impl {
             return false;
         }
 
-        public void ReportSpeed(SpeedData speedData, string minerIp) {
+        public void ReportSpeed(SpeedData speedData, string minerIp, bool isFromWsServerNode) {
             if (!IsReadied) {
                 return;
             }
@@ -115,7 +117,6 @@ namespace NTMiner.Core.Impl {
             if (string.IsNullOrEmpty(minerIp)) {
                 return;
             }
-            bool isFromWsServerNode = minerIp.Contains(':');
             if (!isFromWsServerNode) {
                 _speedDataRedis.SetAsync(speedData);
             }
@@ -132,7 +133,7 @@ namespace NTMiner.Core.Impl {
             }
         }
 
-        public void ReportState(ReportState state, string minerIp) {
+        public void ReportState(ReportState state, string minerIp, bool isFromWsServerNode) {
             if (!IsReadied) {
                 return;
             }
@@ -153,7 +154,6 @@ namespace NTMiner.Core.Impl {
                     DoUpdateSave(MinerData.Create(clientData));
                 }
             }
-            bool isFromWsServerNode = minerIp.Contains(':');
             if (!isFromWsServerNode) {
                 var speedData = clientData.ToSpeedData();
                 _speedDataRedis.SetAsync(speedData);

+ 2 - 2
src/WebApiServer/Core/Mq/MqMessagePaths/MinerClientMqMessagePath.cs

@@ -23,11 +23,11 @@ namespace NTMiner.Core.Mq.MqMessagePaths {
             switch (ea.RoutingKey) {
                 // 上报的算力放在这里消费,因为只有WebApiServer消费该类型的消息,WsServer不消费该类型的消息
                 case MqKeyword.SpeedRoutingKey: {
-                        SpeedData speedData = OperationMqBodyUtil.GetSpeedMqReceiveBody(ea.Body);
+                        Guid clientId = MinerClientMqBodyUtil.GetClientIdMqReciveBody(ea.Body);
                         DateTime timestamp = Timestamp.FromTimestamp(ea.BasicProperties.Timestamp.UnixTime);
                         string appId = ea.BasicProperties.AppId;
                         string minerIp = ea.BasicProperties.ReadHeaderString(MqKeyword.MinerIpHeaderName);
-                        VirtualRoot.RaiseEvent(new SpeedDataMqMessage(appId, speedData, minerIp, timestamp));
+                        VirtualRoot.RaiseEvent(new SpeedDataMqMessage(appId, clientId, minerIp, timestamp));
                     }
                     break;
                 case MqKeyword.MinerClientWsOpenedRoutingKey: {

+ 0 - 2
src/WebApiServer/WebApiServer.csproj

@@ -151,9 +151,7 @@
     <Compile Include="Core\Redis\IMinerRedis.cs" />
     <Compile Include="Core\Redis\Impl\CaptchaRedis.cs" />
     <Compile Include="Core\Redis\Impl\MinerRedis.cs" />
-    <Compile Include="Core\Redis\Impl\SpeedDataRedis.cs" />
     <Compile Include="Core\Redis\Impl\UserRedis.cs" />
-    <Compile Include="Core\Redis\ISpeedDataRedis.cs" />
     <Compile Include="Core\Redis\IUserRedis.cs" />
     <Compile Include="UserDataExtensions.cs" />
     <Compile Include="WebApiRoot.cs" />

+ 1 - 0
src/WsServer/Core/Mq/Senders/IMinerClientMqSender.cs

@@ -3,6 +3,7 @@ using System;
 
 namespace NTMiner.Core.Mq.Senders {
     public interface IMinerClientMqSender : IMqSender {
+        void SendSpeed(string loginName, Guid clientId, string minerIp);
         void SendMinerClientWsOpened(string loginName, Guid clientId);
         void SendMinerClientWsClosed(string loginName, Guid clientId);
         void SendMinerClientWsBreathed(string loginName, Guid clientId);

+ 0 - 2
src/WsServer/Core/Mq/Senders/IOperationMqSender.cs

@@ -1,6 +1,5 @@
 using NTMiner.Core.MinerClient;
 using NTMiner.Core.MinerServer;
-using NTMiner.Report;
 using NTMiner.VirtualMemory;
 using System;
 using System.Collections.Generic;
@@ -26,7 +25,6 @@ namespace NTMiner.Core.Mq.Senders {
         void SendOperationReceived(string loginName, Guid clientId);
 
         void SendGetSpeed(string loginName, List<Guid> clientIds);
-        void SendSpeed(string loginName, SpeedData speedData, string minerIp);
 
         void SendEnableRemoteDesktop(string loginName, Guid clientId);
         void SendBlockWAU(string loginName, Guid clientId);

+ 13 - 0
src/WsServer/Core/Mq/Senders/Impl/MinerClientMqSender.cs

@@ -10,6 +10,19 @@ namespace NTMiner.Core.Mq.Senders.Impl {
             _mqChannel = mqChannel;
         }
 
+        public void SendSpeed(string loginName, Guid clientId, string minerIp) {
+            if (string.IsNullOrEmpty(loginName) || clientId == Guid.Empty || string.IsNullOrEmpty(minerIp)) {
+                return;
+            }
+            var basicProperties = CreateBasicProperties(loginName);
+            basicProperties.Headers[MqKeyword.MinerIpHeaderName] = minerIp;
+            _mqChannel.BasicPublish(
+                exchange: MqKeyword.NTMinerExchange,
+                routingKey: MqKeyword.SpeedRoutingKey,
+                basicProperties: basicProperties,
+                body: MinerClientMqBodyUtil.GetClientIdMqSendBody(clientId));
+        }
+
         public void SendMinerClientWsOpened(string loginName, Guid clientId) {
             if (string.IsNullOrEmpty(loginName) || clientId == Guid.Empty) {
                 return;

+ 0 - 14
src/WsServer/Core/Mq/Senders/Impl/OperationMqSender.cs

@@ -1,6 +1,5 @@
 using NTMiner.Core.MinerClient;
 using NTMiner.Core.MinerServer;
-using NTMiner.Report;
 using NTMiner.VirtualMemory;
 using RabbitMQ.Client;
 using System;
@@ -146,19 +145,6 @@ namespace NTMiner.Core.Mq.Senders.Impl {
                 body: OperationMqBodyUtil.GetGetSpeedMqSendBody(clientIds));
         }
 
-        public void SendSpeed(string loginName, SpeedData speedData, string minerIp) {
-            if (string.IsNullOrEmpty(loginName) || speedData == null || string.IsNullOrEmpty(minerIp)) {
-                return;
-            }
-            var basicProperties = CreateBasicProperties(loginName);
-            basicProperties.Headers[MqKeyword.MinerIpHeaderName] = minerIp;
-            _mqChannel.BasicPublish(
-                exchange: MqKeyword.NTMinerExchange,
-                routingKey: MqKeyword.SpeedRoutingKey,
-                basicProperties: basicProperties,
-                body: OperationMqBodyUtil.GetSpeedMqSendBody(speedData));
-        }
-
         public void SendEnableRemoteDesktop(string loginName, Guid clientId) {
             if (string.IsNullOrEmpty(loginName) || clientId == Guid.Empty) {
                 return;

+ 4 - 2
src/WsServer/MinerClientWsMessageHandler.cs

@@ -8,7 +8,7 @@ using System.Collections.Generic;
 
 namespace NTMiner {
     public static class MinerClientWsMessageHandler {
-        private static readonly Dictionary<string, Action<MinerClientBehavior, string, Guid, WsMessage>> 
+        private static readonly Dictionary<string, Action<MinerClientBehavior, string, Guid, WsMessage>>
             _handlers = new Dictionary<string, Action<MinerClientBehavior, string, Guid, WsMessage>>(StringComparer.OrdinalIgnoreCase) {
             {WsMessage.ConsoleOutLines,
                 (wsBehavior, loginName, clientId, message) => {
@@ -53,7 +53,9 @@ namespace NTMiner {
             {WsMessage.Speed,
                 (wsBehavior, loginName, clientId, message) => {
                     if (message.TryGetData(out SpeedData speedData)) {
-                        WsRoot.OperationMqSender.SendSpeed(loginName, speedData, wsBehavior.Context.UserEndPoint.ToString());
+                        WsRoot.SpeedDataRedis.SetAsync(speedData).ContinueWith(t => {
+                            WsRoot.MinerClientMqSender.SendSpeed(loginName, speedData.ClientId, wsBehavior.Context.UserEndPoint.ToString());
+                        });
                     }
                 }
             },

+ 3 - 0
src/WsServer/WsRoot.cs

@@ -3,6 +3,7 @@ using NTMiner.Core.Impl;
 using NTMiner.Core.Mq.MqMessagePaths;
 using NTMiner.Core.Mq.Senders;
 using NTMiner.Core.Mq.Senders.Impl;
+using NTMiner.Core.Redis;
 using NTMiner.Core.Redis.Impl;
 using NTMiner.User;
 using System;
@@ -44,6 +45,7 @@ namespace NTMiner {
         public static IReadOnlyUserSet ReadOnlyUserSet { get; private set; }
         public static IMinerSignSet MinerSignSet { get; private set; }
         public static IWsServerNodeAddressSet WsServerNodeAddressSet { get; private set; }
+        public static ISpeedDataRedis SpeedDataRedis { get; private set; }
         public static IMinerClientMqSender MinerClientMqSender { get; private set; }
         public static IOperationMqSender OperationMqSender { get; private set; }
         public static IUserMqSender UserMqSender { get; private set; }
@@ -76,6 +78,7 @@ namespace NTMiner {
                 return;
             }
             MinerClientMqSender = new MinerClientMqSender(_serverContext.Channel);
+            SpeedDataRedis = new SpeedDataRedis(_serverContext.RedisConn);
             OperationMqSender = new OperationMqSender(_serverContext.Channel);
             UserMqSender = new UserMqSender(_serverContext.Channel);
             var minerRedis = new ReadOnlyMinerRedis(_serverContext.RedisConn);