tencentcloud.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # coding=utf-8
  2. """
  3. Tencent Cloud DNSPod API
  4. 腾讯云 DNSPod API
  5. @author: NewFuture
  6. """
  7. from ._base import BaseProvider, TYPE_JSON
  8. from hashlib import sha256
  9. from hmac import new as hmac
  10. from time import time, strftime, gmtime
  11. from json import dumps as jsonencode
  12. class TencentCloudProvider(BaseProvider):
  13. """
  14. 腾讯云 DNSPod API 提供商
  15. Tencent Cloud DNSPod API Provider
  16. API Version: 2021-03-23
  17. Documentation: https://cloud.tencent.com/document/api/1427
  18. """
  19. API = "https://dnspod.tencentcloudapi.com"
  20. content_type = TYPE_JSON
  21. # 腾讯云 DNSPod API 配置
  22. service = "dnspod"
  23. version_date = "2021-03-23"
  24. def _sign_tc3(self, method, uri, query, headers, payload, timestamp):
  25. """
  26. 腾讯云 API 3.0 签名算法 (TC3-HMAC-SHA256)
  27. API 文档: https://cloud.tencent.com/document/api/1427/56189
  28. Args:
  29. method (str): HTTP 方法
  30. uri (str): URI 路径
  31. query (str): 查询字符串
  32. headers (dict): 请求头
  33. payload (str): 请求体
  34. timestamp (int): 时间戳
  35. Returns:
  36. str: Authorization 头部值
  37. """
  38. algorithm = "TC3-HMAC-SHA256"
  39. # Step 1: 构建规范请求串
  40. http_request_method = method.upper()
  41. canonical_uri = uri
  42. canonical_querystring = query or ""
  43. # 构建规范头部
  44. signed_headers_list = []
  45. canonical_headers = ""
  46. for key in sorted(headers.keys()):
  47. if key in ["content-type", "host"]:
  48. signed_headers_list.append(key)
  49. canonical_headers += "{}:{}\n".format(key, headers[key])
  50. signed_headers = ";".join(signed_headers_list)
  51. hashed_request_payload = sha256(payload.encode("utf-8")).hexdigest()
  52. canonical_request = "\n".join(
  53. [
  54. http_request_method,
  55. canonical_uri,
  56. canonical_querystring,
  57. canonical_headers,
  58. signed_headers,
  59. hashed_request_payload,
  60. ]
  61. )
  62. # Step 2: 构建待签名字符串
  63. date = strftime("%Y-%m-%d", gmtime()) # 日期
  64. credential_scope = "{}/{}/tc3_request".format(date, self.service)
  65. hashed_canonical_request = sha256(canonical_request.encode("utf-8")).hexdigest()
  66. string_to_sign = "\n".join([algorithm, str(timestamp), credential_scope, hashed_canonical_request])
  67. # Step 3: 计算签名
  68. def _sign(key, msg):
  69. return hmac(key, msg.encode("utf-8"), sha256).digest()
  70. secret_date = _sign(("TC3" + self.auth_token).encode("utf-8"), date)
  71. secret_service = _sign(secret_date, self.service)
  72. secret_signing = _sign(secret_service, "tc3_request")
  73. signature = hmac(secret_signing, string_to_sign.encode("utf-8"), sha256).hexdigest()
  74. # Step 4: 构建 Authorization 头部
  75. authorization = "{} Credential={}/{}, SignedHeaders={}, Signature={}".format(
  76. algorithm, self.auth_id, credential_scope, signed_headers, signature
  77. )
  78. return authorization
  79. def _request(self, action, **params):
  80. # type: (str, **(str | int | bytes | bool | None)) -> dict | None
  81. """
  82. 发送腾讯云 API 请求
  83. API 文档: https://cloud.tencent.com/document/api/1427/56187
  84. Args:
  85. action (str): API 操作名称
  86. params (dict): 请求参数
  87. Returns:
  88. dict: API 响应结果
  89. """
  90. params = {k: v for k, v in params.items() if v is not None}
  91. timestamp = int(time())
  92. # 构建请求头,小写
  93. headers = {
  94. "content-type": self.content_type,
  95. "host": self.API.split("://", 1)[1].strip("/"),
  96. "X-TC-Action": action,
  97. "X-TC-Version": self.version_date,
  98. "X-TC-Timestamp": str(timestamp),
  99. }
  100. # 构建请求体
  101. payload = jsonencode(params)
  102. # 生成签名
  103. authorization = self._sign_tc3("POST", "/", "", headers, payload, timestamp)
  104. headers["authorization"] = authorization
  105. # 发送请求
  106. response = self._http("POST", "/", body=payload, headers=headers)
  107. if response and "Response" in response:
  108. if "Error" in response["Response"]:
  109. error = response["Response"]["Error"]
  110. self.logger.error(
  111. "TencentCloud API error: %s - %s",
  112. error.get("Code", "Unknown"),
  113. error.get("Message", "Unknown error"),
  114. )
  115. return None
  116. return response["Response"]
  117. self.logger.warning("Unexpected response format: %s", response)
  118. return None
  119. def _query_zone_id(self, domain):
  120. # type: (str) -> str | None
  121. """查询域名的 zone_id (domain id) https://cloud.tencent.com/document/api/1427/56173"""
  122. # 使用 DescribeDomain API 查询指定域名的信息
  123. response = self._request("DescribeDomain", Domain=domain)
  124. if not response or "DomainInfo" not in response:
  125. self.logger.debug("Domain info not found or query failed for: %s", domain)
  126. return None
  127. domain_id = response.get("DomainInfo", {}).get("DomainId")
  128. if domain_id is not None:
  129. self.logger.debug("Found domain %s with ID: %s", domain, domain_id)
  130. return str(domain_id)
  131. self.logger.debug("Domain ID not found in response for: %s", domain)
  132. return None
  133. def _query_record(self, zone_id, subdomain, main_domain, record_type, line, extra):
  134. # type: (str, str, str, str, str | None, dict) -> dict | None
  135. """查询 DNS 记录列表 https://cloud.tencent.com/document/api/1427/56166"""
  136. response = self._request(
  137. "DescribeRecordList",
  138. DomainId=int(zone_id),
  139. Subdomain=subdomain,
  140. Domain=main_domain,
  141. RecordType=record_type,
  142. RecordLine=line,
  143. **extra
  144. )
  145. if not response or "RecordList" not in response:
  146. self.logger.debug("No records found or query failed")
  147. return None
  148. records = response["RecordList"]
  149. if not records:
  150. self.logger.debug("No records found for subdomain: %s", subdomain)
  151. return None
  152. # 查找匹配的记录
  153. target_name = subdomain if subdomain and subdomain != "@" else "@"
  154. for record in records:
  155. if record.get("Name") == target_name and record.get("Type") == record_type.upper():
  156. self.logger.debug("Found existing record: %s", record)
  157. return record
  158. self.logger.debug("No matching record found")
  159. return None
  160. def _create_record(self, zone_id, subdomain, main_domain, value, record_type, ttl, line, extra):
  161. """创建 DNS 记录 https://cloud.tencent.com/document/api/1427/56180"""
  162. extra["Remark"] = extra.get("Remark", self.remark)
  163. response = self._request(
  164. "CreateRecord",
  165. Domain=main_domain,
  166. DomainId=int(zone_id),
  167. SubDomain=subdomain,
  168. RecordType=record_type,
  169. Value=value,
  170. RecordLine=line or "默认",
  171. TTL=int(ttl) if ttl else None,
  172. **extra
  173. )
  174. if response and "RecordId" in response:
  175. self.logger.info("Record created successfully with ID: %s", response["RecordId"])
  176. return True
  177. self.logger.error("Failed to create record:\n%s", response)
  178. return False
  179. def _update_record(self, zone_id, old_record, value, record_type, ttl, line, extra):
  180. """更新 DNS 记录: https://cloud.tencent.com/document/api/1427/56157"""
  181. extra["Remark"] = extra.get("Remark", self.remark)
  182. response = self._request(
  183. "ModifyRecord",
  184. Domain=old_record.get("Domain", ""),
  185. DomainId=old_record.get("DomainId", int(zone_id)),
  186. SubDomain=old_record.get("Name"),
  187. RecordId=old_record.get("RecordId"),
  188. RecordType=record_type,
  189. RecordLine=old_record.get("Line", line or "默认"),
  190. Value=value,
  191. TTL=int(ttl) if ttl else None,
  192. **extra
  193. )
  194. if response and "RecordId" in response:
  195. self.logger.info("Record updated successfully")
  196. return True
  197. self.logger.error("Failed to update record")
  198. return False