Day 3/100
Deltalake - [ Intro and Quick Start ]
key features -
- ACID transactions
- schema enforcement on writes
- Unification of batch and streaming - A table in Delta Lake is a batch table as well as a streaming source and sink.
- Time travel
- Supports merge, update and delete operations
Quickstart
create deltalake table
// Write the data to its target.
df.write.format("delta").save(save_path)
//Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path)
// or you convert existing parquet data to deltalake table
spark.sql("CONVERT TO DELTA parquet.`/tmp/delta/people-10m`")
//Deltalake table but partitioned
df.write.partitionBy(partition_by).format("delta").save(save_path)
//support for streaming read/write
val df_stream = (spark.readStream.schema(read_schema)
.option("maxFilesPerTrigger", 1).option("multiline", true)
.json(json_read_path))
df_stream.writeStream.format("delta").outputMode("append")
.option("checkpointLocation", checkpoint_path).start(save_path)
Upserts
- Basically while doing append/update if row exists deltalake updates the columns and creates a new one if it does not exist.
Time Travel
- for getting history of table you need to use describe table command in SQL or in scala use,
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
val fullHistoryDF = deltaTable.history() // get the full history of the table
val lastOperationDF = deltaTable.history(1) //get only last update
and the output should look something like this
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
- to query earlier version of the table
spark.sql("SELECT * FROM default.deltaTable VERSION AS OF 0")
//OR
spark.sql("SELECT * FROM default.deltaTable TIMESTAMP AS OF '2019-01-29 00:37:58'") //here you can also pass in the date only
//DF/DS API
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').load('/tmp/delta/people-10m')
//OR
df2 = spark.read.format('delta').option('versionAsOf', 2).load('/tmp/delta/people-10m')
- you can also optimise i.e. merge smaller files into one larger files
- also you can z-order columns
spark.sql("OPTIMIZE delta.`/tmp/delta/people-10m`")
spark.sql("OPTIMIZE default.people10m")
// AND
spark.sql("OPTIMIZE delta.`/tmp/delta/people-10m` ZORDER BY (gender)")
spark.sql("OPTIMIZE default.people10m ZORDER BY (gender)")
Deleting snapshots
spark.sql("VACUUM default.people10m")
Reference Article - docs.databricks.com/delta/quick-start.html#