Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
For Apache Iceberg and Delta Lake tables, each operation that modifies a table creates a new table version. Use history information to audit operations, roll back a table, or query a table at a specific point in time using time travel.
Note
Databricks doesn't recommend using table history as a long-term backup solution for data archival. Use only the past 7 days for time travel operations unless you have set both data and log retention configurations to a larger value.
Retrieve table history
Run the DESCRIBE HISTORY command to retrieve information including the operations, user, and timestamp for each write to a table. The operations are returned in reverse chronological order.
Table history retention is determined by the table setting logRetentionDuration, which is 30 days by default.
Note
Time travel and table history are controlled by different retention thresholds. See Time travel.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
For Spark SQL syntax details, see DESCRIBE HISTORY.
For Scala, Java, and Python syntax details, see the Delta Lake API documentation.
Catalog Explorer shows table history visually on the History tab.
History schema
The output of the history operation has the following columns.
| Column | Type | Description |
|---|---|---|
| version | long |
The table version generated by the operation. |
| timestamp | timestamp |
When this version was committed. |
| userId | string |
The ID of the user that ran the operation. |
| userName | string |
The name of the user that ran the operation. |
| operation | string |
The name of the operation. |
| operationParameters | map |
The parameters of the operation (for example, predicates.) |
| job | struct |
The details of the Lakeflow job that ran the operation. Populates only for commits written from a Lakeflow job. Otherwise, null. |
| notebook | struct |
The details of the Databricks notebook from which the operation was run. Populates only for commits written from a Databricks notebook. Otherwise, null. |
| clusterId | string |
The ID of the cluster on which the operation ran. |
| readVersion | long |
The version of the table that was read to perform the write operation. |
| isolationLevel | string |
The isolation level used for this operation. |
| isBlindAppend | boolean |
Whether this operation appended data. |
| operationMetrics | map |
The metrics of the operation (for example, number of rows and files modified.) |
| userMetadata | string |
The user-defined commit metadata if it was specified. |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
Note
- If you write into a table using the following methods, some columns aren't available:
- Columns added in the future will always be added after the last column.
Understanding partitionBy in operation parameters
The partitionBy field in table history is only meaningful for CREATE and OVERWRITE operations that define or change a table's partition schema.
For append operations to existing tables (APPEND, INSERT, UPDATE, DELETE, MERGE), this field might show an empty array [] or partition columns depending on the write method used (.save() vs .saveAsTable()).
This inconsistency is expected behavior and doesn't affect how data is written to partitions. You shouldn't use it to validate append operations.
Example
Consider a table partitioned by the date column. When you create the table, partitionBy is populated:
df.write.format("delta") \
.partitionBy("date") \
.saveAsTable("sales_data")
The CREATE operation in history shows:
operationParameters: {
"mode": "ErrorIfExists",
"partitionBy": "[\"date\"]"
}
When you append data to this table, partitionBy shows an empty array:
new_df.write.format("delta") \
.mode("append") \
.saveAsTable("sales_data")
The APPEND operation shows:
operationParameters: {
"mode": "Append",
"partitionBy": "[]"
}
The empty partitionBy value is expected. The data is still written to the correct partitions based on the table's existing partition schema. Note that .save() to a path might show partition columns in this field, but this difference is an implementation detail and doesn't affect write behavior.
Operation metrics
The history operation returns a collection of operations metrics in the operationMetrics column map.
The following tables list the map key definitions by operation.
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
The following metrics are available for these operations:
| Metric name | Description |
|---|---|
numFiles |
The number of files written. |
numOutputBytes |
The size in bytes of the written contents. |
numOutputRows |
The number of rows written. |
STREAMING UPDATE
The following metrics are available for this operation:
| Metric name | Description |
|---|---|
numAddedFiles |
The number of files added. |
numRemovedFiles |
The number of files removed. |
numOutputRows |
The number of rows written. |
numOutputBytes |
The size of write in bytes. |
DELETE
The following metrics are available for this operation:
| Metric name | Description |
|---|---|
numAddedFiles |
The number of files added. Not provided when partitions of the table are deleted. |
numRemovedFiles |
The number of files removed. |
numDeletedRows |
The number of rows removed. Not provided when partitions of the table are deleted. |
numCopiedRows |
The number of rows copied in the process of deleting files. |
executionTimeMs |
The time taken to execute the entire operation. |
scanTimeMs |
The time taken to scan the files for matches. |
rewriteTimeMs |
The time taken to rewrite the matched files. |
TRUNCATE
The following metrics are available for this operation:
| Metric name | Description |
|---|---|
numRemovedFiles |
The number of files removed. |
executionTimeMs |
The time taken to execute the entire operation. |
MERGE
The following metrics are available for this operation:
| Metric name | Description |
|---|---|
numSourceRows |
The number of rows in the source DataFrame. |
numTargetRowsInserted |
The number of rows inserted into the target table. |
numTargetRowsUpdated |
The number of rows updated in the target table. |
numTargetRowsDeleted |
The number of rows deleted in the target table. |
numTargetRowsCopied |
The number of target rows copied. |
numOutputRows |
The total number of rows written out. |
numTargetFilesAdded |
The number of files added to the sink (target). |
numTargetFilesRemoved |
The number of files removed from the sink (target). |
executionTimeMs |
The time taken to execute the entire operation. |
scanTimeMs |
The time taken to scan the files for matches. |
rewriteTimeMs |
The time taken to rewrite the matched files. |
UPDATE
The following metrics are available for this operation:
| Metric name | Description |
|---|---|
numAddedFiles |
The number of files added. |
numRemovedFiles |
The number of files removed. |
numUpdatedRows |
The number of rows updated. |
numCopiedRows |
The number of rows just copied over in the process of updating files. |
executionTimeMs |
The time taken to execute the entire operation. |
scanTimeMs |
The time taken to scan the files for matches. |
rewriteTimeMs |
The time taken to rewrite the matched files. |
FSCK
The following metrics are available for this operation:
| Metric name | Description |
|---|---|
numRemovedFiles |
The number of files removed. |
CONVERT
The following metrics are available for this operation:
| Metric name | Description |
|---|---|
numConvertedFiles |
The number of Parquet files that have been converted. |
OPTIMIZE
The following metrics are available for this operation:
| Metric name | Description |
|---|---|
numAddedFiles |
The number of files added. |
numRemovedFiles |
The number of files optimized. |
numAddedBytes |
The number of bytes added after the table was optimized. |
numRemovedBytes |
The number of bytes removed. |
minFileSize |
The size of the smallest file after the table was optimized. |
p25FileSize |
The size of the 25th percentile file after the table was optimized. |
p50FileSize |
The median file size after the table was optimized. |
p75FileSize |
The size of the 75th percentile file after the table was optimized. |
maxFileSize |
The size of the largest file after the table was optimized. |
CLONE
The following metrics are available for this operation:
| Metric name | Description |
|---|---|
sourceTableSize |
The size in bytes of the source table at the version that's cloned. |
sourceNumOfFiles |
The number of files in the source table at the version that's cloned. |
numRemovedFiles |
The number of files removed from the target table if a previous table was replaced. |
removedFilesSize |
The total size in bytes of the files removed from the target table if a previous table was replaced. |
numCopiedFiles |
The number of files that were copied over to the new location. 0 for shallow clones. |
copiedFilesSize |
The total size in bytes of the files that were copied over to the new location. 0 for shallow clones. |
RESTORE
The following metrics are available for this operation:
| Metric name | Description |
|---|---|
tableSizeAfterRestore |
The table size in bytes after restore. |
numOfFilesAfterRestore |
The number of files in the table after restore. |
numRemovedFiles |
The number of files removed by the restore operation. |
numRestoredFiles |
The number of files that were added as a result of the restore. |
removedFilesSize |
The size in bytes of files removed by the restore. |
restoredFilesSize |
The size in bytes of files added by the restore. |
VACUUM
The following metrics are available for this operation:
| Metric name | Description |
|---|---|
numDeletedFiles |
The number of deleted files. |
numVacuumedDirectories |
The number of vacuumed directories. |
numFilesToDelete |
The number of files to delete. |
Time travel
Time travel supports querying previous table versions based on timestamp or table version (as recorded in the transaction log). You can use time travel for applications such as the following:
- Re-creating analyses, reports, or outputs, such as the output of a machine learning model. This might be useful for debugging or auditing, especially in regulated industries.
- Writing complex temporal queries.
- Fixing mistakes in your data.
- Providing snapshot isolation for a set of queries for fast changing tables.
Note
In Databricks Runtime 18.0 and above, time travel queries are blocked if they request a version older than the deletedFileRetentionDuration table property (default 7 days). For Unity Catalog managed tables, this applies to Databricks Runtime 12.2 and above.
Time travel syntax
You query a table with time travel by adding a clause after the table name specification.
timestamp_expressioncan be any one of:'2018-10-18T22:15:12.013Z', that is, a string that can be cast to a timestampcast('2018-10-18 13:36:32 CEST' as timestamp)'2018-10-18', that is, a date stringcurrent_timestamp() - interval 12 hoursdate_sub(current_date(), 1)- Any other expression that is or can be cast to a timestamp
versionis a long value that can be obtained from the output ofDESCRIBE HISTORY table_spec.
Neither timestamp_expression nor version can be subqueries.
Only date or timestamp strings are accepted. For example, "2019-01-01" and "2019-01-01T00:00:00.000Z". See the following code for example syntax:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
You can also use the @ syntax to specify the timestamp or version as part of the table name. The timestamp must be in yyyyMMddHHmmssSSS format. You can specify a version with @v. See the following code for example syntax:
SQL
-- Timestamp version
SELECT * FROM people10m@20190101000000000
-- Version number
SELECT * FROM people10m@v123
Python
# Timestamp version
spark.read.table("people10m@20190101000000000")
# Version number
spark.read.table("people10m@v123")
Configure data retention for time travel queries
To query a previous table version, you must retain both the log and the data files for that version:
- Data files are deleted when
VACUUMruns against a table. - Log files are removed automatically after checkpointing table versions.
To increase the data retention threshold for tables, you must configure the following table properties, replacing <format> with either delta or iceberg:
<format>.logRetentionDuration = "interval <interval>": controls how long the history for a table is kept. The default isinterval 30 days.- In Databricks Runtime 18.0 and above,
logRetentionDurationmust be greater than or equal todeletedFileRetentionDuration. For Unity Catalog managed tables, this applies to Databricks Runtime 12.2 and above.
- In Databricks Runtime 18.0 and above,
<format>.deletedFileRetentionDuration = "interval <interval>": determines the thresholdVACUUMuses to remove data files no longer referenced in the current table version. The default isinterval 7 days.
For example, to access 30 days of historical data, set delta.deletedFileRetentionDuration = "interval 30 days", which matches the default setting for delta.logRetentionDuration.
Important
Increasing data retention threshold can cause your storage costs to go up, as more data files are maintained.
You can specify table properties during table creation or set them with an ALTER TABLE statement. See Table properties reference.
Time travel examples
To fix accidental deletes to a table for the user 111:
INSERT INTO my_table
SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
WHERE userId = 111
To fix accidental incorrect updates to a table:
MERGE INTO my_table target
USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET *
To query the number of new customers added over the last week:
SELECT
(
SELECT count(distinct userId)
FROM my_table
)
-
(
SELECT count(distinct userId)
FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)
) AS new_customers
Transaction log checkpoints
The transaction log records table versions as JSON files within the transaction log directory alongside table data.
To optimize checkpoint querying, table versions are aggregated to Parquet checkpoint files, which improves performance by preventing the need to read all JSON versions of table history. Users don't need to interact with checkpoints directly.
Azure Databricks optimizes checkpointing frequency for data size and workload. The checkpoint frequency is subject to change without notice.
Restore a table to an earlier state
Use the RESTORE command to restore a table to a previous version or timestamp, including for these scenarios:
- You can restore an already restored table.
- You can restore a cloned table.
Consider the following requirements:
- To restore a table, you must have
MODIFYpermission for the table. - After data files are deleted, manually or by
VACUUM, you can't restore a table to an older version that references those files. Restoring to this version partially is still possible ifspark.sql.files.ignoreMissingFilesis set totrue. - To restore by timestamp, use the formats
yyyy-MM-dd HH:mm:ssoryyyy-MM-dd.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
For syntax details, see RESTORE.
Streaming behavior
Restore is a data-changing operation and might result in duplicate data for downstream workloads. Log entries added by the RESTORE command contain dataChange set to true.
For downstream workloads, such as a Structured streaming job that processes the updates to a table, the data change log entries added by the restore operation are considered new data updates, and processing them may result in duplicate data.
For example:
| Table version | Operation | Log updates | Records in data change log updates |
|---|---|---|---|
| 0 | INSERT |
AddFile(/path/to/file-1, dataChange = true) |
(name = Viktor, age = 29), (name = George, age = 55) |
| 1 | INSERT |
AddFile(/path/to/file-2, dataChange = true) |
(name = George, age = 39) |
| 2 | OPTIMIZE |
AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) |
No records. OPTIMIZE compaction does not change the data in the table. |
| 3 | RESTORE(version=1) |
RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) |
(name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
In the preceding example, the RESTORE command results in updates that were previously seen when reading the table version 0 and 1. If a streaming query reads this table again, then these files are considered as newly added data and are processed again.
Restore metrics
After completing, RESTORE reports the following metrics as a single row DataFrame:
table_size_after_restore: The size of the table after restoring.num_of_files_after_restore: The number of files in the table after restoring.num_removed_files: Number of files removed (logically deleted) from the table.num_restored_files: Number of files restored due to rolling back.removed_files_size: Total size in bytes of the files that are removed from the table.restored_files_size: Total size in bytes of the files that are restored.
Find the last commit version
To get the version number of the last commit written by the current SparkSession across all threads and all tables, query the SQL configuration spark.databricks.<format>.lastCommitVersionInSession. Replace <format> with either delta or iceberg, depending on your table's format.
For example:
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
If no commits have been made by the SparkSession, querying the key returns an empty value.
Note
If you share the same SparkSession across multiple threads, it's similar to sharing a variable
across multiple threads. You might encounter race conditions for concurrent updates to the configuration value.