Day 4/100
Delta Lake [Part 2]
Ingest data to Delta Lake
- Delta Lake provides a way to ingest existing data by using COPY INTO sql command
- COPY INTO is an idenpotent command, can run multiple times and will skip ingested files.
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
Auto Loader
- Auto Loader incrementally and efficiently processes new data files as they arrive
- Auto Loader can discover files more efficiently than the COPY INTO SQL command
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.schema("city string, year int, population long")
.load(upload_path)
df.writeStream.format("delta")
.option("checkpointLocation", checkpoint_path)
.start(write_path)
Table batch reads and writes
- Delta Lake supports creating two types of tables—tables defined in the metastore and tables defined by path.
- SQL DDL
CREATE TABLE IF NOT EXISTS tableName (
id INT,
name STRING,
salary INT
) USING DELTA
- DF/DS API
// Create table in the metastore using DataFrame's schema and write data to it
df.write.format("delta").saveAsTable("tableName")
// Create table with path using DataFrame's schema and write data to it
df.write.format("delta").mode("overwrite").save("/path/to/data")
- DeltaTableBuilder API
DeltaTable.createOrReplace(spark)
.tableName("tableName")
.addColumn("id", "INT")
.addColumn("name", "STRING")
.addColumn("salary", "INT")
.execute()
Adding Partitions
- You can use .partitionBy("columnName") to add partition to tables using DF API
- You can use .partitionedBy("columnName") while using DeltaTableBuilder API
Data Location
- Tables created with LOCATION are unmanaged by metastore
- Otherwise it is a managed table
- an unmanaged table’s files are not deleted when you DROP the table.
- metastore automatically inherits the schema, partitioning, and table properties of the existing data
- on specifying any configuration (schema, partitioning, or table properties), Delta Lake verifies that the specification exactly matches the configuration of the existing data otherwise throws error.
Time Travel
SELECT * FROM table_name TIMESTAMP AS OF timestamp_expression
SELECT * FROM table_name VERSION AS OF version
- timestamp_expression can be any one of:
- '2018-10-18T22:15:12.013Z', that is, a string that can be cast to a timestamp
- cast('2018-10-18 13:36:32 CEST' as timestamp)
- '2018-10-18', that is, a date string
- Query the number of new customers added over the last week.
SELECT count(distinct userId) - (
SELECT count(distinct userId)
FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Write to Delta Lake
- Append - atomically add new data to an existing Delta table
df.write.format("delta").mode("append").save("/path/to/data")
df.write.format("delta").mode("append").saveAsTable("tableName")
import io.delta.implicits._
df.write.mode("append").delta("/path/to/data")
- Overwrite - atomically replace all the data in a table
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")
import io.delta.implicits._
df.write.mode("overwrite").delta("/tmp/delta/people10m")
- Conditional updates
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.save("/tmp/delta/events")
Schema Validations
- Delta Lake automatically validates that the schema of the DataFrame being written is compatible with the schema of the table
- All DataFrame columns must exist in the target table
- DataFrame column data types must match with respective Delta Lake
- DataFrame column names are case sensitive as Parquet is case sensitive. Even setting spark to case insensitive does not matter.
Table schema Updates
Following Updates are allowed,
- Adding new columns
- Reordering existing columns
- Renaming existing columns
Change a column type
spark.read.table(...) \
.withColumn("birthDate", col("birthDate").cast("date")) \
.write \
.format("delta") \
.mode("overwrite")
.option("overwriteSchema", "true") \
.saveAsTable(...)
- Change a column name
spark.read.table(...) \
.withColumnRenamed("dateOfBirth", "birthDate") \
.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable(...)
Add columns
write
orwriteStream
have.option("mergeSchema", "true")
spark.databricks.delta.schema.autoMerge.enabled
istrue
Replace table schema
df.write.option("overwriteSchema", "true")
Views on Delta Lake Tables
- supports the creation of views on top of Delta tables
- challenge - If you alter a Delta table schema, you must recreate derivative views to account for any additions to the schema
Table Properties
- we can store your own metadata as a table property using TBLPROPERTIES in CREATE and ALTER
- can't define new TBLPROPERTIES in a CREATE statement if a Delta table already exists
- supports Block deletes and updates in a Delta table:
delta.appendOnly=true.
- Time travel properties,
delta.logRetentionDuration=<interval-string>
anddelta.deletedFileRetentionDuration=<interval-string>
- Pro TIP - For tables that require > thousands of requests per second, It is recommended to dedicate an S3 bucket to a table, and also enabling randomized file prefixes for best experience using
delta.randomizeFilePrefixes=true
- Set delta table properties via spark using something like this
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
Table Metadata
- DESCRIBE DETAIL
- Provides schema, partitioning, table size, and so on.
- DESCRIBE HISTORY
- Provides operation, user, and operation metrics for each write to a table and so on.
Reference - docs.databricks.com/delta/delta-batch.html