Day 6/100

Delta Lake [Part 4]

Change data feed

change data feed represents row-level changes between versions, This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated.

we can read this field using SQL, Batch and Streaming DataFrame API as well. Enable using following commands,

  • New Table -
    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Existing Table -
    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • All New tables -
    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Note - Only changes made after you enable the change data feed are recorded; past changes to a table are not captured.

Read Changes

  • If you provide a version lower or timestamp older than one that has recorded change events, that is, when the change data feed was enabled, an error is thrown indicating that the change data feed was not enabled.

  • Batch query would look something like this,

    spark.read.format("delta").option("readChangeFeed", "true").option("startingVersion", 0).option("endingVersion", 10).table("myDeltaTable")
    
  • Streaming queries would look something like this,
    spark.readStream.format("delta").option("readChangeFeed", "true").option("startingVersion", "2021-04-21 05:35:43").load("/pathToMyDeltaTable")
    
  • Not significant overhead on processing if change is enabled.

Table utility commands

Remove files no longer referenced by Delta table,

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum()        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     // vacuum files not required by versions more than 100 hours old
  • VACUUM commits to the Delta transaction log contain audit information. You can query the audit events using DESCRIBE HISTORY.

    • To capture audit information, enable spark.databricks.delta.vacuum.logging.enabled

Retrieve Delta table history

You can retrieve information on the operations, user, timestamp, and so on for each write to a Delta table by running the history command. The operations are returned in reverse chronological order. By default table history is retained for 30 days.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val fullHistoryDF = deltaTable.history()    // get the full history of the table

val lastOperationDF = deltaTable.history(1) // get the last operation

Sample History Schema

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

Retrieve Delta table details

DESCRIBE DETAIL '/data/events/'

DESCRIBE DETAIL eventsTable

Sample Details looks something like this,

+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
|format|                  id|              name|description|            location|           createdAt|       lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
| delta|d31f82d2-a69f-42e...|default.deltatable|       null|file:/Users/tuor/...|2020-06-05 12:20:...|2020-06-05 12:20:20|              []|      10|      12345|        []|               1|               2|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+

Generate a manifest file

val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

Convert a Parquet table to a Delta table

This command lists all the files in the directory, creates a Delta Lake transaction log that tracks these files, and automatically infers the data schema

import io.delta.tables._

// Convert unpartitioned Parquet table at path '<path-to-table>'
val deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")

// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
val partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int")

Restore a Delta table to an earlier state

Delta table internally maintains historic versions of the table that enable it to be restored to an earlier state.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, <path-to-table>)
val deltaTable = DeltaTable.forName(spark, <table-name>)

deltaTable.restoreToVersion(0) // restore table to oldest version

deltaTable.restoreToTimestamp("2019-02-14") // restore to a specific timestamp

Clone a Delta table

  • A deep clone is a clone that copies the source table data to the clone target in addition to the metadata of the existing table. Additionally, stream metadata is also cloned such that a stream that writes to the Delta table can be stopped on a source table and continued on the target of a clone from where it left off.

  • A shallow clone is a clone that does not copy the data files to the clone target. The table metadata is equivalent to the source. These clones are cheaper to create.

 import io.delta.tables._

 val deltaTable = DeltaTable.forPath(spark, pathToTable)
 val deltaTable = DeltaTable.forName(spark, tableName)

 deltaTable.clone(target, isShallow, replace) // clone the source at latest version

 deltaTable.cloneAtVersion(version, target, isShallow, replace) // clone the source at a specific version

 deltaTable.cloneAtTimestamp(timestamp, target, isShallow, replace) // clone the source at a specific timestamp

Constraints

Constraints ensure that the quality and integrity of data added to a table is automatically verified. When a constraint is violated, Delta Lake throws an InvariantViolationException to signal that the new data can’t be added.

NOT NULL constraint

Before adding a NOT NULL constraint to a table, Databricks verifies that all existing rows satisfy the constraint.

> CREATE TABLE tableName ( id INT NOT NULL, name STRING, salary INT ) USING DELTA;

> ALTER TABLE tableName CHANGE COLUMN name DROP NOT NULL;

// Alter existing table columns
> ALTER TABLE tableName CHANGE COLUMN id SET NOT NULL;

CHECK constraint

> CREATE TABLE tableName ( id INT, name STRING, salary INT ) USING DELTA;
> ALTER TABLE tableName ADD CONSTRAINT salaryGreaterThan CHECK (salary > 19000);
> ALTER TABLE tableName DROP CONSTRAINT salaryGreaterThan;

> ALTER TABLE tableName ADD CONSTRAINT validIds CHECK (id > 1 and id < 99999999);
> DESCRIBE DETAIL tableName;
> SHOW TBLPROPERTIES tableName;

Reference 1 - docs.databricks.com/delta/delta-change-data..

Reference 2 - docs.databricks.com/delta/delta-utility.html

Reference 3 - docs.databricks.com/delta/delta-constraints..