ApolloCfgSource.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. using System.Collections.Concurrent;
  2. using System.Net.Http.Json;
  3. using System.Security.Cryptography;
  4. using System.Text;
  5. using System.Text.Json;
  6. using System.Web;
  7. using Apq.Cfg.Sources;
  8. using Microsoft.Extensions.Configuration;
  9. using Microsoft.Extensions.Primitives;
  10. namespace Apq.Cfg.Apollo;
  11. /// <summary>
  12. /// Apollo 配置源
  13. /// </summary>
  14. internal sealed class ApolloCfgSource : IWritableCfgSource, IDisposable
  15. {
  16. private readonly ApolloCfgOptions _options;
  17. private readonly HttpClient _httpClient;
  18. private readonly HttpClient _longPollingClient;
  19. private readonly ConcurrentDictionary<string, string?> _data;
  20. private readonly ConcurrentDictionary<string, long> _notificationIds;
  21. private readonly CancellationTokenSource _disposeCts;
  22. private volatile bool _disposed;
  23. private ConfigurationReloadToken _reloadToken;
  24. private readonly object _reloadTokenLock = new();
  25. private Task? _watchTask;
  26. public ApolloCfgSource(ApolloCfgOptions options, int level, bool isPrimaryWriter)
  27. {
  28. _options = options;
  29. Level = level;
  30. IsPrimaryWriter = isPrimaryWriter;
  31. _data = new ConcurrentDictionary<string, string?>();
  32. _notificationIds = new ConcurrentDictionary<string, long>();
  33. _disposeCts = new CancellationTokenSource();
  34. _reloadToken = new ConfigurationReloadToken();
  35. _httpClient = new HttpClient
  36. {
  37. Timeout = options.ConnectTimeout
  38. };
  39. _longPollingClient = new HttpClient
  40. {
  41. Timeout = options.LongPollingTimeout + TimeSpan.FromSeconds(10)
  42. };
  43. // 初始化通知 ID
  44. foreach (var ns in options.Namespaces)
  45. {
  46. _notificationIds[ns] = -1;
  47. }
  48. // 初始加载
  49. LoadDataAsync().GetAwaiter().GetResult();
  50. // 启动热重载监听
  51. if (options.EnableHotReload)
  52. {
  53. _watchTask = WatchForChangesAsync(_disposeCts.Token);
  54. }
  55. }
  56. public int Level { get; }
  57. public bool IsWriteable => false; // Apollo 不支持通过 API 写入配置
  58. public bool IsPrimaryWriter { get; }
  59. public void Dispose()
  60. {
  61. if (_disposed) return;
  62. _disposed = true;
  63. _disposeCts.Cancel();
  64. try { _watchTask?.Wait(TimeSpan.FromSeconds(2)); }
  65. catch { }
  66. _disposeCts.Dispose();
  67. _httpClient.Dispose();
  68. _longPollingClient.Dispose();
  69. }
  70. public IConfigurationSource BuildSource()
  71. {
  72. ThrowIfDisposed();
  73. return new ApolloConfigurationSource(this);
  74. }
  75. public Task ApplyChangesAsync(IReadOnlyDictionary<string, string?> changes, CancellationToken cancellationToken)
  76. {
  77. // Apollo 不支持通过 API 写入配置
  78. throw new NotSupportedException("Apollo 配置源不支持写入操作,请通过 Apollo 管理界面修改配置");
  79. }
  80. private async Task LoadDataAsync()
  81. {
  82. try
  83. {
  84. _data.Clear();
  85. foreach (var ns in _options.Namespaces)
  86. {
  87. var config = await GetConfigAsync(ns).ConfigureAwait(false);
  88. if (config != null)
  89. {
  90. foreach (var (key, value) in config)
  91. {
  92. // 如果有多个命名空间,使用命名空间作为前缀(application 除外)
  93. var configKey = _options.Namespaces.Length > 1 && ns != "application"
  94. ? $"{ns}:{key}"
  95. : key;
  96. _data[configKey] = value;
  97. }
  98. }
  99. }
  100. }
  101. catch
  102. {
  103. // 连接失败时保持空数据
  104. }
  105. }
  106. private async Task<Dictionary<string, string?>?> GetConfigAsync(string namespaceName)
  107. {
  108. var url = BuildConfigUrl(namespaceName);
  109. try
  110. {
  111. using var request = new HttpRequestMessage(HttpMethod.Get, url);
  112. AddAuthorizationHeader(request, url);
  113. var response = await _httpClient.SendAsync(request).ConfigureAwait(false);
  114. if (!response.IsSuccessStatusCode) return null;
  115. var json = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
  116. using var doc = JsonDocument.Parse(json);
  117. var result = new Dictionary<string, string?>();
  118. // Apollo 返回格式: { "configurations": { "key": "value" }, ... }
  119. if (doc.RootElement.TryGetProperty("configurations", out var configurations))
  120. {
  121. foreach (var prop in configurations.EnumerateObject())
  122. {
  123. var value = prop.Value.ValueKind == JsonValueKind.String
  124. ? prop.Value.GetString()
  125. : prop.Value.GetRawText();
  126. // 将 . 分隔符转换为 : 分隔符
  127. var key = prop.Name.Replace('.', ':');
  128. result[key] = value;
  129. }
  130. }
  131. return result;
  132. }
  133. catch
  134. {
  135. return null;
  136. }
  137. }
  138. private string BuildConfigUrl(string namespaceName)
  139. {
  140. var metaServer = _options.MetaServer.TrimEnd('/');
  141. return $"{metaServer}/configs/{HttpUtility.UrlEncode(_options.AppId)}/{HttpUtility.UrlEncode(_options.Cluster)}/{HttpUtility.UrlEncode(namespaceName)}";
  142. }
  143. private async Task WatchForChangesAsync(CancellationToken cancellationToken)
  144. {
  145. while (!cancellationToken.IsCancellationRequested)
  146. {
  147. try
  148. {
  149. var hasChanges = await CheckForNotificationsAsync(cancellationToken).ConfigureAwait(false);
  150. if (hasChanges)
  151. {
  152. await LoadDataAsync().ConfigureAwait(false);
  153. OnReload();
  154. }
  155. }
  156. catch (OperationCanceledException)
  157. {
  158. break;
  159. }
  160. catch
  161. {
  162. // 连接失败,等待后重试
  163. try
  164. {
  165. await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken).ConfigureAwait(false);
  166. }
  167. catch (OperationCanceledException)
  168. {
  169. break;
  170. }
  171. }
  172. }
  173. }
  174. private async Task<bool> CheckForNotificationsAsync(CancellationToken cancellationToken)
  175. {
  176. var notifications = _notificationIds.Select(kv => new
  177. {
  178. namespaceName = kv.Key,
  179. notificationId = kv.Value
  180. }).ToArray();
  181. var notificationsJson = JsonSerializer.Serialize(notifications);
  182. var metaServer = _options.MetaServer.TrimEnd('/');
  183. var url = $"{metaServer}/notifications/v2?" +
  184. $"appId={HttpUtility.UrlEncode(_options.AppId)}&" +
  185. $"cluster={HttpUtility.UrlEncode(_options.Cluster)}&" +
  186. $"notifications={HttpUtility.UrlEncode(notificationsJson)}";
  187. using var request = new HttpRequestMessage(HttpMethod.Get, url);
  188. AddAuthorizationHeader(request, url);
  189. var response = await _longPollingClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
  190. if (response.StatusCode == System.Net.HttpStatusCode.NotModified)
  191. {
  192. return false;
  193. }
  194. if (!response.IsSuccessStatusCode)
  195. {
  196. return false;
  197. }
  198. var json = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
  199. using var doc = JsonDocument.Parse(json);
  200. var hasChanges = false;
  201. foreach (var item in doc.RootElement.EnumerateArray())
  202. {
  203. if (item.TryGetProperty("namespaceName", out var nsElement) &&
  204. item.TryGetProperty("notificationId", out var idElement))
  205. {
  206. var ns = nsElement.GetString();
  207. var id = idElement.GetInt64();
  208. if (ns != null)
  209. {
  210. _notificationIds[ns] = id;
  211. hasChanges = true;
  212. }
  213. }
  214. }
  215. return hasChanges;
  216. }
  217. private void AddAuthorizationHeader(HttpRequestMessage request, string url)
  218. {
  219. if (string.IsNullOrEmpty(_options.Secret)) return;
  220. var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString();
  221. var pathAndQuery = new Uri(url).PathAndQuery;
  222. var stringToSign = $"{timestamp}\n{pathAndQuery}";
  223. using var hmac = new HMACSHA1(Encoding.UTF8.GetBytes(_options.Secret));
  224. var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
  225. request.Headers.Add("Authorization", $"Apollo {_options.AppId}:{signature}");
  226. request.Headers.Add("Timestamp", timestamp);
  227. }
  228. private void OnReload()
  229. {
  230. ConfigurationReloadToken previousToken;
  231. lock (_reloadTokenLock)
  232. {
  233. previousToken = _reloadToken;
  234. _reloadToken = new ConfigurationReloadToken();
  235. }
  236. previousToken.OnReload();
  237. }
  238. private void ThrowIfDisposed()
  239. {
  240. if (_disposed) throw new ObjectDisposedException(nameof(ApolloCfgSource));
  241. }
  242. internal IEnumerable<string> GetAllKeys() => _data.Keys;
  243. internal bool TryGetValue(string key, out string? value) => _data.TryGetValue(key, out value);
  244. internal IChangeToken GetReloadToken()
  245. {
  246. lock (_reloadTokenLock)
  247. {
  248. return _reloadToken;
  249. }
  250. }
  251. /// <summary>
  252. /// 内部配置源,用于集成到 Microsoft.Extensions.Configuration
  253. /// </summary>
  254. private sealed class ApolloConfigurationSource : IConfigurationSource
  255. {
  256. private readonly ApolloCfgSource _apolloSource;
  257. public ApolloConfigurationSource(ApolloCfgSource apolloSource)
  258. {
  259. _apolloSource = apolloSource;
  260. }
  261. public IConfigurationProvider Build(IConfigurationBuilder builder)
  262. {
  263. return new ApolloConfigurationProvider(_apolloSource);
  264. }
  265. }
  266. /// <summary>
  267. /// 内部配置提供程序
  268. /// </summary>
  269. private sealed class ApolloConfigurationProvider : ConfigurationProvider
  270. {
  271. private readonly ApolloCfgSource _apolloSource;
  272. public ApolloConfigurationProvider(ApolloCfgSource apolloSource)
  273. {
  274. _apolloSource = apolloSource;
  275. }
  276. public override void Load()
  277. {
  278. Data = new Dictionary<string, string?>(StringComparer.OrdinalIgnoreCase);
  279. foreach (var key in _apolloSource.GetAllKeys())
  280. {
  281. if (_apolloSource.TryGetValue(key, out var value))
  282. {
  283. Data[key] = value;
  284. }
  285. }
  286. }
  287. public override bool TryGet(string key, out string? value)
  288. {
  289. return _apolloSource.TryGetValue(key, out value);
  290. }
  291. public new IChangeToken GetReloadToken()
  292. {
  293. return _apolloSource.GetReloadToken();
  294. }
  295. }
  296. }