ApolloCfgSource.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. using System.Collections.Concurrent;
  2. using System.Net.Http.Json;
  3. using System.Security.Cryptography;
  4. using System.Text;
  5. using System.Text.Json;
  6. using System.Web;
  7. using Apq.Cfg.Sources;
  8. using Microsoft.Extensions.Configuration;
  9. using Microsoft.Extensions.Primitives;
  10. namespace Apq.Cfg.Apollo;
  11. /// <summary>
  12. /// Apollo 配置源
  13. /// </summary>
  14. internal sealed class ApolloCfgSource : IWritableCfgSource, IDisposable
  15. {
  16. private readonly ApolloCfgOptions _options;
  17. private readonly HttpClient _httpClient;
  18. private readonly HttpClient _longPollingClient;
  19. private readonly ConcurrentDictionary<string, string?> _data;
  20. private readonly ConcurrentDictionary<string, long> _notificationIds;
  21. private readonly CancellationTokenSource _disposeCts;
  22. private volatile bool _disposed;
  23. private ConfigurationReloadToken _reloadToken;
  24. private readonly object _reloadTokenLock = new();
  25. private Task? _watchTask;
  26. public ApolloCfgSource(ApolloCfgOptions options, int level, bool isPrimaryWriter, string? name = null)
  27. {
  28. _options = options;
  29. Level = level;
  30. IsPrimaryWriter = isPrimaryWriter;
  31. Name = name ?? $"Apollo:{options.AppId}";
  32. _data = new ConcurrentDictionary<string, string?>();
  33. _notificationIds = new ConcurrentDictionary<string, long>();
  34. _disposeCts = new CancellationTokenSource();
  35. _reloadToken = new ConfigurationReloadToken();
  36. _httpClient = new HttpClient
  37. {
  38. Timeout = options.ConnectTimeout
  39. };
  40. _longPollingClient = new HttpClient
  41. {
  42. Timeout = options.LongPollingTimeout + TimeSpan.FromSeconds(10)
  43. };
  44. // 初始化通知 ID
  45. foreach (var ns in options.Namespaces)
  46. {
  47. _notificationIds[ns] = -1;
  48. }
  49. // 初始加载
  50. LoadDataAsync().GetAwaiter().GetResult();
  51. // 启动热重载监听
  52. if (options.EnableHotReload)
  53. {
  54. _watchTask = WatchForChangesAsync(_disposeCts.Token);
  55. }
  56. }
  57. /// <inheritdoc />
  58. public int Level { get; }
  59. /// <inheritdoc />
  60. public string Name { get; set; }
  61. /// <inheritdoc />
  62. public string Type => nameof(ApolloCfgSource);
  63. /// <inheritdoc />
  64. public bool IsWriteable => false;
  65. /// <inheritdoc />
  66. public bool IsPrimaryWriter { get; }
  67. /// <inheritdoc />
  68. public int KeyCount => GetAllValues().Count();
  69. /// <inheritdoc />
  70. public int TopLevelKeyCount => GetAllValues()
  71. .Select(kv => kv.Key.Split(':')[0])
  72. .Distinct()
  73. .Count();
  74. /// <inheritdoc />
  75. public IEnumerable<KeyValuePair<string, string?>> GetAllValues()
  76. {
  77. return _data.ToArray();
  78. }
  79. /// <summary>
  80. /// 释放资源,取消所有异步操作并释放 HTTP 客户端
  81. /// </summary>
  82. public void Dispose()
  83. {
  84. if (_disposed) return;
  85. _disposed = true;
  86. _disposeCts.Cancel();
  87. try { _watchTask?.Wait(TimeSpan.FromSeconds(2)); }
  88. catch { }
  89. _disposeCts.Dispose();
  90. _httpClient.Dispose();
  91. _longPollingClient.Dispose();
  92. }
  93. /// <summary>
  94. /// 构建 Microsoft.Extensions.Configuration 的配置源
  95. /// </summary>
  96. /// <returns>Microsoft.Extensions.Configuration.IConfigurationSource 实例</returns>
  97. /// <exception cref="ObjectDisposedException">当对象已释放时抛出</exception>
  98. public IConfigurationSource BuildSource()
  99. {
  100. ThrowIfDisposed();
  101. return new ApolloConfigurationSource(this);
  102. }
  103. /// <summary>
  104. /// 应用配置更改(Apollo 不支持通过 API 写入配置)
  105. /// </summary>
  106. /// <param name="changes">要应用的配置更改</param>
  107. /// <param name="cancellationToken">取消令牌</param>
  108. /// <returns>表示异步操作的任务</returns>
  109. /// <exception cref="NotSupportedException">始终抛出,因为 Apollo 不支持通过 API 写入配置</exception>
  110. public Task ApplyChangesAsync(IReadOnlyDictionary<string, string?> changes, CancellationToken cancellationToken)
  111. {
  112. // Apollo 不支持通过 API 写入配置
  113. throw new NotSupportedException("Apollo 配置源不支持写入操作,请通过 Apollo 管理界面修改配置");
  114. }
  115. private async Task LoadDataAsync()
  116. {
  117. try
  118. {
  119. _data.Clear();
  120. foreach (var ns in _options.Namespaces)
  121. {
  122. var config = await GetConfigAsync(ns).ConfigureAwait(false);
  123. if (config != null)
  124. {
  125. foreach (var (key, value) in config)
  126. {
  127. // 如果有多个命名空间,使用命名空间作为前缀(application 除外)
  128. var configKey = _options.Namespaces.Length > 1 && ns != "application"
  129. ? $"{ns}:{key}"
  130. : key;
  131. _data[configKey] = value;
  132. }
  133. }
  134. }
  135. }
  136. catch
  137. {
  138. // 连接失败时保持空数据
  139. }
  140. }
  141. private async Task<Dictionary<string, string?>?> GetConfigAsync(string namespaceName)
  142. {
  143. var url = BuildConfigUrl(namespaceName);
  144. try
  145. {
  146. using var request = new HttpRequestMessage(HttpMethod.Get, url);
  147. AddAuthorizationHeader(request, url);
  148. var response = await _httpClient.SendAsync(request).ConfigureAwait(false);
  149. if (!response.IsSuccessStatusCode) return null;
  150. var json = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
  151. using var doc = JsonDocument.Parse(json);
  152. var result = new Dictionary<string, string?>();
  153. // Apollo 返回格式: { "configurations": { "key": "value" }, ... }
  154. if (doc.RootElement.TryGetProperty("configurations", out var configurations))
  155. {
  156. foreach (var prop in configurations.EnumerateObject())
  157. {
  158. var value = prop.Value.ValueKind == JsonValueKind.String
  159. ? prop.Value.GetString()
  160. : prop.Value.GetRawText();
  161. // 将 . 分隔符转换为 : 分隔符
  162. var key = prop.Name.Replace('.', ':');
  163. result[key] = value;
  164. }
  165. }
  166. return result;
  167. }
  168. catch
  169. {
  170. return null;
  171. }
  172. }
  173. private string BuildConfigUrl(string namespaceName)
  174. {
  175. var metaServer = _options.MetaServer.TrimEnd('/');
  176. return $"{metaServer}/configs/{HttpUtility.UrlEncode(_options.AppId)}/{HttpUtility.UrlEncode(_options.Cluster)}/{HttpUtility.UrlEncode(namespaceName)}";
  177. }
  178. private async Task WatchForChangesAsync(CancellationToken cancellationToken)
  179. {
  180. while (!cancellationToken.IsCancellationRequested)
  181. {
  182. try
  183. {
  184. var hasChanges = await CheckForNotificationsAsync(cancellationToken).ConfigureAwait(false);
  185. if (hasChanges)
  186. {
  187. await LoadDataAsync().ConfigureAwait(false);
  188. OnReload();
  189. }
  190. }
  191. catch (OperationCanceledException)
  192. {
  193. break;
  194. }
  195. catch
  196. {
  197. // 连接失败,等待后重试
  198. try
  199. {
  200. await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken).ConfigureAwait(false);
  201. }
  202. catch (OperationCanceledException)
  203. {
  204. break;
  205. }
  206. }
  207. }
  208. }
  209. private async Task<bool> CheckForNotificationsAsync(CancellationToken cancellationToken)
  210. {
  211. var notifications = _notificationIds.Select(kv => new
  212. {
  213. namespaceName = kv.Key,
  214. notificationId = kv.Value
  215. }).ToArray();
  216. var notificationsJson = JsonSerializer.Serialize(notifications);
  217. var metaServer = _options.MetaServer.TrimEnd('/');
  218. var url = $"{metaServer}/notifications/v2?" +
  219. $"appId={HttpUtility.UrlEncode(_options.AppId)}&" +
  220. $"cluster={HttpUtility.UrlEncode(_options.Cluster)}&" +
  221. $"notifications={HttpUtility.UrlEncode(notificationsJson)}";
  222. using var request = new HttpRequestMessage(HttpMethod.Get, url);
  223. AddAuthorizationHeader(request, url);
  224. var response = await _longPollingClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
  225. if (response.StatusCode == System.Net.HttpStatusCode.NotModified)
  226. {
  227. return false;
  228. }
  229. if (!response.IsSuccessStatusCode)
  230. {
  231. return false;
  232. }
  233. var json = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
  234. using var doc = JsonDocument.Parse(json);
  235. var hasChanges = false;
  236. foreach (var item in doc.RootElement.EnumerateArray())
  237. {
  238. if (item.TryGetProperty("namespaceName", out var nsElement) &&
  239. item.TryGetProperty("notificationId", out var idElement))
  240. {
  241. var ns = nsElement.GetString();
  242. var id = idElement.GetInt64();
  243. if (ns != null)
  244. {
  245. _notificationIds[ns] = id;
  246. hasChanges = true;
  247. }
  248. }
  249. }
  250. return hasChanges;
  251. }
  252. private void AddAuthorizationHeader(HttpRequestMessage request, string url)
  253. {
  254. if (string.IsNullOrEmpty(_options.Secret)) return;
  255. var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString();
  256. var pathAndQuery = new Uri(url).PathAndQuery;
  257. var stringToSign = $"{timestamp}\n{pathAndQuery}";
  258. using var hmac = new HMACSHA1(Encoding.UTF8.GetBytes(_options.Secret));
  259. var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
  260. request.Headers.Add("Authorization", $"Apollo {_options.AppId}:{signature}");
  261. request.Headers.Add("Timestamp", timestamp);
  262. }
  263. private void OnReload()
  264. {
  265. ConfigurationReloadToken previousToken;
  266. lock (_reloadTokenLock)
  267. {
  268. previousToken = _reloadToken;
  269. _reloadToken = new ConfigurationReloadToken();
  270. }
  271. previousToken.OnReload();
  272. }
  273. private void ThrowIfDisposed()
  274. {
  275. if (_disposed) throw new ObjectDisposedException(nameof(ApolloCfgSource));
  276. }
  277. internal IEnumerable<string> GetAllKeys() => _data.Keys;
  278. internal bool TryGetValue(string key, out string? value) => _data.TryGetValue(key, out value);
  279. internal IChangeToken GetReloadToken()
  280. {
  281. lock (_reloadTokenLock)
  282. {
  283. return _reloadToken;
  284. }
  285. }
  286. /// <summary>
  287. /// 内部配置源,用于集成到 Microsoft.Extensions.Configuration
  288. /// </summary>
  289. private sealed class ApolloConfigurationSource : IConfigurationSource
  290. {
  291. private readonly ApolloCfgSource _apolloSource;
  292. public ApolloConfigurationSource(ApolloCfgSource apolloSource)
  293. {
  294. _apolloSource = apolloSource;
  295. }
  296. public IConfigurationProvider Build(IConfigurationBuilder builder)
  297. {
  298. return new ApolloConfigurationProvider(_apolloSource);
  299. }
  300. }
  301. /// <summary>
  302. /// 内部配置提供程序
  303. /// </summary>
  304. private sealed class ApolloConfigurationProvider : ConfigurationProvider
  305. {
  306. private readonly ApolloCfgSource _apolloSource;
  307. public ApolloConfigurationProvider(ApolloCfgSource apolloSource)
  308. {
  309. _apolloSource = apolloSource;
  310. }
  311. public override void Load()
  312. {
  313. Data = new Dictionary<string, string?>(StringComparer.OrdinalIgnoreCase);
  314. foreach (var key in _apolloSource.GetAllKeys())
  315. {
  316. if (_apolloSource.TryGetValue(key, out var value))
  317. {
  318. Data[key] = value;
  319. }
  320. }
  321. }
  322. public override bool TryGet(string key, out string? value)
  323. {
  324. return _apolloSource.TryGetValue(key, out value);
  325. }
  326. public new IChangeToken GetReloadToken()
  327. {
  328. return _apolloSource.GetReloadToken();
  329. }
  330. }
  331. }