alidns.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # coding=utf-8
  2. """
  3. AliDNS API
  4. 阿里DNS解析操作库
  5. @author: NewFuture
  6. """
  7. from time import gmtime, strftime, time
  8. from ._base import TYPE_FORM, BaseProvider, encode_params, join_domain
  9. from ._signature import hmac_sha256_authorization, sha256_hash
  10. class AliBaseProvider(BaseProvider):
  11. """阿里云基础Provider,提供通用的_request方法"""
  12. endpoint = "https://alidns.aliyuncs.com"
  13. content_type = TYPE_FORM # 阿里云DNS API使用表单格式
  14. api_version = "2015-01-09" # API版本,v3签名需要
  15. def _request(self, action, method="POST", **params):
  16. # type: (str, str, **(Any)) -> dict
  17. """Aliyun v3 https://help.aliyun.com/zh/sdk/product-overview/v3-request-structure-and-signature"""
  18. params = {k: v for k, v in params.items() if v is not None}
  19. if method in ("GET", "DELETE"):
  20. # For GET and DELETE requests, parameters go in query string
  21. query_string = encode_params(params) if len(params) > 0 else ""
  22. body_content = ""
  23. else:
  24. # For POST requests, parameters go in body
  25. body_content = self._encode_body(params)
  26. query_string = ""
  27. path = "/"
  28. content_hash = sha256_hash(body_content)
  29. # 构造请求头部
  30. headers = {
  31. "host": self.endpoint.split("://", 1)[1].strip("/"),
  32. "content-type": self.content_type,
  33. "x-acs-action": action,
  34. "x-acs-content-sha256": content_hash,
  35. "x-acs-date": strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()),
  36. "x-acs-signature-nonce": str(hash(time()))[2:],
  37. "x-acs-version": self.api_version,
  38. }
  39. # 使用通用签名函数
  40. authorization = hmac_sha256_authorization(
  41. secret_key=self.token,
  42. method=method,
  43. path=path,
  44. query=query_string,
  45. headers=headers,
  46. body_hash=content_hash,
  47. signing_string_format="ACS3-HMAC-SHA256\n{HashedCanonicalRequest}",
  48. authorization_format=(
  49. "ACS3-HMAC-SHA256 Credential=" + self.id + ",SignedHeaders={SignedHeaders},Signature={Signature}"
  50. ),
  51. )
  52. headers["Authorization"] = authorization
  53. # 对于v3签名的RPC API,参数在request body中
  54. path = path if not query_string else path + "?" + format(query_string)
  55. return self._http(method, path, body=body_content, headers=headers)
  56. class AlidnsProvider(AliBaseProvider):
  57. """阿里云DNS Provider"""
  58. def _split_zone_and_sub(self, domain):
  59. # type: (str) -> tuple[str | None, str | None, str]
  60. """
  61. AliDNS 支持直接查询主域名和RR,无需循环查询。
  62. 返回没有DomainId,用DomainName代替
  63. https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-getmaindomainname
  64. """
  65. res = self._request("GetMainDomainName", InputString=domain)
  66. sub, main = res.get("RR"), res.get("DomainName")
  67. return (main, sub, main or domain)
  68. def _query_zone_id(self, domain):
  69. """调用_split_zone_and_sub可直接获取,无需调用_query_zone_id"""
  70. raise NotImplementedError("_split_zone_and_sub is used to get zone_id")
  71. def _query_record(self, zone_id, subdomain, main_domain, record_type, line, extra):
  72. """https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-describesubdomainrecords"""
  73. sub = join_domain(subdomain, main_domain)
  74. data = self._request(
  75. "DescribeSubDomainRecords",
  76. SubDomain=sub, # aliyun API要求SubDomain为完整域名
  77. DomainName=main_domain,
  78. Type=record_type,
  79. Line=line,
  80. PageSize=500,
  81. Lang=extra.get("Lang"), # 默认中文
  82. Status=extra.get("Status"), # 默认全部状态
  83. )
  84. records = data.get("DomainRecords", {}).get("Record", [])
  85. if not records:
  86. self.logger.warning(
  87. "No records found for [%s] with %s <%s> (line: %s)", zone_id, subdomain, record_type, line
  88. )
  89. elif not isinstance(records, list):
  90. self.logger.error("Invalid records format: %s", records)
  91. else:
  92. return next((r for r in records), None)
  93. return None
  94. def _create_record(self, zone_id, subdomain, main_domain, value, record_type, ttl, line, extra):
  95. """https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-adddomainrecord"""
  96. data = self._request(
  97. "AddDomainRecord",
  98. DomainName=main_domain,
  99. RR=subdomain,
  100. Value=value,
  101. Type=record_type,
  102. TTL=ttl,
  103. Line=line,
  104. **extra
  105. ) # fmt: skip
  106. if data and data.get("RecordId"):
  107. self.logger.info("Record created: %s", data)
  108. return True
  109. self.logger.error("Failed to create record: %s", data)
  110. return False
  111. def _update_record(self, zone_id, old_record, value, record_type, ttl, line, extra):
  112. """https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-updatedomainrecord"""
  113. # 阿里云DNS update新旧值不能一样,先判断是否发生变化
  114. if (
  115. old_record.get("Value") == value
  116. and old_record.get("Type") == record_type
  117. and (not ttl or old_record.get("TTL") == ttl)
  118. ):
  119. domain = join_domain(old_record.get("RR"), old_record.get("DomainName"))
  120. self.logger.warning("No changes detected, skipping update for record: %s", domain)
  121. return True
  122. data = self._request(
  123. "UpdateDomainRecord",
  124. RecordId=old_record.get("RecordId"),
  125. Value=value,
  126. RR=old_record.get("RR"),
  127. Type=record_type,
  128. TTL=ttl,
  129. Line=line or old_record.get("Line"),
  130. **extra
  131. ) # fmt: skip
  132. if data and data.get("RecordId"):
  133. self.logger.info("Record updated: %s", data)
  134. return True
  135. self.logger.error("Failed to update record: %s", data)
  136. return False