Quick Sum Up
When loading a dataframe from a file with Spark, make sure to reload the dataframe with spark.read
each time the file has been modified. Indeed, this command will not automatically be evaluated completely each time you call an action on it. If you don’t, you may lose data with no warning. This means that the schema is not the only information about your data that is not loaded lazily.
Context
We are setting up Spark pipelines ingesting live data from sensors in micro-batches with the Spark streaming API. Each micro-batch needs to be enriched with slowly evolving data, made available in a csv file which is regularly overwritten by an external process. The csv columns will remain the same but entries can be modified and new lines can be added.
Note: having to ingest this data with no control on its modifications was not optimal, but this was a constraint we had to deal with.
Problem
A few days after the implementation, we started to observe data loss in our pipeline. Investigating our logs, we isolated the issue in the part of the code performing the join with external data coming from this very file.
Our first step to debug the code was to use parameter once
in .trigger(once=True)
, which is provided by the structured streaming API. Very handy to debug streaming processes! However, when processing only one batch, all the data was present in the output. Thus, the issue was not directly caused by the data contained in the file.
Digging a bit deeper, we saw that it was linked to the way we were loading the file. In our code implementation, we made an assumption that turned out to be wrong… Before looking at it in details, let’s quickly refresh our minds about an important concept: lazy loading.
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently.
Source: Spark documentation
For example, df = spark.read.csv(”data.csv”)
will be lazily executed so the data will not be loaded in the dataframe right away. However, applying the action df.count()
will effectively need to compute the transformations in order to return a result to the driver.
Reproducing the issue
As stated earlier, while implementing our solution, we implicitly made an assumption that was not true. We thought that when calling an action on our Spark dataframe, all the transformations defining the dataframe would be evaluated again. Thus, we read the csv file once (transformation), and regularly triggered actions to actually perform our data manipulations. Here is the previous schematic with a more detailed view of the Spark job.
What we discovered is that the data read from the csv file was incomplete. However, there was no exception or warning: the dataframe was silently truncated!
Here is a minimal code snippet to reproduce what happened with pyspark
:
# data.csv
# --------
# anne,paris
# bob,barcelone
# charles,re
df = spark.read.csv("data.csv")
df.show() # you see 3 entries as expected
+-------+---------+
| _c0| _c1|
+-------+---------+
| anne| paris|
| bob|barcelona|
|charles| re|
+-------+---------+
# Now, Bob is called Bobby and moved out so data.csv is edited as follows
#
# data.csv# --------
# anne,paris
# bob,a-city-that-has-a-longer-name
# charles,re
df.show() # the last line has been lost with no warning
+-----+--------------------+
| _c0| _c1|
+-----+--------------------+
| anne| paris|
|bobby|a-city-that-has-a...|
+-----+--------------------+
This is due to the fact that the second action did not trigger a full reload of the source file but kind of tried to force the new data into the previous dataframe. If you recreate it, everything will, of course, work as expected.
Note: this is why our attempt to debug our streaming process by running only one batch did not reproduce the issue. Indeed, the file loading transformation was always performed before the action!
Takeaway
- Ensure that your data processing is behaving as expected thanks to data quality indicators and monitor them. Otherwise, you will never be safe. This is true whereas you use Spark or not.
- The reading of a source file with Spark is linked in some way to the initial file data. This means that other information than the schema is not loaded lazily (this was the cause of our bug!), but we did not grasp what yet (maybe an information about the data size?). Anyway, if the source file is modified, you need to recreate your dataframe with
spark.read
. - Spark might sometimes behave in ways that you do not expect and, be silent about it! Thus, be really careful about edge cases when implementing your data processing.
- Take advantage of Spark parameter
readStream.trigger(once=True)
to debug your streaming processes 😉
Bonus
While looking for resources about our bug, I found this article. It describes various Spark behaviors when dealing with unexpected data, which often happens in real-life systems. In particular, schema handling is a very important topic to be cautious about.
If you are looking for ways to run pandas code on spark or to create datalake on spark, you will also find information on our blog so do not hesitate to check it out!