About DataFrames

In the last blog post I gave you an overview of our Data Science stack based on Python. This time let’s focus on one important component: DataFrames.

DataFrames are a great abstraction for working with structured and semi-structured data. They are basically a collection of rows, organized into named columns. Think of relational database tables: DataFrames are very similar and allow you to do similar operations on them:

  • slice data: select subset of rows or columns based on conditions (filters)
  • sort data by one or more columns
  • aggregate data and compute summary statistics
  • join multiple DataFrames

What makes them much more powerful than SQL is the fact that this nice, SQL-like API is actually exposed in a full-fledged programming language. Which means we can mix declarative SQL-like operations with arbitrary code written in a general-purpose programming language.

DataFrames were popularized by R and then adopted by other languages and frameworks. For Python we have pandas, a great data analysis library, where DataFrame is one of the key abstractions.

Pandas limitations and Spark DataFrames

Pandas won’t work in every case. It is a single machine tool, so we’re constrained by single machine limits. Moreover, pandas doesn’t have any parallelism built in, which means it uses only one CPU core.
It’s likely that you’ll hit a wall even on medium datasets (tens of gigabytes). And If you want to work with even bigger datasets, pandas won’t help you.

Fortunately, a few months ago Spark community released a new version of Spark with DataFrames support. They have a very similar API, but are designed from the ground-up to support big data.

There is a lot of cool engineering behind Spark DataFrames such as code generation, manual memory management and Catalyst optimizer. But lets forget about them and focus on usability.

  • How hard it is to use Spark DataFrames if you know pandas?
  • Are there any quirks?
  • Is any important functionality missing?

We’ll compare pandas and Spark Dataframes on a few examples, and then fix some of the pains we’ve noticed.

Let’s get started!

Note: This blog-post is based on Spark 1.3.1 and pandas 0.16.1

Comparison: The good part

Reading the data

Pandas:

>>> data = pd.read_csv("/data/adult-dataset-data.csv")
>>> data.head(1)
Out: 
   age  final_weight   workclass  education_num
0   39         77516   State-gov             13

Spark:

>>> data = sqlc.load("s3://data/adult-dataset-data.csv", "com.databricks.spark.csv")

>>> data.take(1)
Out: 
[Row(age=45.0, final_weight=160962.0, workclass=u' Self-emp-not-inc', education_num=10.0)]

Both pandas and Spark DataFrames can easily read multiple formats including CSV, JSON, and some binary formats (some of them require additional libraries)

Note that Spark DataFrame doesn’t have an index. It doesn’t enumerate rows (which is a default index in pandas). In pandas the index is just a special column, so if we really need it, we should choose one of the columns of Spark DataFrame as ‘index’.

Slicing

Pandas:

>>> sliced = data[data.workclass.isin([' Local-gov', ' State-gov']) \
                 & (data.education_num > 1)][['age', 'workclass']]

>>> sliced.head(1)
Out:
   age   workclass
0   39   State-gov

Spark:


>>> slicedSpark = dataSpark[dataSpark.workclass.inSet([' Local-gov', ' State-gov']) 
                           & (dataSpark.education_num > 1)][['age', 'workclass']]

>>> slicedSpark.take(1)
Out:
[Row(age=48.0, workclass=u' State-gov')]

As you can see, they are very similar if you work on a single DataFrame.
There is one important difference. In pandas, boolean slicing expects just a boolean series, which means you can apply filter from another DataFrame if they match in length. In Spark you can only filter data based on columns from DataFrame you want to filter.

Aggregations

Simple value counts:

Pandas:

>>> data.groupby('workclass').workclass.count()
# or shortcut
>>> data.workclass.value_counts()
Out:
workclass
 ?                    1836
 Federal-gov           960
 ...

Spark:

>>> dataSpark.groupBy('workclass').count().collect()
Out:
[Row(workclass=u' Self-emp-not-inc', count=2541),
 Row(workclass=u' Federal-gov', count=960),
 Row(workclass=u' ?', count=1836),
 ...]

Multiple aggregations:

Pandas:

>>> data.groupby('workclass').agg({'final_weight': 'sum', 'age': 'mean'})
Out:
                         age  final_weight
workclass                                 
 ?                 40.960240     346115997
 Federal-gov       42.590625     177812394
 Local-gov         41.751075     394822919
 ...

Spark:

>>> dataSpark.groupBy('workclass').agg({'capital_gain': 'sum', 'education_num': 'mean'}).collect()

Out: 
[Row(workclass=u' Self-emp-not-inc', AVG(age)=44.96969696969697, SUM(final_weight)=446221558.0),
 Row(workclass=u' Federal-gov', AVG(age)=42.590625, SUM(final_weight)=177812394.0),
 ...]

Again syntax is very similar and you can easily calculate standard aggregations. Unfortunately currently Spark DataFrames don’t support custom aggregation functions, so you can use only several built-ins. It’s still possible to aggregate data in a custom way (using Hive UDAF or transitioning to raw RDD), but it’s less convenient and less performant.

