| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553 | #!/usr/bin/env python3## SPDX-FileCopyrightText: (c) 2020-2021 CokeMine & Its repository contributors# SPDX-FileCopyrightText: (c) 2021 A beam of light## SPDX-License-Identifier: GPL-3.0-or-later#"""euserv auto-renew scriptChangeLogv2021.09.30- Captcha automatic recognition using TrueCaptcha API- Email notification- Add login failure retry mechanism- reformat log infov2021.11.06- Receive renew PIN(6-digits) using mailparser parsed data download url  workflow: auto-forward your EUserv PIN email to your mailparser inbox   -> parsing PIN via mailparser -> get PIN from mailparser- Update kc2_security_password_get_token requestv2021.11.26- Adjust TrueCaptcha constraint parameters for high availability.  Plus, the CAPTCHA of EUserv is currently case-insensitive, so the above adjustment works."""import osimport reimport jsonimport timeimport base64from email.mime.application import MIMEApplicationfrom email.mime.multipart import MIMEMultipartfrom email.mime.text import MIMETextfrom smtplib import SMTP_SSL, SMTPDataErrorimport requestsfrom bs4 import BeautifulSoup# 多个账户请使用空格隔开USERNAME = os.environ["USERNAME"]  # 用户名或邮箱PASSWORD = os.environ["PASSWORD"]  # 密码# default value is TrueCaptcha demo credential,# you can use your own credential via set environment variables:# TRUECAPTCHA_USERID and TRUECAPTCHA_APIKEY# demo: https://apitruecaptcha.org/demo# demo2: https://apitruecaptcha.org/demo2# demo apikey also has a limit of 100 times per day# {# 'error': '101.0 above free usage limit 100 per day and no balance',# 'requestId': '7690c065-70e0-4757-839b-5fd8381e65c7'# }TRUECAPTCHA_USERID = os.environ.get("TRUECAPTCHA_USERID", "arun56")TRUECAPTCHA_APIKEY = os.environ.get("TRUECAPTCHA_APIKEY", "wMjXmBIcHcdYqO2RrsVN")# Extract key data from your emails, automatically. https://mailparser.io # 30 Emails/Month, 10 inboxes and unlimited downloads for free.# 多个mailparser下载链接id请使用空格隔开, 顺序与 EUserv 账号/邮箱一一对应MAILPARSER_DOWNLOAD_URL_ID = os.environ["MAILPARSER_DOWNLOAD_URL_ID"]# mailparser.io parsed data download base urlMAILPARSER_DOWNLOAD_BASE_URL = "https://files.mailparser.io/d/"# Telegram Bot Push https://core.telegram.org/bots/api#authorizing-your-botTG_BOT_TOKEN = ""  # 通过 @BotFather 申请获得,示例:1077xxx4424:AAFjv0FcqxxxxxxgEMGfi22B4yh15R5uwTG_USER_ID = ""  # 用户、群组或频道 ID,示例:129xxx206TG_API_HOST = "https://api.telegram.org"  # 自建 API 反代地址,供网络环境无法访问时使用,网络正常则保持默认# Email notificationRECEIVER_EMAIL = os.environ.get("RECEIVER_EMAIL", "")YD_EMAIL = os.environ.get("YD_EMAIL", "")YD_APP_PWD = os.environ.get("YD_APP_PWD", "")  # yandex mail 使用第三方 APP 授权码# Server 酱(ServerChan) https://sct.ftqq.com# 免费额度: 每天最多发送 5 条, 卡片仅显示标题, 每天 API 最大请求 1000 次, 每分钟最多发送 5 条.SERVER_CHAN_SENDKEY = os.environ.get("SERVER_CHAN_SENDKEY", "")  # Server 酱的 SENDKEY,无需推送可忽略# Magic internet accessPROXIES = {"http": "http://127.0.0.1:10808", "https": "http://127.0.0.1:10808"}# Maximum number of login retryLOGIN_MAX_RETRY_COUNT = 5# Waiting time of receiving PIN, units are seconds.WAITING_TIME_OF_PIN = 15# options: True or FalseCHECK_CAPTCHA_SOLVER_USAGE = Trueuser_agent = (    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "    "Chrome/95.0.4638.69 Safari/537.36")desp = ""  # 空值def log(info: str):    print(info)    global desp    desp = desp + info + "\n\n"def login_retry(*args, **kwargs):    def wrapper(func):        def inner(username, password):            ret, ret_session = func(username, password)            max_retry = kwargs.get("max_retry")            # default retry 3 times            if not max_retry:                max_retry = 3            number = 0            if ret == "-1":                while number < max_retry:                    number += 1                    if number > 1:                        ordinal = lambda n: "{}{}".format(n,"tsnrhtdd"[(n/10%10!=1)*(n%10<4)*n%10::4])                        log(f"[EUserv] Login tried the {ordinal(number)} time.")                    sess_id, session = func(username, password)                    if sess_id != "-1":                        return sess_id, session                    else:                        if number == max_retry:                            return sess_id, session            else:                return ret, ret_session        return inner    return wrapperdef captcha_solver(captcha_image_url: str, session: requests.session) -> dict:    """    TrueCaptcha API doc: https://apitruecaptcha.org/api    Free to use 100 requests per day.    -- response::    {        "result": "", ==> Or "result": 0        "conf": 0.85,         "usage": 0,         "requestId": "ed0006e5-69f0-4617-b698-97dc054f9022",         "version": "dev2"    }    """    response = session.get(captcha_image_url)    encoded_string = base64.b64encode(response.content)    url = "https://api.apitruecaptcha.org/one/gettext"    # Since "case": "mixed", "mode": "human"     # can sometimes cause internal errors in the truecaptcha server.     # So a more relaxed constraint(lower/upper & default) is used here.    # Plus, the CAPTCHA of EUserv is currently case-insensitive, so the below adjustment works.    data = {        "userid": TRUECAPTCHA_USERID,        "apikey": TRUECAPTCHA_APIKEY,        # case sensitivity of text (upper | lower| mixed)        "case": "lower",        # use human or AI (human | default)        "mode": "default",        "data": str(encoded_string)[2:-1],    }    r = requests.post(url=url, json=data)    j = json.loads(r.text)    return jdef handle_captcha_solved_result(solved: dict) -> str:    """Since CAPTCHA sometimes appears as a very simple binary arithmetic expression.    But since recognition sometimes doesn't show the result of the calculation directly,    that's what this function is for.    """    if "result" in solved:        solved_result = solved["result"]        if isinstance(solved_result, str):            if "RESULT  IS" in solved_result:                log("[Captcha Solver] You are using the demo apikey.")                print("There is no guarantee that demo apikey will work in the future!")                # because using demo apikey                text = re.findall(r"RESULT  IS . (.*) .", solved_result)[0]            else:                # using your own apikey                log("[Captcha Solver] You are using your own apikey.")                text = solved_result            operators = ["X", "x", "+", "-"]            if any(x in text for x in operators):                for operator in operators:                    operator_pos = text.find(operator)                    if operator == "x" or operator == "X":                        operator = "*"                    if operator_pos != -1:                        left_part = text[:operator_pos]                        right_part = text[operator_pos + 1 :]                        if left_part.isdigit() and right_part.isdigit():                            return eval(                                "{left} {operator} {right}".format(                                    left=left_part, operator=operator, right=right_part                                )                            )                        else:                            # Because these symbols("X", "x", "+", "-") do not appear at the same time,                            # it just contains an arithmetic symbol.                            return text            else:                return text        else:            print(f"[Captcha Solver] Returned JSON: {solved}")            log("[Captcha Solver] Service Exception!")            raise ValueError("[Captcha Solver] Service Exception!")    else:        print(f"[Captcha Solver] Returned JSON: {solved}")        log("[Captcha Solver] Failed to find parsed results!")        raise KeyError("[Captcha Solver] Failed to find parsed results!")def get_captcha_solver_usage() -> dict:    url = "https://api.apitruecaptcha.org/one/getusage"    params = {        "username": TRUECAPTCHA_USERID,        "apikey": TRUECAPTCHA_APIKEY,    }    r = requests.get(url=url, params=params)    j = json.loads(r.text)    return jdef get_pin_from_mailparser(url_id: str) -> str:    """    response format:    [      {        "id": "83b95f50f6202fb03950afbe00975eab",        "received_at": "2021-11-06 02:30:07",  ==> up to mailparser account timezone setting, here is UTC 0000.        "processed_at": "2021-11-06 02:30:07",        "pin": "123456"      }    ]    """    response = requests.get(        f"{MAILPARSER_DOWNLOAD_BASE_URL}{url_id}",        # Mailparser parsed data download using Basic Authentication.        # auth=("<your mailparser username>", "<your mailparser password>")    )    pin = response.json()[0]["pin"]    return pin@login_retry(max_retry=LOGIN_MAX_RETRY_COUNT)def login(username: str, password: str) -> (str, requests.session):    headers = {"user-agent": user_agent, "origin": "https://www.euserv.com"}    url = "https://support.euserv.com/index.iphp"    captcha_image_url = "https://support.euserv.com/securimage_show.php"    session = requests.Session()    sess = session.get(url, headers=headers)    sess_id = re.findall("PHPSESSID=(\\w{10,100});", str(sess.headers))[0]    # visit png    logo_png_url = "https://support.euserv.com/pic/logo_small.png"    session.get(logo_png_url, headers=headers)    login_data = {        "email": username,        "password": password,        "form_selected_language": "en",        "Submit": "Login",        "subaction": "login",        "sess_id": sess_id,    }    f = session.post(url, headers=headers, data=login_data)    f.raise_for_status()    if (        f.text.find("Hello") == -1        and f.text.find("Confirm or change your customer data here") == -1    ):        if (            f.text.find(                "To finish the login process please solve the following captcha."            )            == -1        ):            return "-1", session        else:            log("[Captcha Solver] 进行验证码识别...")            solved_result = captcha_solver(captcha_image_url, session)            try:                captcha_code = handle_captcha_solved_result(solved_result)                log("[Captcha Solver] 识别的验证码是: {}".format(captcha_code))                if CHECK_CAPTCHA_SOLVER_USAGE:                    usage = get_captcha_solver_usage()                    log(                        "[Captcha Solver] current date {0} api usage count: {1}".format(                            usage[0]["date"], usage[0]["count"]                        )                    )                f2 = session.post(                    url,                    headers=headers,                    data={                        "subaction": "login",                        "sess_id": sess_id,                        "captcha_code": captcha_code,                    },                )                if (                    f2.text.find(                        "To finish the login process please solve the following captcha."                    )                    == -1                ):                    log("[Captcha Solver] 验证通过")                    return sess_id, session                else:                    log("[Captcha Solver] 验证失败")                    return "-1", session            except (KeyError, ValueError):                return "-1", session    else:        return sess_id, sessiondef get_servers(sess_id: str, session: requests.session) -> {}:    d = {}    url = "https://support.euserv.com/index.iphp?sess_id=" + sess_id    headers = {"user-agent": user_agent, "origin": "https://www.euserv.com"}    f = session.get(url=url, headers=headers)    f.raise_for_status()    soup = BeautifulSoup(f.text, "html.parser")    for tr in soup.select(        "#kc2_order_customer_orders_tab_content_1 .kc2_order_table.kc2_content_table tr"    ):        server_id = tr.select(".td-z1-sp1-kc")        if not len(server_id) == 1:            continue        flag = (            True            if tr.select(".td-z1-sp2-kc .kc2_order_action_container")[0]            .get_text()            .find("Contract extension possible from")            == -1            else False        )        d[server_id[0].get_text()] = flag    return ddef renew(    sess_id: str, session: requests.session, password: str, order_id: str, mailparser_dl_url_id: str) -> bool:    url = "https://support.euserv.com/index.iphp"    headers = {        "user-agent": user_agent,        "Host": "support.euserv.com",        "origin": "https://support.euserv.com",        "Referer": "https://support.euserv.com/index.iphp",    }    data = {        "Submit": "Extend contract",        "sess_id": sess_id,        "ord_no": order_id,        "subaction": "choose_order",        "choose_order_subaction": "show_contract_details",    }    session.post(url, headers=headers, data=data)    # pop up 'Security Check' window, it will trigger 'send PIN' automatically.    session.post(        url,        headers=headers,        data={            "sess_id": sess_id,            "subaction": "show_kc2_security_password_dialog",            "prefix": "kc2_customer_contract_details_extend_contract_",            "type": "1",        },    )    # # trigger 'Send new PIN to your Email-Address' (optional),    # new_pin = session.post(url, headers=headers, data={    #     "sess_id": sess_id,    #     "subaction": "kc2_security_password_send_pin",    #     "ident": f"kc2_customer_contract_details_extend_contract_{order_id}"    # })    # if not json.loads(new_pin.text)["rc"] == "100":    #     print("new PIN Not Sended")    #     return False    # sleep WAITING_TIME_OF_PIN seconds waiting for mailparser email parsed PIN    time.sleep(WAITING_TIME_OF_PIN)    pin = get_pin_from_mailparser(mailparser_dl_url_id)    log(f"[MailParser] PIN: {pin}")    # using PIN instead of password to get token    data = {        "auth": pin,        "sess_id": sess_id,        "subaction": "kc2_security_password_get_token",        "prefix": "kc2_customer_contract_details_extend_contract_",        "type": 1,        "ident": f"kc2_customer_contract_details_extend_contract_{order_id}",    }    f = session.post(url, headers=headers, data=data)    f.raise_for_status()    if not json.loads(f.text)["rs"] == "success":        return False    token = json.loads(f.text)["token"]["value"]    data = {        "sess_id": sess_id,        "ord_id": order_id,        "subaction": "kc2_customer_contract_details_extend_contract_term",        "token": token,    }    session.post(url, headers=headers, data=data)    time.sleep(5)    return Truedef check(sess_id: str, session: requests.session):    print("Checking.......")    d = get_servers(sess_id, session)    flag = True    for key, val in d.items():        if val:            flag = False            log("[EUserv] ServerID: %s Renew Failed!" % key)    if flag:        log("[EUserv] ALL Work Done! Enjoy~")# Telegram Bot Push https://core.telegram.org/bots/api#authorizing-your-botdef telegram():    data = (("chat_id", TG_USER_ID), ("text", "EUserv续费日志\n\n" + desp))    response = requests.post(        TG_API_HOST + "/bot" + TG_BOT_TOKEN + "/sendMessage", data=data    )    if response.status_code != 200:        print("Telegram Bot 推送失败")    else:        print("Telegram Bot 推送成功")# Yandex mail notificationdef send_mail_by_yandex(    to_email, from_email, subject, text, files, sender_email, sender_password):    msg = MIMEMultipart()    msg["Subject"] = subject    msg["From"] = from_email    msg["To"] = to_email    msg.attach(MIMEText(text, _charset="utf-8"))    if files is not None:        for file in files:            file_name, file_content = file            # print(file_name)            part = MIMEApplication(file_content)            part.add_header(                "Content-Disposition", "attachment", filename=("gb18030", "", file_name)            )            msg.attach(part)    s = SMTP_SSL("smtp.yandex.ru", 465)    s.login(sender_email, sender_password)    try:        s.sendmail(msg["From"], msg["To"], msg.as_string())    except SMTPDataError as e:        raise e    finally:        s.close()# eMail pushdef email():    msg = "EUserv 续费日志\n\n" + desp    try:        send_mail_by_yandex(            RECEIVER_EMAIL, YD_EMAIL, "EUserv 续费日志", msg, None, YD_EMAIL, YD_APP_PWD        )        print("eMail 推送成功")    except requests.exceptions.RequestException as e:        print(str(e))        print("eMail 推送失败")    except SMTPDataError as e1:        print(str(e1))        print("eMail 推送失败")# Server酱 https://sct.ftqq.comdef server_chan():    data = {        "title": "EUserv 续费日志",        "desp": desp    }    response = requests.post(f"https://sctapi.ftqq.com/{SERVER_CHAN_SENDKEY}.send", data=data)    if response.status_code != 200:        print('Server 酱推送失败')    else:        print('Server 酱推送成功')def main_handler(event, context):    if not USERNAME or not PASSWORD:        log("[EUserv] 你没有添加任何账户")        exit(1)    user_list = USERNAME.strip().split()    passwd_list = PASSWORD.strip().split()    mailparser_dl_url_id_list = MAILPARSER_DOWNLOAD_URL_ID.strip().split()    if len(user_list) != len(passwd_list):        log("[EUserv] The number of usernames and passwords do not match!")        exit(1)    if len(mailparser_dl_url_id_list) != len(user_list):        log("[Mailparser] The number of mailparser_dl_url_ids and usernames do not match!")        exit(1)    for i in range(len(user_list)):        print("*" * 30)        log("[EUserv] 正在续费第 %d 个账号" % (i + 1))        sessid, s = login(user_list[i], passwd_list[i])        if sessid == "-1":            log("[EUserv] 第 %d 个账号登陆失败,请检查登录信息" % (i + 1))            continue        SERVERS = get_servers(sessid, s)        log("[EUserv] 检测到第 {} 个账号有 {} 台 VPS,正在尝试续期".format(i + 1, len(SERVERS)))        for k, v in SERVERS.items():            if v:                if not renew(sessid, s, passwd_list[i], k, mailparser_dl_url_id_list[i]):                    log("[EUserv] ServerID: %s Renew Error!" % k)                else:                    log("[EUserv] ServerID: %s has been successfully renewed!" % k)            else:                log("[EUserv] ServerID: %s does not need to be renewed" % k)        time.sleep(15)        check(sessid, s)        time.sleep(5)    TG_BOT_TOKEN and TG_USER_ID and TG_API_HOST and telegram()    RECEIVER_EMAIL and YD_EMAIL and YD_APP_PWD and email()    SERVER_CHAN_SENDKEY and server_chan()    print("*" * 30)# only for debug# if __name__ == '__main__':#     main_handler(None, None)
 |