2 SQL DataFrames - Python(Python)

Spark Logo + SF Open Data Logo

Mount the data:

ACCESSY_KEY_ID = "AKIAJBRYNXGHORDHZB4A"
SECERET_ACCESS_KEY = "a0BzE1bSegfydr3%2FGE3LSPM6uIV5A4hOUfpH8aFF" 

mounts_list = [
{'bucket':'databricks-corp-training/sf_open_data/', 'mount_folder':'/mnt/sf_open_data'}
]
for mount_point in mounts_list:
  bucket = mount_point['bucket']
  mount_folder = mount_point['mount_folder']
  try:
    dbutils.fs.ls(mount_folder)
    dbutils.fs.unmount(mount_folder)
  except:
    pass
  finally: #If MOUNT_FOLDER does not exist
    dbutils.fs.mount("s3a://"+ ACCESSY_KEY_ID + ":" + SECERET_ACCESS_KEY + "@" + bucket,mount_folder)

Exploring the City of San Francisco public data with Apache Spark 2.0

On 4th of July SF residents enjoyed a fireworks show:

Fireworks

How did the 4th of July holiday affect demand for Firefighters a year ago in SF districts?

The SF OpenData project was launched in 2009 and contains hundreds of datasets from the city and county of San Francisco. Open government data has the potential to increase the quality of life for residents, create more efficient government services, better public decisions, and even new local businesses and services.

In our analysis of SF Fire Department calls, we will be seeking answers the following questions:

  1. How many different types of calls were made to the Fire Department?
  2. How many incidents of each call type were there?
  3. How many incidents of each call type were there?
  4. How many service calls were logged in the past 7 days?
  5. Which neighborhood in SF generated the most calls last year?
  6. What was the primary non-medical reason most people called the fire department from the Tenderloin last year?

Introduction to Spark

Spark is a unified processing engine that can analyze big data using SQL, machine learning, graph processing or real time stream analysis:

Spark Engines

We will mostly focus on Spark SQL and DataFrames this evening.

Spark can read from many different databases and file systems and run in various environments:

Spark Goal

Although Spark supports four languages (Scala, Java, Python, R), tonight we will use Python. Broadly speaking, there are 2 APIs for interacting with Spark:

  • DataFrames/SQL/Datasets: general, higher level API for users of Spark
  • RDD: a lower level API for spark internals and advanced programming

A Spark cluster is made of one Driver and many Executor JVMs (java virtual machines):

Spark Physical Cluster, slots

The Driver sends Tasks to the empty slots on the Executors when work has to be done:

Spark Physical Cluster, tasks

In Databricks Community Edition, everyone gets a local mode cluster, where the Driver and Executor code run in the same JVM. Local mode clusters are typically used for prototyping and learning Spark:

Notebook + Micro Cluster

Databricks

Introduction to Fire Department Calls for Service

The July 6th, 2016 copy of the "Fire Department Calls for Service" data set has been uploaded to S3. You can see the data with the %fs ls command:

%fs ls /mnt/sf_open_data/fire_dept_calls_for_service/
dbfs:/mnt/sf_open_data/fire_dept_calls_for_service/Fire_Department_Calls_for_Service.csvFire_Department_Calls_for_Service.csv1634673683

Note, you can also access the 1.6 GB of data directly from sfgov.org via this link: https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3

The entry point into all functionality in Spark 2.x is the new SparkSession class, which we explored in our previous workshop: http://dbricks.co/ss_wkshp1

spark
Out[3]: <pyspark.sql.session.SparkSession at 0x7fb731df43d0>

Using the SparkSession, create a DataFrame from the CSV file by inferring the schema:

fireServiceCallsDF = spark.read.csv('/mnt/sf_open_data/fire_dept_calls_for_service/Fire_Department_Calls_for_Service.csv', header=True, inferSchema=True)

Notice that the above cell takes ~15 seconds to run because it is inferring the schema by sampling the file and reading through it.

Inferring the schema works for ad hoc analysis against smaller datasets. But when working on multi-TB+ data, it's better to provide an explicit pre-defined schema manually, so there's no inferring cost:

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType
# Note that we are removing all space characters from the col names to prevent errors when writing to Parquet later

