check-services.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. import asyncio
  2. import json
  3. import os
  4. import time
  5. import sys
  6. import zipfile
  7. import aiohttp
  8. import requests
  9. from io import BytesIO
  10. from typing import List, Dict
  11. from collections import defaultdict
  12. MINIMUM_PURGE_AGE = 9.75 * 24 * 60 * 60 # slightly less than 10 days
  13. TIMEOUT = 10
  14. SKIPPED_SERVICES = {"SHOWROOM", "Dacast"}
  15. SERVICES_FILE = "plugins/rtmp-services/data/services.json"
  16. PACKAGE_FILE = "plugins/rtmp-services/data/package.json"
  17. CACHE_FILE = "other/timestamps.json"
  18. GITHUB_OUTPUT_FILE = os.environ.get("GITHUB_OUTPUT", None)
  19. DO_NOT_PING = {"jp9000"}
  20. PR_MESSAGE = """This is an automatically created pull request to remove unresponsive servers and services.
  21. | Service | Action Taken | Author(s) |
  22. | ------- | ------------ | --------- |
  23. {table}
  24. If you are not responsible for an affected service and want to be excluded from future pings please let us know.
  25. Created by workflow run: https://github.com/{repository}/actions/runs/{run_id}"""
  26. # GQL is great isn't it
  27. GQL_QUERY = """{
  28. repositoryOwner(login: "obsproject") {
  29. repository(name: "obs-studio") {
  30. object(expression: "master") {
  31. ... on Commit {
  32. blame(path: "plugins/rtmp-services/data/services.json") {
  33. ranges {
  34. startingLine
  35. endingLine
  36. commit {
  37. author {
  38. user {
  39. login
  40. }
  41. }
  42. }
  43. }
  44. }
  45. }
  46. }
  47. }
  48. }
  49. }"""
  50. def get_last_artifact():
  51. s = requests.session()
  52. s.headers["Authorization"] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
  53. run_id = os.environ["WORKFLOW_RUN_ID"]
  54. repo = os.environ["REPOSITORY"]
  55. # fetch run first, get workflow id from there to get workflow runs
  56. r = s.get(f"https://api.github.com/repos/{repo}/actions/runs/{run_id}")
  57. r.raise_for_status()
  58. workflow_id = r.json()["workflow_id"]
  59. r = s.get(
  60. f"https://api.github.com/repos/{repo}/actions/workflows/{workflow_id}/runs",
  61. params=dict(
  62. per_page=1,
  63. status="completed",
  64. branch="master",
  65. conclusion="success",
  66. event="schedule",
  67. ),
  68. )
  69. r.raise_for_status()
  70. runs = r.json()
  71. if not runs["workflow_runs"]:
  72. raise ValueError("No completed workflow runs found")
  73. r = s.get(runs["workflow_runs"][0]["artifacts_url"])
  74. r.raise_for_status()
  75. for artifact in r.json()["artifacts"]:
  76. if artifact["name"] == "timestamps":
  77. artifact_url = artifact["archive_download_url"]
  78. break
  79. else:
  80. raise ValueError("No previous artifact found.")
  81. r = s.get(artifact_url)
  82. r.raise_for_status()
  83. zip_data = BytesIO()
  84. zip_data.write(r.content)
  85. with zipfile.ZipFile(zip_data) as zip_ref:
  86. for info in zip_ref.infolist():
  87. if info.filename == "timestamps.json":
  88. return json.loads(zip_ref.read(info.filename))
  89. def find_people_to_blame(raw_services: str, servers: list[tuple[str, str]]) -> dict:
  90. if not servers:
  91. return dict()
  92. # Fetch Blame data from github
  93. s = requests.session()
  94. s.headers["Authorization"] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
  95. r = s.post(
  96. "https://api.github.com/graphql", json=dict(query=GQL_QUERY, variables=dict())
  97. )
  98. r.raise_for_status()
  99. j = r.json()
  100. # The file is only ~2600 lines so this isn't too crazy and makes the lookup very easy
  101. line_author = dict()
  102. for blame in j["data"]["repositoryOwner"]["repository"]["object"]["blame"][
  103. "ranges"
  104. ]:
  105. for i in range(blame["startingLine"] - 1, blame["endingLine"]):
  106. if user := blame["commit"]["author"]["user"]:
  107. line_author[i] = user["login"]
  108. service_authors = defaultdict(set)
  109. for i, line in enumerate(raw_services.splitlines()):
  110. if '"url":' not in line:
  111. continue
  112. for server, service in servers:
  113. if server in line and (author := line_author.get(i)):
  114. if author not in DO_NOT_PING:
  115. service_authors[service].add(author)
  116. return service_authors
  117. def set_output(name, value):
  118. if not GITHUB_OUTPUT_FILE:
  119. return
  120. try:
  121. with open(GITHUB_OUTPUT_FILE, "a", encoding="utf-8", newline="\n") as f:
  122. f.write(f"{name}={value}\n")
  123. except Exception as e:
  124. print(f"Writing to github output files failed: {e!r}")
  125. async def check_servers_task(
  126. session: aiohttp.ClientSession, host: str, protocol: str, servers: List[str]
  127. ) -> List[Dict]:
  128. query = [dict(url=h, protocol=protocol) for h in servers]
  129. async with session.get(
  130. f"http://{host}:8999/test_remote_servers", json=query
  131. ) as resp:
  132. return await resp.json()
  133. async def process_services(session: aiohttp.ClientSession, check_servers: List[str]):
  134. try:
  135. with open(SERVICES_FILE, encoding="utf-8") as services_file:
  136. raw_services = services_file.read()
  137. services = json.loads(raw_services)
  138. with open(PACKAGE_FILE, encoding="utf-8") as package_file:
  139. package = json.load(package_file)
  140. except OSError as e:
  141. print(f"❌ Could not open services/package file: {e}")
  142. return 1
  143. # attempt to load last check result cache
  144. try:
  145. with open(CACHE_FILE, encoding="utf-8") as check_file:
  146. fail_timestamps = json.load(check_file)
  147. except OSError as e:
  148. # cache might be evicted or not exist yet, so this is non-fatal
  149. print(
  150. f"⚠️ Could not read cache file, trying to get last artifact (Exception: {e})"
  151. )
  152. try:
  153. fail_timestamps = get_last_artifact()
  154. except Exception as e:
  155. print(f"⚠️ Could not fetch cache file, starting fresh. (Exception: {e})")
  156. fail_timestamps = dict()
  157. else:
  158. print("Fetched cache file from last run artifact.")
  159. else:
  160. print("Successfully loaded cache file:", CACHE_FILE)
  161. start_time = int(time.time())
  162. affected_services = dict()
  163. removed_servers = list()
  164. # create temporary new list
  165. new_services = services.copy()
  166. new_services["services"] = []
  167. for service in services["services"]:
  168. # skip services that do custom stuff that we can't easily check
  169. if service["name"] in SKIPPED_SERVICES:
  170. new_services["services"].append(service)
  171. continue
  172. service_type = service.get("recommended", {}).get("output", "rtmp_output")
  173. if service_type not in {"rtmp_output", "ffmpeg_hls_muxer"}:
  174. print("Unknown service type:", service_type)
  175. new_services["services"].append(service)
  176. continue
  177. protocol = "rtmp" if service_type == "rtmp_output" else "hls"
  178. # create a copy to mess with
  179. new_service = service.copy()
  180. new_service["servers"] = []
  181. # run checks for all the servers, and store results in timestamp cache
  182. try:
  183. servers = [s["url"] for s in service["servers"]]
  184. tasks = []
  185. for host in check_servers:
  186. tasks.append(
  187. asyncio.create_task(
  188. check_servers_task(session, host, protocol, servers)
  189. )
  190. )
  191. results = await asyncio.gather(*tasks)
  192. except Exception as e:
  193. print(
  194. f"❌ Querying server status for \"{service['name']}\" failed with: {e}"
  195. )
  196. return 1
  197. # go over results
  198. for server, result in zip(service["servers"], zip(*results)):
  199. failure_count = sum(not res["status"] for res in result)
  200. probe_count = len(result)
  201. # only treat server as failed if all check servers reported a failure
  202. is_ok = failure_count < probe_count
  203. if not is_ok:
  204. failures = {res["comment"] for res in result if not res["status"]}
  205. print(
  206. f"⚠️ Connecting to server failed: {server['url']} (Reason(s): {failures})"
  207. )
  208. if ts := fail_timestamps.get(server["url"], None):
  209. if (delta := start_time - ts) >= MINIMUM_PURGE_AGE:
  210. print(
  211. f'🗑️ Purging server "{server["url"]}", it has been '
  212. f"unresponsive for {round(delta/60/60/24)} days."
  213. )
  214. removed_servers.append((server["url"], service["name"]))
  215. # continuing here means not adding it to the new list, thus dropping it
  216. continue
  217. else:
  218. fail_timestamps[server["url"]] = start_time
  219. elif is_ok and server["url"] in fail_timestamps:
  220. # remove timestamp of failed check if server is back
  221. delta = start_time - fail_timestamps[server["url"]]
  222. print(
  223. f'💡 Server "{server["url"]}" is back after {round(delta/60/60/24)} days!'
  224. )
  225. del fail_timestamps[server["url"]]
  226. new_service["servers"].append(server)
  227. if (diff := len(service["servers"]) - len(new_service["servers"])) > 0:
  228. print(f'ℹ️ Removed {diff} server(s) from {service["name"]}')
  229. affected_services[service["name"]] = f"{diff} servers removed"
  230. # remove services with no valid servers
  231. if not new_service["servers"]:
  232. print(f'💀 Service "{service["name"]}" has no valid servers left, removing!')
  233. affected_services[service["name"]] = f"Service removed"
  234. else:
  235. new_services["services"].append(new_service)
  236. # wait a bit between services
  237. await asyncio.sleep(2.0)
  238. # write cache file
  239. try:
  240. os.makedirs("other", exist_ok=True)
  241. with open(CACHE_FILE, "w", encoding="utf-8") as cache_file:
  242. json.dump(fail_timestamps, cache_file)
  243. except OSError as e:
  244. print(f"❌ Could not write cache file: {e}")
  245. return 1
  246. else:
  247. print("Successfully wrote cache file:", CACHE_FILE)
  248. if removed_servers:
  249. # increment package version and save that as well
  250. package["version"] += 1
  251. package["files"][0]["version"] += 1
  252. try:
  253. with open(SERVICES_FILE, "w", encoding="utf-8") as services_file:
  254. json.dump(new_services, services_file, indent=4, ensure_ascii=False)
  255. services_file.write("\n")
  256. with open(PACKAGE_FILE, "w", encoding="utf-8") as package_file:
  257. json.dump(package, package_file, indent=4)
  258. package_file.write("\n")
  259. except OSError as e:
  260. print(f"❌ Could not write services/package file: {e}")
  261. return 1
  262. else:
  263. print(
  264. f"Successfully wrote services/package files:\n- {SERVICES_FILE}\n- {PACKAGE_FILE}"
  265. )
  266. # try to find authors to ping, this is optional and is allowed to fail
  267. try:
  268. service_authors = find_people_to_blame(raw_services, removed_servers)
  269. except Exception as e:
  270. print(f"⚠ Could not fetch blame for some reason: {e}")
  271. service_authors = dict()
  272. # set GitHub outputs
  273. set_output("make_pr", "true")
  274. msg = PR_MESSAGE.format(
  275. repository=os.environ["REPOSITORY"],
  276. run_id=os.environ["WORKFLOW_RUN_ID"],
  277. table="\n".join(
  278. "| {name} | {action} | {authors} |".format(
  279. name=name.replace("|", "\\|"),
  280. action=action,
  281. authors=", ".join(
  282. f"@{author}" for author in sorted(service_authors.get(name, []))
  283. ),
  284. )
  285. for name, action in sorted(affected_services.items())
  286. ),
  287. )
  288. set_output("pr_message", json.dumps(msg))
  289. else:
  290. set_output("make_pr", "false")
  291. async def main():
  292. # check for environment variables
  293. try:
  294. api_key = os.environ["API_KEY"]
  295. servers = os.environ["API_SERVERS"].split(",")
  296. if not servers:
  297. raise ValueError("No checker servers!")
  298. # Mask everything except the region code
  299. for server in servers:
  300. prefix = server[: server.index(".") + 1]
  301. print(f"::add-mask::{prefix}")
  302. suffix = server[server.index(".", len(prefix)) :]
  303. print(f"::add-mask::{suffix}")
  304. except Exception as e:
  305. print(f"❌ Failed getting required environment variables: {e}")
  306. return 1
  307. # create aiohttp session
  308. async with aiohttp.ClientSession() as session:
  309. session.headers["Authorization"] = api_key
  310. session.headers["User-Agent"] = "OBS Repo Service Checker/1.0"
  311. return await process_services(session, servers)
  312. if __name__ == "__main__":
  313. sys.exit(asyncio.run(main()))