Day 3/100

Deltalake - [ Intro and Quick Start ]

key features -

  • ACID transactions
  • schema enforcement on writes
  • Unification of batch and streaming - A table in Delta Lake is a batch table as well as a streaming source and sink.
  • Time travel
  • Supports merge, update and delete operations

Quickstart

create deltalake table

// Write the data to its target.
df.write.format("delta").save(save_path)

//Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path)

// or you convert existing parquet data to deltalake table
spark.sql("CONVERT TO DELTA parquet.`/tmp/delta/people-10m`")

//Deltalake table but partitioned
df.write.partitionBy(partition_by).format("delta").save(save_path)

//support for streaming read/write
val df_stream = (spark.readStream.schema(read_schema)
  .option("maxFilesPerTrigger", 1).option("multiline", true)
  .json(json_read_path))

df_stream.writeStream.format("delta").outputMode("append")
  .option("checkpointLocation", checkpoint_path).start(save_path)

Upserts

  • Basically while doing append/update if row exists deltalake updates the columns and creates a new one if it does not exist.

Time Travel

  • for getting history of table you need to use describe table command in SQL or in scala use,
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val fullHistoryDF = deltaTable.history()    // get the full history of the table

val lastOperationDF = deltaTable.history(1) //get only last update

and the output should look something like this

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
  • to query earlier version of the table
spark.sql("SELECT * FROM default.deltaTable VERSION AS OF 0")

//OR

spark.sql("SELECT * FROM default.deltaTable TIMESTAMP AS OF '2019-01-29 00:37:58'")  //here you can also pass in the date only

//DF/DS API

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').load('/tmp/delta/people-10m')

//OR
df2 = spark.read.format('delta').option('versionAsOf', 2).load('/tmp/delta/people-10m')
  • you can also optimise i.e. merge smaller files into one larger files
  • also you can z-order columns
spark.sql("OPTIMIZE delta.`/tmp/delta/people-10m`")
spark.sql("OPTIMIZE default.people10m")

// AND

spark.sql("OPTIMIZE delta.`/tmp/delta/people-10m` ZORDER BY (gender)")
spark.sql("OPTIMIZE default.people10m ZORDER BY (gender)")

Deleting snapshots

spark.sql("VACUUM default.people10m")

Reference Article - docs.databricks.com/delta/quick-start.html#