Day 5/100
Delta Lake [Part 3] -
Delta table Streaming
Delta Lake is deeply integrated with Spark Structured Streaming through readStream
and writeStream
along with following features,
- Coalesce small files
- Maintains
Exactly-once
processing - Discovering new files
As Source
If we load Delta Table as a source then streaming processes all the existing data from the table as well as any new data that arrives.
import io.delta.implicits._
spark.readStream.delta("/path/to/data")
// OR
spark.readStream.format("delta").load("/path/to/data")
// OR
spark.readStream.format("delta").table("tableName")
Limit Input rate
maxFilesPerTrigger
: number of new files to be considered in every micro-batch. [default = 1000]maxBytesPerTrigger
: How much data gets processed in each micro-batch. this is a soft-limit the size can be more or less around the limit.If both are provided, stream processes data until either the
maxFilesPerTrigger
ormaxBytesPerTrigger
limit is reached.
Specify Initial position
startingVersion
- starts from specific delta table versionstartingTimestamp
- ts to start- You cannot set both options at the same time;
- the schema of the streaming source is always the latest schema of the Delta table, irrespective of starting vetsion or timestamp
spark.readStream.format("delta").option("startingVersion", "5").load("/tmp/delta/user_events")
//OR
spark.readStream.format("delta").option("startingTimestamp", "2018-10-18").load("/tmp/delta/user_events")
As Sink Metric
numBytesOutstanding
and numFilesOutstanding
, will give us data size or number of files remaining
- Append Mode - adds new records to the table.
- Complete Mode - replace the entire table with every batch
Table deletes, updates, and merges
Delete -
- delete removes the data from the latest version of the Delta table but does not remove it from the physical storage until the old versions are explicitly vacuumed
DELETE FROM tableName WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/path/to/data` WHERE birthDate < '1955-01-01'
Update -
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/path/to/data")
deltaTable.updateExpr( "gender = 'F'", Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
deltaTable.update( col("gender") === "M",Map("gender" -> lit("Male")));
Upsert using merge
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forPath(spark, "/path/to/data")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr( Map( "id" -> "updates.id", "name" -> "updates.name", "salary" -> "updates.salary"))
.whenNotMatched
.insertExpr(
Map( "id" -> "updates.id", "name" -> "updates.name", "salary" -> "updates.salary"
))
.execute()
Auto Schema Evaluation
By default, updateAll and insertAll assign all the columns in the target Delta table with columns of the same name from the source dataset.
Need to set spark.databricks.delta.schema.autoMerge.enabled
to true
.
Performance Training -
- Reduce the search space for matches by providing known column values also partitions
- compact files
- Low Shuffle Merge Mode
Reference - docs.databricks.com/delta/delta-update.html..