Day 7/100
Delta Lake [Part 5]
Table Versioning
Delta Lake chooses the minimum required protocol version based on table characteristics such as the schema or table properties. You can also set the default protocol versions by setting the SQL configurations:
spark.databricks.delta.properties.defaults.minWriterVersion = 2 (default)
spark.databricks.delta.properties.defaults.minReaderVersion = 1 (default)
To upgrade protocol version use following, but note that version upgrade is irreversible so only upgrade if you need some new feature from that version.
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3) // upgrades to readerVersion=1, writerVersion=3
Delta Column Mapping
this allows Delta table columns and the corresponding Parquet columns to use different names. Also enables Delta schema evolution operations such as RENAME COLUMN on a Delta table without the need to rewrite the underlying Parquet files.
- requires the Delta table version to be reader version 2 and writer version 5.
ALTER TABLE <table_name> SET TBLPROPERTIES ( 'delta.columnMapping.mode' = 'name' )
you can include spaces as well as any of these characters in the table’s column names: ,;{}()\n\t=.
Once you enable column mapping for a table you can use following to rename
ALTER TABLE <table_name> RENAME COLUMN old_col_name TO new_col_name
Best practices: Delta Lake
Provide data location hints
- If a column is commonly used in query predicates and has high cardinality, then use Z-ORDER BY. [more info here]
Choose the right partition column
- If the cardinality of a column will be very high, do not use that column for partitioning
- expect data in each partition to be at least 1 GB
Compact files
- a large number of small files should be rewritten into a smaller number of larger files on a regular basis, this is file compaction.
- we can use the OPTIMIZE command for compaction.
Replace the content or schema of a table
- If you want to change the table schema, you can replace the whole table atomically. For example:
dataframe.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.partitionBy(<your-partition-columns>)
.saveAsTable("<your-table>") // Managed table
dataframe.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.option("path", "<your-table-path>")
.partitionBy(<your-partition-columns>)
.saveAsTable("<your-table>") // External table
Spark caching
df = spark.read.load('/some/path')
df.cache()
Use caching only if you have some expensive aggregation or join result that will be used multiple times.
You lose any data skipping that can come from additional filters added on top of the cached DataFrame.
The data that gets cached may not be updated if the table is accessed using a different identifier
Reference 1 - docs.databricks.com/delta/versioning.html
Reference 2 - docs.databricks.com/delta/delta-column-mapp..
Reference 3 - docs.databricks.com/delta/best-practices.html