tencentcloud.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. # coding=utf-8
  2. """
  3. Tencent Cloud DNSPod API
  4. 腾讯云 DNSPod API
  5. @author: NewFuture
  6. """
  7. from ._base import BaseProvider, TYPE_JSON, hmac_sha256_authorization, sha256_hash, hmac_sha256
  8. from time import time, strftime, gmtime
  9. from json import dumps as jsonencode
  10. class TencentCloudProvider(BaseProvider):
  11. """
  12. 腾讯云 DNSPod API 提供商
  13. Tencent Cloud DNSPod API Provider
  14. API Version: 2021-03-23
  15. Documentation: https://cloud.tencent.com/document/api/1427
  16. """
  17. API = "https://dnspod.tencentcloudapi.com"
  18. content_type = TYPE_JSON
  19. # 腾讯云 DNSPod API 配置
  20. service = "dnspod"
  21. version_date = "2021-03-23"
  22. def _request(self, action, **params):
  23. # type: (str, **(str | int | bytes | bool | None)) -> dict | None
  24. """
  25. 发送腾讯云 API 请求
  26. API 文档: https://cloud.tencent.com/document/api/1427/56187
  27. Args:
  28. action (str): API 操作名称
  29. params (dict): 请求参数
  30. Returns:
  31. dict: API 响应结果
  32. """
  33. # 构建请求体
  34. params = {k: v for k, v in params.items() if v is not None}
  35. body = jsonencode(params)
  36. # 构建请求头,小写 腾讯云只签名特定头部
  37. headers = {
  38. "content-type": self.content_type,
  39. "host": self.API.split("://", 1)[1].strip("/"),
  40. }
  41. # 腾讯云特殊的密钥派生过程
  42. date = strftime("%Y-%m-%d", gmtime())
  43. credential_scope = "{}/{}/tc3_request".format(date, self.service)
  44. # 派生签名密钥
  45. secret_date = hmac_sha256("TC3" + self.auth_token, date).digest()
  46. secret_service = hmac_sha256(secret_date, self.service).digest()
  47. signing_key = hmac_sha256(secret_service, "tc3_request").digest()
  48. # 预处理模板字符串
  49. auth_format = "TC3-HMAC-SHA256 Credential=%s/%s, SignedHeaders={SignedHeaders}, Signature={Signature}" % (
  50. self.auth_id,
  51. credential_scope,
  52. )
  53. timestamp = str(int(time()))
  54. sign_template = "\n".join(["TC3-HMAC-SHA256", timestamp, credential_scope, "{HashedCanonicalRequest}"])
  55. authorization = hmac_sha256_authorization(
  56. secret_key=signing_key,
  57. method="POST",
  58. path="/",
  59. query="",
  60. headers=headers,
  61. body_hash=sha256_hash(body),
  62. signing_string_format=sign_template,
  63. authorization_format=auth_format,
  64. )
  65. # X-TC 更新签名之后方可添加
  66. headers.update(
  67. {
  68. "X-TC-Action": action,
  69. "X-TC-Version": self.version_date,
  70. "X-TC-Timestamp": timestamp,
  71. "authorization": authorization,
  72. }
  73. )
  74. response = self._http("POST", "/", body=body, headers=headers)
  75. if response and "Response" in response:
  76. if "Error" in response["Response"]:
  77. error = response["Response"]["Error"]
  78. self.logger.error(
  79. "TencentCloud API error: %s - %s",
  80. error.get("Code", "Unknown"),
  81. error.get("Message", "Unknown error"),
  82. )
  83. return None
  84. return response["Response"]
  85. self.logger.warning("Unexpected response format: %s", response)
  86. return None
  87. def _query_zone_id(self, domain):
  88. # type: (str) -> str | None
  89. """查询域名的 zone_id (domain id) https://cloud.tencent.com/document/api/1427/56173"""
  90. # 使用 DescribeDomain API 查询指定域名的信息
  91. response = self._request("DescribeDomain", Domain=domain)
  92. if not response or "DomainInfo" not in response:
  93. self.logger.debug("Domain info not found or query failed for: %s", domain)
  94. return None
  95. domain_id = response.get("DomainInfo", {}).get("DomainId")
  96. if domain_id is not None:
  97. self.logger.debug("Found domain %s with ID: %s", domain, domain_id)
  98. return str(domain_id)
  99. self.logger.debug("Domain ID not found in response for: %s", domain)
  100. return None
  101. def _query_record(self, zone_id, subdomain, main_domain, record_type, line, extra):
  102. # type: (str, str, str, str, str | None, dict) -> dict | None
  103. """查询 DNS 记录列表 https://cloud.tencent.com/document/api/1427/56166"""
  104. response = self._request(
  105. "DescribeRecordList",
  106. DomainId=int(zone_id),
  107. Subdomain=subdomain,
  108. Domain=main_domain,
  109. RecordType=record_type,
  110. RecordLine=line,
  111. **extra
  112. )
  113. if not response or "RecordList" not in response:
  114. self.logger.debug("No records found or query failed")
  115. return None
  116. records = response["RecordList"]
  117. if not records:
  118. self.logger.debug("No records found for subdomain: %s", subdomain)
  119. return None
  120. # 查找匹配的记录
  121. target_name = subdomain if subdomain and subdomain != "@" else "@"
  122. for record in records:
  123. if record.get("Name") == target_name and record.get("Type") == record_type.upper():
  124. self.logger.debug("Found existing record: %s", record)
  125. return record
  126. self.logger.debug("No matching record found")
  127. return None
  128. def _create_record(self, zone_id, subdomain, main_domain, value, record_type, ttl, line, extra):
  129. """创建 DNS 记录 https://cloud.tencent.com/document/api/1427/56180"""
  130. extra["Remark"] = extra.get("Remark", self.remark)
  131. response = self._request(
  132. "CreateRecord",
  133. Domain=main_domain,
  134. DomainId=int(zone_id),
  135. SubDomain=subdomain,
  136. RecordType=record_type,
  137. Value=value,
  138. RecordLine=line or "默认",
  139. TTL=int(ttl) if ttl else None,
  140. **extra
  141. )
  142. if response and "RecordId" in response:
  143. self.logger.info("Record created successfully with ID: %s", response["RecordId"])
  144. return True
  145. self.logger.error("Failed to create record:\n%s", response)
  146. return False
  147. def _update_record(self, zone_id, old_record, value, record_type, ttl, line, extra):
  148. """更新 DNS 记录: https://cloud.tencent.com/document/api/1427/56157"""
  149. extra["Remark"] = extra.get("Remark", self.remark)
  150. response = self._request(
  151. "ModifyRecord",
  152. Domain=old_record.get("Domain", ""),
  153. DomainId=old_record.get("DomainId", int(zone_id)),
  154. SubDomain=old_record.get("Name"),
  155. RecordId=old_record.get("RecordId"),
  156. RecordType=record_type,
  157. RecordLine=old_record.get("Line", line or "默认"),
  158. Value=value,
  159. TTL=int(ttl) if ttl else None,
  160. **extra
  161. )
  162. if response and "RecordId" in response:
  163. self.logger.info("Record updated successfully")
  164. return True
  165. self.logger.error("Failed to update record")
  166. return False