Created
December 21, 2015 18:38
-
-
Save SamPenrose/f5243ebca8197ea474af to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"nbformat_minor": 0, "cells": [{"source": "# Two approaches to converting JSON-Schema to Parquet Schema", "cell_type": "markdown", "metadata": {}}, {"source": "Writing a function that will generate a Parquet schema (an instance of\npyspark.sql.types.StructType) from a JSON-Schema document is straightforward.\nGenerating an RDD for sqlContext to read into a Parquet schema requires an\nextra step. Below are two approaches. First, simple conversion code:", "cell_type": "markdown", "metadata": {}}, {"execution_count": 1, "cell_type": "code", "source": "from pyspark.sql import types as T # * masks e.g. types.StringType\nJS_Type2ParquetClass = {\n \"boolean\": T.BooleanType,\n \"float\": T.DoubleType,\n \"number\": T.LongType, # XXX ?\n \"string\": T.StringType,\n}\n\n\nclass ConversionError(ValueError):\n '''\n When is a type error not a TypeError?\n '''\n\n\ndef jsonschema2parquet(js_schema):\n '''\n Convert a JSONSchema blob into the equivalent Parquet data structure.\n\n We test only the parts of the blob that we need to return something\n plausible.\n '''\n js_top = js_schema.get('type')\n if js_top == 'object':\n fields = _object2parquet(js_schema)\n elif js_top == 'array':\n # XXX minItems, maxItems, additionalItems\n # XXX test for missing/empty items and raise ConversionError\n fields = _array2parquet(js_schema['items'])\n elif js_top is None:\n raise ConversionError(\"JSON-Schema is missing 'type' property\")\n else:\n raise ConversionError(\"Can't handle JSON-Schema type '%s'\" % js_top)\n fields.sort(key=lambda field: field.name)\n parquet_schema = T.StructType(fields)\n return parquet_schema\n\n\ndef _object2parquet(js_schema):\n required = js_schema.get('required', [])[:] # XXX test\n properties = js_schema.get('properties', {})\n parquet_list = []\n for property_name, js_item in properties.items():\n js_type = js_item['type']\n try:\n klass = JS_Type2ParquetClass[js_type]\n except KeyError:\n raise ConversionError(\"Can't convert JSON item: %s\" % js_item)\n nullable = True # not property_name in required\n field = T.StructField(property_name, klass(), nullable)\n parquet_list.append(field)\n return parquet_list", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 2, "cell_type": "code", "source": "ESS = {\n \"$schema\": \"http://json-schema.org/draft-04/schema#\",\n \"title\": \"Executive Summary\",\n \"description\": \"The executive summary derived stream\",\n \"type\": \"object\",\n \"properties\": {\n \"Hostname\": {\n \"type\": \"string\"\n },\n \"Timestamp\": {\n \"type\": \"number\"\n },\n \"Type\": {\n \"type\": \"string\"\n },\n \"activityTimestamp\": {\n \"type\": \"float\"\n },\n \"app\": {\n \"type\": \"string\"\n },\n \"bing\": {\n \"type\": \"number\"\n },\n \"buildId\": {\n \"type\": \"string\"\n },\n \"channel\": {\n \"type\": \"string\",\n \"enum\": [\"nightly\", \"aurora\", \"beta\", \"release\"]\n },\n \"clientId\": {\n \"type\": \"string\"\n },\n \"country\": {\n \"type\": \"string\"\n },\n \"default\": {\n \"type\": \"boolean\"\n },\n \"docType\": {\n \"type\": \"string\"\n },\n \"documentId\": {\n \"type\": \"string\"\n },\n \"google\": {\n \"type\": \"number\"\n },\n \"hours\": {\n \"type\": \"float\"\n },\n \"os\": {\n \"type\": \"string\"\n },\n \"osVersion\": {\n \"type\": \"string\"\n },\n \"other\": {\n \"type\": \"number\"\n },\n \"profileCreationTimestamp\": {\n \"type\": \"float\"\n },\n \"reason\": {\n \"type\": \"string\"\n },\n \"submissionDate\": {\n \"type\": \"string\"\n },\n \"vendor\": {\n \"type\": \"string\"\n },\n \"version\": {\n \"type\": \"string\"\n },\n \"yahoo\": {\n \"type\": \"number\"\n }\n },\n \"required\": [\"clientId\"]\n}\nfrom collections import OrderedDict\nkeys = sorted(ESS.keys())\nOESS = OrderedDict()\nfor k in keys:\n OESS[k] = ESS[k]", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 3, "cell_type": "code", "source": "from moztelemetry import get_records", "outputs": [{"output_type": "stream", "name": "stdout", "text": "Unable to parse whitelist (/home/hadoop/anaconda/lib/python2.7/site-packages/moztelemetry/bucket-whitelist.json). Assuming all histograms are acceptable.\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 4, "cell_type": "code", "source": "summaries = get_records(sc, 'telemetry-executive-summary', submissionDate=\"20151129\", fraction=0.01)\nhundred = summaries.take(100)\nhundred = [d['meta'] for d in hundred]", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "### The first option is to ingest stringified JSON.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 6, "cell_type": "code", "source": "parquet_schema = jsonschema2parquet(ESS)\ndataframe_via_json = sqlContext.jsonRDD(sc.parallelize(hundred).map(lambda d: json.dumps(d)), parquet_schema)", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "### The second option is to ingest lists sorted into a known ordering", "cell_type": "markdown", "metadata": {}}, {"execution_count": 7, "cell_type": "code", "source": "ordered_parquet_schema = jsonschema2parquet(OESS)\n\naslist = []\nkeys = sorted(hundred[0].keys())\nfor d in hundred:\n l = []\n for k in keys:\n l.append(d.get(k, None))\n aslist.append(l)\n\ndataframe_via_sorting = sqlContext.createDataFrame(sc.parallelize(aslist), ordered_parquet_schema)", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}], "nbformat": 4, "metadata": {"kernelspec": {"display_name": "Python 2", "name": "python2", "language": "python"}, "language_info": {"mimetype": "text/x-python", "nbconvert_exporter": "python", "version": "2.7.9", "name": "python", "file_extension": ".py", "pygments_lexer": "ipython2", "codemirror_mode": {"version": 2, "name": "ipython"}}}} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment