Day 5/100

Delta Lake [Part 3] -

Delta table Streaming

Delta Lake is deeply integrated with Spark Structured Streaming through readStream and writeStream along with following features,

  • Coalesce small files
  • Maintains Exactly-once processing
  • Discovering new files

As Source

If we load Delta Table as a source then streaming processes all the existing data from the table as well as any new data that arrives.

import io.delta.implicits._

spark.readStream.delta("/path/to/data")
// OR 
spark.readStream.format("delta").load("/path/to/data")
// OR
spark.readStream.format("delta").table("tableName")

Limit Input rate

  • maxFilesPerTrigger : number of new files to be considered in every micro-batch. [default = 1000]

  • maxBytesPerTrigger : How much data gets processed in each micro-batch. this is a soft-limit the size can be more or less around the limit.

  • If both are provided, stream processes data until either the maxFilesPerTrigger or maxBytesPerTrigger limit is reached.

Specify Initial position

  • startingVersion - starts from specific delta table version
  • startingTimestamp - ts to start
  • You cannot set both options at the same time;
  • the schema of the streaming source is always the latest schema of the Delta table, irrespective of starting vetsion or timestamp
spark.readStream.format("delta").option("startingVersion", "5").load("/tmp/delta/user_events")
//OR 
spark.readStream.format("delta").option("startingTimestamp", "2018-10-18").load("/tmp/delta/user_events")

As Sink Metric

numBytesOutstanding and numFilesOutstanding, will give us data size or number of files remaining

  • Append Mode - adds new records to the table.
  • Complete Mode - replace the entire table with every batch

Table deletes, updates, and merges

Delete -

  • delete removes the data from the latest version of the Delta table but does not remove it from the physical storage until the old versions are explicitly vacuumed
DELETE FROM tableName WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/path/to/data` WHERE birthDate < '1955-01-01'

Update -

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/path/to/data")

deltaTable.updateExpr( "gender = 'F'", Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.update( col("gender") === "M",Map("gender" -> lit("Male")));

Upsert using merge

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/path/to/data")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr( Map( "id" -> "updates.id", "name" -> "updates.name",  "salary" -> "updates.salary"))
  .whenNotMatched
  .insertExpr(
    Map( "id" -> "updates.id", "name" -> "updates.name",  "salary" -> "updates.salary"
    ))
  .execute()

Auto Schema Evaluation

By default, updateAll and insertAll assign all the columns in the target Delta table with columns of the same name from the source dataset. Need to set spark.databricks.delta.schema.autoMerge.enabled to true. Performance Training -

  • Reduce the search space for matches by providing known column values also partitions
  • compact files
  • Low Shuffle Merge Mode

Reference - docs.databricks.com/delta/delta-update.html..