As a Data Scientist at Acquia I get to build machine learning models to solve problems or speed up tasks that are time consuming for humans. This means I spend a lot of time getting data into a format that is usable by machine learning models, or even just putting it in a useful form for exploratory analysis.
One of the tools I use for handling large amounts of data and getting it into the required format is Apache Spark.
Spark is an open source analytics engine for large scale data processing that allows data to be processed in parallel across a cluster. I use it in combination with AWS Elastic MapReduce (EMR) instances which provide more computing resources than my laptop can provide.
I work with events related to web browsing, such as what people click on, the timestamp on those clicks, and the web browser they are using. I might be able to process about a day’s worth of events for a website on my laptop using python and Pandas DataFrames, but since that requires the data to fit in memory I would run out of memory if I tried to work with data from a longer time period.
With Spark I can work with DataFrames that have hundreds of thousands of rows, or even more. Since I've been at Acquia I have gone from never using Spark to being able to effectively manipulate months worth of data in minutes.
Some Lessons Learned
Figure out if you need to use RDDs, Dataframes, or Datasets.
Learn the basics of what they are and when to use them. If you are new to Spark and have structured data in columnar format then you probably want to use Dataframes or Datasets. I was using Pyspark for structured data, and since Datasets don’t exists for Pyspark I worked with Dataframes.
Getting Spark code to scale isn’t always easy.
Learning the Spark basics is fairly straightforward, especially if you are already used to working with data in table format, such as with SQL or Pandas. But code that runs fine on a small sample could run into memory issues or have longer than expected run times when scaled.
Spark makes working with big data fast and easy. When your code runs as it is supposed to, things are great. When it doesn’t, I suggest the following steps:
- Use
count()
for debugging. For errors or code that runs too slowcount()
is useful for breaking up large sets of transformation and identifying where the problem is. Since transformations are evaluated lazily, errors only occur when actions are called. To figure out which transformation is causing the problem trycount()
after each transformation. - Know your actions vs transformations. Calling multiple actions without caching can cause serious slowdowns because all the transformations that preceded the action calls will be unnecessarily done multiple times. If you need to debug, then use
explain()
and go line by line to make sure you aren’t accidentally calling an action when you think it’s all transformations, or forgetting to cache before either multiple actions are called or when data will be reused later. That being said, don’t cache when the computations for the transformation are quicker than caching which will involve storing and retrieving the data. - When going through line by line, keep in mind that transformations will be optimized to execute as efficiently as possible when an action is called, so the sum of running transformations individually will be longer than running them all at once.
- Learn how to read the execution models. This will save you a lot of headache when trying to debug.
- Get familiar with the Spark UI. The Spark UI allows you to monitor job executions, job times, and cached data. It’s a good tool for identifying problems and slowdowns. When spark is run locally the UI can be found at localhost:4040. When not run locally you’ll have to Google instructions on how to connect. The UI should help you answer questions such as “Is there enough driver memory?”’ “Is there enough executor memory?” or “Is there enough parallelization or is there too much parallelization?”
The above steps should help identify commands that are causing problems. From there you’ll have to figure out why, and either optimize the code or tweak Spark properties such as spark.executor.instances
or spark.driver.memory
.
Use up-to-date resources.
Read a book on learning Spark that was recently published, or use tutorials that were posted as recently as possible.
Looking at old resources might give you ways to do things that aren’t the most efficient or give you advice/best practices that are no longer applicable. I also found that some stack overflow questions for Spark had accepted answers that were no longer the best way to solve the problem. My take-away: search in official documentation and search for resources that are the most recent.
Also, when you are debugging issues, make sure you are looking specifically for information for either RDDs or Dataframes and Datasets.
For example, when trying to debug memory issues I found that many people listed using groupByKey()
as a common problem and suggested using reduceByKey()
when possible instead. Researching this further I found that this suggestion was not relevant to me because I was using Dataframes and Dataframe’s groupBy()
is optimized to avoid the problems that can be encountered using the RDD function groupyByKey()
.
Make sure to use built-in functions.
Before writing a simple User Defined Function (UDF) make sure what you are doing isn’t already a built-in function. If you write a UDF that is already a built-in function then at best you wasted the time to write it and at worst you have slowed down your script because built-in functions are optimized for speed.
More Specific Tips
The following are tips that would have been useful for me to know from the beginning:
Avoid calling withColumnRenamed()
multiple times. If you need to rename multiple columns I found it faster and more efficient to use alias()
with select()
.
For example if DataFrame, df, has columns A, b,c,D,E and I want to rename b -> B and c -> C then:
# in Python df = df.select("A",col("b").alias("B"),col("c").alias("C"),"D","E")
If you have a lot of column names to change, use the following to change them all at once:
# in Python old_columns = df.schema.names new_columns = Change all or some column names from old_columns here cols_alias = [ col(n).alias(m) for n,m in zip(old_columns, new_columns)] df = df.select(*cols_alias)
Similarly, avoid calling drop()
multiple times (like in a loop). Either call drop()
once with every column you want to drop, or call select()
with every column you want to keep.
If using Zeppelin with AWS EMR it might be useful to increase ZEPPELIN_MEM.
I solved a memory error by increasing the memory available to Zeppelin by adding a configurations file when starting up a EMR cluster.
I used the following configuration:
{ "Classification": "zeppelin-env", "Configurations":[{ "Classification":"export", "Properties": { "ZEPPELIN_MEM":"-Xms[NEW MEMORY HERE]g -Xmx[NEW MEMORY HERE]g" } }], "Properties": {} }
Also, if using Zeppelin, make sure to take advantage of built-in visualization capabilities.
Check out Zeppelin visualization documentation. The most useful command will probably be the one to nicely print out a DataFrame.
To print DataFrame, df:
z.show(df)
I hope my tips will help you get to a point where your Spark code is running without error, and in an acceptable amount of time. Best of luck in your journey towards learning Spark.