alidns.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # coding=utf-8
  2. """
  3. AliDNS API
  4. 阿里DNS解析操作库
  5. @author: NewFuture
  6. """
  7. from ._base import TYPE_FORM, BaseProvider
  8. from hashlib import sha256
  9. from hmac import new as hmac
  10. from time import strftime, gmtime, time
  11. class AlidnsProvider(BaseProvider):
  12. API = "https://alidns.aliyuncs.com"
  13. content_type = TYPE_FORM # 阿里云DNS API使用表单格式
  14. api_version = "2015-01-09" # API版本,v3签名需要
  15. def _signature_v3(self, method, path, headers, query="", body_hash=""):
  16. # type: (str, str, dict, str, str) -> str
  17. """阿里云API v3签名算法 https://help.aliyun.com/zh/sdk/product-overview/v3-request-structure-and-signature"""
  18. # 构造规范化头部
  19. headers_to_sign = {k.lower(): str(v).strip() for k, v in headers.items()}
  20. # 按字母顺序排序并构造
  21. signed_headers_list = sorted(headers_to_sign.keys())
  22. canonical_headers = "".join("{}:{}\n".format(key, headers_to_sign[key]) for key in signed_headers_list)
  23. signed_headers = ";".join(signed_headers_list)
  24. # 构造规范化请求
  25. canonical_request = "\n".join([method, path, query, canonical_headers, signed_headers, body_hash])
  26. # 5. 构造待签名字符串
  27. algorithm = "ACS3-HMAC-SHA256"
  28. hashed_canonical_request = sha256(canonical_request.encode("utf-8")).hexdigest()
  29. string_to_sign = "\n".join([algorithm, hashed_canonical_request])
  30. self.logger.debug("String to sign: %s", string_to_sign)
  31. # 6. 计算签名
  32. signature = hmac(self.auth_token.encode("utf-8"), string_to_sign.encode("utf-8"), sha256).hexdigest()
  33. # 7. 构造Authorization头
  34. authorization = "{} Credential={},SignedHeaders={},Signature={}".format(
  35. algorithm, self.auth_id, signed_headers, signature
  36. )
  37. return authorization
  38. def _request(self, action, **params):
  39. # type: (str, **(str | int | bytes | bool | None)) -> dict
  40. params = {k: v for k, v in params.items() if v is not None}
  41. # 从API URL中提取host
  42. host = self.API.replace("https://", "").replace("http://", "").strip("/")
  43. body_content = self._encode(params) if len(params) > 0 else ""
  44. content_hash = sha256(body_content.encode("utf-8")).hexdigest()
  45. # 构造请求头部
  46. headers = {
  47. "host": host,
  48. "content-type": self.content_type,
  49. "x-acs-action": action,
  50. "x-acs-content-sha256": content_hash,
  51. "x-acs-date": strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()),
  52. "x-acs-signature-nonce": str(hash(time()))[2:],
  53. "x-acs-version": self.api_version,
  54. }
  55. # 生成Authorization头
  56. authorization = self._signature_v3("POST", "/", headers, body_hash=content_hash)
  57. headers["Authorization"] = authorization
  58. # 对于v3签名的RPC API,参数在request body中
  59. return self._http("POST", "/", body=body_content, headers=headers)
  60. def _split_zone_and_sub(self, domain):
  61. # type: (str) -> tuple[str | None, str | None, str]
  62. """
  63. AliDNS 支持直接查询主域名和RR,无需循环查询。
  64. 返回没有DomainId,用DomainName代替
  65. https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-getmaindomainname
  66. """
  67. res = self._request("GetMainDomainName", InputString=domain)
  68. sub, main = res.get("RR"), res.get("DomainName")
  69. return (main, sub, main or domain)
  70. def _query_zone_id(self, domain):
  71. """调用_split_zone_and_sub可直接获取,无需调用_query_zone_id"""
  72. raise NotImplementedError("_split_zone_and_sub is used to get zone_id")
  73. def _query_record(self, zone_id, subdomain, main_domain, record_type, line, extra):
  74. """https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-describesubdomainrecords"""
  75. sub = self._join_domain(subdomain, main_domain)
  76. data = self._request(
  77. "DescribeSubDomainRecords",
  78. SubDomain=sub, # aliyun API要求SubDomain为完整域名
  79. DomainName=main_domain,
  80. Type=record_type,
  81. Line=line,
  82. PageSize=500,
  83. Lang=extra.get("Lang"), # 默认中文
  84. Status=extra.get("Status"), # 默认全部状态
  85. )
  86. records = data.get("DomainRecords", {}).get("Record", [])
  87. if not records:
  88. self.logger.warning(
  89. "No records found for [%s] with %s <%s> (line: %s)", zone_id, subdomain, record_type, line
  90. )
  91. elif not isinstance(records, list):
  92. self.logger.error("Invalid records format: %s", records)
  93. else:
  94. return next((r for r in records), None)
  95. return None
  96. def _create_record(self, zone_id, subdomain, main_domain, value, record_type, ttl, line, extra):
  97. """https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-adddomainrecord"""
  98. data = self._request(
  99. "AddDomainRecord",
  100. DomainName=main_domain,
  101. RR=subdomain,
  102. Value=value,
  103. Type=record_type,
  104. TTL=ttl,
  105. Line=line,
  106. **extra
  107. )
  108. if data and data.get("RecordId"):
  109. self.logger.info("Record created: %s", data)
  110. return True
  111. self.logger.error("Failed to create record: %s", data)
  112. return False
  113. def _update_record(self, zone_id, old_record, value, record_type, ttl, line, extra):
  114. """https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-updatedomainrecord"""
  115. # 阿里云DNS update新旧值不能一样,先判断是否发生变化
  116. if (
  117. old_record.get("Value") == value
  118. and old_record.get("Type") == record_type
  119. and (not ttl or old_record.get("TTL") == ttl)
  120. ):
  121. domain = self._join_domain(old_record.get("RR"), old_record.get("DomainName"))
  122. self.logger.warning("No changes detected, skipping update for record: %s", domain)
  123. return True
  124. data = self._request(
  125. "UpdateDomainRecord",
  126. RecordId=old_record.get("RecordId"),
  127. Value=value,
  128. RR=old_record.get("RR"),
  129. Type=record_type,
  130. TTL=ttl,
  131. Line=line or old_record.get("Line"),
  132. **extra
  133. )
  134. if data and data.get("RecordId"):
  135. self.logger.info("Record updated: %s", data)
  136. return True
  137. self.logger.error("Failed to update record: %s", data)
  138. return False