| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 | 
							- import json
 
- import socket
 
- import ssl
 
- import os
 
- import time
 
- import requests
 
- import sys
 
- import zipfile
 
- from io import BytesIO
 
- from random import randbytes
 
- from urllib.parse import urlparse
 
- from collections import defaultdict
 
- MINIMUM_PURGE_AGE = 9.75 * 24 * 60 * 60  # slightly less than 10 days
 
- TIMEOUT = 10
 
- SKIPPED_SERVICES = {'YouNow', 'SHOWROOM', 'Dacast'}
 
- SERVICES_FILE = 'plugins/rtmp-services/data/services.json'
 
- PACKAGE_FILE = 'plugins/rtmp-services/data/package.json'
 
- CACHE_FILE = 'other/timestamps.json'
 
- GITHUB_OUTPUT_FILE = os.environ.get('GITHUB_OUTPUT', None)
 
- DO_NOT_PING = {'jp9000'}
 
- PR_MESSAGE = '''This is an automatically created pull request to remove unresponsive servers and services.
 
- | Service | Action Taken | Author(s) |
 
- | ------- | ------------ | --------- |
 
- {table}
 
- If you are not responsible for an affected service and want to be excluded from future pings please let us know.
 
- Created by workflow run: https://github.com/{repository}/actions/runs/{run_id}'''
 
- # GQL is great isn't it
 
- GQL_QUERY = '''{
 
-   repositoryOwner(login: "obsproject") {
 
-     repository(name: "obs-studio") {
 
-       object(expression: "master") {
 
-         ... on Commit {
 
-           blame(path: "plugins/rtmp-services/data/services.json") {
 
-             ranges {
 
-               startingLine
 
-               endingLine
 
-               commit {
 
-                 author {
 
-                   user {
 
-                     login
 
-                   }
 
-                 }
 
-               }
 
-             }
 
-           }
 
-         }
 
-       }
 
-     }
 
-   }
 
- }'''
 
- context = ssl.create_default_context()
 
- def check_ftl_server(hostname) -> bool:
 
-     """Check if hostname resolves to a valid address - FTL handshake not implemented"""
 
-     try:
 
-         socket.getaddrinfo(hostname, 8084, proto=socket.IPPROTO_UDP)
 
-     except socket.gaierror as e:
 
-         print(f'⚠️ Could not resolve hostname for server: {hostname} (Exception: {e})')
 
-         return False
 
-     else:
 
-         return True
 
- def check_hls_server(uri) -> bool:
 
-     """Check if URL responds with status code < 500 and not 404, indicating that at least there's *something* there"""
 
-     try:
 
-         r = requests.post(uri, timeout=TIMEOUT)
 
-         if r.status_code >= 500 or r.status_code == 404:
 
-             raise Exception(f'Server responded with {r.status_code}')
 
-     except Exception as e:
 
-         print(f'⚠️ Could not connect to HLS server: {uri} (Exception: {e})')
 
-         return False
 
-     else:
 
-         return True
 
- def check_rtmp_server(uri) -> bool:
 
-     """Try connecting and sending a RTMP handshake (with SSL if necessary)"""
 
-     parsed = urlparse(uri)
 
-     hostname, port = parsed.netloc.partition(':')[::2]
 
-     if port:
 
-         port = int(port)
 
-     elif parsed.scheme == 'rtmps':
 
-         port = 443
 
-     else:
 
-         port = 1935
 
-     try:
 
-         recv = b''
 
-         with socket.create_connection((hostname, port), timeout=TIMEOUT) as sock:
 
-             # RTMP handshake is \x03 + 4 bytes time (can be 0) + 4 zero bytes + 1528 bytes random
 
-             handshake = b'\x03\x00\x00\x00\x00\x00\x00\x00\x00' + randbytes(1528)
 
-             if parsed.scheme == 'rtmps':
 
-                 with context.wrap_socket(sock, server_hostname=hostname) as ssock:
 
-                     ssock.sendall(handshake)
 
-                     while True:
 
-                         _tmp = ssock.recv(4096)
 
-                         recv += _tmp
 
-                         if len(recv) >= 1536 or not _tmp:
 
-                             break
 
-             else:
 
-                 sock.sendall(handshake)
 
-                 while True:
 
-                     _tmp = sock.recv(4096)
 
-                     recv += _tmp
 
-                     if len(recv) >= 1536 or not _tmp:
 
-                         break
 
-         if len(recv) < 1536 or recv[0] != 3:
 
-             raise ValueError('Invalid RTMP handshake received from server')
 
-     except Exception as e:
 
-         print(f'⚠️ Connection to server failed: {uri} (Exception: {e})')
 
-         return False
 
-     else:
 
-         return True
 
- def get_last_artifact():
 
-     s = requests.session()
 
