|
@@ -0,0 +1,381 @@
|
|
|
+import json
|
|
|
+import socket
|
|
|
+import ssl
|
|
|
+import os
|
|
|
+import time
|
|
|
+import requests
|
|
|
+import sys
|
|
|
+import zipfile
|
|
|
+
|
|
|
+from io import BytesIO
|
|
|
+from random import randbytes
|
|
|
+from urllib.parse import urlparse
|
|
|
+from collections import defaultdict
|
|
|
+
|
|
|
+MINIMUM_PURGE_AGE = 9.75 * 24 * 60 * 60 # slightly less than 10 days
|
|
|
+TIMEOUT = 10
|
|
|
+SKIPPED_SERVICES = {"YouNow", "SHOWROOM", "Dacast"}
|
|
|
+SERVICES_FILE = "plugins/rtmp-services/data/services.json"
|
|
|
+PACKAGE_FILE = "plugins/rtmp-services/data/package.json"
|
|
|
+CACHE_FILE = "other/timestamps.json"
|
|
|
+GITHUB_OUTPUT_FILE = os.environ.get("GITHUB_OUTPUT", None)
|
|
|
+
|
|
|
+DO_NOT_PING = {"jp9000"}
|
|
|
+PR_MESSAGE = """This is an automatically created pull request to remove unresponsive servers and services.
|
|
|
+
|
|
|
+| Service | Action Taken | Author(s) |
|
|
|
+| ------- | ------------ | --------- |
|
|
|
+{table}
|
|
|
+
|
|
|
+If you are not responsible for an affected service and want to be excluded from future pings please let us know.
|
|
|
+
|
|
|
+Created by workflow run: https://github.com/{repository}/actions/runs/{run_id}"""
|
|
|
+
|
|
|
+# GQL is great isn't it
|
|
|
+GQL_QUERY = """{
|
|
|
+ repositoryOwner(login: "obsproject") {
|
|
|
+ repository(name: "obs-studio") {
|
|
|
+ object(expression: "master") {
|
|
|
+ ... on Commit {
|
|
|
+ blame(path: "plugins/rtmp-services/data/services.json") {
|
|
|
+ ranges {
|
|
|
+ startingLine
|
|
|
+ endingLine
|
|
|
+ commit {
|
|
|
+ author {
|
|
|
+ user {
|
|
|
+ login
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}"""
|
|
|
+
|
|
|
+context = ssl.create_default_context()
|
|
|
+
|
|
|
+
|
|
|
+def check_ftl_server(hostname) -> bool:
|
|
|
+ """Check if hostname resolves to a valid address - FTL handshake not implemented"""
|
|
|
+ try:
|
|
|
+ socket.getaddrinfo(hostname, 8084, proto=socket.IPPROTO_UDP)
|
|
|
+ except socket.gaierror as e:
|
|
|
+ print(f"⚠️ Could not resolve hostname for server: {hostname} (Exception: {e})")
|
|
|
+ return False
|
|
|
+ else:
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+def check_hls_server(uri) -> bool:
|
|
|
+ """Check if URL responds with status code < 500 and not 404, indicating that at least there's *something* there"""
|
|
|
+ try:
|
|
|
+ r = requests.post(uri, timeout=TIMEOUT)
|
|
|
+ if r.status_code >= 500 or r.status_code == 404:
|
|
|
+ raise Exception(f"Server responded with {r.status_code}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"⚠️ Could not connect to HLS server: {uri} (Exception: {e})")
|
|
|
+ return False
|
|
|
+ else:
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+def check_rtmp_server(uri) -> bool:
|
|
|
+ """Try connecting and sending a RTMP handshake (with SSL if necessary)"""
|
|
|
+ parsed = urlparse(uri)
|
|
|
+ hostname, port = parsed.netloc.partition(":")[::2]
|
|
|
+
|
|
|
+ if port:
|
|
|
+ port = int(port)
|
|
|
+ elif parsed.scheme == "rtmps":
|
|
|
+ port = 443
|
|
|
+ else:
|
|
|
+ port = 1935
|
|
|
+
|
|
|
+ try:
|
|
|
+ recv = b""
|
|
|
+ with socket.create_connection((hostname, port), timeout=TIMEOUT) as sock:
|
|
|
+ # RTMP handshake is \x03 + 4 bytes time (can be 0) + 4 zero bytes + 1528 bytes random
|
|
|
+ handshake = b"\x03\x00\x00\x00\x00\x00\x00\x00\x00" + randbytes(1528)
|
|
|
+ if parsed.scheme == "rtmps":
|
|
|
+ with context.wrap_socket(sock, server_hostname=hostname) as ssock:
|
|
|
+ ssock.sendall(handshake)
|
|
|
+ while True:
|
|
|
+ _tmp = ssock.recv(4096)
|
|
|
+ recv += _tmp
|
|
|
+ if len(recv) >= 1536 or not _tmp:
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ sock.sendall(handshake)
|
|
|
+ while True:
|
|
|
+ _tmp = sock.recv(4096)
|
|
|
+ recv += _tmp
|
|
|
+ if len(recv) >= 1536 or not _tmp:
|
|
|
+ break
|
|
|
+
|
|
|
+ if len(recv) < 1536 or recv[0] != 3:
|
|
|
+ raise ValueError("Invalid RTMP handshake received from server")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"⚠️ Connection to server failed: {uri} (Exception: {e})")
|
|
|
+ return False
|
|
|
+ else:
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+def get_last_artifact():
|
|
|
+ s = requests.session()
|
|
|
+ s.headers["Authorization"] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
|
|
|
+
|
|
|
+ run_id = os.environ["WORKFLOW_RUN_ID"]
|
|
|
+ repo = os.environ["REPOSITORY"]
|
|
|
+
|
|
|
+ # fetch run first, get workflow id from there to get workflow runs
|
|
|
+ r = s.get(f"https://api.github.com/repos/{repo}/actions/runs/{run_id}")
|
|
|
+ r.raise_for_status()
|
|
|
+ workflow_id = r.json()["workflow_id"]
|
|
|
+
|
|
|
+ r = s.get(
|
|
|
+ f"https://api.github.com/repos/{repo}/actions/workflows/{workflow_id}/runs",
|
|
|
+ params=dict(
|
|
|
+ per_page=1,
|
|
|
+ status="completed",
|
|
|
+ branch="master",
|
|
|
+ conclusion="success",
|
|
|
+ event="schedule",
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ r.raise_for_status()
|
|
|
+ runs = r.json()
|
|
|
+ if not runs["workflow_runs"]:
|
|
|
+ raise ValueError("No completed workflow runs found")
|
|
|
+
|
|
|
+ r = s.get(runs["workflow_runs"][0]["artifacts_url"])
|
|
|
+ r.raise_for_status()
|
|
|
+
|
|
|
+ for artifact in r.json()["artifacts"]:
|
|
|
+ if artifact["name"] == "timestamps":
|
|
|
+ artifact_url = artifact["archive_download_url"]
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ raise ValueError("No previous artifact found.")
|
|
|
+
|
|
|
+ r = s.get(artifact_url)
|
|
|
+ r.raise_for_status()
|
|
|
+ zip_data = BytesIO()
|
|
|
+ zip_data.write(r.content)
|
|
|
+
|
|
|
+ with zipfile.ZipFile(zip_data) as zip_ref:
|
|
|
+ for info in zip_ref.infolist():
|
|
|
+ if info.filename == "timestamps.json":
|
|
|
+ return json.loads(zip_ref.read(info.filename))
|
|
|
+
|
|
|
+
|
|
|
+def find_people_to_blame(raw_services: str, servers: list[tuple[str, str]]) -> dict:
|
|
|
+ if not servers:
|
|
|
+ return dict()
|
|
|
+
|
|
|
+ # Fetch Blame data from github
|
|
|
+ s = requests.session()
|
|
|
+ s.headers["Authorization"] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
|
|
|
+
|
|
|
+ r = s.post(
|
|
|
+ "https://api.github.com/graphql", json=dict(query=GQL_QUERY, variables=dict())
|
|
|
+ )
|
|
|
+ r.raise_for_status()
|
|
|
+ j = r.json()
|
|
|
+
|
|
|
+ # The file is only ~2600 lines so this isn't too crazy and makes the lookup very easy
|
|
|
+ line_author = dict()
|
|
|
+ for blame in j["data"]["repositoryOwner"]["repository"]["object"]["blame"][
|
|
|
+ "ranges"
|
|
|
+ ]:
|
|
|
+ for i in range(blame["startingLine"] - 1, blame["endingLine"]):
|
|
|
+ if user := blame["commit"]["author"]["user"]:
|
|
|
+ line_author[i] = user["login"]
|
|
|
+
|
|
|
+ service_authors = defaultdict(set)
|
|
|
+ for i, line in enumerate(raw_services.splitlines()):
|
|
|
+ if '"url":' not in line:
|
|
|
+ continue
|
|
|
+ for server, service in servers:
|
|
|
+ if server in line and (author := line_author.get(i)):
|
|
|
+ if author not in DO_NOT_PING:
|
|
|
+ service_authors[service].add(author)
|
|
|
+
|
|
|
+ return service_authors
|
|
|
+
|
|
|
+
|
|
|
+def set_output(name, value):
|
|
|
+ if not GITHUB_OUTPUT_FILE:
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ with open(GITHUB_OUTPUT_FILE, "a", encoding="utf-8", newline="\n") as f:
|
|
|
+ f.write(f"{name}={value}\n")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"Writing to github output files failed: {e!r}")
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ try:
|
|
|
+ with open(SERVICES_FILE, encoding="utf-8") as services_file:
|
|
|
+ raw_services = services_file.read()
|
|
|
+ services = json.loads(raw_services)
|
|
|
+ with open(PACKAGE_FILE, encoding="utf-8") as package_file:
|
|
|
+ package = json.load(package_file)
|
|
|
+ except OSError as e:
|
|
|
+ print(f"❌ Could not open services/package file: {e}")
|
|
|
+ return 1
|
|
|
+
|
|
|
+ # attempt to load last check result cache
|
|
|
+ try:
|
|
|
+ with open(CACHE_FILE, encoding="utf-8") as check_file:
|
|
|
+ fail_timestamps = json.load(check_file)
|
|
|
+ except OSError as e:
|
|
|
+ # cache might be evicted or not exist yet, so this is non-fatal
|
|
|
+ print(
|
|
|
+ f"⚠️ Could not read cache file, trying to get last artifact (Exception: {e})"
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ fail_timestamps = get_last_artifact()
|
|
|
+ except Exception as e:
|
|
|
+ print(f"⚠️ Could not fetch cache file, starting fresh. (Exception: {e})")
|
|
|
+ fail_timestamps = dict()
|
|
|
+ else:
|
|
|
+ print("Fetched cache file from last run artifact.")
|
|
|
+ else:
|
|
|
+ print("Successfully loaded cache file:", CACHE_FILE)
|
|
|
+
|
|
|
+ start_time = int(time.time())
|
|
|
+ affected_services = dict()
|
|
|
+ removed_servers = list()
|
|
|
+
|
|
|
+ # create temporary new list
|
|
|
+ new_services = services.copy()
|
|
|
+ new_services["services"] = []
|
|
|
+
|
|
|
+ for service in services["services"]:
|
|
|
+ # skip services that do custom stuff that we can't easily check
|
|
|
+ if service["name"] in SKIPPED_SERVICES:
|
|
|
+ new_services["services"].append(service)
|
|
|
+ continue
|
|
|
+
|
|
|
+ service_type = service.get("recommended", {}).get("output", "rtmp_output")
|
|
|
+ if service_type not in {"rtmp_output", "ffmpeg_hls_muxer", "ftl_output"}:
|
|
|
+ print("Unknown service type:", service_type)
|
|
|
+ new_services["services"].append(service)
|
|
|
+ continue
|
|
|
+
|
|
|
+ # create a copy to mess with
|
|
|
+ new_service = service.copy()
|
|
|
+ new_service["servers"] = []
|
|
|
+
|
|
|
+ # run checks for all the servers, and store results in timestamp cache
|
|
|
+ for server in service["servers"]:
|
|
|
+ if service_type == "ftl_output":
|
|
|
+ is_ok = check_ftl_server(server["url"])
|
|
|
+ elif service_type == "ffmpeg_hls_muxer":
|
|
|
+ is_ok = check_hls_server(server["url"])
|
|
|
+ else: # rtmp
|
|
|
+ is_ok = check_rtmp_server(server["url"])
|
|
|
+
|
|
|
+ if not is_ok:
|
|
|
+ if ts := fail_timestamps.get(server["url"], None):
|
|
|
+ if (delta := start_time - ts) >= MINIMUM_PURGE_AGE:
|
|
|
+ print(
|
|
|
+ f'🗑️ Purging server "{server["url"]}", it has been '
|
|
|
+ f"unresponsive for {round(delta/60/60/24)} days."
|
|
|
+ )
|
|
|
+ removed_servers.append((server["url"], service["name"]))
|
|
|
+ # continuing here means not adding it to the new list, thus dropping it
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ fail_timestamps[server["url"]] = start_time
|
|
|
+ elif is_ok and server["url"] in fail_timestamps:
|
|
|
+ # remove timestamp of failed check if server is back
|
|
|
+ delta = start_time - fail_timestamps[server["url"]]
|
|
|
+ print(
|
|
|
+ f'💡 Server "{server["url"]}" is back after {round(delta/60/60/24)} days!'
|
|
|
+ )
|
|
|
+ del fail_timestamps[server["url"]]
|
|
|
+
|
|
|
+ new_service["servers"].append(server)
|
|
|
+
|
|
|
+ if (diff := len(service["servers"]) - len(new_service["servers"])) > 0:
|
|
|
+ print(f'ℹ️ Removed {diff} server(s) from {service["name"]}')
|
|
|
+ affected_services[service["name"]] = f"{diff} servers removed"
|
|
|
+
|
|
|
+ # remove services with no valid servers
|
|
|
+ if not new_service["servers"]:
|
|
|
+ print(f'💀 Service "{service["name"]}" has no valid servers left, removing!')
|
|
|
+ affected_services[service["name"]] = f"Service removed"
|
|
|
+ continue
|
|
|
+
|
|
|
+ new_services["services"].append(new_service)
|
|
|
+
|
|
|
+ # write cache file
|
|
|
+ try:
|
|
|
+ os.makedirs("other", exist_ok=True)
|
|
|
+ with open(CACHE_FILE, "w", encoding="utf-8") as cache_file:
|
|
|
+ json.dump(fail_timestamps, cache_file)
|
|
|
+ except OSError as e:
|
|
|
+ print(f"❌ Could not write cache file: {e}")
|
|
|
+ return 1
|
|
|
+ else:
|
|
|
+ print("Successfully wrote cache file:", CACHE_FILE)
|
|
|
+
|
|
|
+ if removed_servers:
|
|
|
+ # increment package version and save that as well
|
|
|
+ package["version"] += 1
|
|
|
+ package["files"][0]["version"] += 1
|
|
|
+
|
|
|
+ try:
|
|
|
+ with open(SERVICES_FILE, "w", encoding="utf-8") as services_file:
|
|
|
+ json.dump(new_services, services_file, indent=4, ensure_ascii=False)
|
|
|
+ services_file.write("\n")
|
|
|
+
|
|
|
+ with open(PACKAGE_FILE, "w", encoding="utf-8") as package_file:
|
|
|
+ json.dump(package, package_file, indent=4)
|
|
|
+ package_file.write("\n")
|
|
|
+ except OSError as e:
|
|
|
+ print(f"❌ Could not write services/package file: {e}")
|
|
|
+ return 1
|
|
|
+ else:
|
|
|
+ print(
|
|
|
+ f"Successfully wrote services/package files:\n- {SERVICES_FILE}\n- {PACKAGE_FILE}"
|
|
|
+ )
|
|
|
+
|
|
|
+ # try to find authors to ping, this is optional and is allowed to fail
|
|
|
+ try:
|
|
|
+ service_authors = find_people_to_blame(raw_services, removed_servers)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"⚠ Could not fetch blame for some reason: {e}")
|
|
|
+ service_authors = dict()
|
|
|
+
|
|
|
+ # set GitHub outputs
|
|
|
+ set_output("make_pr", "true")
|
|
|
+ msg = PR_MESSAGE.format(
|
|
|
+ repository=os.environ["REPOSITORY"],
|
|
|
+ run_id=os.environ["WORKFLOW_RUN_ID"],
|
|
|
+ table="\n".join(
|
|
|
+ "| {name} | {action} | {authors} |".format(
|
|
|
+ name=name.replace("|", "\\|"),
|
|
|
+ action=action,
|
|
|
+ authors=", ".join(
|
|
|
+ f"@{author}" for author in sorted(service_authors.get(name, []))
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ for name, action in sorted(affected_services.items())
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ set_output("pr_message", json.dumps(msg))
|
|
|
+ else:
|
|
|
+ set_output("make_pr", "false")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ sys.exit(main())
|