| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- # coding=utf-8
- """
- AliDNS API
- 阿里DNS解析操作库
- @author: NewFuture
- """
- from ._base import TYPE_FORM, BaseProvider
- from hashlib import sha256
- from hmac import new as hmac
- from time import strftime, gmtime, time
- class AlidnsProvider(BaseProvider):
- API = "https://alidns.aliyuncs.com"
- content_type = TYPE_FORM # 阿里云DNS API使用表单格式
- api_version = "2015-01-09" # API版本,v3签名需要
- def _signature_v3(self, method, path, headers, query="", body_hash=""):
- # type: (str, str, dict, str, str) -> str
- """阿里云API v3签名算法 https://help.aliyun.com/zh/sdk/product-overview/v3-request-structure-and-signature"""
- # 构造规范化头部
- headers_to_sign = {k.lower(): str(v).strip() for k, v in headers.items()}
- # 按字母顺序排序并构造
- signed_headers_list = sorted(headers_to_sign.keys())
- canonical_headers = "".join("{}:{}\n".format(key, headers_to_sign[key]) for key in signed_headers_list)
- signed_headers = ";".join(signed_headers_list)
- # 构造规范化请求
- canonical_request = "\n".join([method, path, query, canonical_headers, signed_headers, body_hash])
- # 5. 构造待签名字符串
- algorithm = "ACS3-HMAC-SHA256"
- hashed_canonical_request = sha256(canonical_request.encode("utf-8")).hexdigest()
- string_to_sign = "\n".join([algorithm, hashed_canonical_request])
- self.logger.debug("String to sign: %s", string_to_sign)
- # 6. 计算签名
- signature = hmac(self.auth_token.encode("utf-8"), string_to_sign.encode("utf-8"), sha256).hexdigest()
- # 7. 构造Authorization头
- authorization = "{} Credential={},SignedHeaders={},Signature={}".format(
- algorithm, self.auth_id, signed_headers, signature
- )
- return authorization
- def _request(self, action, **params):
- # type: (str, **(str | int | bytes | bool | None)) -> dict
- params = {k: v for k, v in params.items() if v is not None}
- # 从API URL中提取host
- host = self.API.replace("https://", "").replace("http://", "").strip("/")
- body_content = self._encode(params) if len(params) > 0 else ""
- content_hash = sha256(body_content.encode("utf-8")).hexdigest()
- # 构造请求头部
- headers = {
- "host": host,
- "content-type": self.content_type,
- "x-acs-action": action,
- "x-acs-content-sha256": content_hash,
- "x-acs-date": strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()),
- "x-acs-signature-nonce": str(hash(time()))[2:],
- "x-acs-version": self.api_version,
- }
- # 生成Authorization头
- authorization = self._signature_v3("POST", "/", headers, body_hash=content_hash)
- headers["Authorization"] = authorization
- # 对于v3签名的RPC API,参数在request body中
- return self._http("POST", "/", body=body_content, headers=headers)
- def _split_zone_and_sub(self, domain):
- # type: (str) -> tuple[str | None, str | None, str]
- """
- AliDNS 支持直接查询主域名和RR,无需循环查询。
- 返回没有DomainId,用DomainName代替
- https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-getmaindomainname
- """
- res = self._request("GetMainDomainName", InputString=domain)
- sub, main = res.get("RR"), res.get("DomainName")
- return (main, sub, main or domain)
- def _query_zone_id(self, domain):
- """调用_split_zone_and_sub可直接获取,无需调用_query_zone_id"""
- raise NotImplementedError("_split_zone_and_sub is used to get zone_id")
- def _query_record(self, zone_id, subdomain, main_domain, record_type, line, extra):
- """https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-describesubdomainrecords"""
- sub = self._join_domain(subdomain, main_domain)
- data = self._request(
- "DescribeSubDomainRecords",
- SubDomain=sub, # aliyun API要求SubDomain为完整域名
- DomainName=main_domain,
- Type=record_type,
- Line=line,
- PageSize=500,
- Lang=extra.get("Lang"), # 默认中文
- Status=extra.get("Status"), # 默认全部状态
- )
- records = data.get("DomainRecords", {}).get("Record", [])
- if not records:
- self.logger.warning(
- "No records found for [%s] with %s <%s> (line: %s)", zone_id, subdomain, record_type, line
- )
- elif not isinstance(records, list):
- self.logger.error("Invalid records format: %s", records)
- else:
- return next((r for r in records), None)
- return None
- def _create_record(self, zone_id, subdomain, main_domain, value, record_type, ttl, line, extra):
- """https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-adddomainrecord"""
- data = self._request(
- "AddDomainRecord",
- DomainName=main_domain,
- RR=subdomain,
- Value=value,
- Type=record_type,
- TTL=ttl,
- Line=line,
- **extra
- )
- if data and data.get("RecordId"):
- self.logger.info("Record created: %s", data)
- return True
- self.logger.error("Failed to create record: %s", data)
- return False
- def _update_record(self, zone_id, old_record, value, record_type, ttl, line, extra):
- """https://help.aliyun.com/zh/dns/api-alidns-2015-01-09-updatedomainrecord"""
- # 阿里云DNS update新旧值不能一样,先判断是否发生变化
- if (
- old_record.get("Value") == value
- and old_record.get("Type") == record_type
- and (not ttl or old_record.get("TTL") == ttl)
- ):
- domain = self._join_domain(old_record.get("RR"), old_record.get("DomainName"))
- self.logger.warning("No changes detected, skipping update for record: %s", domain)
- return True
- data = self._request(
- "UpdateDomainRecord",
- RecordId=old_record.get("RecordId"),
- Value=value,
- RR=old_record.get("RR"),
- Type=record_type,
- TTL=ttl,
- Line=line or old_record.get("Line"),
- **extra
- )
- if data and data.get("RecordId"):
- self.logger.info("Record updated: %s", data)
- return True
- self.logger.error("Failed to update record: %s", data)
- return False
|