-     s.headers['Authorization'] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
 
-     run_id = os.environ['WORKFLOW_RUN_ID']
 
-     repo = os.environ['REPOSITORY']
 
-     # fetch run first, get workflow id from there to get workflow runs
 
-     r = s.get(f'https://api.github.com/repos/{repo}/actions/runs/{run_id}')
 
-     r.raise_for_status()
 
-     workflow_id = r.json()['workflow_id']
 
-     r = s.get(
 
-         f'https://api.github.com/repos/{repo}/actions/workflows/{workflow_id}/runs',
 
-         params=dict(per_page=1, status='completed', branch='master', conclusion='success', event='schedule'),
 
-     )
 
-     r.raise_for_status()
 
-     runs = r.json()
 
-     if not runs['workflow_runs']:
 
-         raise ValueError('No completed workflow runs found')
 
-     r = s.get(runs['workflow_runs'][0]['artifacts_url'])
 
-     r.raise_for_status()
 
-     for artifact in r.json()['artifacts']:
 
-         if artifact['name'] == 'timestamps':
 
-             artifact_url = artifact['archive_download_url']
 
-             break
 
-     else:
 
-         raise ValueError('No previous artifact found.')
 
-     r = s.get(artifact_url)
 
-     r.raise_for_status()
 
-     zip_data = BytesIO()
 
-     zip_data.write(r.content)
 
-     with zipfile.ZipFile(zip_data) as zip_ref:
 
-         for info in zip_ref.infolist():
 
-             if info.filename == 'timestamps.json':
 
-                 return json.loads(zip_ref.read(info.filename))
 
- def find_people_to_blame(raw_services: str, servers: list[tuple[str, str]]) -> dict:
 
-     if not servers:
 
-         return dict()
 
-     # Fetch Blame data from github
 
-     s = requests.session()
 
-     s.headers['Authorization'] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
 
-     r = s.post('https://api.github.com/graphql', json=dict(query=GQL_QUERY, variables=dict()))
 
-     r.raise_for_status()
 
-     j = r.json()
 
-     # The file is only ~2600 lines so this isn't too crazy and makes the lookup very easy
 
-     line_author = dict()
 
-     for blame in j['data']['repositoryOwner']['repository']['object']['blame']['ranges']:
 
-         for i in range(blame['startingLine'] - 1, blame['endingLine']):
 
-             if user := blame['commit']['author']['user']:
 
-                 line_author[i] = user['login']
 
-     service_authors = defaultdict(set)
 
-     for i, line in enumerate(raw_services.splitlines()):
 
-         if '"url":' not in line:
 
-             continue
 
-         for server, service in servers:
 
-             if server in line and (author := line_author.get(i)):
 
-                 if author not in DO_NOT_PING:
 
-                     service_authors[service].add(author)
 
-     return service_authors
 
- def set_output(name, value):
 
-     if not GITHUB_OUTPUT_FILE:
 
-         return
 
-     try:
 
-         with open(GITHUB_OUTPUT_FILE, 'a', encoding='utf-8', newline='\n') as f:
 
-             f.write(f'{name}={value}\n')
 
-     except Exception as e:
 
-         print(f'Writing to github output files failed: {e!r}')
 
- def main():
 
-     try:
 
-         with open(SERVICES_FILE, encoding='utf-8') as services_file:
 
-             raw_services = services_file.read()
 
-             services = json.loads(raw_services)
 
-         with open(PACKAGE_FILE, encoding='utf-8') as package_file:
 
-             package = json.load(package_file)
 
-     except OSError as e:
 
-         print(f'❌ Could not open services/package file: {e}')
 
-         return 1
 
-     # attempt to load last check result cache
 
-     try:
 
-         with open(CACHE_FILE, encoding='utf-8') as check_file:
 
-             fail_timestamps = json.load(check_file)
 
-     except OSError as e:
 
-         # cache might be evicted or not exist yet, so this is non-fatal
 
-         print(f'⚠️ Could not read cache file, trying to get last artifact (Exception: {e})')
 
-         try:
 
-             fail_timestamps = get_last_artifact()
 
-         except Exception as e:
 
-             print(f'⚠️ Could not fetch cache file, starting fresh. (Exception: {e})')
 
-             fail_timestamps = dict()
 
-         else:
 
-             print('Fetched cache file from last run artifact.')
 
-     else:
 
-         print('Successfully loaded cache file:', CACHE_FILE)
 
-     start_time = int(time.time())
 
-     affected_services = dict()
 
-     removed_servers = list()
 
-     # create temporary new list
 
-     new_services = services.copy()
 
-     new_services['services'] = []
 
-     for service in services['services']:
 
-         # skip services that do custom stuff that we can't easily check
 
-         if service['name'] in SKIPPED_SERVICES:
 
-             new_services['services'].append(service)
 
