ApolloCfgSource.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. using System.Collections.Concurrent;
  2. using System.Net.Http.Json;
  3. using System.Security.Cryptography;
  4. using System.Text;
  5. using System.Text.Json;
  6. using System.Web;
  7. using Apq.Cfg.Sources;
  8. using Microsoft.Extensions.Configuration;
  9. using Microsoft.Extensions.Primitives;
  10. namespace Apq.Cfg.Apollo;
  11. /// <summary>
  12. /// Apollo 配置源
  13. /// </summary>
  14. internal sealed class ApolloCfgSource : IWritableCfgSource, IDisposable
  15. {
  16. private readonly ApolloCfgOptions _options;
  17. private readonly HttpClient _httpClient;
  18. private readonly HttpClient _longPollingClient;
  19. private readonly ConcurrentDictionary<string, string?> _data;
  20. private readonly ConcurrentDictionary<string, long> _notificationIds;
  21. private readonly CancellationTokenSource _disposeCts;
  22. private volatile bool _disposed;
  23. private ConfigurationReloadToken _reloadToken;
  24. private readonly object _reloadTokenLock = new();
  25. private Task? _watchTask;
  26. public ApolloCfgSource(ApolloCfgOptions options, int level, bool isPrimaryWriter)
  27. {
  28. _options = options;
  29. Level = level;
  30. IsPrimaryWriter = isPrimaryWriter;
  31. _data = new ConcurrentDictionary<string, string?>();
  32. _notificationIds = new ConcurrentDictionary<string, long>();
  33. _disposeCts = new CancellationTokenSource();
  34. _reloadToken = new ConfigurationReloadToken();
  35. _httpClient = new HttpClient
  36. {
  37. Timeout = options.ConnectTimeout
  38. };
  39. _longPollingClient = new HttpClient
  40. {
  41. Timeout = options.LongPollingTimeout + TimeSpan.FromSeconds(10)
  42. };
  43. // 初始化通知 ID
  44. foreach (var ns in options.Namespaces)
  45. {
  46. _notificationIds[ns] = -1;
  47. }
  48. // 初始加载
  49. LoadDataAsync().GetAwaiter().GetResult();
  50. // 启动热重载监听
  51. if (options.EnableHotReload)
  52. {
  53. _watchTask = WatchForChangesAsync(_disposeCts.Token);
  54. }
  55. }
  56. /// <summary>
  57. /// 获取配置层级,数值越大优先级越高
  58. /// </summary>
  59. public int Level { get; }
  60. /// <summary>
  61. /// 获取是否可写,Apollo 不支持通过 API 写入配置,因此始终为 false
  62. /// </summary>
  63. public bool IsWriteable => false;
  64. /// <summary>
  65. /// 获取是否为主要写入源,Apollo 不支持写入,此值用于标识
  66. /// </summary>
  67. public bool IsPrimaryWriter { get; }
  68. /// <summary>
  69. /// 释放资源,取消所有异步操作并释放 HTTP 客户端
  70. /// </summary>
  71. public void Dispose()
  72. {
  73. if (_disposed) return;
  74. _disposed = true;
  75. _disposeCts.Cancel();
  76. try { _watchTask?.Wait(TimeSpan.FromSeconds(2)); }
  77. catch { }
  78. _disposeCts.Dispose();
  79. _httpClient.Dispose();
  80. _longPollingClient.Dispose();
  81. }
  82. /// <summary>
  83. /// 构建 Microsoft.Extensions.Configuration 的配置源
  84. /// </summary>
  85. /// <returns>Microsoft.Extensions.Configuration.IConfigurationSource 实例</returns>
  86. /// <exception cref="ObjectDisposedException">当对象已释放时抛出</exception>
  87. public IConfigurationSource BuildSource()
  88. {
  89. ThrowIfDisposed();
  90. return new ApolloConfigurationSource(this);
  91. }
  92. /// <summary>
  93. /// 应用配置更改(Apollo 不支持通过 API 写入配置)
  94. /// </summary>
  95. /// <param name="changes">要应用的配置更改</param>
  96. /// <param name="cancellationToken">取消令牌</param>
  97. /// <returns>表示异步操作的任务</returns>
  98. /// <exception cref="NotSupportedException">始终抛出,因为 Apollo 不支持通过 API 写入配置</exception>
  99. public Task ApplyChangesAsync(IReadOnlyDictionary<string, string?> changes, CancellationToken cancellationToken)
  100. {
  101. // Apollo 不支持通过 API 写入配置
  102. throw new NotSupportedException("Apollo 配置源不支持写入操作,请通过 Apollo 管理界面修改配置");
  103. }
  104. private async Task LoadDataAsync()
  105. {
  106. try
  107. {
  108. _data.Clear();
  109. foreach (var ns in _options.Namespaces)
  110. {
  111. var config = await GetConfigAsync(ns).ConfigureAwait(false);
  112. if (config != null)
  113. {
  114. foreach (var (key, value) in config)
  115. {
  116. // 如果有多个命名空间,使用命名空间作为前缀(application 除外)
  117. var configKey = _options.Namespaces.Length > 1 && ns != "application"
  118. ? $"{ns}:{key}"
  119. : key;
  120. _data[configKey] = value;
  121. }
  122. }
  123. }
  124. }
  125. catch
  126. {
  127. // 连接失败时保持空数据
  128. }
  129. }
  130. private async Task<Dictionary<string, string?>?> GetConfigAsync(string namespaceName)
  131. {
  132. var url = BuildConfigUrl(namespaceName);
  133. try
  134. {
  135. using var request = new HttpRequestMessage(HttpMethod.Get, url);
  136. AddAuthorizationHeader(request, url);
  137. var response = await _httpClient.SendAsync(request).ConfigureAwait(false);
  138. if (!response.IsSuccessStatusCode) return null;
  139. var json = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
  140. using var doc = JsonDocument.Parse(json);
  141. var result = new Dictionary<string, string?>();
  142. // Apollo 返回格式: { "configurations": { "key": "value" }, ... }
  143. if (doc.RootElement.TryGetProperty("configurations", out var configurations))
  144. {
  145. foreach (var prop in configurations.EnumerateObject())
  146. {
  147. var value = prop.Value.ValueKind == JsonValueKind.String
  148. ? prop.Value.GetString()
  149. : prop.Value.GetRawText();
  150. // 将 . 分隔符转换为 : 分隔符
  151. var key = prop.Name.Replace('.', ':');
  152. result[key] = value;
  153. }
  154. }
  155. return result;
  156. }
  157. catch
  158. {
  159. return null;
  160. }
  161. }
  162. private string BuildConfigUrl(string namespaceName)
  163. {
  164. var metaServer = _options.MetaServer.TrimEnd('/');
  165. return $"{metaServer}/configs/{HttpUtility.UrlEncode(_options.AppId)}/{HttpUtility.UrlEncode(_options.Cluster)}/{HttpUtility.UrlEncode(namespaceName)}";
  166. }
  167. private async Task WatchForChangesAsync(CancellationToken cancellationToken)
  168. {
  169. while (!cancellationToken.IsCancellationRequested)
  170. {
  171. try
  172. {
  173. var hasChanges = await CheckForNotificationsAsync(cancellationToken).ConfigureAwait(false);
  174. if (hasChanges)
  175. {
  176. await LoadDataAsync().ConfigureAwait(false);
  177. OnReload();
  178. }
  179. }
  180. catch (OperationCanceledException)
  181. {
  182. break;
  183. }
  184. catch
  185. {
  186. // 连接失败,等待后重试
  187. try
  188. {
  189. await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken).ConfigureAwait(false);
  190. }
  191. catch (OperationCanceledException)
  192. {
  193. break;
  194. }
  195. }
  196. }
  197. }
  198. private async Task<bool> CheckForNotificationsAsync(CancellationToken cancellationToken)
  199. {
  200. var notifications = _notificationIds.Select(kv => new
  201. {
  202. namespaceName = kv.Key,
  203. notificationId = kv.Value
  204. }).ToArray();
  205. var notificationsJson = JsonSerializer.Serialize(notifications);
  206. var metaServer = _options.MetaServer.TrimEnd('/');
  207. var url = $"{metaServer}/notifications/v2?" +
  208. $"appId={HttpUtility.UrlEncode(_options.AppId)}&" +
  209. $"cluster={HttpUtility.UrlEncode(_options.Cluster)}&" +
  210. $"notifications={HttpUtility.UrlEncode(notificationsJson)}";
  211. using var request = new HttpRequestMessage(HttpMethod.Get, url);
  212. AddAuthorizationHeader(request, url);
  213. var response = await _longPollingClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
  214. if (response.StatusCode == System.Net.HttpStatusCode.NotModified)
  215. {
  216. return false;
  217. }
  218. if (!response.IsSuccessStatusCode)
  219. {
  220. return false;
  221. }
  222. var json = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
  223. using var doc = JsonDocument.Parse(json);
  224. var hasChanges = false;
  225. foreach (var item in doc.RootElement.EnumerateArray())
  226. {
  227. if (item.TryGetProperty("namespaceName", out var nsElement) &&
  228. item.TryGetProperty("notificationId", out var idElement))
  229. {
  230. var ns = nsElement.GetString();
  231. var id = idElement.GetInt64();
  232. if (ns != null)
  233. {
  234. _notificationIds[ns] = id;
  235. hasChanges = true;
  236. }
  237. }
  238. }
  239. return hasChanges;
  240. }
  241. private void AddAuthorizationHeader(HttpRequestMessage request, string url)
  242. {
  243. if (string.IsNullOrEmpty(_options.Secret)) return;
  244. var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString();
  245. var pathAndQuery = new Uri(url).PathAndQuery;
  246. var stringToSign = $"{timestamp}\n{pathAndQuery}";
  247. using var hmac = new HMACSHA1(Encoding.UTF8.GetBytes(_options.Secret));
  248. var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
  249. request.Headers.Add("Authorization", $"Apollo {_options.AppId}:{signature}");
  250. request.Headers.Add("Timestamp", timestamp);
  251. }
  252. private void OnReload()
  253. {
  254. ConfigurationReloadToken previousToken;
  255. lock (_reloadTokenLock)
  256. {
  257. previousToken = _reloadToken;
  258. _reloadToken = new ConfigurationReloadToken();
  259. }
  260. previousToken.OnReload();
  261. }
  262. private void ThrowIfDisposed()
  263. {
  264. if (_disposed) throw new ObjectDisposedException(nameof(ApolloCfgSource));
  265. }
  266. internal IEnumerable<string> GetAllKeys() => _data.Keys;
  267. internal bool TryGetValue(string key, out string? value) => _data.TryGetValue(key, out value);
  268. internal IChangeToken GetReloadToken()
  269. {
  270. lock (_reloadTokenLock)
  271. {
  272. return _reloadToken;
  273. }
  274. }
  275. /// <summary>
  276. /// 内部配置源,用于集成到 Microsoft.Extensions.Configuration
  277. /// </summary>
  278. private sealed class ApolloConfigurationSource : IConfigurationSource
  279. {
  280. private readonly ApolloCfgSource _apolloSource;
  281. public ApolloConfigurationSource(ApolloCfgSource apolloSource)
  282. {
  283. _apolloSource = apolloSource;
  284. }
  285. public IConfigurationProvider Build(IConfigurationBuilder builder)
  286. {
  287. return new ApolloConfigurationProvider(_apolloSource);
  288. }
  289. }
  290. /// <summary>
  291. /// 内部配置提供程序
  292. /// </summary>
  293. private sealed class ApolloConfigurationProvider : ConfigurationProvider
  294. {
  295. private readonly ApolloCfgSource _apolloSource;
  296. public ApolloConfigurationProvider(ApolloCfgSource apolloSource)
  297. {
  298. _apolloSource = apolloSource;
  299. }
  300. public override void Load()
  301. {
  302. Data = new Dictionary<string, string?>(StringComparer.OrdinalIgnoreCase);
  303. foreach (var key in _apolloSource.GetAllKeys())
  304. {
  305. if (_apolloSource.TryGetValue(key, out var value))
  306. {
  307. Data[key] = value;
  308. }
  309. }
  310. }
  311. public override bool TryGet(string key, out string? value)
  312. {
  313. return _apolloSource.TryGetValue(key, out value);
  314. }
  315. public new IChangeToken GetReloadToken()
  316. {
  317. return _apolloSource.GetReloadToken();
  318. }
  319. }
  320. }