Created
April 13, 2016 19:35
-
-
Save SamPenrose/bd588ba5ccb7419b371fd95d3bffd339 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"BUCKET = 'net-mozaws-prod-us-west-2-pipeline-analysis'\n", | |
"path = 's3n://' + BUCKET + '/spenrose/' + 'longitudinal-thousandth-20151115-to-20160410/'\n", | |
"\n", | |
"import ujson as json\n", | |
"\n", | |
"def load(tup):\n", | |
" js = json.loads(tup[1])\n", | |
" return (tup[0], js)\n", | |
"\n", | |
"def prep(tup):\n", | |
" js = json.dumps(list(tup[1]))\n", | |
" return (tup[0], js)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"256\n", | |
"14500\n", | |
"CPU times: user 24 ms, sys: 12 ms, total: 36 ms\n", | |
"Wall time: 4min 8s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"rdd = sc.sequenceFile(path)\n", | |
"print sc.defaultParallelism\n", | |
"print rdd.getNumPartitions()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"count: 2865930\n", | |
"CPU times: user 912 ms, sys: 440 ms, total: 1.35 s\n", | |
"Wall time: 10min 25s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"loaded = rdd.map(load)\n", | |
"profile_count = loaded.count()\n", | |
"print \"count:\", profile_count" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 0 ns, sys: 0 ns, total: 0 ns\n", | |
"Wall time: 4.41 ms\n" | |
] | |
} | |
], | |
"source": [ | |
"partion_target = sc.defaultParallelism * 10 # Databricks advice\n", | |
"%time gathered = rdd.coalesce(partion_target)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"2865930\n", | |
"CPU times: user 196 ms, sys: 96 ms, total: 292 ms\n", | |
"Wall time: 10min 12s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"loaded2 = gathered.map(load)\n", | |
"profile_count = loaded2.count()\n", | |
"print profile_count" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Let's test performance on a big search map." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 24, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"def extract_one_field(d, path, missing=None):\n", | |
" path = path[:]\n", | |
" target = d\n", | |
" while path:\n", | |
" key = path.pop(0)\n", | |
" target = target.get(key)\n", | |
" if not target:\n", | |
" if path:\n", | |
" target = missing\n", | |
" break\n", | |
" return target\n", | |
"\n", | |
"field2path = {\n", | |
" \"clientId\": [\"clientId\"],\n", | |
" \"defaultSearchEngineDataName\": [\"environment\", \"settings\", \"defaultSearchEngineData\", \"name\"],\n", | |
" \"distributionId\": [\"environment\", \"partner\", \"distributionId\"],\n", | |
" \"subsessionStartDate\": [\"payload\", \"info\", \"subsessionStartDate\"],\n", | |
" \"geoCountry\": [\"meta\", \"geoCountry\"],\n", | |
" \"SEARCH_COUNTS\": [\"payload\", \"keyedHistograms\", \"SEARCH_COUNTS\"]\n", | |
"}\n", | |
"\n", | |
"def make_ping_extractor():\n", | |
" \"\"\"\n", | |
" Match the pattern we are planning for the dedicated search stream,\n", | |
" in which the first pass does minimal processing.\n", | |
" \"\"\"\n", | |
" counts = {\n", | |
" 'ok': sc.accumulator(0),\n", | |
" 'bad': sc.accumulator(0)\n", | |
" }\n", | |
" def extractor(d):\n", | |
" print \"***\", type(d)\n", | |
" result = {}\n", | |
" if d.get('type') != 'main':\n", | |
" return result\n", | |
" try:\n", | |
" for field, path in field2path.items():\n", | |
" result[field] = extract_one_field(d, path)\n", | |
" except Exception:\n", | |
" counts['bad'].add(1)\n", | |
" return {}\n", | |
" counts['ok'].add(1)\n", | |
" return result\n", | |
" return extractor, counts\n", | |
"\n", | |
"def make_ping_list_extractor():\n", | |
" extractor, counts = make_ping_extractor()\n", | |
" def list_extractor(l):\n", | |
" import sys\n", | |
" print '-->', type(l)\n", | |
" sys.stdout.flush()\n", | |
" return map(extractor, l)\n", | |
" return list_extractor, counts\n", | |
"\n", | |
"def _loadPartnerIds():\n", | |
" with open('partner_distrib_ids.csv') as f:\n", | |
" rows = [s.split(',') for s in list(f)]\n", | |
" results = {}\n", | |
" for is_current, partnerId, distributionId in rows:\n", | |
" if is_current:\n", | |
" results[distributionId] = partnerId\n", | |
" return results\n", | |
"distributionId_to_partnerId = _loadPartnerIds()\n", | |
"\n", | |
"def official_partner_for_search(d):\n", | |
" '''\n", | |
" Return the official source for the distribution from which this search\n", | |
" took place.\n", | |
"\n", | |
" We do not appear to have an established mechanism for mapping Geo to\n", | |
" partner.\n", | |
" '''\n", | |
" distributionId = d.get('environment', {}).get('partner', {}).get(\n", | |
" 'distributionId', 'MISSING')\n", | |
" if distributionId.startswith('mozilla') or \\\n", | |
" distributionId in ('MISSING', 'euballot', ''):\n", | |
" return 'mozilla'\n", | |
" return distributionId_to_partnerId.get(distributionId)\n", | |
"\n", | |
"from collections import defaultdict\n", | |
"import re\n", | |
"# A usable search source identifier looks like:\n", | |
"# <ws><provider-name><ws>.<search-access-point>\n", | |
"# where:\n", | |
"# <ws> is optional whitespace\n", | |
"# <provider-name> is any string of characters\n", | |
"# . is a literal \".\"\n", | |
"# <search-access-point> is a non-empty lower-case ascii string\n", | |
"# such as \"urlbar\", identifying a widget.\n", | |
"RE_VALID_SEARCH_SOURCE = \"^\\\\s*(\\\\S.*?)\\\\s*\\\\.([a-z]+)$\"\n", | |
"DEFAULT_KEY = 'default_search_engine'\n", | |
"# XXX return these from a data loading function\n", | |
"START = '2015-11-15'\n", | |
"END = '2016-01-29'\n", | |
"\n", | |
"def most_common_provider(searches_for_one_day):\n", | |
" '''\n", | |
" {('yahoo', 'urlbar'): 3,\n", | |
" ('yahoo', 'awesomebar'): 2,\n", | |
" ('google', 'urlbar'): 4} -> 'yahoo'\n", | |
" '''\n", | |
" totals = defaultdict(int)\n", | |
" for pair in searches_for_one_day.items():\n", | |
" tup, count = pair\n", | |
" try:\n", | |
" provider, _ = tup\n", | |
" except ValueError: # not a count key\n", | |
" continue\n", | |
" totals[provider] += count\n", | |
" totals = totals if totals else {None: 0}\n", | |
" by_total = [(total, provider) for (provider, total) in totals.items()]\n", | |
" by_total.sort()\n", | |
" return by_total[-1][1]\n", | |
"\n", | |
"def extract_default(d, container, previous_default):\n", | |
" default = d['defaultSearchEngineDataName']\n", | |
" if default is None:\n", | |
" if previous_default is None:\n", | |
" default = most_common_provider(container)\n", | |
" else:\n", | |
" default = previous_default\n", | |
" return default\n", | |
"\n", | |
"def decompose_source(search_identifier):\n", | |
" '''\n", | |
" '''\n", | |
" provider, point = None, None\n", | |
" valid = re.match(RE_VALID_SEARCH_SOURCE, search_identifier)\n", | |
" if valid:\n", | |
" provider, point = valid.group(1, 2)\n", | |
" return provider, point\n", | |
"\n", | |
"\n", | |
"def extract_searches(d, container):\n", | |
" # container must be a defaultdict(int) instance\n", | |
" for identifier in d['SEARCH_COUNTS']:\n", | |
" provider, point = decompose_source(identifier)\n", | |
" count = int(d['SEARCH_COUNTS'][identifier]['sum'])\n", | |
" if not count:\n", | |
" continue\n", | |
" container[(provider, point)] += count\n", | |
"\n", | |
"def extract_search_info_by_date(l):\n", | |
" '''\n", | |
" Coalesce the output of list_walker() by date.\n", | |
" '''\n", | |
" results = {}\n", | |
" previous_default = None\n", | |
" last_date = None\n", | |
" try:\n", | |
" l.sort(key=lambda d: d.get('subsessionStartDate', 'MISSING')[:10])\n", | |
" except Exception:\n", | |
" counts['bad'].add(len(l))\n", | |
" return results\n", | |
" for d in l:\n", | |
" try:\n", | |
" when = d['subsessionStartDate'][:10]\n", | |
" if when < START or when > END:\n", | |
" continue\n", | |
" last_date = when\n", | |
" dayblob = results.get(when, defaultdict(int))\n", | |
" # Note: we also assign non-ints below.\n", | |
" results[when] = dayblob\n", | |
" \n", | |
" extract_searches(d, dayblob)\n", | |
" default = extract_default(d, dayblob, previous_default)\n", | |
" previous_default = default\n", | |
" dayblob[DEFAULT_KEY] = default\n", | |
"\n", | |
" dayblob['official_partner'] = official_partner_for_search(d)\n", | |
" dayblob['geoCountry'] = d['geoCountry']\n", | |
" counts['ok'].add(1)\n", | |
" except Exception:\n", | |
" counts['bad'].add(1)\n", | |
"\n", | |
" for date in results:\n", | |
" results[date] = dict(results[date])\n", | |
" return results" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"import copy\n", | |
"def bydate2flat(date_dict):\n", | |
" '''\n", | |
" Convert the output of extract_search_info_by_date() to a non-empty\n", | |
" list of rows whose items are 8-tuples matching COLUMNS.\n", | |
" \n", | |
" for (date, d) in date_dict.items():\n", | |
" If d has 3 keys, append a row of the form:\n", | |
" ('<date'>, '<default>', '<geo_country>', '<official_partner>', None, None, 0, False)\n", | |
" If d has a 4th key ('<provider>', '<point>') whose value is the positive integer <count>,\n", | |
" append a row of the form:\n", | |
" ('<date'>, '<default>', '<geo_country>', '<official_partner>', '<provider>', '<point>', count, False)\n", | |
" For each additional key (which will have the same form as the previous) append:\n", | |
" ('<date'>, '<default>', '<geo_country>', '<official_partner>', '<provider>', '<point>', count, False)\n", | |
" '''\n", | |
" results = []\n", | |
" for date, d in date_dict.items():\n", | |
" d = copy.deepcopy(d) # because we mutate it\n", | |
" row_template = [date, d['default_search_engine'], d['geoCountry'], d['official_partner']]\n", | |
" del d['default_search_engine']\n", | |
" del d['geoCountry']\n", | |
" del d['official_partner']\n", | |
" row = row_template[:]\n", | |
" if not d: # No searches, just environment information\n", | |
" row.extend([None, None, 0, False])\n", | |
" results.append(tuple(row))\n", | |
" continue\n", | |
" is_additional = False # First (provider, point) key\n", | |
" for pair in d:\n", | |
" provider, point = pair\n", | |
" count = d[pair]\n", | |
" row = row_template[:]\n", | |
" row.extend([provider, point, count, is_additional])\n", | |
" results.append(tuple(row))\n", | |
" is_additional = True # Any further search pairs\n", | |
" results.sort()\n", | |
" return results" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 28, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"rows: 7724293\n", | |
"{'bad': Accumulator<id=9, value=416123>, 'ok': Accumulator<id=8, value=56084705>}\n", | |
"CPU times: user 280 ms, sys: 72 ms, total: 352 ms\n", | |
"Wall time: 10min 12s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"rowish = loaded2.map(lambda (k, l): extractor(l)).map(extract_search_info_by_date)\n", | |
"rows = rowish.map(bydate2flat).flatMap(lambda l: l)\n", | |
"print \"rows:\", rows.count()\n", | |
"print counts" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 29, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"ten = loaded2.take(10)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 41, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"[17, 18, 19, 20]\n", | |
"[49, 50, 51, 52, 53, 54, 55]\n", | |
"[499, 500, 501, 502, 503, 504, 507, 509, 510, 511, 512, 513, 514, 515, 516, 517, 521, 522, 523, 525, 526, 529, 530, 531, 532, 533, 538, 539, 540, 544, 551, 555, 557, 558, 1, 7, 8, 9, 10, 11, 13, 14, 16, 17, 24, 28, 29, 31, 32, 33, 34, 35, 38, 39]\n", | |
"[60, 61, 62, 63, 65, 66, 68]\n", | |
"[646, 648, 651, 652, 656, 659, 660, 661, 663, 664, 665, 666, 670, 673, 674, 675, 676, 677, 678, 679, 680, 681, 684, 685, 687, 688, 689, 690, 692, 693, 694, 695, 696, 697, 698, 699, 700, 701, 702, 1, 2, 3, 5, 6, 7, 8, 9, 10, 16, 18, 19, 20, 21, 22, 24, 25]\n", | |
"[83, 85, 89, 90, 92, 93, 94, 95]\n", | |
"[344, 347, 348, 349, 350, 351, 353, 358, 359, 360, 361, 363, 364, 366, 368, 369, 370, 371, 373, 374, 375, 376, 379, 380, 383, 385]\n", | |
"[136, 138, 139, 140, 141, 143, 146, 147, 149, 153, 154]\n", | |
"[144, 145, 148, 149, 152, 154, 157, 161, 162, 164]\n", | |
"[269, 270, 271, 272, 273, 274, 275, 276, 277, 279, 280, 281, 282, 283, 284, 285, 286]\n" | |
] | |
} | |
], | |
"source": [ | |
"for tup in ten:\n", | |
" k, l = tup\n", | |
" print [extract_one_field(d, [\"payload\", \"info\", \"profileSubsessionCounter\"]) for d in l]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 2", | |
"language": "python", | |
"name": "python2" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 2 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython2", | |
"version": "2.7.11" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 0 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment