alidns.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. # coding=utf-8
  2. """
  3. AliDNS API
  4. 阿里DNS解析操作库
  5. @author: NewFuture
  6. """
  7. from ._base import TYPE_FORM, BaseProvider, hmac_sha256_authorization, sha256_hash
  8. from time import strftime, gmtime, time
  9. class AlidnsProvider(BaseProvider):
  10. API = "https://alidns.aliyuncs.com"
  11. content_type = TYPE_FORM # 阿里云DNS API使用表单格式
  12. api_version = "2015-01-09" # API版本,v3签名需要
  13. def _request(self, action, **params):
  14. # type: (str, **(str | int | bytes | bool | None)) -> dict
  15. """Aliyun v3 https://help.aliyun.com/zh/sdk/product-overview/v3-request-structure-and-signature"""
  16. params = {k: v for k, v in params.items() if v is not None}
  17. body_content = self._encode(params) if len(params) > 0 else ""
  18. content_hash = sha256_hash(body_content)
  19. # 构造请求头部
  20. headers = {
  21. "host": self.API.split("://", 1)[1].strip("/"),
  22. "content-type": self.content_type,
  23. "x-acs-action": action,
  24. "x-acs-content-sha256": content_hash,
  25. "x-acs-date": strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()),
  26. "x-acs-signature-nonce": str(hash(time()))[2:],
  27. "x-acs-version": self.api_version,
  28. }
  29. # 使用通用签名函数
  30. authorization = hmac_sha256_authorization(
  31. secret_key=self.auth_token,
  32. method="POST",
  33. path="/",
  34. query="",
  35. headers=headers,
  36. body_hash=content_hash,
  37. signing_string_format="ACS3-HMAC-SHA256\n{HashedCanonicalRequest}",
  38. authorization_format=(
  39. "ACS3-HMAC-SHA256 Credential=" + self.auth_id + ",SignedHeaders={SignedHeaders},Signature={Signature}"
  40. ),
  41. )
  42. headers["Authorization"] = authorization
  43. # 对于v3签名的RPC API,参数在request body中
  44. return self._http("POST", "/", body=body_content, headers=headers)
  45. def _split_zone_and_sub(self, domain):
  46. # type: (str) -> tuple[str | None, str | None, str]
  47. """
  48. AliDNS 支持直接查询主域名和RR,无需循环查询。
  49. 返回没有DomainId,用DomainName代替
  50. https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-getmaindomainname
  51. """
  52. res = self._request("GetMainDomainName", InputString=domain)
  53. sub, main = res.get("RR"), res.get("DomainName")
  54. return (main, sub, main or domain)
  55. def _query_zone_id(self, domain):
  56. """调用_split_zone_and_sub可直接获取,无需调用_query_zone_id"""
  57. raise NotImplementedError("_split_zone_and_sub is used to get zone_id")
  58. def _query_record(self, zone_id, subdomain, main_domain, record_type, line, extra):
  59. """https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-describesubdomainrecords"""
  60. sub = self._join_domain(subdomain, main_domain)
  61. data = self._request(
  62. "DescribeSubDomainRecords",
  63. SubDomain=sub, # aliyun API要求SubDomain为完整域名
  64. DomainName=main_domain,
  65. Type=record_type,
  66. Line=line,
  67. PageSize=500,
  68. Lang=extra.get("Lang"), # 默认中文
  69. Status=extra.get("Status"), # 默认全部状态
  70. )
  71. records = data.get("DomainRecords", {}).get("Record", [])
  72. if not records:
  73. self.logger.warning(
  74. "No records found for [%s] with %s <%s> (line: %s)", zone_id, subdomain, record_type, line
  75. )
  76. elif not isinstance(records, list):
  77. self.logger.error("Invalid records format: %s", records)
  78. else:
  79. return next((r for r in records), None)
  80. return None
  81. def _create_record(self, zone_id, subdomain, main_domain, value, record_type, ttl, line, extra):
  82. """https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-adddomainrecord"""
  83. data = self._request(
  84. "AddDomainRecord",
  85. DomainName=main_domain,
  86. RR=subdomain,
  87. Value=value,
  88. Type=record_type,
  89. TTL=ttl,
  90. Line=line,
  91. **extra
  92. )
  93. if data and data.get("RecordId"):
  94. self.logger.info("Record created: %s", data)
  95. return True
  96. self.logger.error("Failed to create record: %s", data)
  97. return False
  98. def _update_record(self, zone_id, old_record, value, record_type, ttl, line, extra):
  99. """https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-updatedomainrecord"""
  100. # 阿里云DNS update新旧值不能一样,先判断是否发生变化
  101. if (
  102. old_record.get("Value") == value
  103. and old_record.get("Type") == record_type
  104. and (not ttl or old_record.get("TTL") == ttl)
  105. ):
  106. domain = self._join_domain(old_record.get("RR"), old_record.get("DomainName"))
  107. self.logger.warning("No changes detected, skipping update for record: %s", domain)
  108. return True
  109. data = self._request(
  110. "UpdateDomainRecord",
  111. RecordId=old_record.get("RecordId"),
  112. Value=value,
  113. RR=old_record.get("RR"),
  114. Type=record_type,
  115. TTL=ttl,
  116. Line=line or old_record.get("Line"),
  117. **extra
  118. )
  119. if data and data.get("RecordId"):
  120. self.logger.info("Record updated: %s", data)
  121. return True
  122. self.logger.error("Failed to update record: %s", data)
  123. return False