Skip to content

Instantly share code, notes, and snippets.

@SamPenrose
Created December 21, 2015 18:38
Show Gist options
  • Save SamPenrose/f5243ebca8197ea474af to your computer and use it in GitHub Desktop.
Save SamPenrose/f5243ebca8197ea474af to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{"nbformat_minor": 0, "cells": [{"source": "# Two approaches to converting JSON-Schema to Parquet Schema", "cell_type": "markdown", "metadata": {}}, {"source": "Writing a function that will generate a Parquet schema (an instance of\npyspark.sql.types.StructType) from a JSON-Schema document is straightforward.\nGenerating an RDD for sqlContext to read into a Parquet schema requires an\nextra step. Below are two approaches. First, simple conversion code:", "cell_type": "markdown", "metadata": {}}, {"execution_count": 1, "cell_type": "code", "source": "from pyspark.sql import types as T # * masks e.g. types.StringType\nJS_Type2ParquetClass = {\n \"boolean\": T.BooleanType,\n \"float\": T.DoubleType,\n \"number\": T.LongType, # XXX ?\n \"string\": T.StringType,\n}\n\n\nclass ConversionError(ValueError):\n '''\n When is a type error not a TypeError?\n '''\n\n\ndef jsonschema2parquet(js_schema):\n '''\n Convert a JSONSchema blob into the equivalent Parquet data structure.\n\n We test only the parts of the blob that we need to return something\n plausible.\n '''\n js_top = js_schema.get('type')\n if js_top == 'object':\n fields = _object2parquet(js_schema)\n elif js_top == 'array':\n # XXX minItems, maxItems, additionalItems\n # XXX test for missing/empty items and raise ConversionError\n fields = _array2parquet(js_schema['items'])\n elif js_top is None:\n raise ConversionError(\"JSON-Schema is missing 'type' property\")\n else:\n raise ConversionError(\"Can't handle JSON-Schema type '%s'\" % js_top)\n fields.sort(key=lambda field: field.name)\n parquet_schema = T.StructType(fields)\n return parquet_schema\n\n\ndef _object2parquet(js_schema):\n required = js_schema.get('required', [])[:] # XXX test\n properties = js_schema.get('properties', {})\n parquet_list = []\n for property_name, js_item in properties.items():\n js_type = js_item['type']\n try:\n klass = JS_Type2ParquetClass[js_type]\n except KeyError:\n raise ConversionError(\"Can't convert JSON item: %s\" % js_item)\n nullable = True # not property_name in required\n field = T.StructField(property_name, klass(), nullable)\n parquet_list.append(field)\n return parquet_list", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 2, "cell_type": "code", "source": "ESS = {\n \"$schema\": \"http://json-schema.org/draft-04/schema#\",\n \"title\": \"Executive Summary\",\n \"description\": \"The executive summary derived stream\",\n \"type\": \"object\",\n \"properties\": {\n \"Hostname\": {\n \"type\": \"string\"\n },\n \"Timestamp\": {\n \"type\": \"number\"\n },\n \"Type\": {\n \"type\": \"string\"\n },\n \"activityTimestamp\": {\n \"type\": \"float\"\n },\n \"app\": {\n \"type\": \"string\"\n },\n \"bing\": {\n \"type\": \"number\"\n },\n \"buildId\": {\n \"type\": \"string\"\n },\n \"channel\": {\n \"type\": \"string\",\n \"enum\": [\"nightly\", \"aurora\", \"beta\", \"release\"]\n },\n \"clientId\": {\n \"type\": \"string\"\n },\n \"country\": {\n \"type\": \"string\"\n },\n \"default\": {\n \"type\": \"boolean\"\n },\n \"docType\": {\n \"type\": \"string\"\n },\n \"documentId\": {\n \"type\": \"string\"\n },\n \"google\": {\n \"type\": \"number\"\n },\n \"hours\": {\n \"type\": \"float\"\n },\n \"os\": {\n \"type\": \"string\"\n },\n \"osVersion\": {\n \"type\": \"string\"\n },\n \"other\": {\n \"type\": \"number\"\n },\n \"profileCreationTimestamp\": {\n \"type\": \"float\"\n },\n \"reason\": {\n \"type\": \"string\"\n },\n \"submissionDate\": {\n \"type\": \"string\"\n },\n \"vendor\": {\n \"type\": \"string\"\n },\n \"version\": {\n \"type\": \"string\"\n },\n \"yahoo\": {\n \"type\": \"number\"\n }\n },\n \"required\": [\"clientId\"]\n}\nfrom collections import OrderedDict\nkeys = sorted(ESS.keys())\nOESS = OrderedDict()\nfor k in keys:\n OESS[k] = ESS[k]", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"execution_count": 3, "cell_type": "code", "source": "from moztelemetry import get_records", "outputs": [{"output_type": "stream", "name": "stdout", "text": "Unable to parse whitelist (/home/hadoop/anaconda/lib/python2.7/site-packages/moztelemetry/bucket-whitelist.json). Assuming all histograms are acceptable.\n"}], "metadata": {"collapsed": false, "trusted": true}}, {"execution_count": 4, "cell_type": "code", "source": "summaries = get_records(sc, 'telemetry-executive-summary', submissionDate=\"20151129\", fraction=0.01)\nhundred = summaries.take(100)\nhundred = [d['meta'] for d in hundred]", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}, {"source": "### The first option is to ingest stringified JSON.", "cell_type": "markdown", "metadata": {}}, {"execution_count": 6, "cell_type": "code", "source": "parquet_schema = jsonschema2parquet(ESS)\ndataframe_via_json = sqlContext.jsonRDD(sc.parallelize(hundred).map(lambda d: json.dumps(d)), parquet_schema)", "outputs": [], "metadata": {"collapsed": false, "trusted": true}}, {"source": "### The second option is to ingest lists sorted into a known ordering", "cell_type": "markdown", "metadata": {}}, {"execution_count": 7, "cell_type": "code", "source": "ordered_parquet_schema = jsonschema2parquet(OESS)\n\naslist = []\nkeys = sorted(hundred[0].keys())\nfor d in hundred:\n l = []\n for k in keys:\n l.append(d.get(k, None))\n aslist.append(l)\n\ndataframe_via_sorting = sqlContext.createDataFrame(sc.parallelize(aslist), ordered_parquet_schema)", "outputs": [], "metadata": {"collapsed": true, "trusted": true}}], "nbformat": 4, "metadata": {"kernelspec": {"display_name": "Python 2", "name": "python2", "language": "python"}, "language_info": {"mimetype": "text/x-python", "nbconvert_exporter": "python", "version": "2.7.9", "name": "python", "file_extension": ".py", "pygments_lexer": "ipython2", "codemirror_mode": {"version": 2, "name": "ipython"}}}}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment