check-services.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. import json
  2. import socket
  3. import ssl
  4. import os
  5. import time
  6. import requests
  7. import sys
  8. import zipfile
  9. from io import BytesIO
  10. from random import randbytes
  11. from urllib.parse import urlparse
  12. from collections import defaultdict
  13. MINIMUM_PURGE_AGE = 9.75 * 24 * 60 * 60 # slightly less than 10 days
  14. TIMEOUT = 10
  15. SKIPPED_SERVICES = {'YouNow', 'SHOWROOM', 'Dacast'}
  16. SERVICES_FILE = 'plugins/rtmp-services/data/services.json'
  17. PACKAGE_FILE = 'plugins/rtmp-services/data/package.json'
  18. CACHE_FILE = 'other/timestamps.json'
  19. GITHUB_OUTPUT_FILE = os.environ.get('GITHUB_OUTPUT', None)
  20. DO_NOT_PING = {'jp9000'}
  21. PR_MESSAGE = '''This is an automatically created pull request to remove unresponsive servers and services.
  22. | Service | Action Taken | Author(s) |
  23. | ------- | ------------ | --------- |
  24. {table}
  25. If you are not responsible for an affected service and want to be excluded from future pings please let us know.
  26. Created by workflow run: https://github.com/{repository}/actions/runs/{run_id}'''
  27. # GQL is great isn't it
  28. GQL_QUERY = '''{
  29. repositoryOwner(login: "obsproject") {
  30. repository(name: "obs-studio") {
  31. object(expression: "master") {
  32. ... on Commit {
  33. blame(path: "plugins/rtmp-services/data/services.json") {
  34. ranges {
  35. startingLine
  36. endingLine
  37. commit {
  38. author {
  39. user {
  40. login
  41. }
  42. }
  43. }
  44. }
  45. }
  46. }
  47. }
  48. }
  49. }
  50. }'''
  51. context = ssl.create_default_context()
  52. def check_ftl_server(hostname) -> bool:
  53. """Check if hostname resolves to a valid address - FTL handshake not implemented"""
  54. try:
  55. socket.getaddrinfo(hostname, 8084, proto=socket.IPPROTO_UDP)
  56. except socket.gaierror as e:
  57. print(f'⚠️ Could not resolve hostname for server: {hostname} (Exception: {e})')
  58. return False
  59. else:
  60. return True
  61. def check_hls_server(uri) -> bool:
  62. """Check if URL responds with status code < 500 and not 404, indicating that at least there's *something* there"""
  63. try:
  64. r = requests.post(uri, timeout=TIMEOUT)
  65. if r.status_code >= 500 or r.status_code == 404:
  66. raise Exception(f'Server responded with {r.status_code}')
  67. except Exception as e:
  68. print(f'⚠️ Could not connect to HLS server: {uri} (Exception: {e})')
  69. return False
  70. else:
  71. return True
  72. def check_rtmp_server(uri) -> bool:
  73. """Try connecting and sending a RTMP handshake (with SSL if necessary)"""
  74. parsed = urlparse(uri)
  75. hostname, port = parsed.netloc.partition(':')[::2]
  76. if port:
  77. port = int(port)
  78. elif parsed.scheme == 'rtmps':
  79. port = 443
  80. else:
  81. port = 1935
  82. try:
  83. recv = b''
  84. with socket.create_connection((hostname, port), timeout=TIMEOUT) as sock:
  85. # RTMP handshake is \x03 + 4 bytes time (can be 0) + 4 zero bytes + 1528 bytes random
  86. handshake = b'\x03\x00\x00\x00\x00\x00\x00\x00\x00' + randbytes(1528)
  87. if parsed.scheme == 'rtmps':
  88. with context.wrap_socket(sock, server_hostname=hostname) as ssock:
  89. ssock.sendall(handshake)
  90. while True:
  91. _tmp = ssock.recv(4096)
  92. recv += _tmp
  93. if len(recv) >= 1536 or not _tmp:
  94. break
  95. else:
  96. sock.sendall(handshake)
  97. while True:
  98. _tmp = sock.recv(4096)
  99. recv += _tmp
  100. if len(recv) >= 1536 or not _tmp:
  101. break
  102. if len(recv) < 1536 or recv[0] != 3:
  103. raise ValueError('Invalid RTMP handshake received from server')
  104. except Exception as e:
  105. print(f'⚠️ Connection to server failed: {uri} (Exception: {e})')
  106. return False
  107. else:
  108. return True
  109. def get_last_artifact():
  110. s = requests.session()
  111. s.headers['Authorization'] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
  112. run_id = os.environ['WORKFLOW_RUN_ID']
  113. repo = os.environ['REPOSITORY']
  114. # fetch run first, get workflow id from there to get workflow runs
  115. r = s.get(f'https://api.github.com/repos/{repo}/actions/runs/{run_id}')
  116. r.raise_for_status()
  117. workflow_id = r.json()['workflow_id']
  118. r = s.get(
  119. f'https://api.github.com/repos/{repo}/actions/workflows/{workflow_id}/runs',
  120. params=dict(per_page=1, status='completed', branch='master', conclusion='success', event='schedule'),
  121. )
  122. r.raise_for_status()
  123. runs = r.json()
  124. if not runs['workflow_runs']:
  125. raise ValueError('No completed workflow runs found')
  126. r = s.get(runs['workflow_runs'][0]['artifacts_url'])
  127. r.raise_for_status()
  128. for artifact in r.json()['artifacts']:
  129. if artifact['name'] == 'timestamps':
  130. artifact_url = artifact['archive_download_url']
  131. break
  132. else:
  133. raise ValueError('No previous artifact found.')
  134. r = s.get(artifact_url)
  135. r.raise_for_status()
  136. zip_data = BytesIO()
  137. zip_data.write(r.content)
  138. with zipfile.ZipFile(zip_data) as zip_ref:
  139. for info in zip_ref.infolist():
  140. if info.filename == 'timestamps.json':
  141. return json.loads(zip_ref.read(info.filename))
  142. def find_people_to_blame(raw_services: str, servers: list[tuple[str, str]]) -> dict:
  143. if not servers:
  144. return dict()
  145. # Fetch Blame data from github
  146. s = requests.session()
  147. s.headers['Authorization'] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
  148. r = s.post('https://api.github.com/graphql', json=dict(query=GQL_QUERY, variables=dict()))
  149. r.raise_for_status()
  150. j = r.json()
  151. # The file is only ~2600 lines so this isn't too crazy and makes the lookup very easy
  152. line_author = dict()
  153. for blame in j['data']['repositoryOwner']['repository']['object']['blame']['ranges']:
  154. for i in range(blame['startingLine'] - 1, blame['endingLine']):
  155. if user := blame['commit']['author']['user']:
  156. line_author[i] = user['login']
  157. service_authors = defaultdict(set)
  158. for i, line in enumerate(raw_services.splitlines()):
  159. if '"url":' not in line:
  160. continue
  161. for server, service in servers:
  162. if server in line and (author := line_author.get(i)):
  163. if author not in DO_NOT_PING:
  164. service_authors[service].add(author)
  165. return service_authors
  166. def set_output(name, value):
  167. if not GITHUB_OUTPUT_FILE:
  168. return
  169. try:
  170. with open(GITHUB_OUTPUT_FILE, 'a', encoding='utf-8', newline='\n') as f:
  171. f.write(f'{name}={value}\n')
  172. except Exception as e:
  173. print(f'Writing to github output files failed: {e!r}')
  174. def main():
  175. try:
  176. with open(SERVICES_FILE, encoding='utf-8') as services_file:
  177. raw_services = services_file.read()
  178. services = json.loads(raw_services)
  179. with open(PACKAGE_FILE, encoding='utf-8') as package_file:
  180. package = json.load(package_file)
  181. except OSError as e:
  182. print(f'❌ Could not open services/package file: {e}')
  183. return 1
  184. # attempt to load last check result cache
  185. try:
  186. with open(CACHE_FILE, encoding='utf-8') as check_file:
  187. fail_timestamps = json.load(check_file)
  188. except OSError as e:
  189. # cache might be evicted or not exist yet, so this is non-fatal
  190. print(f'⚠️ Could not read cache file, trying to get last artifact (Exception: {e})')
  191. try:
  192. fail_timestamps = get_last_artifact()
  193. except Exception as e:
  194. print(f'⚠️ Could not fetch cache file, starting fresh. (Exception: {e})')
  195. fail_timestamps = dict()
  196. else:
  197. print('Fetched cache file from last run artifact.')
  198. else:
  199. print('Successfully loaded cache file:', CACHE_FILE)
  200. start_time = int(time.time())
  201. affected_services = dict()
  202. removed_servers = list()
  203. # create temporary new list
  204. new_services = services.copy()
  205. new_services['services'] = []
  206. for service in services['services']:
  207. # skip services that do custom stuff that we can't easily check
  208. if service['name'] in SKIPPED_SERVICES:
  209. new_services['services'].append(service)
  210. continue
  211. service_type = service.get('recommended', {}).get('output', 'rtmp_output')
  212. if service_type not in {'rtmp_output', 'ffmpeg_hls_muxer', 'ftl_output'}:
  213. print('Unknown service type:', service_type)
  214. new_services['services'].append(service)
  215. continue
  216. # create a copy to mess with
  217. new_service = service.copy()
  218. new_service['servers'] = []
  219. # run checks for all the servers, and store results in timestamp cache
  220. for server in service['servers']:
  221. if service_type == 'ftl_output':
  222. is_ok = check_ftl_server(server['url'])
  223. elif service_type == 'ffmpeg_hls_muxer':
  224. is_ok = check_hls_server(server['url'])
  225. else: # rtmp
  226. is_ok = check_rtmp_server(server['url'])
  227. if not is_ok:
  228. if ts := fail_timestamps.get(server['url'], None):
  229. if (delta := start_time - ts) >= MINIMUM_PURGE_AGE:
  230. print(
  231. f'🗑️ Purging server "{server["url"]}", it has been '
  232. f'unresponsive for {round(delta/60/60/24)} days.'
  233. )
  234. removed_servers.append((server['url'], service['name']))
  235. # continuing here means not adding it to the new list, thus dropping it
  236. continue
  237. else:
  238. fail_timestamps[server['url']] = start_time
  239. elif is_ok and server['url'] in fail_timestamps:
  240. # remove timestamp of failed check if server is back
  241. delta = start_time - fail_timestamps[server['url']]
  242. print(f'💡 Server "{server["url"]}" is back after {round(delta/60/60/24)} days!')
  243. del fail_timestamps[server['url']]
  244. new_service['servers'].append(server)
  245. if (diff := len(service['servers']) - len(new_service['servers'])) > 0:
  246. print(f'ℹ️ Removed {diff} server(s) from {service["name"]}')
  247. affected_services[service['name']] = f'{diff} servers removed'
  248. # remove services with no valid servers
  249. if not new_service['servers']:
  250. print(f'💀 Service "{service["name"]}" has no valid servers left, removing!')
  251. affected_services[service['name']] = f'Service removed'
  252. continue
  253. new_services['services'].append(new_service)
  254. # write cache file
  255. try:
  256. os.makedirs('other', exist_ok=True)
  257. with open(CACHE_FILE, 'w', encoding='utf-8') as cache_file:
  258. json.dump(fail_timestamps, cache_file)
  259. except OSError as e:
  260. print(f'❌ Could not write cache file: {e}')
  261. return 1
  262. else:
  263. print('Successfully wrote cache file:', CACHE_FILE)
  264. if removed_servers:
  265. # increment package version and save that as well
  266. package['version'] += 1
  267. package['files'][0]['version'] += 1
  268. try:
  269. with open(SERVICES_FILE, 'w', encoding='utf-8') as services_file:
  270. json.dump(new_services, services_file, indent=4, ensure_ascii=False)
  271. services_file.write('\n')
  272. with open(PACKAGE_FILE, 'w', encoding='utf-8') as package_file:
  273. json.dump(package, package_file, indent=4)
  274. package_file.write('\n')
  275. except OSError as e:
  276. print(f'❌ Could not write services/package file: {e}')
  277. return 1
  278. else:
  279. print(f'Successfully wrote services/package files:\n- {SERVICES_FILE}\n- {PACKAGE_FILE}')
  280. # try to find authors to ping, this is optional and is allowed to fail
  281. try:
  282. service_authors = find_people_to_blame(raw_services, removed_servers)
  283. except Exception as e:
  284. print(f'⚠ Could not fetch blame for some reason: {e}')
  285. service_authors = dict()
  286. # set GitHub outputs
  287. set_output('make_pr', 'true')
  288. msg = PR_MESSAGE.format(
  289. repository=os.environ['REPOSITORY'],
  290. run_id=os.environ['WORKFLOW_RUN_ID'],
  291. table='\n'.join(
  292. '| {name} | {action} | {authors} |'.format(
  293. name=name.replace('|', '\\|'),
  294. action=action,
  295. authors=', '.join(f'@{author}' for author in sorted(service_authors.get(name, []))),
  296. )
  297. for name, action in sorted(affected_services.items())
  298. ),
  299. )
  300. set_output('pr_message', json.dumps(msg))
  301. else:
  302. set_output('make_pr', 'false')
  303. if __name__ == '__main__':
  304. sys.exit(main())