-             continue
 
-         service_type = service.get('recommended', {}).get('output', 'rtmp_output')
 
-         if service_type not in {'rtmp_output', 'ffmpeg_hls_muxer', 'ftl_output'}:
 
-             print('Unknown service type:', service_type)
 
-             new_services['services'].append(service)
 
-             continue
 
-         # create a copy to mess with
 
-         new_service = service.copy()
 
-         new_service['servers'] = []
 
-         # run checks for all the servers, and store results in timestamp cache
 
-         for server in service['servers']:
 
-             if service_type == 'ftl_output':
 
-                 is_ok = check_ftl_server(server['url'])
 
-             elif service_type == 'ffmpeg_hls_muxer':
 
-                 is_ok = check_hls_server(server['url'])
 
-             else:  # rtmp
 
-                 is_ok = check_rtmp_server(server['url'])
 
-             if not is_ok:
 
-                 if ts := fail_timestamps.get(server['url'], None):
 
-                     if (delta := start_time - ts) >= MINIMUM_PURGE_AGE:
 
-                         print(
 
-                             f'🗑️ Purging server "{server["url"]}", it has been '
 
-                             f'unresponsive for {round(delta/60/60/24)} days.'
 
-                         )
 
-                         removed_servers.append((server['url'], service['name']))
 
-                         # continuing here means not adding it to the new list, thus dropping it
 
-                         continue
 
-                 else:
 
-                     fail_timestamps[server['url']] = start_time
 
-             elif is_ok and server['url'] in fail_timestamps:
 
-                 # remove timestamp of failed check if server is back
 
-                 delta = start_time - fail_timestamps[server['url']]
 
-                 print(f'💡 Server "{server["url"]}" is back after {round(delta/60/60/24)} days!')
 
-                 del fail_timestamps[server['url']]
 
-             new_service['servers'].append(server)
 
-         if (diff := len(service['servers']) - len(new_service['servers'])) > 0:
 
-             print(f'ℹ️ Removed {diff} server(s) from {service["name"]}')
 
-             affected_services[service['name']] = f'{diff} servers removed'
 
-         # remove services with no valid servers
 
-         if not new_service['servers']:
 
-             print(f'💀 Service "{service["name"]}" has no valid servers left, removing!')
 
-             affected_services[service['name']] = f'Service removed'
 
-             continue
 
-         new_services['services'].append(new_service)
 
-     # write cache file
 
-     try:
 
-         os.makedirs('other', exist_ok=True)
 
-         with open(CACHE_FILE, 'w', encoding='utf-8') as cache_file:
 
-             json.dump(fail_timestamps, cache_file)
 
-     except OSError as e:
 
-         print(f'❌ Could not write cache file: {e}')
 
-         return 1
 
-     else:
 
-         print('Successfully wrote cache file:', CACHE_FILE)
 
-     if removed_servers:
 
-         # increment package version and save that as well
 
-         package['version'] += 1
 
-         package['files'][0]['version'] += 1
 
-         try:
 
-             with open(SERVICES_FILE, 'w', encoding='utf-8') as services_file:
 
-                 json.dump(new_services, services_file, indent=4, ensure_ascii=False)
 
-                 services_file.write('\n')
 
-             with open(PACKAGE_FILE, 'w', encoding='utf-8') as package_file:
 
-                 json.dump(package, package_file, indent=4)
 
-                 package_file.write('\n')
 
-         except OSError as e:
 
-             print(f'❌ Could not write services/package file: {e}')
 
-             return 1
 
-         else:
 
-             print(f'Successfully wrote services/package files:\n- {SERVICES_FILE}\n- {PACKAGE_FILE}')
 
-         # try to find authors to ping, this is optional and is allowed to fail
 
-         try:
 
-             service_authors = find_people_to_blame(raw_services, removed_servers)
 
-         except Exception as e:
 
-             print(f'⚠ Could not fetch blame for some reason: {e}')
 
-             service_authors = dict()
 
-         # set GitHub outputs
 
-         set_output('make_pr', 'true')
 
-         msg = PR_MESSAGE.format(
 
-             repository=os.environ['REPOSITORY'],
 
-             run_id=os.environ['WORKFLOW_RUN_ID'],
 
-             table='\n'.join(
 
-                 '| {name} | {action} | {authors} |'.format(
 
-                     name=name.replace('|', '\\|'),
 
-                     action=action,
 
-                     authors=', '.join(f'@{author}' for author in sorted(service_authors.get(name, []))),
 
-                 )
 
-                 for name, action in sorted(affected_services.items())
 
-             ),
 
-         )
 
-         set_output('pr_message', json.dumps(msg))
 
-     else:
 
-         set_output('make_pr', 'false')
 
- if __name__ == '__main__':
 
-     sys.exit(main())
 
 
  |