Browse Source

CI: Add services check job

derrod 3 years ago
parent
commit
ccff8040fa
2 changed files with 305 additions and 0 deletions
  1. 57 0
      .github/workflows/services-json.yml
  2. 248 0
      CI/check-services.py

+ 57 - 0
.github/workflows/services-json.yml

@@ -9,6 +9,9 @@ on:
     paths:
       - "plugins/rtmp-services/data/services.json"
       - "plugins/rtmp-services/data/package.json"
+  schedule:
+  - cron: 0 0 * * *
+  workflow_dispatch:
 
 jobs:
   schema:
@@ -38,3 +41,57 @@ jobs:
           repo-token: "${{ secrets.GITHUB_TOKEN }}"
           title: "Service JSON Errors"
           input: "./validation_errors.json"
+
+  service_check:
+    name: Service Check
+    runs-on: ubuntu-20.04
+    needs: schema
+    if: ${{ github.repository_owner == 'obsproject' && (github.event_name == 'schedule' || github.event_name == 'workflow_dispatch') }}
+
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v3
+        with:
+          fetch-depth: 0
+
+      - name: Restore cache
+        uses: actions/cache@v3
+        with:
+          path: ${{ github.workspace }}/other
+          key: service-check
+
+      - name: Install & Configure Python
+        run: |
+          sudo apt install python3.9
+          python3.9 -m pip install requests
+
+      - name: Check Services
+        id: check
+        run: |
+           python3.9 -u CI/check-services.py
+           # if there are changes, run the PR step
+           if ! git diff --quiet; then
+             echo "::set-output name=make_pr::true"
+           else
+             echo "::set-output name=make_pr::false"
+           fi
+        env:
+          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+          WORKFLOW_RUN_ID: ${{ github.run_id }}
+          REPOSITORY: ${{ github.repository }}
+
+      - uses: actions/upload-artifact@v3
+        with:
+          name: timestamps
+          path: ${{ github.workspace }}/other/*
+
+      - name: Create Pull Request
+        uses: peter-evans/create-pull-request@f094b77505fb89581e68a1163fbd2fffece39da1
+        if: steps.check.outputs.make_pr == 'true'
+        with:
+          author: "Service Checker <[email protected]>"
+          commit-message: "rtmp-services: Remove defunct servers/services"
+          title: "rtmp-services: Remove defunct servers/services"
+          branch: "automated/clean-services"
+          body: "Automatic PR to remove dead servers\nCreated by workflow run: https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}"
+          delete-branch: true

+ 248 - 0
CI/check-services.py

@@ -0,0 +1,248 @@
+import json
+import socket
+import ssl
+import os
+import time
+import requests
+import sys
+import zipfile
+
+from io import BytesIO
+from random import randbytes
+from urllib.parse import urlparse
+
+MINIMUM_PURGE_AGE = 9.75 * 24 * 60 * 60  # slightly less than 10 days
+TIMEOUT = 10
+SKIPPED_SERVICES = {'YouNow', 'SHOWROOM', 'Dacast'}
+SERVICES_FILE = 'plugins/rtmp-services/data/services.json'
+PACKAGE_FILE = 'plugins/rtmp-services/data/package.json'
+CACHE_FILE = 'other/timestamps.json'
+
+context = ssl.create_default_context()
+
+
+def check_ftl_server(hostname) -> bool:
+    """Check if hostname resolves to a valid address - FTL handshake not implemented"""
+    try:
+        socket.getaddrinfo(hostname, 8084, proto=socket.IPPROTO_UDP)
+    except socket.gaierror as e:
+        print(f'⚠️ Could not resolve hostname for server: {hostname} (Exception: {e})')
+        return False
+    else:
+        return True
+
+
+def check_hls_server(uri) -> bool:
+    """Check if URL responds with status code < 500 and not 404, indicating that at least there's *something* there"""
+    try:
+        r = requests.post(uri, timeout=TIMEOUT)
+        if r.status_code >= 500 or r.status_code == 404:
+            raise Exception(f'Server responded with {r.status_code}')
+    except Exception as e:
+        print(f'⚠️ Could not connect to HLS server: {uri} (Exception: {e})')
+        return False
+    else:
+        return True
+
+
+def check_rtmp_server(uri) -> bool:
+    """Try connecting and sending a RTMP handshake (with SSL if necessary)"""
+    parsed = urlparse(uri)
+    hostname, port = parsed.netloc.partition(':')[::2]
+    port = int(port) if port else 1935
+
+    try:
+        recv = b''
+        with socket.create_connection((hostname, port), timeout=TIMEOUT) as sock:
+            # RTMP handshake is \x03 + 4 bytes time (can be 0) + 4 zero bytes + 1528 bytes random
+            handshake = b'\x03\x00\x00\x00\x00\x00\x00\x00\x00' + randbytes(1528)
+            if parsed.scheme == 'rtmps':
+                with context.wrap_socket(sock, server_hostname=hostname) as ssock:
+                    ssock.sendall(handshake)
+                    while True:
+                        _tmp = ssock.recv(4096)
+                        recv += _tmp
+                        if len(recv) >= 1536 or not _tmp:
+                            break
+            else:
+                sock.sendall(handshake)
+                while True:
+                    _tmp = sock.recv(4096)
+                    recv += _tmp
+                    if len(recv) >= 1536 or not _tmp:
+                        break
+
+        if len(recv) < 1536 or recv[0] != 3:
+            raise ValueError('Invalid RTMP handshake received from server')
+    except Exception as e:
+        print(f'⚠️ Connection to server failed: {uri} (Exception: {e})')
+        return False
+    else:
+        return True
+
+
+def get_last_artifact():
+    s = requests.session()
+    s.headers['Authorization'] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
+
+    run_id = os.environ['WORKFLOW_RUN_ID']
+    repo = os.environ['REPOSITORY']
+
+    # fetch run first, get workflow id from there to get workflow runs
+    r = s.get(f'https://api.github.com/repos/{repo}/actions/runs/{run_id}')
+    r.raise_for_status()
+    workflow_id = r.json()['workflow_id']
+
+    r = s.get(
+        f'https://api.github.com/repos/{repo}/actions/workflows/{workflow_id}/runs',
+        params=dict(per_page=1, status='completed', branch='master', conclusion='success', event='schedule'),
+    )
+    r.raise_for_status()
+    runs = r.json()
+    if not runs['workflow_runs']:
+        raise ValueError('No completed workflow runs found')
+
+    r = s.get(runs['workflow_runs'][0]['artifacts_url'])
+    r.raise_for_status()
+
+    for artifact in r.json()['artifacts']:
+        if artifact['name'] == 'timestamps':
+            artifact_url = artifact['archive_download_url']
+            break
+    else:
+        raise ValueError('No previous artifact found.')
+
+    r = s.get(artifact_url)
+    r.raise_for_status()
+    zip_data = BytesIO()
+    zip_data.write(r.content)
+
+    with zipfile.ZipFile(zip_data) as zip_ref:
+        for info in zip_ref.infolist():
+            if info.filename == 'timestamps.json':
+                return json.loads(zip_ref.read(info.filename))
+
+
+def main():
+    try:
+        with open(SERVICES_FILE, encoding='utf-8') as services_file:
+            services = json.load(services_file)
+        with open(PACKAGE_FILE, encoding='utf-8') as package_file:
+            package = json.load(package_file)
+    except OSError as e:
+        print(f'❌ Could not open services/package file: {e}')
+        return 1
+
+    # attempt to load last check result cache
+    try:
+        with open(CACHE_FILE, encoding='utf-8') as check_file:
+            fail_timestamps = json.load(check_file)
+    except OSError as e:
+        # cache might be evicted or not exist yet, so this is non-fatal
+        print(f'⚠️ Could not read cache file, trying to get last artifact (Exception: {e})')
+
+        try:
+            fail_timestamps = get_last_artifact()
+        except Exception as e:
+            print(f'⚠️ Could not fetch cache file, starting fresh. (Exception: {e})')
+            fail_timestamps = dict()
+        else:
+            print('Fetched cache file from last run artifact.')
+    else:
+        print('Successfully loaded cache file:', CACHE_FILE)
+
+    start_time = int(time.time())
+    removed_something = False
+
+    # create temporary new list
+    new_services = services.copy()
+    new_services['services'] = []
+
+    for service in services['services']:
+        # skip services that do custom stuff that we can't easily check
+        if service['name'] in SKIPPED_SERVICES:
+            new_services['services'].append(service)
+            continue
+
+        service_type = service.get('recommended', {}).get('output', 'rtmp_output')
+        if service_type not in {'rtmp_output', 'ffmpeg_hls_muxer', 'ftl_output'}:
+            print('Unknown service type:', service_type)
+            new_services['services'].append(service)
+            continue
+
+        # create a copy to mess with
+        new_service = service.copy()
+        new_service['servers'] = []
+
+        # run checks for all the servers, and store results in timestamp cache
+        for server in service['servers']:
+            if service_type == 'ftl_output':
+                is_ok = check_ftl_server(server['url'])
+            elif service_type == 'ffmpeg_hls_muxer':
+                is_ok = check_hls_server(server['url'])
+            else:  # rtmp
+                is_ok = check_rtmp_server(server['url'])
+
+            if not is_ok:
+                if ts := fail_timestamps.get(server['url'], None):
+                    if (delta := start_time - ts) >= MINIMUM_PURGE_AGE:
+                        print(
+                            f'🗑️ Purging server "{server["url"]}", it has been '
+                            f'unresponsive for {round(delta/60/60/24)} days.'
+                        )
+                        # continuing here means not adding it to the new list, thus dropping it
+                        continue
+                else:
+                    fail_timestamps[server['url']] = start_time
+            elif is_ok and server['url'] in fail_timestamps:
+                # remove timestamp of failed check if server is back
+                delta = start_time - fail_timestamps[server['url']]
+                print(f'💡 Server "{server["url"]}" is back after {round(delta/60/60/24)} days!')
+                del fail_timestamps[server['url']]
+
+            new_service['servers'].append(server)
+
+        if (diff := len(service['servers']) - len(new_service['servers'])) > 0:
+            print(f'ℹ️ Removed {diff} server(s) from {service["name"]}')
+            removed_something = True
+
+        # remove services with no valid servers
+        if not new_service['servers']:
+            print(f'💀 Service "{service["name"]}" has no valid servers left, removing!')
+            continue
+
+        new_services['services'].append(new_service)
+
+    # write cache file
+    try:
+        os.makedirs('other', exist_ok=True)
+        with open(CACHE_FILE, 'w', encoding='utf-8') as cache_file:
+            json.dump(fail_timestamps, cache_file)
+    except OSError as e:
+        print(f'❌ Could not write cache file: {e}')
+        return 1
+    else:
+        print('Successfully wrote cache file:', CACHE_FILE)
+
+    if removed_something:
+        # increment package version and save that as well
+        package['version'] += 1
+        package['files'][0]['version'] += 1
+
+        try:
+            with open(SERVICES_FILE, 'w', encoding='utf-8') as services_file:
+                json.dump(new_services, services_file, indent=4, ensure_ascii=False)
+                services_file.write('\n')
+
+            with open(PACKAGE_FILE, 'w', encoding='utf-8') as package_file:
+                json.dump(package, package_file, indent=4)
+                package_file.write('\n')
+        except OSError as e:
+            print(f'❌ Could not write services/package file: {e}')
+            return 1
+        else:
+            print(f'Successfully wrote services/package files:\n- {SERVICES_FILE}\n- {PACKAGE_FILE}')
+
+
+if __name__ == '__main__':
+    sys.exit(main())