Mapping

Let’s define a custom function:

def f(workclass, final_weight):
    if "gov" in workclass.lower():
        return final_weight * 2.0
    else:
        return final_weight

Pandas:

>>> pandasF = lambda row: f(row.workclass, row.final_weight)

>>> data.apply(pandasF, axis=1)
Out: 
0        155032
1         83311
...

Spark:

>>> sparkF = pyspark.sql.functions.udf(f,  pyspark.sql.types.IntegerType())

>>> dataSpark.select(
        sparkF(dataSpark.workclass, dataSpark.final_weight).alias('result')
    ).collect()
Out:
[Row(result=155032.0),
 Row(result=83311.0),
 ...]

As you can see, applying a custom function on one or more columns is very easy in both cases, we were even able to reuse the same function, just wrapped up differently for pandas and Spark. A slight inconvenience for Spark UDFs is that they require us to specify function return type upfront.

Comparison: The ugly part

So far so good, Spark and pandas are very similar and equally easy to use. Spark DataFrames, although much simpler to use than any other Big Data tool, are still a young element of Spark ecosystem and there are some rough edges. Let’s see a few examples.

Joins

Let’s define two very simple DataFrames for the join example.

>>> pandasA = pd.DataFrame([
        [1, "te", 1], 
        [2, "pandas", 4]], 
        columns=['colX', 'colY', 'colW'])

>>> pandasB = pd.DataFrame([
        [3.0, "st", 2],
        [4.0, "spark", 3]],
        columns=['colY', 'colZ', 'colX'])

>>> sparkA = sqlc.createDataFrame(pandasA)
>>> sparkB = sqlc.createDataFrame(pandasB)

Pandas has super-easy join syntax with its merge method:

Pandas:

>>> pandasA.merge(pandasB, on='colX', suffixes=('_A', '_B'), how='left')
Out: 
   colX  colY_A  colW  colY_B colZ
0     1      te     1     NaN  NaN
1     2  pandas     4       3   st

Pandas offers a few useful features:

  • you specify join key instead of equality condition
  • it automatically adds suffixes to common columns
  • it keeps only one copy of the join key

Now let’s see how it looks in Spark.

Spark:

>>> joined = sparkA.join(sparkB, sparkA.colX == sparkB.colX, 'left_outer')
>>> joined.toPandas()
Out: 
   colX    colY  colW  colY  colZ  colX
0     1      te     1   NaN  None   NaN
1     2  pandas     4     3    st     2

As you can see it requires the whole equality condition, keeps 2 copies of the join key and doesn’t add suffixes. It’s problematic, because now we can’t use df.col notation to select the columns. It’s even more confusing if you use collect instead of toPandas, because it seems that the second column with the same name overrides the first one (it doesn’t really do that, but it’s very confusing).

Spark:

>>> joined.collect()
Out:
[Row(colX=None, colY=None, colW=1, colY=None, colZ=None, colX=None),
 Row(colX=2, colY=3.0, colW=4, colY=3.0, colZ=u'st', colX=2)]

In order to get the same effect as in pandas, we need to do something like this:

Spark:

>>> sparkARenamed = sparkA \
                    .withColumnRenamed('colY', 'colY_A')
>>> sparkBRenamed = sparkB \
                    .withColumnRenamed('colX', 'colX_B') \
                    .withColumnRenamed('colY', 'colY_B')

>>> sparkARenamed.join(sparkBRenamed, sparkARenamed.colX == sparkBRenamed.colX_B, 'left_outer') \
             .select('colX', 'colY_A', 'colW', 'colY_B', 'colZ') \
             .toPandas()
Out:

   colX  colY_A  colW  colY_B  colZ
0     1      te     1     NaN  None
1     2  pandas     4       3    st

Ugh, so ugly!

Unions

Now let’s try to concat / union DataFrames:

Pandas:

>>> pd.concat([pandasA, pandasB])
Out:
   colW  colX    colY   colZ
0     1     1      te    NaN
1     4     2  pandas    NaN
0   NaN     2       3     st
1   NaN     3       4  spark

It looks reasonably. Pandas matched columns from both DataFrames, and filled missing values with empty values (NaNs).

Spark:

>>> sparkA.unionAll(sparkB).collect()
Out:
[Row(colX=1.0, colY=u'te', colW=1),
 Row(colX=2.0, colY=u'pandas', colW=4),
 Row(colX=3.0, colY=u'st', colW=2),
 Row(colX=4.0, colY=u'spark', colW=3)]

Well, clearly something is wrong here. Spark union is very limited. It only works if DataFrames have exactly the same schema (including the order of columns), but it doesn’t throw any error, so it’s very easy to fall in this trap.

Fixing the ugly part

As you can see, there are some ugly parts, but can we do something about it?

Well, it looks like it’s just the matter of input data, so let’s write a wrapper function which will simulate pandas-like concat on Spark DataFrames.

Spark:

def addEmptyColumns(df, colNames):
    exprs = df.columns + ["null as " + colName for colName in colNames]
    return df.selectExpr(*exprs)


def concatTwoDfs(left, right):
    # append columns from right df to left df
    missingColumnsLeft = set(right.columns) - set(left.columns)
    left = addEmptyColumns(left, missingColumnsLeft)

    # append columns from left df to right df
    missingColumnsRight = set(left.columns) - set(right.columns)
    right = addEmptyColumns(right, missingColumnsRight)

    # let's set the same order of columns
    right = right[left.columns]

     # finally, union them
    return left.unionAll(right)


def concat(dfs):
    return reduce(concatTwoDfs, dfs)

Now we can concat those DataFrames in pandas style:

Spark:

>>> concat([sparkA, sparkB]).collect()
Out:
[Row(colX=1, colY=u'te', colW=1, colZ=None),
 Row(colX=2, colY=u'pandas', colW=4, colZ=None),
 Row(colX=2, colY=u'3.0', colW=None, colZ=u'st'),
 Row(colX=3, colY=u'4.0', colW=None, colZ=u'spark')]

You can write a pandas-like merge as an exercise.

Summary

I only covered small subset of pandas & Spark DFs functionalities, but I hope you get the impression.

Spark DataFrames in their current state are already powerful and easy to use. However, there are some pain points. They still need to catch up to usability of pandas – in some cases the API is pretty raw and makes unintuitive assumptions. Additionally, some important pieces of functionality like custom aggregations are missing. Fortunately these are already on the Spark roadmap, so the future releases will likely improve the situation.

On the other hand, it’s very easy to extend basic functionality with custom wrappers. We started using Spark several months ago and we’ve been able to fix almost all of the pain points we’ve encountered.

If you use pandas and want to work on a bigger datasets, go for PySpark and DataFrames!


Cover photo by Elizabeth Haslam licensed with Attribution-NonCommercial 2.0 Generic License

Share this article

  • dataviking

    Thanks Mateusz! Very helpful for us Spark beginners. How does one slice a Spark DF horizontally by index (and not by column properties)? For eg. if I want the 20th to 30th rows of a dataframe in a new DF? I can think of a few ways – adding an index column and filtering, doing a .take() twice, converting to Pandas and slicing, etc., but is there an easy transformation to do this?

    • Tycho Grouwstra

      I’m not sure it’s exposed to the DataFrame API, but RDDs had this zipWithIndex() to add an index ‘column’, see http://stackoverflow.com/questions/26046410/how-can-i-obtain-an-element-position-in-sparks-rdd . With that though (see thread), the order obtained may not match the original ordering. I haven’t tried, but in case DataFrames could use that to add an index, I suppose one might then use df.filter() to obtain only the desired rows…

      • dataviking

        Thanks Tycho! Although zipByIndex() is not present in the DF API, the longer solution of filtering and re-converting to a dataframe works. And the order can be regained by doing a sortByKey() (where key is the index).

    • Mateusz Buśkiewicz

      @dataviking:disqus Thank you!

      Currently there is no easy & scalable way to do this. There are several workarounds, probably the best one is the one mentioned by @tychogrouwstra:disqus . There are also window functions introduced in Spark 1.4: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html . In particular, there is a rowNumber function, but currently it doesn’t work well on large datasets (if you want to partition on the entire dataset).

      If you only need an arbitrary subset of rows, sampling works well.

  • Joel Bondurant

    “Moreover, pandas doesn’t have any parallelism built in, which means it uses only one CPU core.” – false, Pandas uses Numexpr to utilize multiple cores in some operations transparent to the user.

    • Mateusz Buśkiewicz

      @joelbondurant:disqus, good to know :) Can you provide more details? I think that numexpr can only speed up very simple numerical expressions. It’s something, but certainly not enough to say that pandas is multi-core.

  • leezu

    Nice blogpost, thanks!

    I implemented some join helper function to fix some of the ugly parts (duplicate columns, column renaming). You may find it here: https://gist.github.com/leezu/86b4d50e6d2fe80f9198
    Any additions, fixes or ideas for improvement are welcome :)

  • Reynold Xin

    Reynold from the Spark team here. Thanks for the great blog post.

    I just created a pull request to rename inSet to isin to match Pandas. https://github.com/apache/spark/pull/7977

    The part about join is a great idea. Would you be interested in submitting a pull request to Spark?

  • Dror Asaf

    Please note that dask is the name of the game in order to handle medium scale data

  • kouravakarthik jm

    .cat.codes is a pandas method, I need to use similar method which does the same operation in spark!

  • kouravakarthik jm

    how to convert string feature column to numerical values or codes such that they can fit in the machine learning model?