Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save SamPenrose/bd588ba5ccb7419b371fd95d3bffd339 to your computer and use it in GitHub Desktop.
Save SamPenrose/bd588ba5ccb7419b371fd95d3bffd339 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"BUCKET = 'net-mozaws-prod-us-west-2-pipeline-analysis'\n",
"path = 's3n://' + BUCKET + '/spenrose/' + 'longitudinal-thousandth-20151115-to-20160410/'\n",
"\n",
"import ujson as json\n",
"\n",
"def load(tup):\n",
" js = json.loads(tup[1])\n",
" return (tup[0], js)\n",
"\n",
"def prep(tup):\n",
" js = json.dumps(list(tup[1]))\n",
" return (tup[0], js)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"256\n",
"14500\n",
"CPU times: user 24 ms, sys: 12 ms, total: 36 ms\n",
"Wall time: 4min 8s\n"
]
}
],
"source": [
"%%time\n",
"rdd = sc.sequenceFile(path)\n",
"print sc.defaultParallelism\n",
"print rdd.getNumPartitions()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"count: 2865930\n",
"CPU times: user 912 ms, sys: 440 ms, total: 1.35 s\n",
"Wall time: 10min 25s\n"
]
}
],
"source": [
"%%time\n",
"loaded = rdd.map(load)\n",
"profile_count = loaded.count()\n",
"print \"count:\", profile_count"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 0 ns, sys: 0 ns, total: 0 ns\n",
"Wall time: 4.41 ms\n"
]
}
],
"source": [
"partion_target = sc.defaultParallelism * 10 # Databricks advice\n",
"%time gathered = rdd.coalesce(partion_target)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2865930\n",
"CPU times: user 196 ms, sys: 96 ms, total: 292 ms\n",
"Wall time: 10min 12s\n"
]
}
],
"source": [
"%%time\n",
"loaded2 = gathered.map(load)\n",
"profile_count = loaded2.count()\n",
"print profile_count"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's test performance on a big search map."
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def extract_one_field(d, path, missing=None):\n",
" path = path[:]\n",
" target = d\n",
" while path:\n",
" key = path.pop(0)\n",
" target = target.get(key)\n",
" if not target:\n",
" if path:\n",
" target = missing\n",
" break\n",
" return target\n",
"\n",
"field2path = {\n",
" \"clientId\": [\"clientId\"],\n",
" \"defaultSearchEngineDataName\": [\"environment\", \"settings\", \"defaultSearchEngineData\", \"name\"],\n",
" \"distributionId\": [\"environment\", \"partner\", \"distributionId\"],\n",
" \"subsessionStartDate\": [\"payload\", \"info\", \"subsessionStartDate\"],\n",
" \"geoCountry\": [\"meta\", \"geoCountry\"],\n",
" \"SEARCH_COUNTS\": [\"payload\", \"keyedHistograms\", \"SEARCH_COUNTS\"]\n",
"}\n",
"\n",
"def make_ping_extractor():\n",
" \"\"\"\n",
" Match the pattern we are planning for the dedicated search stream,\n",
" in which the first pass does minimal processing.\n",
" \"\"\"\n",
" counts = {\n",
" 'ok': sc.accumulator(0),\n",
" 'bad': sc.accumulator(0)\n",
" }\n",
" def extractor(d):\n",
" print \"***\", type(d)\n",
" result = {}\n",
" if d.get('type') != 'main':\n",
" return result\n",
" try:\n",
" for field, path in field2path.items():\n",
" result[field] = extract_one_field(d, path)\n",
" except Exception:\n",
" counts['bad'].add(1)\n",
" return {}\n",
" counts['ok'].add(1)\n",
" return result\n",
" return extractor, counts\n",
"\n",
"def make_ping_list_extractor():\n",
" extractor, counts = make_ping_extractor()\n",
" def list_extractor(l):\n",
" import sys\n",
" print '-->', type(l)\n",
" sys.stdout.flush()\n",
" return map(extractor, l)\n",
" return list_extractor, counts\n",
"\n",
"def _loadPartnerIds():\n",
" with open('partner_distrib_ids.csv') as f:\n",
" rows = [s.split(',') for s in list(f)]\n",
" results = {}\n",
" for is_current, partnerId, distributionId in rows:\n",
" if is_current:\n",
" results[distributionId] = partnerId\n",
" return results\n",
"distributionId_to_partnerId = _loadPartnerIds()\n",
"\n",
"def official_partner_for_search(d):\n",
" '''\n",
" Return the official source for the distribution from which this search\n",
" took place.\n",
"\n",
" We do not appear to have an established mechanism for mapping Geo to\n",
" partner.\n",
" '''\n",
" distributionId = d.get('environment', {}).get('partner', {}).get(\n",
" 'distributionId', 'MISSING')\n",
" if distributionId.startswith('mozilla') or \\\n",
" distributionId in ('MISSING', 'euballot', ''):\n",
" return 'mozilla'\n",
" return distributionId_to_partnerId.get(distributionId)\n",
"\n",
"from collections import defaultdict\n",
"import re\n",
"# A usable search source identifier looks like:\n",
"# <ws><provider-name><ws>.<search-access-point>\n",
"# where:\n",
"# <ws> is optional whitespace\n",
"# <provider-name> is any string of characters\n",
"# . is a literal \".\"\n",
"# <search-access-point> is a non-empty lower-case ascii string\n",
"# such as \"urlbar\", identifying a widget.\n",
"RE_VALID_SEARCH_SOURCE = \"^\\\\s*(\\\\S.*?)\\\\s*\\\\.([a-z]+)$\"\n",
"DEFAULT_KEY = 'default_search_engine'\n",
"# XXX return these from a data loading function\n",
"START = '2015-11-15'\n",
"END = '2016-01-29'\n",
"\n",
"def most_common_provider(searches_for_one_day):\n",
" '''\n",
" {('yahoo', 'urlbar'): 3,\n",
" ('yahoo', 'awesomebar'): 2,\n",
" ('google', 'urlbar'): 4} -> 'yahoo'\n",
" '''\n",
" totals = defaultdict(int)\n",
" for pair in searches_for_one_day.items():\n",
" tup, count = pair\n",
" try:\n",
" provider, _ = tup\n",
" except ValueError: # not a count key\n",
" continue\n",
" totals[provider] += count\n",
" totals = totals if totals else {None: 0}\n",
" by_total = [(total, provider) for (provider, total) in totals.items()]\n",
" by_total.sort()\n",
" return by_total[-1][1]\n",
"\n",
"def extract_default(d, container, previous_default):\n",
" default = d['defaultSearchEngineDataName']\n",
" if default is None:\n",
" if previous_default is None:\n",
" default = most_common_provider(container)\n",
" else:\n",
" default = previous_default\n",
" return default\n",
"\n",
"def decompose_source(search_identifier):\n",
" '''\n",
" '''\n",
" provider, point = None, None\n",
" valid = re.match(RE_VALID_SEARCH_SOURCE, search_identifier)\n",
" if valid:\n",
" provider, point = valid.group(1, 2)\n",
" return provider, point\n",
"\n",
"\n",
"def extract_searches(d, container):\n",
" # container must be a defaultdict(int) instance\n",
" for identifier in d['SEARCH_COUNTS']:\n",
" provider, point = decompose_source(identifier)\n",
" count = int(d['SEARCH_COUNTS'][identifier]['sum'])\n",
" if not count:\n",
" continue\n",
" container[(provider, point)] += count\n",
"\n",
"def extract_search_info_by_date(l):\n",
" '''\n",
" Coalesce the output of list_walker() by date.\n",
" '''\n",
" results = {}\n",
" previous_default = None\n",
" last_date = None\n",
" try:\n",
" l.sort(key=lambda d: d.get('subsessionStartDate', 'MISSING')[:10])\n",
" except Exception:\n",
" counts['bad'].add(len(l))\n",
" return results\n",
" for d in l:\n",
" try:\n",
" when = d['subsessionStartDate'][:10]\n",
" if when < START or when > END:\n",
" continue\n",
" last_date = when\n",
" dayblob = results.get(when, defaultdict(int))\n",
" # Note: we also assign non-ints below.\n",
" results[when] = dayblob\n",
" \n",
" extract_searches(d, dayblob)\n",
" default = extract_default(d, dayblob, previous_default)\n",
" previous_default = default\n",
" dayblob[DEFAULT_KEY] = default\n",
"\n",
" dayblob['official_partner'] = official_partner_for_search(d)\n",
" dayblob['geoCountry'] = d['geoCountry']\n",
" counts['ok'].add(1)\n",
" except Exception:\n",
" counts['bad'].add(1)\n",
"\n",
" for date in results:\n",
" results[date] = dict(results[date])\n",
" return results"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import copy\n",
"def bydate2flat(date_dict):\n",
" '''\n",
" Convert the output of extract_search_info_by_date() to a non-empty\n",
" list of rows whose items are 8-tuples matching COLUMNS.\n",
" \n",
" for (date, d) in date_dict.items():\n",
" If d has 3 keys, append a row of the form:\n",
" ('<date'>, '<default>', '<geo_country>', '<official_partner>', None, None, 0, False)\n",
" If d has a 4th key ('<provider>', '<point>') whose value is the positive integer <count>,\n",
" append a row of the form:\n",
" ('<date'>, '<default>', '<geo_country>', '<official_partner>', '<provider>', '<point>', count, False)\n",
" For each additional key (which will have the same form as the previous) append:\n",
" ('<date'>, '<default>', '<geo_country>', '<official_partner>', '<provider>', '<point>', count, False)\n",
" '''\n",
" results = []\n",
" for date, d in date_dict.items():\n",
" d = copy.deepcopy(d) # because we mutate it\n",
" row_template = [date, d['default_search_engine'], d['geoCountry'], d['official_partner']]\n",
" del d['default_search_engine']\n",
" del d['geoCountry']\n",
" del d['official_partner']\n",
" row = row_template[:]\n",
" if not d: # No searches, just environment information\n",
" row.extend([None, None, 0, False])\n",
" results.append(tuple(row))\n",
" continue\n",
" is_additional = False # First (provider, point) key\n",
" for pair in d:\n",
" provider, point = pair\n",
" count = d[pair]\n",
" row = row_template[:]\n",
" row.extend([provider, point, count, is_additional])\n",
" results.append(tuple(row))\n",
" is_additional = True # Any further search pairs\n",
" results.sort()\n",
" return results"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"rows: 7724293\n",
"{'bad': Accumulator<id=9, value=416123>, 'ok': Accumulator<id=8, value=56084705>}\n",
"CPU times: user 280 ms, sys: 72 ms, total: 352 ms\n",
"Wall time: 10min 12s\n"
]
}
],
"source": [
"%%time\n",
"rowish = loaded2.map(lambda (k, l): extractor(l)).map(extract_search_info_by_date)\n",
"rows = rowish.map(bydate2flat).flatMap(lambda l: l)\n",
"print \"rows:\", rows.count()\n",
"print counts"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"ten = loaded2.take(10)"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[17, 18, 19, 20]\n",
"[49, 50, 51, 52, 53, 54, 55]\n",
"[499, 500, 501, 502, 503, 504, 507, 509, 510, 511, 512, 513, 514, 515, 516, 517, 521, 522, 523, 525, 526, 529, 530, 531, 532, 533, 538, 539, 540, 544, 551, 555, 557, 558, 1, 7, 8, 9, 10, 11, 13, 14, 16, 17, 24, 28, 29, 31, 32, 33, 34, 35, 38, 39]\n",
"[60, 61, 62, 63, 65, 66, 68]\n",
"[646, 648, 651, 652, 656, 659, 660, 661, 663, 664, 665, 666, 670, 673, 674, 675, 676, 677, 678, 679, 680, 681, 684, 685, 687, 688, 689, 690, 692, 693, 694, 695, 696, 697, 698, 699, 700, 701, 702, 1, 2, 3, 5, 6, 7, 8, 9, 10, 16, 18, 19, 20, 21, 22, 24, 25]\n",
"[83, 85, 89, 90, 92, 93, 94, 95]\n",
"[344, 347, 348, 349, 350, 351, 353, 358, 359, 360, 361, 363, 364, 366, 368, 369, 370, 371, 373, 374, 375, 376, 379, 380, 383, 385]\n",
"[136, 138, 139, 140, 141, 143, 146, 147, 149, 153, 154]\n",
"[144, 145, 148, 149, 152, 154, 157, 161, 162, 164]\n",
"[269, 270, 271, 272, 273, 274, 275, 276, 277, 279, 280, 281, 282, 283, 284, 285, 286]\n"
]
}
],
"source": [
"for tup in ten:\n",
" k, l = tup\n",
" print [extract_one_field(d, [\"payload\", \"info\", \"profileSubsessionCounter\"]) for d in l]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.11"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment