{"cells":[{"cell_type":"markdown","execution_count":null,"metadata":{"tags":[]},"outputs":[],"source":["SAM001a - Query Storage Pool from SQL Server Master Pool (1 of 3) - Load sample data\n","====================================================================================\n","\n","Description\n","-----------\n","\n","In this 3 part tutorial, load data into the Storage Pool (HDFS) using\n","`azdata`, convert it into Parquet (using Spark) and the in the 3rd part,\n","query the data using the Master Pool (SQL Server)\n","\n","### Common functions\n","\n","Define helper functions used in this notebook."]},{"cell_type":"code","execution_count":null,"metadata":{"tags":["hide_input"]},"outputs":[],"source":["# Define `run` function for transient fault handling, suggestions on error, and scrolling updates on Windows\n","import sys\n","import os\n","import re\n","import platform\n","import shlex\n","import shutil\n","import datetime\n","\n","from subprocess import Popen, PIPE\n","from IPython.display import Markdown\n","\n","retry_hints = {} # Output in stderr known to be transient, therefore automatically retry\n","error_hints = {} # Output in stderr where a known SOP/TSG exists which will be HINTed for further help\n","install_hint = {} # The SOP to help install the executable if it cannot be found\n","\n","def run(cmd, return_output=False, no_output=False, retry_count=0, base64_decode=False, return_as_json=False, regex_mask=None):\n"," \"\"\"Run shell command, stream stdout, print stderr and optionally return output\n","\n"," NOTES:\n","\n"," 1. Commands that need this kind of ' quoting on Windows e.g.:\n","\n"," kubectl get nodes -o jsonpath={.items[?(@.metadata.annotations.pv-candidate=='data-pool')].metadata.name}\n","\n"," Need to actually pass in as '\"':\n","\n"," kubectl get nodes -o jsonpath={.items[?(@.metadata.annotations.pv-candidate=='\"'data-pool'\"')].metadata.name}\n","\n"," The ' quote approach, although correct when pasting into Windows cmd, will hang at the line:\n"," \n"," `iter(p.stdout.readline, b'')`\n","\n"," The shlex.split call does the right thing for each platform, just use the '\"' pattern for a '\n"," \"\"\"\n"," MAX_RETRIES = 5\n"," output = \"\"\n"," retry = False\n","\n"," # When running `azdata sql query` on Windows, replace any \\n in \"\"\" strings, with \" \", otherwise we see:\n"," #\n"," # ('HY090', '[HY090] [Microsoft][ODBC Driver Manager] Invalid string or buffer length (0) (SQLExecDirectW)')\n"," #\n"," if platform.system() == \"Windows\" and cmd.startswith(\"azdata sql query\"):\n"," cmd = cmd.replace(\"\\n\", \" \")\n","\n"," # shlex.split is required on bash and for Windows paths with spaces\n"," #\n"," cmd_actual = shlex.split(cmd)\n","\n"," # Store this (i.e. kubectl, python etc.) to support binary context aware error_hints and retries\n"," #\n"," user_provided_exe_name = cmd_actual[0].lower()\n","\n"," # When running python, use the python in the ADS sandbox ({sys.executable})\n"," #\n"," if cmd.startswith(\"python \"):\n"," cmd_actual[0] = cmd_actual[0].replace(\"python\", sys.executable)\n","\n"," # On Mac, when ADS is not launched from terminal, LC_ALL may not be set, which causes pip installs to fail\n"," # with:\n"," #\n"," # UnicodeDecodeError: 'ascii' codec can't decode byte 0xc5 in position 4969: ordinal not in range(128)\n"," #\n"," # Setting it to a default value of \"en_US.UTF-8\" enables pip install to complete\n"," #\n"," if platform.system() == \"Darwin\" and \"LC_ALL\" not in os.environ:\n"," os.environ[\"LC_ALL\"] = \"en_US.UTF-8\"\n","\n"," # When running `kubectl`, if AZDATA_OPENSHIFT is set, use `oc`\n"," #\n"," if cmd.startswith(\"kubectl \") and \"AZDATA_OPENSHIFT\" in os.environ:\n"," cmd_actual[0] = cmd_actual[0].replace(\"kubectl\", \"oc\")\n","\n"," # To aid supportability, determine which binary file will actually be executed on the machine\n"," #\n"," which_binary = None\n","\n"," # Special case for CURL on Windows. The version of CURL in Windows System32 does not work to\n"," # get JWT tokens, it returns \"(56) Failure when receiving data from the peer\". If another instance\n"," # of CURL exists on the machine use that one. (Unfortunately the curl.exe in System32 is almost\n"," # always the first curl.exe in the path, and it can't be uninstalled from System32, so here we\n"," # look for the 2nd installation of CURL in the path)\n"," if platform.system() == \"Windows\" and cmd.startswith(\"curl \"):\n"," path = os.getenv('PATH')\n"," for p in path.split(os.path.pathsep):\n"," p = os.path.join(p, \"curl.exe\")\n"," if os.path.exists(p) and os.access(p, os.X_OK):\n"," if p.lower().find(\"system32\") == -1:\n"," cmd_actual[0] = p\n"," which_binary = p\n"," break\n","\n"," # Find the path based location (shutil.which) of the executable that will be run (and display it to aid supportability), this\n"," # seems to be required for .msi installs of azdata.cmd/az.cmd. (otherwise Popen returns FileNotFound) \n"," #\n"," # NOTE: Bash needs cmd to be the list of the space separated values hence shlex.split.\n"," #\n"," if which_binary == None:\n"," which_binary = shutil.which(cmd_actual[0])\n","\n"," # Display an install HINT, so the user can click on a SOP to install the missing binary\n"," #\n"," if which_binary == None:\n"," print(f\"The path used to search for '{cmd_actual[0]}' was:\")\n"," print(sys.path)\n","\n"," if user_provided_exe_name in install_hint and install_hint[user_provided_exe_name] is not None:\n"," display(Markdown(f'HINT: Use [{install_hint[user_provided_exe_name][0]}]({install_hint[user_provided_exe_name][1]}) to resolve this issue.'))\n","\n"," raise FileNotFoundError(f\"Executable '{cmd_actual[0]}' not found in path (where/which)\")\n"," else: \n"," cmd_actual[0] = which_binary\n","\n"," start_time = datetime.datetime.now().replace(microsecond=0)\n","\n"," cmd_display = cmd\n"," if regex_mask is not None:\n"," regex = re.compile(regex_mask)\n"," cmd_display = re.sub(regex, '******', cmd)\n"," \n"," print(f\"START: {cmd_display} @ {start_time} ({datetime.datetime.utcnow().replace(microsecond=0)} UTC)\")\n"," print(f\" using: {which_binary} ({platform.system()} {platform.release()} on {platform.machine()})\")\n"," print(f\" cwd: {os.getcwd()}\")\n","\n"," # Command-line tools such as CURL and AZDATA HDFS commands output\n"," # scrolling progress bars, which causes Jupyter to hang forever, to\n"," # workaround this, use no_output=True\n"," #\n","\n"," # Work around a infinite hang when a notebook generates a non-zero return code, break out, and do not wait\n"," #\n"," wait = True \n","\n"," try:\n"," if no_output:\n"," p = Popen(cmd_actual)\n"," else:\n"," p = Popen(cmd_actual, stdout=PIPE, stderr=PIPE, bufsize=1)\n"," with p.stdout:\n"," for line in iter(p.stdout.readline, b''):\n"," line = line.decode()\n"," if return_output:\n"," output = output + line\n"," else:\n"," if cmd.startswith(\"azdata notebook run\"): # Hyperlink the .ipynb file\n"," regex = re.compile(' \"(.*)\"\\: \"(.*)\"') \n"," match = regex.match(line)\n"," if match:\n"," if match.group(1).find(\"HTML\") != -1:\n"," display(Markdown(f' - \"{match.group(1)}\": \"{match.group(2)}\"'))\n"," else:\n"," display(Markdown(f' - \"{match.group(1)}\": \"[{match.group(2)}]({match.group(2)})\"'))\n","\n"," wait = False\n"," break # otherwise infinite hang, have not worked out why yet.\n"," else:\n"," print(line, end='')\n","\n"," if wait:\n"," p.wait()\n"," except FileNotFoundError as e:\n"," if install_hint is not None:\n"," display(Markdown(f'HINT: Use {install_hint} to resolve this issue.'))\n","\n"," raise FileNotFoundError(f\"Executable '{cmd_actual[0]}' not found in path (where/which)\") from e\n","\n"," exit_code_workaround = 0 # WORKAROUND: azdata hangs on exception from notebook on p.wait()\n","\n"," if not no_output:\n"," for line in iter(p.stderr.readline, b''):\n"," try:\n"," line_decoded = line.decode()\n"," except UnicodeDecodeError:\n"," # NOTE: Sometimes we get characters back that cannot be decoded(), e.g.\n"," #\n"," # \\xa0\n"," #\n"," # For example see this in the response from `az group create`:\n"," #\n"," # ERROR: Get Token request returned http error: 400 and server \n"," # response: {\"error\":\"invalid_grant\",# \"error_description\":\"AADSTS700082: \n"," # The refresh token has expired due to inactivity.\\xa0The token was \n"," # issued on 2018-10-25T23:35:11.9832872Z\n"," #\n"," # which generates the exception:\n"," #\n"," # UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa0 in position 179: invalid start byte\n"," #\n"," print(\"WARNING: Unable to decode stderr line, printing raw bytes:\")\n"," print(line)\n"," line_decoded = \"\"\n"," pass\n"," else:\n","\n"," # azdata emits a single empty line to stderr when doing an hdfs cp, don't\n"," # print this empty \"ERR:\" as it confuses.\n"," #\n"," if line_decoded == \"\":\n"," continue\n"," \n"," print(f\"STDERR: {line_decoded}\", end='')\n","\n"," if line_decoded.startswith(\"An exception has occurred\") or line_decoded.startswith(\"ERROR: An error occurred while executing the following cell\"):\n"," exit_code_workaround = 1\n","\n"," # inject HINTs to next TSG/SOP based on output in stderr\n"," #\n"," if user_provided_exe_name in error_hints:\n"," for error_hint in error_hints[user_provided_exe_name]:\n"," if line_decoded.find(error_hint[0]) != -1:\n"," display(Markdown(f'HINT: Use [{error_hint[1]}]({error_hint[2]}) to resolve this issue.'))\n","\n"," # Verify if a transient error, if so automatically retry (recursive)\n"," #\n"," if user_provided_exe_name in retry_hints:\n"," for retry_hint in retry_hints[user_provided_exe_name]:\n"," if line_decoded.find(retry_hint) != -1:\n"," if retry_count \u003c MAX_RETRIES:\n"," print(f\"RETRY: {retry_count} (due to: {retry_hint})\")\n"," retry_count = retry_count + 1\n"," output = run(cmd, return_output=return_output, retry_count=retry_count)\n","\n"," if return_output:\n"," if base64_decode:\n"," import base64\n"," return base64.b64decode(output).decode('utf-8')\n"," else:\n"," return output\n","\n"," elapsed = datetime.datetime.now().replace(microsecond=0) - start_time\n","\n"," # WORKAROUND: We avoid infinite hang above in the `azdata notebook run` failure case, by inferring success (from stdout output), so\n"," # don't wait here, if success known above\n"," #\n"," if wait: \n"," if p.returncode != 0:\n"," raise SystemExit(f'Shell command:\\n\\n\\t{cmd_display} ({elapsed}s elapsed)\\n\\nreturned non-zero exit code: {str(p.returncode)}.\\n')\n"," else:\n"," if exit_code_workaround !=0 :\n"," raise SystemExit(f'Shell command:\\n\\n\\t{cmd_display} ({elapsed}s elapsed)\\n\\nreturned non-zero exit code: {str(exit_code_workaround)}.\\n')\n","\n"," print(f'\\nSUCCESS: {elapsed}s elapsed.\\n')\n","\n"," if return_output:\n"," if base64_decode:\n"," import base64\n"," return base64.b64decode(output).decode('utf-8')\n"," else:\n"," return output\n","\n","\n","\n","# Hints for tool retry (on transient fault), known errors and install guide\n","#\n","retry_hints = {'kubectl': ['A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond', ], 'python': [ ], 'azdata': ['Endpoint sql-server-master does not exist', 'Endpoint livy does not exist', 'Failed to get state for cluster', 'Endpoint webhdfs does not exist', 'Adaptive Server is unavailable or does not exist', 'Error: Address already in use', 'Login timeout expired (0) (SQLDriverConnect)', 'SSPI Provider: No Kerberos credentials available', ], }\n","error_hints = {'kubectl': [['no such host', 'TSG010 - Get configuration contexts', '../monitor-k8s/tsg010-get-kubernetes-contexts.ipynb'], ['No connection could be made because the target machine actively refused it', 'TSG056 - Kubectl fails with No connection could be made because the target machine actively refused it', '../repair/tsg056-kubectl-no-connection-could-be-made.ipynb'], ], 'python': [['Library not loaded: /usr/local/opt/unixodbc', 'SOP012 - Install unixodbc for Mac', '../install/sop012-brew-install-odbc-for-sql-server.ipynb'], ['WARNING: You are using pip version', 'SOP040 - Upgrade pip in ADS Python sandbox', '../install/sop040-upgrade-pip.ipynb'], ], 'azdata': [['Please run \\'azdata login\\' to first authenticate', 'SOP028 - azdata login', '../common/sop028-azdata-login.ipynb'], ['The token is expired', 'SOP028 - azdata login', '../common/sop028-azdata-login.ipynb'], ['Reason: Unauthorized', 'SOP028 - azdata login', '../common/sop028-azdata-login.ipynb'], ['Max retries exceeded with url: /api/v1/bdc/endpoints', 'SOP028 - azdata login', '../common/sop028-azdata-login.ipynb'], ['Look at the controller logs for more details', 'TSG027 - Observe cluster deployment', '../diagnose/tsg027-observe-bdc-create.ipynb'], ['provided port is already allocated', 'TSG062 - Get tail of all previous container logs for pods in BDC namespace', '../log-files/tsg062-tail-bdc-previous-container-logs.ipynb'], ['Create cluster failed since the existing namespace', 'SOP061 - Delete a big data cluster', '../install/sop061-delete-bdc.ipynb'], ['Failed to complete kube config setup', 'TSG067 - Failed to complete kube config setup', '../repair/tsg067-failed-to-complete-kube-config-setup.ipynb'], ['Data source name not found and no default driver specified', 'SOP069 - Install ODBC for SQL Server', '../install/sop069-install-odbc-driver-for-sql-server.ipynb'], ['Can\\'t open lib \\'ODBC Driver 17 for SQL Server', 'SOP069 - Install ODBC for SQL Server', '../install/sop069-install-odbc-driver-for-sql-server.ipynb'], ['Control plane upgrade failed. Failed to upgrade controller.', 'TSG108 - View the controller upgrade config map', '../diagnose/tsg108-controller-failed-to-upgrade.ipynb'], ['NameError: name \\'azdata_login_secret_name\\' is not defined', 'SOP013 - Create secret for azdata login (inside cluster)', '../common/sop013-create-secret-for-azdata-login.ipynb'], ['ERROR: No credentials were supplied, or the credentials were unavailable or inaccessible.', 'TSG124 - \\'No credentials were supplied\\' error from azdata login', '../repair/tsg124-no-credentials-were-supplied.ipynb'], ['Please accept the license terms to use this product through', 'TSG126 - azdata fails with \\'accept the license terms to use this product\\'', '../repair/tsg126-accept-license-terms.ipynb'], ], }\n","install_hint = {'kubectl': [ 'SOP036 - Install kubectl command line interface', '../install/sop036-install-kubectl.ipynb' ], 'azdata': [ 'SOP063 - Install azdata CLI (using package manager)', '../install/sop063-packman-install-azdata.ipynb' ], }\n","\n","\n","print('Common functions defined successfully.')"]},{"cell_type":"markdown","execution_count":null,"metadata":{"tags":[]},"outputs":[],"source":["### Get the Kubernetes namespace for the big data cluster\n","\n","Get the namespace of the Big Data Cluster use the kubectl command line\n","interface .\n","\n","**NOTE:**\n","\n","If there is more than one Big Data Cluster in the target Kubernetes\n","cluster, then either:\n","\n","- set \\[0\\] to the correct value for the big data cluster.\n","- set the environment variable AZDATA\\_NAMESPACE, before starting\n"," Azure Data Studio."]},{"cell_type":"code","execution_count":null,"metadata":{"tags":["hide_input"]},"outputs":[],"source":["# Place Kubernetes namespace name for BDC into 'namespace' variable\n","\n","if \"AZDATA_NAMESPACE\" in os.environ:\n"," namespace = os.environ[\"AZDATA_NAMESPACE\"]\n","else:\n"," try:\n"," namespace = run(f'kubectl get namespace --selector=MSSQL_CLUSTER -o jsonpath={{.items[0].metadata.name}}', return_output=True)\n"," except:\n"," from IPython.display import Markdown\n"," print(f\"ERROR: Unable to find a Kubernetes namespace with label 'MSSQL_CLUSTER'. SQL Server Big Data Cluster Kubernetes namespaces contain the label 'MSSQL_CLUSTER'.\")\n"," display(Markdown(f'HINT: Use [TSG081 - Get namespaces (Kubernetes)](../monitor-k8s/tsg081-get-kubernetes-namespaces.ipynb) to resolve this issue.'))\n"," display(Markdown(f'HINT: Use [TSG010 - Get configuration contexts](../monitor-k8s/tsg010-get-kubernetes-contexts.ipynb) to resolve this issue.'))\n"," display(Markdown(f'HINT: Use [SOP011 - Set kubernetes configuration context](../common/sop011-set-kubernetes-context.ipynb) to resolve this issue.'))\n"," raise\n","\n","print(f'The SQL Server Big Data Cluster Kubernetes namespace is: {namespace}')"]},{"cell_type":"markdown","execution_count":null,"metadata":{"tags":[]},"outputs":[],"source":["### Create a temporary directory to stage files"]},{"cell_type":"code","execution_count":null,"metadata":{"tags":["hide_input"]},"outputs":[],"source":["# Create a temporary directory to hold configuration files\n","\n","import tempfile\n","\n","temp_dir = tempfile.mkdtemp()\n","\n","print(f\"Temporary directory created: {temp_dir}\")"]},{"cell_type":"markdown","execution_count":null,"metadata":{"tags":[]},"outputs":[],"source":["### Helper function to save configuration files to disk"]},{"cell_type":"code","execution_count":null,"metadata":{"tags":["hide_input"]},"outputs":[],"source":["# Define helper function 'save_file' to save configuration files to the temporary directory created above\n","import os\n","import io\n","\n","def save_file(filename, contents):\n"," with io.open(os.path.join(temp_dir, filename), \"w\", encoding='utf8', newline='\\n') as text_file:\n"," text_file.write(contents)\n","\n"," print(\"File saved: \" + os.path.join(temp_dir, filename))\n","\n","print(\"Function `save_file` defined successfully.\")"]},{"cell_type":"markdown","execution_count":null,"metadata":{"tags":[]},"outputs":[],"source":["### Get the controller username and password\n","\n","Get the controller username and password from the Kubernetes Secret\n","Store and place in the required AZDATA\\_USERNAME and AZDATA\\_PASSWORD\n","environment variables."]},{"cell_type":"code","execution_count":null,"metadata":{"tags":["hide_input"]},"outputs":[],"source":["# Place controller secret in AZDATA_USERNAME/AZDATA_PASSWORD environment variables\n","\n","import os, base64\n","\n","os.environ[\"AZDATA_USERNAME\"] = run(f'kubectl get secret/controller-login-secret -n {namespace} -o jsonpath={{.data.username}}', return_output=True, base64_decode=True)\n","os.environ[\"AZDATA_PASSWORD\"] = run(f'kubectl get secret/controller-login-secret -n {namespace} -o jsonpath={{.data.password}}', return_output=True, base64_decode=True)\n","\n","print(f\"Controller username '{os.environ['AZDATA_USERNAME']}' and password stored in environment variables\")"]},{"cell_type":"markdown","execution_count":null,"metadata":{"tags":[]},"outputs":[],"source":["### Steps\n","\n","Upload this data into HDFS."]},{"cell_type":"code","execution_count":null,"metadata":{"tags":[]},"outputs":[],"source":["import os\n","\n","items = [\n"," [1, \"Eldon Base for stackable storage shelf platinum\", \"Muhammed MacIntyre\", 3, -213.25, 38.94, 35, \"Nunavut\", \"Storage \u0026 Organization\", 0.8],\n"," [2, \"1.7 Cubic Foot Compact \"\"Cube\"\" Office Refrigerators\", \"Barry French\", 293, 457.81, 208.16, 68.02, \"Nunavut\", \"Appliances\", 0.58], \n"," [3, \"Cardinal Slant-D Ring Binder Heavy Gauge Vinyl\", \"Barry French\", 293,46.71, 8.69, 2.99, \"Nunavut\", \"Binders and Binder Accessories\", 0.39],\n"," [4, \"R380\", \"Clay Rozendal\", 483, 1198.97, 195.99, 3.99, \"Nunavut\", \"Telephones and Communication\", 0.58],\n"," [5, \"Holmes HEPA Air Purifier\", \"Carlos Soltero\", 515, 30.94, 21.78, 5.94, \"Nunavut\", \"Appliances\", 0.5],\n"," [6, \"G.E. Longer-Life Indoor Recessed Floodlight Bulbs\", \"Carlos Soltero\", 515, 4.43, 6.64, 4.95, \"Nunavut\", \"Office Furnishings\", 0.37],\n"," [7, \"Angle-D Binders with Locking Rings Label Holders\", \"Carl Jackson\", 613, -54.04, 7.3, 7.72, \"Nunavut\", \"Binders and Binder Accessories\", 0.38],\n"," [8, \"SAFCO Mobile Desk Side File Wire Frame\", \"Carl Jackson\", 613, 127.7, 42.76, 6.22, \"Nunavut\", \"Storage \u0026 Organization\", ],\n"," [9, \"SAFCO Commercial Wire Shelving Black\", \"Monica Federle\", 643, -695.26, 138.14, 35, \"Nunavut\", \"Storage \u0026 Organization\", ],\n"," [10, \"Xerox 198\", \"Dorothy Badders\", 678, -226.36, 4.98, 8.33, \"Nunavut\", \"Paper\", 0.38]\n","]\n","\n","src = os.path.join(temp_dir, \"items.csv\")\n","dest = \"/tmp/clickstream_data/datasampleCS.csv\"\n","\n","s = \"\"\n","for item in items:\n"," s = s + str(item)[1:-1] + \"\\n\"\n","\n","save_file(src, s)\n","\n","run(f'azdata bdc hdfs rm --path {dest}')\n","\n","src = src.replace(\"\\\\\", \"\\\\\\\\\")\n","\n","run(f'azdata bdc hdfs rm --path hdfs:{dest}')\n","run(f'azdata bdc hdfs cp --from-path {src} --to-path hdfs:{dest}')\n","\n","print (f\"CSV uploaded to HDFS: {dest}\")"]},{"cell_type":"markdown","execution_count":null,"metadata":{"tags":[]},"outputs":[],"source":["### Clean up temporary directory for staging configuration files"]},{"cell_type":"code","execution_count":null,"metadata":{"tags":["hide_input"]},"outputs":[],"source":["# Delete the temporary directory used to hold configuration files\n","\n","import shutil\n","\n","shutil.rmtree(temp_dir)\n","\n","print(f'Temporary directory deleted: {temp_dir}')"]},{"cell_type":"code","execution_count":null,"metadata":{"tags":[]},"outputs":[],"source":["print(\"Notebook execution is complete.\")"]}],"nbformat":4,"nbformat_minor":5,"metadata":{"kernelspec":{"name":"python3","display_name":"Python 3"},"pansop":{"related":"","test":{"strategy":"","types":null,"disable":{"reason":"","workitems":null,"types":null}},"target":{"current":"","final":""},"internal":{"parameters":null,"symlink":false},"timeout":"0"},"language_info":{"codemirror_mode":"{ Name: \"\", Version: \"\"}","file_extension":"","mimetype":"","name":"","nbconvert_exporter":"","pygments_lexer":"","version":""},"widgets":[]}}