Changes

Jump to: navigation, search

Telemetry/Custom analysis with spark

4,981 bytes added, 18:14, 8 September 2016
Updates for Using Spark
This page is currently in-progress.
== Introduction ==
When you access http://localhost:8888, two example Jupyter notebooks are available to peruse. To create a new Jupyter notebook, select new -> Python 2.
Starting out, we recommend looking through the "[https://github.com/mozilla/emr-bootstrap-spark/blob/master/examples/Telemetry%20Hello%20World.ipynb Telemetry Hello World" ] notebook. It gives a nice overview of Jupyter and analyzing telemetry data using pyspark and plotlythe RDD API.
=== Using Jupyter ===
The notebook is setup to work with Spark. See the "Using Spark" section for more information.
== Using Spark ==Spark is a general-purpose cluster computing system - it allows users to run general execution graphs. APIs are available in Python, Scala, and Java. The Jupyter notebook utilizes the Python API. In a nutshell, it provides a way to run functional code (e.g. map, reduce, etc.) on large, distributed data. === SparkContext (sc) ===Access to the Spark API is provided through SparkContext. In the Jupyter notebook, this is the `sc` object. For example, to create a distributed RDD of monotonically increasing numbers 1-1000: numbers = range(1000) #no need to initialize sc in the Jupyter notebook numsRdd = sc.parallelize(numbers) nums.take(10) #no guaranteed order === Spark RDD ===The Resilient Distributed Dataset (RDD) is Spark's basic data structure. The operations that are performed on these structures are distributed to the cluster. Only certain actions (such as collect() or take(N)) pull an RDD in locally. RDD's are nice because there is no imposed schema - whatever they contain, they distribute around the cluster. Additionally, RDD's can be cached in memory, which can greatly improve performance of some algorithms that need access to data over and over again.  Additionally, RDD operations are all part of a directed, acyclic graph. This gives increased redundancy, since Spark is always able to recreate an RDD from the base data (by rerunning the graph), but also provides lazy evaluation. No computation is performed while an RDD is just being transformed (a la map), but when an action is taken (e.g. reduce, take) the entire computation graph is evaluated. Continuing from our previous example, the following gives some of the peaks of a sin wave:  import numpy as np #no computation is performed on the following line! exponentials = numsRdd.map(lambda x : np.float(x) / 10).map(lambda x : (x, np.sin(x))) #now the entire computation graph is evaluated exponentials.takeOrdered(5, lambda x : -x[1]) For jumping into working with Spark RDD's, we recommend reading the [https://spark.apache.org/docs/latest/programming-guide.html Spark Programming Guide]. === Spark SQL and Spark Dataframes/Datasets ===Spark also supports traditional SQL, along with special data structures that require schemas. The Spark SQL API can be accessed with `sqlContext`. For example:  longitudinal = sqlContext.sql('SELECT * FROM longitudinal') creates a DataFrame that contains all the longitudinal data. A Spark DataFrame is essentially a distributed table, a la Pandas or R Dataframes. Under the covers they are an RDD of Row objects, and thus the entirety of the RDD API is available for DataFrames, as well as a DataFrame specific API. For example, a sql-like way to get the count of a specific OS:  longitudinal.select("os").where("os = 'Darwin'").count() To Transform the DataFrame object to an RDD, simply do:  longitudinal_rdd = longitudinal.rdd In general, however, the DataFrames are performance optimized, so it's worth the effort to learn the DataFrame API. For more overview, see the [https://spark.apache.org/docs/latest/sql-programming-guide.html SQL Programming Guide]. See also the [https://github.com/mozilla/emr-bootstrap-spark/blob/master/examples/Longitudinal%20Dataset%20Tutorial.ipynb Longitudinal Tutorial], one of the available example notebooks when you start a cluster. === Available Data Sources for SparkSQL ===For information about available queryable data sources (e.g. Longitudinal dataset), see [https://wiki.mozilla.org/Telemetry/Available_Telemetry_Datasets_and_their_Applications Telemetry Datasets Documentation]. These datasets are optimized for fast access, and will far out-perform analysis on the raw telemetry ping data. == The MozTelemetry Library ==We have provided a library that gives easy access to the raw telemetry ping data. For example usage, see the `[https://github.com/mozilla/emr-bootstrap-spark/blob/master/examples/Telemetry%20Hello%20World.ipynb Telemetry Hello World.ipynb` ] example notebook. Detailed documentation for the library can be found at the [http://python-moztelemetry.readthedocs.io Python MozTelemetry Documentation]. === Using the Raw Ping Data ===First off, import the moztelemetry library using the following:  from moztelemetry import get_pings If you need any other functions, just comma separate them with get_pings. The ping data is an RDD of JSON elements. For example, using the following:  pings = get_pings(sc, app="Firefox", channel="nightly", build_id=("20160901000000", "20160901999999"), fraction=0.01) returns an RDD of 1/100th of Firefox Nightly JSON pings for all builds from September 1 2016. Now, because it's JSON, pings are easy to access. For example, to get the count of each OS type:  os_names = pings.map(lambda x : (x['environment']['system']['os']['name'], 1)) os_counts = os_names.reduceByKey(lambda x, y : x+y) os_counts.collect() Alternatively, moztelemetry provides the `get_pings_properties` function, which will gather the data for you:  subset = get_pings_properties(pings, ["environment/system/os/name"]) subset.map(lambda x : (x["environment/system/os/name"], 1)).reduceByKey(lambda x, y : x+y).collect()
== FAQ ==
Please add more FAQ as questions are answer answered by you or for you.
=== How can I load parquet datasets in a Jupyter notebook? ===
=== Why is my notebook hanging? ===
There are a couple common causes for this:
 
1. Currently, our Spark notebooks can only run a single Python kernel at a time. If you open multiple notebooks on the same cluster and try to run both, the second notebook will hang. Be sure to close notebooks using "Close and Halt" under the "File" dropdown.
 
2. The connection from PySpark to the Spark driver might be lost. Unfortunately the best way to recover from this for the moment seems to be spinning up a new cluster.
29
edits

Navigation menu