fireSchema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),       
                     StructField('WatchDate', StringType(), True),       
                     StructField('ReceivedDtTm', StringType(), True),       
                     StructField('EntryDtTm', StringType(), True),       
                     StructField('DispatchDtTm', StringType(), True),       
                     StructField('ResponseDtTm', StringType(), True),       
                     StructField('OnSceneDtTm', StringType(), True),       
                     StructField('TransportDtTm', StringType(), True),                  
                     StructField('HospitalDtTm', StringType(), True),       
                     StructField('CallFinalDisposition', StringType(), True),       
                     StructField('AvailableDtTm', StringType(), True),       
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('ZipcodeofIncident', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumberofAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('Unitsequenceincalldispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('NeighborhoodDistrict', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True)])
#Notice that no job is run this time
# Python is not typed language so Datasets don't exists. Instead, we always get back a DataFrame
fireServiceCallsDF = spark.read.csv('/mnt/sf_open_data/fire_dept_calls_for_service/Fire_Department_Calls_for_Service.csv', header=True, schema=fireSchema)

Look at the first 5 records in the DataFrame:

display(fireServiceCallsDF.limit(5))
142480332B0214086309Alarms09/05/201409/04/201409/05/2014 03:15:13 AM09/05/2014 03:17:26 AM09/05/2014 03:18:18 AM09/05/2014 03:20:30 AM09/05/2014 03:24:11 AM04/25/2016 01:15:16 PM04/25/2016 01:15:16 PMFire09/05/2014 03:33:20 AM1600 Block of HAIGHT STSan Francisco94117B05124525333trueAlarm1CHIEF355Haight Ashbury(37.7695711762103, -122.449920089485)142480332-B02
153022542T0215115908Structure Fire10/29/201510/29/201510/29/2015 03:39:06 PM10/29/2015 03:39:25 PM10/29/2015 03:39:49 PM10/29/2015 03:40:55 PM10/29/2015 03:43:21 PM04/25/2016 01:07:30 PM04/25/2016 01:07:30 PMFire10/29/2015 03:51:21 PMBATTERY ST/VALLEJO STSan Francisco94111B01131155333falseAlarm1TRUCK413Financial District/South Beach(37.7995314468258, -122.401240243673)153022542-T02
143451112AM0414122741Medical Incident12/11/201412/11/201412/11/2014 09:02:07 AM12/11/2014 09:03:01 AM12/11/2014 09:03:11 AM12/11/2014 09:06:19 AM12/11/2014 09:20:16 AM12/11/2014 09:20:26 AM12/11/2014 09:43:41 AMCode 2 Transport12/11/2014 10:06:26 AM300 Block of BUENA VISTA AVESan Francisco94117B05215136333falsePotentially Life-Threatening1PRIVATE158Castro/Upper Market(37.7668035178194, -122.440704687809)143451112-AM04
141660300E0114057129Medical Incident06/15/201406/14/201406/15/2014 02:04:57 AM06/15/2014 02:06:42 AM06/15/2014 02:10:01 AM06/15/2014 02:12:55 AM06/15/2014 02:24:55 AM04/25/2016 01:16:45 PM04/25/2016 01:16:45 PMCode 2 Transport06/15/2014 02:51:39 AM0 Block of HALLAM STSan Francisco94103B0312313222trueNon Life-threatening1ENGINE226South of Market(37.7756902570435, -122.408609057895)141660300-E01
152633454E3615100829Outside Fire09/20/201509/20/201509/20/2015 08:15:00 PM09/20/2015 08:15:53 PM09/20/2015 08:16:17 PM09/20/2015 08:18:07 PM04/25/2016 01:08:14 PM04/25/2016 01:08:14 PM04/25/2016 01:08:14 PMFire09/20/2015 08:22:11 PMMARKET ST/VAN NESS AVSan Francisco94103B02363211333trueFire1ENGINE126Mission(37.7751470741622, -122.419255607214)152633454-E36

Let's examine the schema