Day 34/100

Spark Structured Streaming Dynamic Dataset Join

I have came across a unique problem, I had a list of valid names in some remote configuration which changes often. Now I have a spark structured streaming job which runs always and it is an always on kind of a long running job. And I want to join the valid names from remote config with spark structured streaming job so that the latest batch should filter/include the latest changes accordingly.

so simplified problem statement looks something like this,

val validNamesDF = spark.read("fromSomeRemoteSource")

the other dataframe is a stream something like this,

val inputStream = spark.readStream("fromSomeStreamSource")

both of them needs to be joined before writing/while reading data to filter rows out

inputStream.join(validNamesDF, inputStream("key")===validNamesDF("key"))
  .writeStream("toSomeSink")

Approach 1

  • Suppose the remote config is in some s3/hdfs location and we use json/csv to read the config. like below,
val validNamesDF = spark.read.json("s3://path/to/custom/config")
  • Now we read the stream from inputStream and try to join the validNamesDF with inputStream before writing to filter out the invalid names.
val inputStream = spark.readStream("fromSomeStreamSource")
inputStream.join(validNamesDF, inputStream("key")===validNamesDF("key"))
  .writeStream("toSomeSink")
  • So the job starts execution at start works as expected, it starts to filter the names as per validNamesDF and writes the valid output
  • the real problem starts when we change the remote config file and add/remove few names.
  • Even after the remote config file has been changed, the writeStream is not picking up the latest changes from remote.
  • Seems like the issue is that spark does not compute the brodcast/smaller config dataframe to be joined with datastream everytime it uses some kind of cache to store the validNamesDF in memory and hence does not recompute every time the batch processing starts.
  • I also tried using a temporary view for the join as well, and a thread to always refresh the spark temporary view. But that too did not work out because of the cache thing.

  • Facepalm!

Approach 2 [Solution]

  • So to solve this I had created a deltalake table on top of the custom config.
  • And the best part about deltalake table is that if the table is updated by any other thread/process/job the other instance of other jobs also pick up the latest changes.
  • Hence that makes deltalake a best candidate to solve this problem of caching.
  • Delta Lake table index TahoeFileIndex class does not cache the data/index. Job calls the DeltaLog#update method for every join operation/access operation, which refreshes the current version of the dataset.
// read the valid names config from deltalake
val validNamesDF = spark.read.format("delta").load("/path/to/valid/names")

// read the stream and join with validNames
val inputStream = spark.readStream("fromSomeStreamSource")
inputStream.join(validNamesDF, inputStream("key")===validNamesDF("key"))
  .writeStream("toSomeSink")
  • This worked like a charm, try this for your next dynamic config read from spark structured streaming.