1 Hello World - Python(Python)

WordCount Example

Goal: Determine the most popular words in a given text file using Python and SQL

Step 1: Load text file from our Hosted Datasets. Shift-Enter Runs the code below.

filePath = "dbfs:/databricks-datasets/SPARK_README.md" # path in Databricks File System
lines = sc.textFile(filePath) # read the file into the cluster
lines.take(10) # display first 10 lines in the file
Out[10]: [u'# Apache Spark', u'', u'Spark is a fast and general cluster computing system for Big Data. It provides', u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that', u'supports general computation graphs for data analysis. It also supports a', u'rich set of higher-level tools including Spark SQL for SQL and DataFrames,', u'MLlib for machine learning, GraphX for graph processing,', u'and Spark Streaming for stream processing.', u'', u'<http://spark.apache.org/>']

Step 2: Inspect the number of partitions (workers) used to store the dataset

numPartitions = lines.getNumPartitions() # get the number of partitions
print "Number of partitions (workers) storing the dataset = %d" % numPartitions
Number of partitions (workers) storing the dataset = 2

Step 3: Split each line into a list of words separated by a space from the dataset

words = lines.flatMap(lambda x: x.split(' ')) # split each line into a list of words
words.take(10) # display the first 10 words
Out[12]: [u'#', u'Apache', u'Spark', u'', u'Spark', u'is', u'a', u'fast', u'and', u'general']

Step 4: Filter the list of words to exclude common stop words

stopWords = ['','a','*','and','is','of','the','a'] # define the list of stop words
filteredWords = words.filter(lambda x: x.lower() not in stopWords) # filter the words
filteredWords.take(10) # display the first 10 filtered words
Out[13]: [u'#', u'Apache', u'Spark', u'Spark', u'fast', u'general', u'cluster', u'computing', u'system', u'for']

Step 5: Cache the filtered dataset in memory to speed up future actions.

filteredWords.cache() # cache filtered dataset into memory across the cluster worker nodes
Out[14]: PythonRDD[39] at RDD at PythonRDD.scala:48

Step 6: Transform filtered words into list of (word,1) tuples for WordCount

word1Tuples = filteredWords.map(lambda x: (x, 1)) # map the words into (word,1) tuples
word1Tuples.take(10) # display the (word,1) tuples
Out[15]: [(u'#', 1), (u'Apache', 1), (u'Spark', 1), (u'Spark', 1), (u'fast', 1), (u'general', 1), (u'cluster', 1), (u'computing', 1), (u'system', 1), (u'for', 1)]

Step 7: Aggregate the (word,1) tuples into (word,count) tuples

wordCountTuples = word1Tuples.reduceByKey(lambda x, y: x + y) # aggregate counts for each word
wordCountTuples.take(10) # display the first 10 (word,count) tuples
Out[16]: [(u'when', 1), (u'"local"', 1), (u'through', 1), (u'computation', 1), (u'using:', 1), (u'guidance', 2), (u'Scala,', 1), (u'environment', 1), (u'only', 1), (u'rich', 1)]

Step 8: Display the top 10 (word,count) tuples by count

sortedWordCountTuples = wordCountTuples.top(10,key=lambda (x, y): y) # top 10 (word,count) tuples
for tuple in sortedWordCountTuples: # display the top 10 (word,count) tuples by count 
  print str(tuple)
(u'to', 14) (u'Spark', 13) (u'for', 11) (u'##', 8) (u'run', 7) (u'can', 6) (u'on', 5) (u'in', 5) (u'you', 4) (u'if', 4)

Step 9: Create a table from the (word,count) tuples

from pyspark.sql import Row # import the pyspark sql Row class
wordCountRows = wordCountTuples.map(lambda p: Row(word=p[0], count=int(p[1]))) # tuples -> Rows
wordCountRows.toDF().createOrReplaceTempView("word_count")

Step 10: Use SQL to visualize the words with count >= 2

%sql 
SELECT word, count 
FROM word_count 
HAVING count >= 2
ORDER BY count DESC --use SQL to query words with count >= 2 descending in order
to14
Spark13
for11
##8
run7
can6
on5
in5
if4
you4
also4
with3
Hadoop3
an3
example3
build3
Please3
documentation3
use3
You3
including3
or3
Shell2
SparkPi2
how2
do2
SQL2
Python,2
should2
Interactive2
using2
set2
Scala2
command,2
`examples`2
one2
particular2
cluster2
general2
at2
refer2
guidance2
tests2
shell:2
examples2
For2
detailed2
building2
locally2
that2
be2
Hive2
Python2
To2
class2
programs2
It2
This2
Hadoop,2
return2
[project2
following2
which2
supports2
./bin/run-example2
1000:2