123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548 |
- #!/usr/bin/env python3
- #
- # SPDX-FileCopyrightText: (c) 2020-2021 CokeMine & Its repository contributors
- # SPDX-FileCopyrightText: (c) 2021 A beam of light
- #
- # SPDX-License-Identifier: GPL-3.0-or-later
- #
- """
- euserv auto-renew script
- ChangeLog
- v2021.09.30
- - Captcha automatic recognition using TrueCaptcha API
- - Email notification
- - Add login failure retry mechanism
- - reformat log info
- v2021.11.06
- - Receive renew PIN(6-digits) using mailparser parsed data download url
- workflow: auto-forward your EUserv PIN email to your mailparser inbox
- -> parsing PIN via mailparser -> get PIN from mailparser
- - Update kc2_security_password_get_token request
- v2021.11.26
- - Adjust TrueCaptcha constraint parameters for high availability.
- Plus, the CAPTCHA of EUserv is currently case-insensitive, so the above adjustment works.
- """
- import os
- import re
- import json
- import time
- import base64
- from email.mime.application import MIMEApplication
- from email.mime.multipart import MIMEMultipart
- from email.mime.text import MIMEText
- from smtplib import SMTP_SSL, SMTPDataError
- import requests
- from bs4 import BeautifulSoup
- # 多个账户请使用空格隔开
- USERNAME = os.environ["USERNAME"] # 用户名或邮箱
- PASSWORD = os.environ["PASSWORD"] # 密码
- # default value is TrueCaptcha demo credential,
- # you can use your own credential via set environment variables:
- # TRUECAPTCHA_USERID and TRUECAPTCHA_APIKEY
- # demo: https://apitruecaptcha.org/demo
- # demo2: https://apitruecaptcha.org/demo2
- # demo apikey also has a limit of 100 times per day
- # {
- # 'error': '101.0 above free usage limit 100 per day and no balance',
- # 'requestId': '7690c065-70e0-4757-839b-5fd8381e65c7'
- # }
- TRUECAPTCHA_USERID = os.environ.get("TRUECAPTCHA_USERID", "arun56")
- TRUECAPTCHA_APIKEY = os.environ.get("TRUECAPTCHA_APIKEY", "wMjXmBIcHcdYqO2RrsVN")
- # Extract key data from your emails, automatically. https://mailparser.io
- # 30 Emails/Month, 10 inboxes and unlimited downloads for free.
- # 多个mailparser下载链接id请使用空格隔开, 顺序与 EUserv 账号/邮箱一一对应
- MAILPARSER_DOWNLOAD_URL_ID = os.environ["MAILPARSER_DOWNLOAD_URL_ID"]
- # mailparser.io parsed data download base url
- MAILPARSER_DOWNLOAD_BASE_URL = "https://files.mailparser.io/d/"
- # Telegram Bot Push https://core.telegram.org/bots/api#authorizing-your-bot
- TG_BOT_TOKEN = "" # 通过 @BotFather 申请获得,示例:1077xxx4424:AAFjv0FcqxxxxxxgEMGfi22B4yh15R5uw
- TG_USER_ID = "" # 用户、群组或频道 ID,示例:129xxx206
- TG_API_HOST = "https://api.telegram.org" # 自建 API 反代地址,供网络环境无法访问时使用,网络正常则保持默认
- # Email notification
- RECEIVER_EMAIL = os.environ.get("RECEIVER_EMAIL", "")
- YD_EMAIL = os.environ.get("YD_EMAIL", "")
- YD_APP_PWD = os.environ.get("YD_APP_PWD", "") # yandex mail 使用第三方 APP 授权码
- # Server 酱(ServerChan) https://sct.ftqq.com
- # 免费额度: 每天最多发送 5 条, 卡片仅显示标题, 每天 API 最大请求 1000 次, 每分钟最多发送 5 条.
- SERVER_CHAN_SENDKEY = os.environ.get("SERVER_CHAN_SENDKEY", "") # Server 酱的 SENDKEY,无需推送可忽略
- # Magic internet access
- PROXIES = {"http": "http://127.0.0.1:10808", "https": "http://127.0.0.1:10808"}
- # Maximum number of login retry
- LOGIN_MAX_RETRY_COUNT = 5
- # Waiting time of receiving PIN, units are seconds.
- WAITING_TIME_OF_PIN = 15
- # options: True or False
- CHECK_CAPTCHA_SOLVER_USAGE = True
- user_agent = (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/95.0.4638.69 Safari/537.36"
- )
- desp = "" # 空值
- def log(info: str):
- print(info)
- global desp
- desp = desp + info + "\n\n"
- def login_retry(*args, **kwargs):
- def wrapper(func):
- def inner(username, password):
- ret, ret_session = func(username, password)
- max_retry = kwargs.get("max_retry")
- # default retry 3 times
- if not max_retry:
- max_retry = 3
- number = 0
- if ret == "-1":
- while number < max_retry:
- number += 1
- if number > 1:
- ordinal = lambda n: "{}{}".format(n,"tsnrhtdd"[(n/10%10!=1)*(n%10<4)*n%10::4])
- log(f"[EUserv] Login tried the {ordinal(number)} time.")
- sess_id, session = func(username, password)
- if sess_id != "-1":
- return sess_id, session
- else:
- if number == max_retry:
- return sess_id, session
- else:
- return ret, ret_session
- return inner
- return wrapper
- def captcha_solver(captcha_image_url: str, session: requests.session) -> dict:
- """
- TrueCaptcha API doc: https://apitruecaptcha.org/api
- Free to use 100 requests per day.
- -- response::
- {
- "result": "", ==> Or "result": 0
- "conf": 0.85,
- "usage": 0,
- "requestId": "ed0006e5-69f0-4617-b698-97dc054f9022",
- "version": "dev2"
- }
- """
- response = session.get(captcha_image_url)
- encoded_string = base64.b64encode(response.content)
- url = "https://api.apitruecaptcha.org/one/gettext"
- # Since "case": "mixed", "mode": "human"
- # can sometimes cause internal errors in the truecaptcha server.
- # So a more relaxed constraint(lower/upper & default) is used here.
- # Plus, the CAPTCHA of EUserv is currently case-insensitive, so the below adjustment works.
- data = {
- "userid": TRUECAPTCHA_USERID,
- "apikey": TRUECAPTCHA_APIKEY,
- # case sensitivity of text (upper | lower| mixed)
- "case": "lower",
- # use human or AI (human | default)
- "mode": "default",
- "data": str(encoded_string)[2:-1],
- }
- r = requests.post(url=url, json=data)
- j = json.loads(r.text)
- return j
- def handle_captcha_solved_result(solved: dict) -> str:
- """Since CAPTCHA sometimes appears as a very simple binary arithmetic expression.
- But since recognition sometimes doesn't show the result of the calculation directly,
- that's what this function is for.
- """
- if "result" in solved:
- solved_result = solved["result"]
- if isinstance(solved_result, str):
- if "RESULT IS" in solved_result:
- log("[Captcha Solver] You are using the demo apikey.")
- print("There is no guarantee that demo apikey will work in the future!")
- # because using demo apikey
- text = re.findall(r"RESULT IS . (.*) .", solved_result)[0]
- else:
- # using your own apikey
- log("[Captcha Solver] You are using your own apikey.")
- text = solved_result
- operators = ["X", "x", "+", "-"]
- if any(x in text for x in operators):
- for operator in operators:
- operator_pos = text.find(operator)
- if operator == "x" or operator == "X":
- operator = "*"
- if operator_pos != -1:
- left_part = text[:operator_pos]
- right_part = text[operator_pos + 1 :]
- if left_part.isdigit() and right_part.isdigit():
- return eval(
- "{left} {operator} {right}".format(
- left=left_part, operator=operator, right=right_part
- )
- )
- else:
- # Because these symbols("X", "x", "+", "-") do not appear at the same time,
- # it just contains an arithmetic symbol.
- return text
- else:
- return text
- else:
- print(f"[Captcha Solver] Returned JSON: {solved}")
- log("[Captcha Solver] Service Exception!")
- raise ValueError("[Captcha Solver] Service Exception!")
- else:
- print(f"[Captcha Solver] Returned JSON: {solved}")
- log("[Captcha Solver] Failed to find parsed results!")
- raise KeyError("[Captcha Solver] Failed to find parsed results!")
- def get_captcha_solver_usage() -> dict:
- url = "https://api.apitruecaptcha.org/one/getusage"
- params = {
- "username": TRUECAPTCHA_USERID,
- "apikey": TRUECAPTCHA_APIKEY,
- }
- r = requests.get(url=url, params=params)
- j = json.loads(r.text)
- return j
- def get_pin_from_mailparser(url_id: str) -> str:
- """
- response format:
- [
- {
- "id": "83b95f50f6202fb03950afbe00975eab",
- "received_at": "2021-11-06 02:30:07", ==> up to mailparser account timezone setting, here is UTC 0000.
- "processed_at": "2021-11-06 02:30:07",
- "pin": "123456"
- }
- ]
- """
- response = requests.get(
- f"{MAILPARSER_DOWNLOAD_BASE_URL}{url_id}",
- # Mailparser parsed data download using Basic Authentication.
- # auth=("<your mailparser username>", "<your mailparser password>")
- )
- pin = response.json()[0]["pin"]
- return pin
- @login_retry(max_retry=LOGIN_MAX_RETRY_COUNT)
- def login(username: str, password: str) -> (str, requests.session):
- headers = {"user-agent": user_agent, "origin": "https://www.euserv.com"}
- url = "https://support.euserv.com/index.iphp"
- captcha_image_url = "https://support.euserv.com/securimage_show.php"
- session = requests.Session()
- sess = session.get(url, headers=headers)
- sess_id = re.findall("PHPSESSID=(\\w{10,100});", str(sess.headers))[0]
- # visit png
- logo_png_url = "https://support.euserv.com/pic/logo_small.png"
- session.get(logo_png_url, headers=headers)
- login_data = {
- "email": username,
- "password": password,
- "form_selected_language": "en",
- "Submit": "Login",
- "subaction": "login",
- "sess_id": sess_id,
- }
- f = session.post(url, headers=headers, data=login_data)
- f.raise_for_status()
- if (
- f.text.find("Hello") == -1
- and f.text.find("Confirm or change your customer data here") == -1
- ):
- if (
- f.text.find(
- "To finish the login process please solve the following captcha."
- )
- == -1
- ):
- return "-1", session
- else:
- log("[Captcha Solver] 进行验证码识别...")
- solved_result = captcha_solver(captcha_image_url, session)
- try:
- captcha_code = handle_captcha_solved_result(solved_result)
- log("[Captcha Solver] 识别的验证码是: {}".format(captcha_code))
- if CHECK_CAPTCHA_SOLVER_USAGE:
- usage = get_captcha_solver_usage()
- log(
- "[Captcha Solver] current date {0} api usage count: {1}".format(
- usage[0]["date"], usage[0]["count"]
- )
- )
- f2 = session.post(
- url,
- headers=headers,
- data={
- "subaction": "login",
- "sess_id": sess_id,
- "captcha_code": captcha_code,
- },
- )
- if (
- f2.text.find(
- "To finish the login process please solve the following captcha."
- )
- == -1
- ):
- log("[Captcha Solver] 验证通过")
- return sess_id, session
- else:
- log("[Captcha Solver] 验证失败")
- return "-1", session
- except (KeyError, ValueError):
- return "-1", session
- else:
- return sess_id, session
- def get_servers(sess_id: str, session: requests.session) -> {}:
- d = {}
- url = "https://support.euserv.com/index.iphp?sess_id=" + sess_id
- headers = {"user-agent": user_agent, "origin": "https://www.euserv.com"}
- f = session.get(url=url, headers=headers)
- f.raise_for_status()
- soup = BeautifulSoup(f.text, "html.parser")
- for tr in soup.select(
- "#kc2_order_customer_orders_tab_content_1 .kc2_order_table.kc2_content_table tr"
- ):
- server_id = tr.select(".td-z1-sp1-kc")
- if not len(server_id) == 1:
- continue
- flag = (
- True
- if tr.select(".td-z1-sp2-kc .kc2_order_action_container")[0]
- .get_text()
- .find("Contract extension possible from")
- == -1
- else False
- )
- d[server_id[0].get_text()] = flag
- return d
- def renew(
- sess_id: str, session: requests.session, password: str, order_id: str, mailparser_dl_url_id: str
- ) -> bool:
- url = "https://support.euserv.com/index.iphp"
- headers = {
- "user-agent": user_agent,
- "Host": "support.euserv.com",
- "origin": "https://support.euserv.com",
- "Referer": "https://support.euserv.com/index.iphp",
- }
- data = {
- "Submit": "Extend contract",
- "sess_id": sess_id,
- "ord_no": order_id,
- "subaction": "choose_order",
- "choose_order_subaction": "show_contract_details",
- }
- session.post(url, headers=headers, data=data)
- # pop up 'Security Check' window, it will trigger 'send PIN' automatically.
- session.post(
- url,
- headers=headers,
- data={
- "sess_id": sess_id,
- "subaction": "show_kc2_security_password_dialog",
- "prefix": "kc2_customer_contract_details_extend_contract_",
- "type": "1",
- },
- )
- # # trigger 'Send new PIN to your Email-Address' (optional),
- # new_pin = session.post(url, headers=headers, data={
- # "sess_id": sess_id,
- # "subaction": "kc2_security_password_send_pin",
- # "ident": f"kc2_customer_contract_details_extend_contract_{order_id}"
- # })
- # if not json.loads(new_pin.text)["rc"] == "100":
- # print("new PIN Not Sended")
- # return False
- # sleep WAITING_TIME_OF_PIN seconds waiting for mailparser email parsed PIN
- time.sleep(WAITING_TIME_OF_PIN)
- pin = get_pin_from_mailparser(mailparser_dl_url_id)
- log(f"[MailParser] PIN: {pin}")
- # using PIN instead of password to get token
- data = {
- "auth": pin,
- "sess_id": sess_id,
- "subaction": "kc2_security_password_get_token",
- "prefix": "kc2_customer_contract_details_extend_contract_",
- "type": 1,
- "ident": f"kc2_customer_contract_details_extend_contract_{order_id}",
- }
- f = session.post(url, headers=headers, data=data)
- f.raise_for_status()
- if not json.loads(f.text)["rs"] == "success":
- return False
- token = json.loads(f.text)["token"]["value"]
- data = {
- "sess_id": sess_id,
- "ord_id": order_id,
- "subaction": "kc2_customer_contract_details_extend_contract_term",
- "token": token,
- }
- session.post(url, headers=headers, data=data)
- time.sleep(5)
- return True
- def check(sess_id: str, session: requests.session):
- print("Checking.......")
- d = get_servers(sess_id, session)
- flag = True
- for key, val in d.items():
- if val:
- flag = False
- log("[EUserv] ServerID: %s Renew Failed!" % key)
- if flag:
- log("[EUserv] ALL Work Done! Enjoy~")
- # Telegram Bot Push https://core.telegram.org/bots/api#authorizing-your-bot
- def telegram():
- data = (("chat_id", TG_USER_ID), ("text", "EUserv续费日志\n\n" + desp))
- response = requests.post(
- TG_API_HOST + "/bot" + TG_BOT_TOKEN + "/sendMessage", data=data
- )
- if response.status_code != 200:
- print("Telegram Bot 推送失败")
- else:
- print("Telegram Bot 推送成功")
- # Yandex mail notification
- def send_mail_by_yandex(
- to_email, from_email, subject, text, files, sender_email, sender_password
- ):
- msg = MIMEMultipart()
- msg["Subject"] = subject
- msg["From"] = from_email
- msg["To"] = to_email
- msg.attach(MIMEText(text, _charset="utf-8"))
- if files is not None:
- for file in files:
- file_name, file_content = file
- # print(file_name)
- part = MIMEApplication(file_content)
- part.add_header(
- "Content-Disposition", "attachment", filename=("gb18030", "", file_name)
- )
- msg.attach(part)
- s = SMTP_SSL("smtp.yandex.ru", 465)
- s.login(sender_email, sender_password)
- try:
- s.sendmail(msg["From"], msg["To"], msg.as_string())
- except SMTPDataError as e:
- raise e
- finally:
- s.close()
- # eMail push
- def email():
- msg = "EUserv 续费日志\n\n" + desp
- try:
- send_mail_by_yandex(
- RECEIVER_EMAIL, YD_EMAIL, "EUserv 续费日志", msg, None, YD_EMAIL, YD_APP_PWD
- )
- print("eMail 推送成功")
- except requests.exceptions.RequestException as e:
- print(str(e))
- print("eMail 推送失败")
- except SMTPDataError as e1:
- print(str(e1))
- print("eMail 推送失败")
- # Server酱 https://sct.ftqq.com
- def server_chan():
- data = {
- "title": "EUserv 续费日志",
- "desp": desp
- }
- response = requests.post(f"https://sctapi.ftqq.com/{SERVER_CHAN_SENDKEY}.send", data=data)
- if response.status_code != 200:
- print('Server 酱推送失败')
- else:
- print('Server 酱推送成功')
- if __name__ == "__main__":
- if not USERNAME or not PASSWORD:
- log("[EUserv] 你没有添加任何账户")
- exit(1)
- user_list = USERNAME.strip().split()
- passwd_list = PASSWORD.strip().split()
- mailparser_dl_url_id_list = MAILPARSER_DOWNLOAD_URL_ID.strip().split()
- if len(user_list) != len(passwd_list):
- log("[EUserv] The number of usernames and passwords do not match!")
- exit(1)
- if len(mailparser_dl_url_id_list) != len(user_list):
- log("[Mailparser] The number of mailparser_dl_url_ids and usernames do not match!")
- exit(1)
- for i in range(len(user_list)):
- print("*" * 30)
- log("[EUserv] 正在续费第 %d 个账号" % (i + 1))
- sessid, s = login(user_list[i], passwd_list[i])
- if sessid == "-1":
- log("[EUserv] 第 %d 个账号登陆失败,请检查登录信息" % (i + 1))
- continue
- SERVERS = get_servers(sessid, s)
- log("[EUserv] 检测到第 {} 个账号有 {} 台 VPS,正在尝试续期".format(i + 1, len(SERVERS)))
- for k, v in SERVERS.items():
- if v:
- if not renew(sessid, s, passwd_list[i], k, mailparser_dl_url_id_list[i]):
- log("[EUserv] ServerID: %s Renew Error!" % k)
- else:
- log("[EUserv] ServerID: %s has been successfully renewed!" % k)
- else:
- log("[EUserv] ServerID: %s does not need to be renewed" % k)
- time.sleep(15)
- check(sessid, s)
- time.sleep(5)
- TG_BOT_TOKEN and TG_USER_ID and TG_API_HOST and telegram()
- RECEIVER_EMAIL and YD_EMAIL and YD_APP_PWD and email()
- SERVER_CHAN_SENDKEY and server_chan()
- print("*" * 30)
|