Day 4/100

Delta Lake [Part 2]

Ingest data to Delta Lake

  • Delta Lake provides a way to ingest existing data by using COPY INTO sql command
  • COPY INTO is an idenpotent command, can run multiple times and will skip ingested files.
spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

Auto Loader

  • Auto Loader incrementally and efficiently processes new data files as they arrive
  • Auto Loader can discover files more efficiently than the COPY INTO SQL command
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .schema("city string, year int, population long")
  .load(upload_path)

df.writeStream.format("delta")
  .option("checkpointLocation", checkpoint_path)
  .start(write_path)

Table batch reads and writes

  • Delta Lake supports creating two types of tables—tables defined in the metastore and tables defined by path.
  1. SQL DDL
CREATE TABLE IF NOT EXISTS tableName (
  id INT,
  name STRING,
  salary INT
) USING DELTA
  1. DF/DS API
// Create table in the metastore using DataFrame's schema and write data to it
df.write.format("delta").saveAsTable("tableName")

// Create table with path using DataFrame's schema and write data to it
df.write.format("delta").mode("overwrite").save("/path/to/data")
  1. DeltaTableBuilder API
DeltaTable.createOrReplace(spark)
  .tableName("tableName")
  .addColumn("id", "INT")
  .addColumn("name", "STRING")
  .addColumn("salary", "INT")
  .execute()

Adding Partitions

  • You can use .partitionBy("columnName") to add partition to tables using DF API
  • You can use .partitionedBy("columnName") while using DeltaTableBuilder API

Data Location

  • Tables created with LOCATION are unmanaged by metastore
  • Otherwise it is a managed table
  • an unmanaged table’s files are not deleted when you DROP the table.
  • metastore automatically inherits the schema, partitioning, and table properties of the existing data
  • on specifying any configuration (schema, partitioning, or table properties), Delta Lake verifies that the specification exactly matches the configuration of the existing data otherwise throws error.

Time Travel

SELECT * FROM table_name TIMESTAMP AS OF timestamp_expression
SELECT * FROM table_name VERSION AS OF version
  • timestamp_expression can be any one of:
    • '2018-10-18T22:15:12.013Z', that is, a string that can be cast to a timestamp
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18', that is, a date string
  • Query the number of new customers added over the last week.
SELECT count(distinct userId) - (
  SELECT count(distinct userId)
  FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))

Write to Delta Lake

  • Append - atomically add new data to an existing Delta table
df.write.format("delta").mode("append").save("/path/to/data")
df.write.format("delta").mode("append").saveAsTable("tableName")

import io.delta.implicits._
df.write.mode("append").delta("/path/to/data")
  • Overwrite - atomically replace all the data in a table
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")

import io.delta.implicits._
df.write.mode("overwrite").delta("/tmp/delta/people10m")
  • Conditional updates
df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

Schema Validations

  • Delta Lake automatically validates that the schema of the DataFrame being written is compatible with the schema of the table
  • All DataFrame columns must exist in the target table
  • DataFrame column data types must match with respective Delta Lake
  • DataFrame column names are case sensitive as Parquet is case sensitive. Even setting spark to case insensitive does not matter.

Table schema Updates

  • Following Updates are allowed,

    • Adding new columns
    • Reordering existing columns
    • Renaming existing columns
  • Change a column type

spark.read.table(...) \
  .withColumn("birthDate", col("birthDate").cast("date")) \
  .write \
  .format("delta") \
  .mode("overwrite")
  .option("overwriteSchema", "true") \
  .saveAsTable(...)
  • Change a column name
spark.read.table(...) \
  .withColumnRenamed("dateOfBirth", "birthDate") \
  .write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable(...)
  • Add columns

    • write or writeStream have .option("mergeSchema", "true")
    • spark.databricks.delta.schema.autoMerge.enabled is true
  • Replace table schema

df.write.option("overwriteSchema", "true")

Views on Delta Lake Tables

  • supports the creation of views on top of Delta tables
  • challenge - If you alter a Delta table schema, you must recreate derivative views to account for any additions to the schema

Table Properties

  • we can store your own metadata as a table property using TBLPROPERTIES in CREATE and ALTER
  • can't define new TBLPROPERTIES in a CREATE statement if a Delta table already exists
  • supports Block deletes and updates in a Delta table: delta.appendOnly=true.
  • Time travel properties, delta.logRetentionDuration=<interval-string> and delta.deletedFileRetentionDuration=<interval-string>
  • Pro TIP - For tables that require > thousands of requests per second, It is recommended to dedicate an S3 bucket to a table, and also enabling randomized file prefixes for best experience using delta.randomizeFilePrefixes=true
  • Set delta table properties via spark using something like this spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")

Table Metadata

  • DESCRIBE DETAIL
    • Provides schema, partitioning, table size, and so on.
  • DESCRIBE HISTORY
    • Provides operation, user, and operation metrics for each write to a table and so on.

Reference - docs.databricks.com/delta/delta-batch.html