123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 |
- import json
- import socket
- import ssl
- import os
- import time
- import requests
- import sys
- import zipfile
- from io import BytesIO
- from random import randbytes
- from urllib.parse import urlparse
- from collections import defaultdict
- MINIMUM_PURGE_AGE = 9.75 * 24 * 60 * 60 # slightly less than 10 days
- TIMEOUT = 10
- SKIPPED_SERVICES = {'YouNow', 'SHOWROOM', 'Dacast'}
- SERVICES_FILE = 'plugins/rtmp-services/data/services.json'
- PACKAGE_FILE = 'plugins/rtmp-services/data/package.json'
- CACHE_FILE = 'other/timestamps.json'
- GITHUB_OUTPUT_FILE = os.environ.get('GITHUB_OUTPUT', None)
- DO_NOT_PING = {'jp9000'}
- PR_MESSAGE = '''This is an automatically created pull request to remove unresponsive servers and services.
- | Service | Action Taken | Author(s) |
- | ------- | ------------ | --------- |
- {table}
- If you are not responsible for an affected service and want to be excluded from future pings please let us know.
- Created by workflow run: https://github.com/{repository}/actions/runs/{run_id}'''
- # GQL is great isn't it
- GQL_QUERY = '''{
- repositoryOwner(login: "obsproject") {
- repository(name: "obs-studio") {
- object(expression: "master") {
- ... on Commit {
- blame(path: "plugins/rtmp-services/data/services.json") {
- ranges {
- startingLine
- endingLine
- commit {
- author {
- user {
- login
- }
- }
- }
- }
- }
- }
- }
- }
- }
- }'''
- context = ssl.create_default_context()
- def check_ftl_server(hostname) -> bool:
- """Check if hostname resolves to a valid address - FTL handshake not implemented"""
- try:
- socket.getaddrinfo(hostname, 8084, proto=socket.IPPROTO_UDP)
- except socket.gaierror as e:
- print(f'⚠️ Could not resolve hostname for server: {hostname} (Exception: {e})')
- return False
- else:
- return True
- def check_hls_server(uri) -> bool:
- """Check if URL responds with status code < 500 and not 404, indicating that at least there's *something* there"""
- try:
- r = requests.post(uri, timeout=TIMEOUT)
- if r.status_code >= 500 or r.status_code == 404:
- raise Exception(f'Server responded with {r.status_code}')
- except Exception as e:
- print(f'⚠️ Could not connect to HLS server: {uri} (Exception: {e})')
- return False
- else:
- return True
- def check_rtmp_server(uri) -> bool:
- """Try connecting and sending a RTMP handshake (with SSL if necessary)"""
- parsed = urlparse(uri)
- hostname, port = parsed.netloc.partition(':')[::2]
- if port:
- port = int(port)
- elif parsed.scheme == 'rtmps':
- port = 443
- else:
- port = 1935
- try:
- recv = b''
- with socket.create_connection((hostname, port), timeout=TIMEOUT) as sock:
- # RTMP handshake is \x03 + 4 bytes time (can be 0) + 4 zero bytes + 1528 bytes random
- handshake = b'\x03\x00\x00\x00\x00\x00\x00\x00\x00' + randbytes(1528)
- if parsed.scheme == 'rtmps':
- with context.wrap_socket(sock, server_hostname=hostname) as ssock:
- ssock.sendall(handshake)
- while True:
- _tmp = ssock.recv(4096)
- recv += _tmp
- if len(recv) >= 1536 or not _tmp:
- break
- else:
- sock.sendall(handshake)
- while True:
- _tmp = sock.recv(4096)
- recv += _tmp
- if len(recv) >= 1536 or not _tmp:
- break
- if len(recv) < 1536 or recv[0] != 3:
- raise ValueError('Invalid RTMP handshake received from server')
- except Exception as e:
- print(f'⚠️ Connection to server failed: {uri} (Exception: {e})')
- return False
- else:
- return True
- def get_last_artifact():
- s = requests.session()
- s.headers['Authorization'] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
- run_id = os.environ['WORKFLOW_RUN_ID']
- repo = os.environ['REPOSITORY']
- # fetch run first, get workflow id from there to get workflow runs
- r = s.get(f'https://api.github.com/repos/{repo}/actions/runs/{run_id}')
- r.raise_for_status()
- workflow_id = r.json()['workflow_id']
- r = s.get(
- f'https://api.github.com/repos/{repo}/actions/workflows/{workflow_id}/runs',
- params=dict(per_page=1, status='completed', branch='master', conclusion='success', event='schedule'),
- )
- r.raise_for_status()
- runs = r.json()
- if not runs['workflow_runs']:
- raise ValueError('No completed workflow runs found')
- r = s.get(runs['workflow_runs'][0]['artifacts_url'])
- r.raise_for_status()
- for artifact in r.json()['artifacts']:
- if artifact['name'] == 'timestamps':
- artifact_url = artifact['archive_download_url']
- break
- else:
- raise ValueError('No previous artifact found.')
- r = s.get(artifact_url)
- r.raise_for_status()
- zip_data = BytesIO()
- zip_data.write(r.content)
- with zipfile.ZipFile(zip_data) as zip_ref:
- for info in zip_ref.infolist():
- if info.filename == 'timestamps.json':
- return json.loads(zip_ref.read(info.filename))
- def find_people_to_blame(raw_services: str, servers: list[tuple[str, str]]) -> dict:
- if not servers:
- return dict()
- # Fetch Blame data from github
- s = requests.session()
- s.headers['Authorization'] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
- r = s.post('https://api.github.com/graphql', json=dict(query=GQL_QUERY, variables=dict()))
- r.raise_for_status()
- j = r.json()
- # The file is only ~2600 lines so this isn't too crazy and makes the lookup very easy
- line_author = dict()
- for blame in j['data']['repositoryOwner']['repository']['object']['blame']['ranges']:
- for i in range(blame['startingLine'] - 1, blame['endingLine']):
- if user := blame['commit']['author']['user']:
- line_author[i] = user['login']
- service_authors = defaultdict(set)
- for i, line in enumerate(raw_services.splitlines()):
- if '"url":' not in line:
- continue
- for server, service in servers:
- if server in line and (author := line_author.get(i)):
- if author not in DO_NOT_PING:
- service_authors[service].add(author)
- return service_authors
- def set_output(name, value):
- if not GITHUB_OUTPUT_FILE:
- return
- try:
- with open(GITHUB_OUTPUT_FILE, 'a', encoding='utf-8', newline='\n') as f:
- f.write(f'{name}={value}\n')
- except Exception as e:
- print(f'Writing to github output files failed: {e!r}')
- def main():
- try:
- with open(SERVICES_FILE, encoding='utf-8') as services_file:
- raw_services = services_file.read()
- services = json.loads(raw_services)
- with open(PACKAGE_FILE, encoding='utf-8') as package_file:
- package = json.load(package_file)
- except OSError as e:
- print(f'❌ Could not open services/package file: {e}')
- return 1
- # attempt to load last check result cache
- try:
- with open(CACHE_FILE, encoding='utf-8') as check_file:
- fail_timestamps = json.load(check_file)
- except OSError as e:
- # cache might be evicted or not exist yet, so this is non-fatal
- print(f'⚠️ Could not read cache file, trying to get last artifact (Exception: {e})')
- try:
- fail_timestamps = get_last_artifact()
- except Exception as e:
- print(f'⚠️ Could not fetch cache file, starting fresh. (Exception: {e})')
- fail_timestamps = dict()
- else:
- print('Fetched cache file from last run artifact.')
- else:
- print('Successfully loaded cache file:', CACHE_FILE)
- start_time = int(time.time())
- affected_services = dict()
- removed_servers = list()
- # create temporary new list
- new_services = services.copy()
- new_services['services'] = []
- for service in services['services']:
- # skip services that do custom stuff that we can't easily check
- if service['name'] in SKIPPED_SERVICES:
- new_services['services'].append(service)
- continue
- service_type = service.get('recommended', {}).get('output', 'rtmp_output')
- if service_type not in {'rtmp_output', 'ffmpeg_hls_muxer', 'ftl_output'}:
- print('Unknown service type:', service_type)
- new_services['services'].append(service)
- continue
- # create a copy to mess with
- new_service = service.copy()
- new_service['servers'] = []
- # run checks for all the servers, and store results in timestamp cache
- for server in service['servers']:
- if service_type == 'ftl_output':
- is_ok = check_ftl_server(server['url'])
- elif service_type == 'ffmpeg_hls_muxer':
- is_ok = check_hls_server(server['url'])
- else: # rtmp
- is_ok = check_rtmp_server(server['url'])
- if not is_ok:
- if ts := fail_timestamps.get(server['url'], None):
- if (delta := start_time - ts) >= MINIMUM_PURGE_AGE:
- print(
- f'🗑️ Purging server "{server["url"]}", it has been '
- f'unresponsive for {round(delta/60/60/24)} days.'
- )
- removed_servers.append((server['url'], service['name']))
- # continuing here means not adding it to the new list, thus dropping it
- continue
- else:
- fail_timestamps[server['url']] = start_time
- elif is_ok and server['url'] in fail_timestamps:
- # remove timestamp of failed check if server is back
- delta = start_time - fail_timestamps[server['url']]
- print(f'💡 Server "{server["url"]}" is back after {round(delta/60/60/24)} days!')
- del fail_timestamps[server['url']]
- new_service['servers'].append(server)
- if (diff := len(service['servers']) - len(new_service['servers'])) > 0:
- print(f'ℹ️ Removed {diff} server(s) from {service["name"]}')
- affected_services[service['name']] = f'{diff} servers removed'
- # remove services with no valid servers
- if not new_service['servers']:
- print(f'💀 Service "{service["name"]}" has no valid servers left, removing!')
- affected_services[service['name']] = f'Service removed'
- continue
- new_services['services'].append(new_service)
- # write cache file
- try:
- os.makedirs('other', exist_ok=True)
- with open(CACHE_FILE, 'w', encoding='utf-8') as cache_file:
- json.dump(fail_timestamps, cache_file)
- except OSError as e:
- print(f'❌ Could not write cache file: {e}')
- return 1
- else:
- print('Successfully wrote cache file:', CACHE_FILE)
- if removed_servers:
- # increment package version and save that as well
- package['version'] += 1
- package['files'][0]['version'] += 1
- try:
- with open(SERVICES_FILE, 'w', encoding='utf-8') as services_file:
- json.dump(new_services, services_file, indent=4, ensure_ascii=False)
- services_file.write('\n')
- with open(PACKAGE_FILE, 'w', encoding='utf-8') as package_file:
- json.dump(package, package_file, indent=4)
- package_file.write('\n')
- except OSError as e:
- print(f'❌ Could not write services/package file: {e}')
- return 1
- else:
- print(f'Successfully wrote services/package files:\n- {SERVICES_FILE}\n- {PACKAGE_FILE}')
- # try to find authors to ping, this is optional and is allowed to fail
- try:
- service_authors = find_people_to_blame(raw_services, removed_servers)
- except Exception as e:
- print(f'⚠ Could not fetch blame for some reason: {e}')
- service_authors = dict()
- # set GitHub outputs
- set_output('make_pr', 'true')
- msg = PR_MESSAGE.format(
- repository=os.environ['REPOSITORY'],
- run_id=os.environ['WORKFLOW_RUN_ID'],
- table='\n'.join(
- '| {name} | {action} | {authors} |'.format(
- name=name.replace('|', '\\|'),
- action=action,
- authors=', '.join(f'@{author}' for author in sorted(service_authors.get(name, []))),
- )
- for name, action in sorted(affected_services.items())
- ),
- )
- set_output('pr_message', json.dumps(msg))
- else:
- set_output('make_pr', 'false')
- if __name__ == '__main__':
- sys.exit(main())
|