123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- import asyncio
- import json
- import os
- import time
- import sys
- import zipfile
- import aiohttp
- import requests
- from io import BytesIO
- from typing import List, Dict
- from collections import defaultdict
- MINIMUM_PURGE_AGE = 9.75 * 24 * 60 * 60 # slightly less than 10 days
- TIMEOUT = 10
- SKIPPED_SERVICES = {"SHOWROOM", "Dacast"}
- SERVICES_FILE = "plugins/rtmp-services/data/services.json"
- PACKAGE_FILE = "plugins/rtmp-services/data/package.json"
- CACHE_FILE = "other/timestamps.json"
- GITHUB_OUTPUT_FILE = os.environ.get("GITHUB_OUTPUT", None)
- DO_NOT_PING = {"jp9000"}
- PR_MESSAGE = """This is an automatically created pull request to remove unresponsive servers and services.
- | Service | Action Taken | Author(s) |
- | ------- | ------------ | --------- |
- {table}
- If you are not responsible for an affected service and want to be excluded from future pings please let us know.
- Created by workflow run: https://github.com/{repository}/actions/runs/{run_id}"""
- # GQL is great isn't it
- GQL_QUERY = """{
- repositoryOwner(login: "obsproject") {
- repository(name: "obs-studio") {
- object(expression: "master") {
- ... on Commit {
- blame(path: "plugins/rtmp-services/data/services.json") {
- ranges {
- startingLine
- endingLine
- commit {
- author {
- user {
- login
- }
- }
- }
- }
- }
- }
- }
- }
- }
- }"""
- def get_last_artifact():
- s = requests.session()
- s.headers["Authorization"] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
- run_id = os.environ["WORKFLOW_RUN_ID"]
- repo = os.environ["REPOSITORY"]
- # fetch run first, get workflow id from there to get workflow runs
- r = s.get(f"https://api.github.com/repos/{repo}/actions/runs/{run_id}")
- r.raise_for_status()
- workflow_id = r.json()["workflow_id"]
- r = s.get(
- f"https://api.github.com/repos/{repo}/actions/workflows/{workflow_id}/runs",
- params=dict(
- per_page=1,
- status="completed",
- branch="master",
- conclusion="success",
- event="schedule",
- ),
- )
- r.raise_for_status()
- runs = r.json()
- if not runs["workflow_runs"]:
- raise ValueError("No completed workflow runs found")
- r = s.get(runs["workflow_runs"][0]["artifacts_url"])
- r.raise_for_status()
- for artifact in r.json()["artifacts"]:
- if artifact["name"] == "timestamps":
- artifact_url = artifact["archive_download_url"]
- break
- else:
- raise ValueError("No previous artifact found.")
- r = s.get(artifact_url)
- r.raise_for_status()
- zip_data = BytesIO()
- zip_data.write(r.content)
- with zipfile.ZipFile(zip_data) as zip_ref:
- for info in zip_ref.infolist():
- if info.filename == "timestamps.json":
- return json.loads(zip_ref.read(info.filename))
- def find_people_to_blame(raw_services: str, servers: list[tuple[str, str]]) -> dict:
- if not servers:
- return dict()
- # Fetch Blame data from github
- s = requests.session()
- s.headers["Authorization"] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
- r = s.post(
- "https://api.github.com/graphql", json=dict(query=GQL_QUERY, variables=dict())
- )
- r.raise_for_status()
- j = r.json()
- # The file is only ~2600 lines so this isn't too crazy and makes the lookup very easy
- line_author = dict()
- for blame in j["data"]["repositoryOwner"]["repository"]["object"]["blame"][
- "ranges"
- ]:
- for i in range(blame["startingLine"] - 1, blame["endingLine"]):
- if user := blame["commit"]["author"]["user"]:
- line_author[i] = user["login"]
- service_authors = defaultdict(set)
- for i, line in enumerate(raw_services.splitlines()):
- if '"url":' not in line:
- continue
- for server, service in servers:
- if server in line and (author := line_author.get(i)):
- if author not in DO_NOT_PING:
- service_authors[service].add(author)
- return service_authors
- def set_output(name, value):
- if not GITHUB_OUTPUT_FILE:
- return
- try:
- with open(GITHUB_OUTPUT_FILE, "a", encoding="utf-8", newline="\n") as f:
- f.write(f"{name}={value}\n")
- except Exception as e:
- print(f"Writing to github output files failed: {e!r}")
- async def check_servers_task(
- session: aiohttp.ClientSession, host: str, protocol: str, servers: List[str]
- ) -> List[Dict]:
- query = [dict(url=h, protocol=protocol) for h in servers]
- async with session.get(
- f"http://{host}:8999/test_remote_servers", json=query
- ) as resp:
- return await resp.json()
- async def process_services(session: aiohttp.ClientSession, check_servers: List[str]):
- try:
- with open(SERVICES_FILE, encoding="utf-8") as services_file:
- raw_services = services_file.read()
- services = json.loads(raw_services)
- with open(PACKAGE_FILE, encoding="utf-8") as package_file:
- package = json.load(package_file)
- except OSError as e:
- print(f"❌ Could not open services/package file: {e}")
- return 1
- # attempt to load last check result cache
- try:
- with open(CACHE_FILE, encoding="utf-8") as check_file:
- fail_timestamps = json.load(check_file)
- except OSError as e:
- # cache might be evicted or not exist yet, so this is non-fatal
- print(
- f"⚠️ Could not read cache file, trying to get last artifact (Exception: {e})"
- )
- try:
- fail_timestamps = get_last_artifact()
- except Exception as e:
- print(f"⚠️ Could not fetch cache file, starting fresh. (Exception: {e})")
- fail_timestamps = dict()
- else:
- print("Fetched cache file from last run artifact.")
- else:
- print("Successfully loaded cache file:", CACHE_FILE)
- start_time = int(time.time())
- affected_services = dict()
- removed_servers = list()
- # create temporary new list
- new_services = services.copy()
- new_services["services"] = []
- for service in services["services"]:
- # skip services that do custom stuff that we can't easily check
- if service["name"] in SKIPPED_SERVICES:
- new_services["services"].append(service)
- continue
- service_type = service.get("recommended", {}).get("output", "rtmp_output")
- if service_type not in {"rtmp_output", "ffmpeg_hls_muxer"}:
- print("Unknown service type:", service_type)
- new_services["services"].append(service)
- continue
- protocol = "rtmp" if service_type == "rtmp_output" else "hls"
- # create a copy to mess with
- new_service = service.copy()
- new_service["servers"] = []
- # run checks for all the servers, and store results in timestamp cache
- try:
- servers = [s["url"] for s in service["servers"]]
- tasks = []
- for host in check_servers:
- tasks.append(
- asyncio.create_task(
- check_servers_task(session, host, protocol, servers)
- )
- )
- results = await asyncio.gather(*tasks)
- except Exception as e:
- print(
- f"❌ Querying server status for \"{service['name']}\" failed with: {e}"
- )
- return 1
- # go over results
- for server, result in zip(service["servers"], zip(*results)):
- failure_count = sum(not res["status"] for res in result)
- probe_count = len(result)
- # only treat server as failed if all check servers reported a failure
- is_ok = failure_count < probe_count
- if not is_ok:
- failures = {res["comment"] for res in result if not res["status"]}
- print(
- f"⚠️ Connecting to server failed: {server['url']} (Reason(s): {failures})"
- )
- if ts := fail_timestamps.get(server["url"], None):
- if (delta := start_time - ts) >= MINIMUM_PURGE_AGE:
- print(
- f'🗑️ Purging server "{server["url"]}", it has been '
- f"unresponsive for {round(delta/60/60/24)} days."
- )
- removed_servers.append((server["url"], service["name"]))
- # continuing here means not adding it to the new list, thus dropping it
- continue
- else:
- fail_timestamps[server["url"]] = start_time
- elif is_ok and server["url"] in fail_timestamps:
- # remove timestamp of failed check if server is back
- delta = start_time - fail_timestamps[server["url"]]
- print(
- f'💡 Server "{server["url"]}" is back after {round(delta/60/60/24)} days!'
- )
- del fail_timestamps[server["url"]]
- new_service["servers"].append(server)
- if (diff := len(service["servers"]) - len(new_service["servers"])) > 0:
- print(f'ℹ️ Removed {diff} server(s) from {service["name"]}')
- affected_services[service["name"]] = f"{diff} servers removed"
- # remove services with no valid servers
- if not new_service["servers"]:
- print(f'💀 Service "{service["name"]}" has no valid servers left, removing!')
- affected_services[service["name"]] = f"Service removed"
- else:
- new_services["services"].append(new_service)
- # wait a bit between services
- await asyncio.sleep(2.0)
- # write cache file
- try:
- os.makedirs("other", exist_ok=True)
- with open(CACHE_FILE, "w", encoding="utf-8") as cache_file:
- json.dump(fail_timestamps, cache_file)
- except OSError as e:
- print(f"❌ Could not write cache file: {e}")
- return 1
- else:
- print("Successfully wrote cache file:", CACHE_FILE)
- if removed_servers:
- # increment package version and save that as well
- package["version"] += 1
- package["files"][0]["version"] += 1
- try:
- with open(SERVICES_FILE, "w", encoding="utf-8") as services_file:
- json.dump(new_services, services_file, indent=4, ensure_ascii=False)
- services_file.write("\n")
- with open(PACKAGE_FILE, "w", encoding="utf-8") as package_file:
- json.dump(package, package_file, indent=4)
- package_file.write("\n")
- except OSError as e:
- print(f"❌ Could not write services/package file: {e}")
- return 1
- else:
- print(
- f"Successfully wrote services/package files:\n- {SERVICES_FILE}\n- {PACKAGE_FILE}"
- )
- # try to find authors to ping, this is optional and is allowed to fail
- try:
- service_authors = find_people_to_blame(raw_services, removed_servers)
- except Exception as e:
- print(f"⚠ Could not fetch blame for some reason: {e}")
- service_authors = dict()
- # set GitHub outputs
- set_output("make_pr", "true")
- msg = PR_MESSAGE.format(
- repository=os.environ["REPOSITORY"],
- run_id=os.environ["WORKFLOW_RUN_ID"],
- table="\n".join(
- "| {name} | {action} | {authors} |".format(
- name=name.replace("|", "\\|"),
- action=action,
- authors=", ".join(
- f"@{author}" for author in sorted(service_authors.get(name, []))
- ),
- )
- for name, action in sorted(affected_services.items())
- ),
- )
- set_output("pr_message", json.dumps(msg))
- else:
- set_output("make_pr", "false")
- async def main():
- # check for environment variables
- try:
- api_key = os.environ["API_KEY"]
- servers = os.environ["API_SERVERS"].split(",")
- if not servers:
- raise ValueError("No checker servers!")
- # Mask everything except the region code
- for server in servers:
- prefix = server[: server.index(".") + 1]
- print(f"::add-mask::{prefix}")
- suffix = server[server.index(".", len(prefix)) :]
- print(f"::add-mask::{suffix}")
- except Exception as e:
- print(f"❌ Failed getting required environment variables: {e}")
- return 1
- # create aiohttp session
- async with aiohttp.ClientSession() as session:
- session.headers["Authorization"] = api_key
- session.headers["User-Agent"] = "OBS Repo Service Checker/1.0"
- return await process_services(session, servers)
- if __name__ == "__main__":
- sys.exit(asyncio.run(main()))
|