{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "TSG055 - Time Curl to Sparkhead\n", "===============================\n", "\n", "Description\n", "-----------\n", "\n", "If `azdata bdc status show` fails with:\n", "\n", "> StatefulSet sparkhead is not healthy: {{Pod sparkhead-0 is not\n", "> healthy: {Container hadoop-livy-sparkhistory is unhealthy: {Found\n", "> error properties: {Property: sparkhistory.readiness, Details: \u2018Timed\n", "> out getting health status after 5000 milliseconds.\u2019}}}}}: unhealthy\n", "> Pod sparkhead-0 is not healthy: {Container hadoop-livy-sparkhistory is\n", "> unhealthy: {Found error properties: {Property: sparkhistory.readiness,\n", "> Details: \u2018Timed out getting health status after 5000\n", "> milliseconds.\u2019}}}: unhealthy spark: unhealthy\" StatefulSet sparkhead\n", "> is not healthy: {{Pod sparkhead-0 is not healthy: {Container\n", "> hadoop-livy-sparkhistory is unhealthy: {Found error properties:\n", "> {Property: sparkhistory.readiness, Details: \u2018Timed out getting health\n", "> status after 5000 milliseconds.\u2019}}}}}: unhealthy Pod sparkhead-0 is\n", "> not healthy: {Container hadoop-livy-sparkhistory is unhealthy: {Found\n", "> error properties: {Property: sparkhistory.readiness, Details: \u2018Timed\n", "> out getting health status after 5000 milliseconds.\u2019}}}: unhealthy\n", "\n", "It can be a useful diagnosis step to understand what the Curl response\n", "time is from the `controller` pod to the `sparkhead` pod.\n", "\n", "Steps\n", "-----\n", "\n", "### Common functions\n", "\n", "Define helper functions used in this notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide_input" ] }, "outputs": [], "source": [ "# Define `run` function for transient fault handling, suggestions on error, and scrolling updates on Windows\n", "import sys\n", "import os\n", "import re\n", "import json\n", "import platform\n", "import shlex\n", "import shutil\n", "import datetime\n", "\n", "from subprocess import Popen, PIPE\n", "from IPython.display import Markdown\n", "\n", "retry_hints = {} # Output in stderr known to be transient, therefore automatically retry\n", "error_hints = {} # Output in stderr where a known SOP/TSG exists which will be HINTed for further help\n", "install_hint = {} # The SOP to help install the executable if it cannot be found\n", "\n", "first_run = True\n", "rules = None\n", "debug_logging = False\n", "\n", "def run(cmd, return_output=False, no_output=False, retry_count=0):\n", " \"\"\"Run shell command, stream stdout, print stderr and optionally return output\n", "\n", " NOTES:\n", "\n", " 1. Commands that need this kind of ' quoting on Windows e.g.:\n", "\n", " kubectl get nodes -o jsonpath={.items[?(@.metadata.annotations.pv-candidate=='data-pool')].metadata.name}\n", "\n", " Need to actually pass in as '\"':\n", "\n", " kubectl get nodes -o jsonpath={.items[?(@.metadata.annotations.pv-candidate=='\"'data-pool'\"')].metadata.name}\n", "\n", " The ' quote approach, although correct when pasting into Windows cmd, will hang at the line:\n", " \n", " `iter(p.stdout.readline, b'')`\n", "\n", " The shlex.split call does the right thing for each platform, just use the '\"' pattern for a '\n", " \"\"\"\n", " MAX_RETRIES = 5\n", " output = \"\"\n", " retry = False\n", "\n", " global first_run\n", " global rules\n", "\n", " if first_run:\n", " first_run = False\n", " rules = load_rules()\n", "\n", " # When running `azdata sql query` on Windows, replace any \\n in \"\"\" strings, with \" \", otherwise we see:\n", " #\n", " # ('HY090', '[HY090] [Microsoft][ODBC Driver Manager] Invalid string or buffer length (0) (SQLExecDirectW)')\n", " #\n", " if platform.system() == \"Windows\" and cmd.startswith(\"azdata sql query\"):\n", " cmd = cmd.replace(\"\\n\", \" \")\n", "\n", " # shlex.split is required on bash and for Windows paths with spaces\n", " #\n", " cmd_actual = shlex.split(cmd)\n", "\n", " # Store this (i.e. kubectl, python etc.) to support binary context aware error_hints and retries\n", " #\n", " user_provided_exe_name = cmd_actual[0].lower()\n", "\n", " # When running python, use the python in the ADS sandbox ({sys.executable})\n", " #\n", " if cmd.startswith(\"python \"):\n", " cmd_actual[0] = cmd_actual[0].replace(\"python\", sys.executable)\n", "\n", " # On Mac, when ADS is not launched from terminal, LC_ALL may not be set, which causes pip installs to fail\n", " # with:\n", " #\n", " # UnicodeDecodeError: 'ascii' codec can't decode byte 0xc5 in position 4969: ordinal not in range(128)\n", " #\n", " # Setting it to a default value of \"en_US.UTF-8\" enables pip install to complete\n", " #\n", " if platform.system() == \"Darwin\" and \"LC_ALL\" not in os.environ:\n", " os.environ[\"LC_ALL\"] = \"en_US.UTF-8\"\n", "\n", " # When running `kubectl`, if AZDATA_OPENSHIFT is set, use `oc`\n", " #\n", " if cmd.startswith(\"kubectl \") and \"AZDATA_OPENSHIFT\" in os.environ:\n", " cmd_actual[0] = cmd_actual[0].replace(\"kubectl\", \"oc\")\n", "\n", " # To aid supportabilty, determine which binary file will actually be executed on the machine\n", " #\n", " which_binary = None\n", "\n", " # Special case for CURL on Windows. The version of CURL in Windows System32 does not work to\n", " # get JWT tokens, it returns \"(56) Failure when receiving data from the peer\". If another instance\n", " # of CURL exists on the machine use that one. (Unfortunately the curl.exe in System32 is almost\n", " # always the first curl.exe in the path, and it can't be uninstalled from System32, so here we\n", " # look for the 2nd installation of CURL in the path)\n", " if platform.system() == \"Windows\" and cmd.startswith(\"curl \"):\n", " path = os.getenv('PATH')\n", " for p in path.split(os.path.pathsep):\n", " p = os.path.join(p, \"curl.exe\")\n", " if os.path.exists(p) and os.access(p, os.X_OK):\n", " if p.lower().find(\"system32\") == -1:\n", " cmd_actual[0] = p\n", " which_binary = p\n", " break\n", "\n", " # Find the path based location (shutil.which) of the executable that will be run (and display it to aid supportability), this\n", " # seems to be required for .msi installs of azdata.cmd/az.cmd. (otherwise Popen returns FileNotFound) \n", " #\n", " # NOTE: Bash needs cmd to be the list of the space separated values hence shlex.split.\n", " #\n", " if which_binary == None:\n", " which_binary = shutil.which(cmd_actual[0])\n", "\n", " if which_binary == None:\n", " if user_provided_exe_name in install_hint and install_hint[user_provided_exe_name] is not None:\n", " display(Markdown(f'HINT: Use [{install_hint[user_provided_exe_name][0]}]({install_hint[user_provided_exe_name][1]}) to resolve this issue.'))\n", "\n", " raise FileNotFoundError(f\"Executable '{cmd_actual[0]}' not found in path (where/which)\")\n", " else: \n", " cmd_actual[0] = which_binary\n", "\n", " start_time = datetime.datetime.now().replace(microsecond=0)\n", "\n", " print(f\"START: {cmd} @ {start_time} ({datetime.datetime.utcnow().replace(microsecond=0)} UTC)\")\n", " print(f\" using: {which_binary} ({platform.system()} {platform.release()} on {platform.machine()})\")\n", " print(f\" cwd: {os.getcwd()}\")\n", "\n", " # Command-line tools such as CURL and AZDATA HDFS commands output\n", " # scrolling progress bars, which causes Jupyter to hang forever, to\n", " # workaround this, use no_output=True\n", " #\n", "\n", " # Work around a infinite hang when a notebook generates a non-zero return code, break out, and do not wait\n", " #\n", " wait = True \n", "\n", " try:\n", " if no_output:\n", " p = Popen(cmd_actual)\n", " else:\n", " p = Popen(cmd_actual, stdout=PIPE, stderr=PIPE, bufsize=1)\n", " with p.stdout:\n", " for line in iter(p.stdout.readline, b''):\n", " line = line.decode()\n", " if return_output:\n", " output = output + line\n", " else:\n", " if cmd.startswith(\"azdata notebook run\"): # Hyperlink the .ipynb file\n", " regex = re.compile(' \"(.*)\"\\: \"(.*)\"') \n", " match = regex.match(line)\n", " if match:\n", " if match.group(1).find(\"HTML\") != -1:\n", " display(Markdown(f' - \"{match.group(1)}\": \"{match.group(2)}\"'))\n", " else:\n", " display(Markdown(f' - \"{match.group(1)}\": \"[{match.group(2)}]({match.group(2)})\"'))\n", "\n", " wait = False\n", " break # otherwise infinite hang, have not worked out why yet.\n", " else:\n", " print(line, end='')\n", " if rules is not None:\n", " apply_expert_rules(line)\n", "\n", " if wait:\n", " p.wait()\n", " except FileNotFoundError as e:\n", " if install_hint is not None:\n", " display(Markdown(f'HINT: Use {install_hint} to resolve this issue.'))\n", "\n", " raise FileNotFoundError(f\"Executable '{cmd_actual[0]}' not found in path (where/which)\") from e\n", "\n", " exit_code_workaround = 0 # WORKAROUND: azdata hangs on exception from notebook on p.wait()\n", "\n", " if not no_output:\n", " for line in iter(p.stderr.readline, b''):\n", " try:\n", " line_decoded = line.decode()\n", " except UnicodeDecodeError:\n", " # NOTE: Sometimes we get characters back that cannot be decoded(), e.g.\n", " #\n", " # \\xa0\n", " #\n", " # For example see this in the response from `az group create`:\n", " #\n", " # ERROR: Get Token request returned http error: 400 and server \n", " # response: {\"error\":\"invalid_grant\",# \"error_description\":\"AADSTS700082: \n", " # The refresh token has expired due to inactivity.\\xa0The token was \n", " # issued on 2018-10-25T23:35:11.9832872Z\n", " #\n", " # which generates the exception:\n", " #\n", " # UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa0 in position 179: invalid start byte\n", " #\n", " print(\"WARNING: Unable to decode stderr line, printing raw bytes:\")\n", " print(line)\n", " line_decoded = \"\"\n", " pass\n", " else:\n", "\n", " # azdata emits a single empty line to stderr when doing an hdfs cp, don't\n", " # print this empty \"ERR:\" as it confuses.\n", " #\n", " if line_decoded == \"\":\n", " continue\n", " \n", " print(f\"STDERR: {line_decoded}\", end='')\n", "\n", " if line_decoded.startswith(\"An exception has occurred\") or line_decoded.startswith(\"ERROR: An error occurred while executing the following cell\"):\n", " exit_code_workaround = 1\n", "\n", " # inject HINTs to next TSG/SOP based on output in stderr\n", " #\n", " if user_provided_exe_name in error_hints:\n", " for error_hint in error_hints[user_provided_exe_name]:\n", " if line_decoded.find(error_hint[0]) != -1:\n", " display(Markdown(f'HINT: Use [{error_hint[1]}]({error_hint[2]}) to resolve this issue.'))\n", "\n", " # apply expert rules (to run follow-on notebooks), based on output\n", " #\n", " if rules is not None:\n", " apply_expert_rules(line_decoded)\n", "\n", " # Verify if a transient error, if so automatically retry (recursive)\n", " #\n", " if user_provided_exe_name in retry_hints:\n", " for retry_hint in retry_hints[user_provided_exe_name]:\n", " if line_decoded.find(retry_hint) != -1:\n", " if retry_count < MAX_RETRIES:\n", " print(f\"RETRY: {retry_count} (due to: {retry_hint})\")\n", " retry_count = retry_count + 1\n", " output = run(cmd, return_output=return_output, retry_count=retry_count)\n", "\n", " if return_output:\n", " return output\n", " else:\n", " return\n", "\n", " elapsed = datetime.datetime.now().replace(microsecond=0) - start_time\n", "\n", " # WORKAROUND: We avoid infinite hang above in the `azdata notebook run` failure case, by inferring success (from stdout output), so\n", " # don't wait here, if success known above\n", " #\n", " if wait: \n", " if p.returncode != 0:\n", " raise SystemExit(f'Shell command:\\n\\n\\t{cmd} ({elapsed}s elapsed)\\n\\nreturned non-zero exit code: {str(p.returncode)}.\\n')\n", " else:\n", " if exit_code_workaround !=0 :\n", " raise SystemExit(f'Shell command:\\n\\n\\t{cmd} ({elapsed}s elapsed)\\n\\nreturned non-zero exit code: {str(exit_code_workaround)}.\\n')\n", "\n", " print(f'\\nSUCCESS: {elapsed}s elapsed.\\n')\n", "\n", " if return_output:\n", " return output\n", "\n", "def load_json(filename):\n", " \"\"\"Load a json file from disk and return the contents\"\"\"\n", "\n", " with open(filename, encoding=\"utf8\") as json_file:\n", " return json.load(json_file)\n", "\n", "def load_rules():\n", " \"\"\"Load any 'expert rules' from the metadata of this notebook (.ipynb) that should be applied to the stderr of the running executable\"\"\"\n", "\n", " try:\n", "\n", " # Load this notebook as json to get access to the expert rules in the notebook metadata.\n", " #\n", " j = load_json(\"tsg055-time-curl-to-sparkhead.ipynb\")\n", "\n", " except:\n", " pass # If the user has renamed the book, we can't load ourself. NOTE: Is there a way in Jupyter, to know your own filename?\n", "\n", " else:\n", " if \"metadata\" in j and \\\n", " \"azdata\" in j[\"metadata\"] and \\\n", " \"expert\" in j[\"metadata\"][\"azdata\"] and \\\n", " \"rules\" in j[\"metadata\"][\"azdata\"][\"expert\"]:\n", "\n", " rules = j[\"metadata\"][\"azdata\"][\"expert\"][\"rules\"]\n", "\n", " rules.sort() # Sort rules, so they run in priority order (the [0] element). Lowest value first.\n", "\n", " # print (f\"EXPERT: There are {len(rules)} rules to evaluate.\")\n", "\n", " return rules\n", "\n", "def apply_expert_rules(line):\n", " \"\"\"Determine if the stderr line passed in, matches the regular expressions for any of the 'expert rules', if so\n", " inject a 'HINT' to the follow-on SOP/TSG to run\"\"\"\n", "\n", " global rules\n", "\n", " for rule in rules:\n", "\n", " # rules that have 9 elements are the injected (output) rules (the ones we want). Rules\n", " # with only 8 elements are the source (input) rules, which are not expanded (i.e. TSG029,\n", " # not ../repair/tsg029-nb-name.ipynb)\n", " if len(rule) == 9:\n", " notebook = rule[1]\n", " cell_type = rule[2]\n", " output_type = rule[3] # i.e. stream or error\n", " output_type_name = rule[4] # i.e. ename or name \n", " output_type_value = rule[5] # i.e. SystemExit or stdout\n", " details_name = rule[6] # i.e. evalue or text \n", " expression = rule[7].replace(\"\\\\*\", \"*\") # Something escaped *, and put a \\ in front of it!\n", "\n", " if debug_logging:\n", " print(f\"EXPERT: If rule '{expression}' satisfied', run '{notebook}'.\")\n", "\n", " if re.match(expression, line, re.DOTALL):\n", "\n", " if debug_logging:\n", " print(\"EXPERT: MATCH: name = value: '{0}' = '{1}' matched expression '{2}', therefore HINT '{4}'\".format(output_type_name, output_type_value, expression, notebook))\n", "\n", " match_found = True\n", "\n", " display(Markdown(f'HINT: Use [{notebook}]({notebook}) to resolve this issue.'))\n", "\n", "\n", "\n", "print('Common functions defined successfully.')\n", "\n", "# Hints for binary (transient fault) retry, (known) error and install guide\n", "#\n", "retry_hints = {'kubectl': ['A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond']}\n", "error_hints = {'kubectl': [['no such host', 'TSG010 - Get configuration contexts', '../monitor-k8s/tsg010-get-kubernetes-contexts.ipynb'], ['No connection could be made because the target machine actively refused it', 'TSG056 - Kubectl fails with No connection could be made because the target machine actively refused it', '../repair/tsg056-kubectl-no-connection-could-be-made.ipynb']]}\n", "install_hint = {'kubectl': ['SOP036 - Install kubectl command line interface', '../install/sop036-install-kubectl.ipynb']}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Instantiate Kubernetes client" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide_input" ] }, "outputs": [], "source": [ "# Instantiate the Python Kubernetes client into 'api' variable\n", "\n", "import os\n", "\n", "try:\n", " from kubernetes import client, config\n", " from kubernetes.stream import stream\n", "\n", " if \"KUBERNETES_SERVICE_PORT\" in os.environ and \"KUBERNETES_SERVICE_HOST\" in os.environ:\n", " config.load_incluster_config()\n", " else:\n", " try:\n", " config.load_kube_config()\n", " except:\n", " display(Markdown(f'HINT: Use [TSG118 - Configure Kubernetes config](../repair/tsg118-configure-kube-config.ipynb) to resolve this issue.'))\n", " raise\n", " api = client.CoreV1Api()\n", "\n", " print('Kubernetes client instantiated')\n", "except ImportError:\n", " from IPython.display import Markdown\n", " display(Markdown(f'HINT: Use [SOP059 - Install Kubernetes Python module](../install/sop059-install-kubernetes-module.ipynb) to resolve this issue.'))\n", " raise" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get the namespace for the big data cluster\n", "\n", "Get the namespace of the Big Data Cluster from the Kuberenetes API.\n", "\n", "**NOTE:**\n", "\n", "If there is more than one Big Data Cluster in the target Kubernetes\n", "cluster, then either:\n", "\n", "- set \\[0\\] to the correct value for the big data cluster.\n", "- set the environment variable AZDATA\\_NAMESPACE, before starting\n", " Azure Data Studio." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide_input" ] }, "outputs": [], "source": [ "# Place Kubernetes namespace name for BDC into 'namespace' variable\n", "\n", "if \"AZDATA_NAMESPACE\" in os.environ:\n", " namespace = os.environ[\"AZDATA_NAMESPACE\"]\n", "else:\n", " try:\n", " namespace = api.list_namespace(label_selector='MSSQL_CLUSTER').items[0].metadata.name\n", " except IndexError:\n", " from IPython.display import Markdown\n", " display(Markdown(f'HINT: Use [TSG081 - Get namespaces (Kubernetes)](../monitor-k8s/tsg081-get-kubernetes-namespaces.ipynb) to resolve this issue.'))\n", " display(Markdown(f'HINT: Use [TSG010 - Get configuration contexts](../monitor-k8s/tsg010-get-kubernetes-contexts.ipynb) to resolve this issue.'))\n", " display(Markdown(f'HINT: Use [SOP011 - Set kubernetes configuration context](../common/sop011-set-kubernetes-context.ipynb) to resolve this issue.'))\n", " raise\n", "\n", "print('The kubernetes namespace for your big data cluster is: ' + namespace)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get name of the \u2018Running\u2019 `controller` `pod`" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide_input" ] }, "outputs": [], "source": [ "# Place the name of the 'Running' controller pod in variable `controller`\n", "\n", "controller = run(f'kubectl get pod --selector=app=controller -n {namespace} -o jsonpath={{.items[0].metadata.name}} --field-selector=status.phase=Running', return_output=True)\n", "\n", "print(f\"Controller pod name: {controller}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Time `curl` in `controller` `pod` to `sparkhead`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "run(f'kubectl exec {controller} -n {namespace} -c controller -- bash -c \"time curl --cacert /run/secrets/certificates/rootca/cluster-ca-certificate.crt https://sparkhead-svc:18480\"')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('Notebook execution complete.')" ] } ], "nbformat": 4, "nbformat_minor": 5, "metadata": { "kernelspec": { "name": "python3", "display_name": "Python 3" }, "azdata": { "side_effects": false, "expert": { "rules": [ [ "TSG078", "code", "stream", "name", "stdout", "text", ".*StatefulSet sparkhead is not healthy.*Timed out getting health status" ] ] } } } }