Skip to content

Commit

Permalink
[Spark][3.3] Make Identity Column High Water Mark updates consistent (#…
Browse files Browse the repository at this point in the history
…3990)

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

Currently:
- When we do a MERGE, we will always call `setTrackHighWaterMarks` on
the transaction. This will have an effect if there is an INSERT clause
in the MERGE.
- If we `setTrackHighWaterMarks`, we collect the max/min of the column
using `DeltaIdentityColumnStatsTracker`. This stats tracker is only
invoked on files that are written/rewritten. These min/max values are
compared with the existing high watermark. If the high watermark doesn't
exist, we will keep as high watermark the largest of the max or the
lowest of the min without checking against the starting value of the
identity column.
- If an identity column did not generate a value yet, the high watermark
is None and isn't stored in the table. This is true for GENERATED ALWAYS
AS IDENTITY tables when it is empty and true for GENERATED BY DEFAULT AS
IDENTITY tables when it only has user inserted values for the identity
column.
- If you run a MERGE UPSERT that only ends up updating values in a
GENERATED BY DEFAULT table that doesn't have a high watermark yet, we
will write a new high watermark that is the highest for the updated
file, which may be lower than the starting value specified for the
identity column.

Proposal:
- This PR makes all high water mark updates go through the same
validation function by default. It will not update the high watermark if
it violates the start or the existing high watermark. Exception is if
the table already has a corrupted high water mark.
- This does NOT prevent the scenario where we automatically set the high
watermark for a generated by default column based on user inserted
values when it does respect the start.
- Previously, we did not do high water mark rounding on the
`updateSchema` path. This seems erroneous as the min/max values can be
user inserted. We fix that in this PR.
- Previously, we did not validate that on SYNC identity, the result of
max can be below the existing high water mark. Now, we also do check
this invariant and block it by default. A SQLConf has been introduced to
allow reducing the high water mark if the user wants.
- We add logging to catch bad high water mark.

## How was this patch tested?
New tests that were failing prior to this change.

## Does this PR introduce _any_ user-facing changes?
No
  • Loading branch information
c27kwan authored Dec 19, 2024
1 parent fb3fd94 commit 9b15a8f
Show file tree
Hide file tree
Showing 9 changed files with 770 additions and 44 deletions.
218 changes: 183 additions & 35 deletions spark/src/main/scala/org/apache/spark/sql/delta/IdentityColumn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package org.apache.spark.sql.delta

import scala.collection.mutable

import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSourceUtils._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.{DeltaFileStatistics, DeltaJobStatisticsTracker}
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.conf.Configuration
Expand All @@ -31,6 +34,17 @@ import org.apache.spark.sql.execution.datasources.WriteTaskStats
import org.apache.spark.sql.functions.{array, max, min, to_json}
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}

/**
* This object holds String constants used the field `debugInfo` for
* logging [[IdentityColumn.opTypeHighWaterMarkUpdate]].
* Each string represents an unexpected or notable event while calculating the high water mark.
*/
object IdentityColumnHighWaterMarkUpdateInfo {
val EXISTING_WATER_MARK_BEFORE_START = "existing_water_mark_before_start"
val CANDIDATE_HIGH_WATER_MARK_ROUNDED = "candidate_high_watermark_rounded"
val CANDIDATE_HIGH_WATER_MARK_BEFORE_START = "candidate_high_water_mark_before_start"
}

/**
* Provide utility methods related to IDENTITY column support for Delta.
*/
Expand All @@ -46,6 +60,8 @@ object IdentityColumn extends DeltaLogging {
val opTypeWrite = "delta.identityColumn.write"
// When IDENTITY column update causes transaction to abort.
val opTypeAbort = "delta.identityColumn.abort"
// When we update the high watermark of an IDENTITY column.
val opTypeHighWaterMarkUpdate = "delta.identityColumn.highWaterMarkUpdate"

// Return true if `field` is an identity column that allows explicit insert. Caller must ensure
// `isIdentityColumn(field)` is true.
Expand Down Expand Up @@ -134,32 +150,158 @@ object IdentityColumn extends DeltaLogging {
))
}

/** Round `value` to the next value that follows start and step configuration. */
protected[delta] def roundToNext(start: Long, step: Long, value: Long): Long = {
val valueOffset = Math.subtractExact(value, start)
if (valueOffset % step == 0) {
value
} else {
// An identity value follows the formula start + step * n. So n = (value - start) / step.
// Where n is a non-negative integer if the value respects the start.
// Since the value doesn't follow this formula, we need to ceil n.
// corrected value = start + step * ceil(n).
// However, we can't cast to Double for division because it's only accurate up to 54 bits.
// Instead, we will do a floored division and add 1.
// start + step * ((value - start) / step + 1)
val quotient = valueOffset / step
// `valueOffset` will have the same sign as `step` if `value` respects the start.
val stepMultiple = if (Math.signum(valueOffset) == Math.signum(step)) {
Math.addExact(quotient, 1L)
} else {
// Don't add one. Otherwise, we end up rounding 2 values up, which may skip the start.
quotient
}
Math.addExact(
start,
Math.multiplyExact(step, stepMultiple)
)
}
}

/**
* Update the high water mark of the IDENTITY column based on `candidateHighWaterMark`.
*
* We validate against the identity column definition (start, step) and may insert a high
* watermark that's different from `candidateHighWaterMark` if it's not valid. This method
* may also not update the high watermark if the candidate doesn't respect the start, is
* below the current watermark or is a NOOP.
*
* @param field The IDENTITY column to update.
* @param candidateHighWaterMark The candidate high water mark to update to.
* @param allowLoweringHighWaterMarkForSyncIdentity Whether to allow lowering the high water mark.
* Lowering the high water mark is NOT SAFE in
* general, but may be a valid operation in SYNC
* IDENTITY (e.g. repair a high water mark after
* a bad sync).
* @return A new `StructField` with the high water mark updated to `candidateHighWaterMark` and
* a Seq[String] that contains debug information for logging.
*/
protected[delta] def updateToValidHighWaterMark(
field: StructField,
candidateHighWaterMark: Long,
allowLoweringHighWaterMarkForSyncIdentity: Boolean
): (StructField, Seq[String]) = {
require(ColumnWithDefaultExprUtils.isIdentityColumn(field))

val info = getIdentityInfo(field)
val positiveStep = info.step > 0
val orderInStepDirection = if (positiveStep) Ordering.Long else Ordering.Long.reverse

val loggingBuffer = new mutable.ArrayBuffer[String]

// We check `candidateHighWaterMark` and not `newHighWaterMark` because
// newHighWaterMark may not be part of the column. E.g. a generated by default column
// has candidateHighWaterMark = 9, start = 10, step = 3, and previous highWaterMark = None.
// We don't want to bump the high water mark to 10 because the next value generated will
// be 13, and we'll miss the specified start entirely.
val isBeforeStart = orderInStepDirection.lt(candidateHighWaterMark, info.start)
if (isBeforeStart) {
loggingBuffer.append(
IdentityColumnHighWaterMarkUpdateInfo.CANDIDATE_HIGH_WATER_MARK_BEFORE_START)
}

// We must round on the generated by default case because the candidate may be a user inserted
// value and may not follow the identity column definition. We're not skipping this check
// for the generated always case. It's effectively a NOOP since generated always values should
// theoretically always respect the identity column definition. If the high watermark was
// wrong (for some reason), this is our chance to fix it.
val roundedCandidateHighWaterMark = roundToNext(info.start, info.step, candidateHighWaterMark)
if (roundedCandidateHighWaterMark != candidateHighWaterMark) {
loggingBuffer.append(IdentityColumnHighWaterMarkUpdateInfo.CANDIDATE_HIGH_WATER_MARK_ROUNDED)
}

// If allowLoweringHighWaterMarkForSyncIdentity is true, we can ignore the existing high water
// mark.
val newHighWaterMark = info.highWaterMark match {
case Some(oldWaterMark) if !allowLoweringHighWaterMarkForSyncIdentity =>
orderInStepDirection.max(oldWaterMark, roundedCandidateHighWaterMark)
case _ => roundedCandidateHighWaterMark
}

val tableHasBadHighWaterMark = info.highWaterMark.exists(oldWaterMark =>
orderInStepDirection.lt(oldWaterMark, info.start))
if (tableHasBadHighWaterMark) {
loggingBuffer.append(
IdentityColumnHighWaterMarkUpdateInfo.EXISTING_WATER_MARK_BEFORE_START)
}

val isChanged = !info.highWaterMark.contains(newHighWaterMark)

// If a table already has a bad high water mark, we shouldn't prevent them from updating the
// high water mark. Always try to update to newHighWaterMark, which is guaranteed to be a better
// choice than the existing one since we do a max().
// Note that means if a table has bad water mark, we can set the high water to the start due to
// the rounding logic.
// Don't update if it's before start or the high watermark is the same.
if (tableHasBadHighWaterMark || (!isBeforeStart && isChanged)) {
val newMetadata = new MetadataBuilder().withMetadata(field.metadata)
.putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark)
.build()
(field.copy(metadata = newMetadata), loggingBuffer.toIndexedSeq)
} else {
// If we don't update the high watermark, we don't need to log the update.
(field, Nil)
}
}

/**
* Return a new schema with IDENTITY high water marks updated in the schema.
* The new high watermarks are decided based on the `updatedIdentityHighWaterMarks` and old high
* watermark values present in the passed `schema`.
*/
def updateSchema(
deltaLog: DeltaLog,
schema: StructType,
updatedIdentityHighWaterMarks: Seq[(String, Long)]) : StructType = {
updatedIdentityHighWaterMarks: Seq[(String, Long)]
): StructType = {
val updatedIdentityHighWaterMarksGrouped =
updatedIdentityHighWaterMarks.groupBy(_._1).mapValues(v => v.map(_._2))
StructType(schema.map { f =>
updatedIdentityHighWaterMarksGrouped.get(DeltaColumnMapping.getPhysicalName(f)) match {
case Some(newWatermarks) if ColumnWithDefaultExprUtils.isIdentityColumn(f) =>
val oldIdentityInfo = getIdentityInfo(f)
val positiveStep = oldIdentityInfo.step > 0
val newHighWaterMark = if (positiveStep) {
oldIdentityInfo.highWaterMark.map(Math.max(_, newWatermarks.max))
.getOrElse(newWatermarks.max)
val candidateHighWaterMark = if (positiveStep) {
newWatermarks.max
} else {
oldIdentityInfo.highWaterMark.map(Math.min(_, newWatermarks.min))
.getOrElse(newWatermarks.min)
newWatermarks.min
}
val (newField, loggingSeq) = updateToValidHighWaterMark(
f, candidateHighWaterMark, allowLoweringHighWaterMarkForSyncIdentity = false)
if (loggingSeq.nonEmpty) {
recordDeltaEvent(
deltaLog = deltaLog,
opType = opTypeHighWaterMarkUpdate,
data = Map(
"columnName" -> f.name,
"debugInfo" -> loggingSeq.mkString(", "),
"oldHighWaterMark" -> oldIdentityInfo.highWaterMark,
"candidateHighWaterMark" -> candidateHighWaterMark,
"updatedFrom" -> "updateSchema"
)
)
}
val builder = new MetadataBuilder()
.withMetadata(f.metadata)
.putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark)
f.copy(metadata = builder.build())
newField
case _ =>
f
}
Expand Down Expand Up @@ -252,19 +394,12 @@ object IdentityColumn extends DeltaLogging {

// Calculate the sync'ed IDENTITY high water mark based on actual data and returns a
// potentially updated `StructField`.
def syncIdentity(field: StructField, df: DataFrame): StructField = {
// Round `value` to the next value that follows start and step configuration.
def roundToNext(start: Long, step: Long, value: Long): Long = {
if (Math.subtractExact(value, start) % step == 0) {
value
} else {
// start + step * ((value - start) / step + 1)
Math.addExact(
Math.multiplyExact(Math.addExact(Math.subtractExact(value, start) / step, 1), step),
start)
}
}

def syncIdentity(
deltaLog: DeltaLog,
field: StructField,
df: DataFrame,
allowLoweringHighWaterMarkForSyncIdentity: Boolean
): StructField = {
assert(ColumnWithDefaultExprUtils.isIdentityColumn(field))
// Run a query to get the actual high water mark (max or min value of the IDENTITY column) from
// the actual data.
Expand All @@ -274,17 +409,23 @@ object IdentityColumn extends DeltaLogging {
val resultRow = df.select(expr).collect().head

if (!resultRow.isNullAt(0)) {
val result = resultRow.getLong(0)
val isBeforeStart = if (positiveStep) result < info.start else result > info.start
val newHighWaterMark = roundToNext(info.start, info.step, result)
if (isBeforeStart || info.highWaterMark.contains(newHighWaterMark)) {
field
} else {
val newMetadata = new MetadataBuilder().withMetadata(field.metadata)
.putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark)
.build()
field.copy(metadata = newMetadata)
val candidateHighWaterMark = resultRow.getLong(0)
val (newField, loggingSeq) = updateToValidHighWaterMark(
field, candidateHighWaterMark, allowLoweringHighWaterMarkForSyncIdentity)
if (loggingSeq.nonEmpty) {
recordDeltaEvent(
deltaLog = deltaLog,
opType = opTypeHighWaterMarkUpdate,
data = Map(
"columnName" -> field.name,
"debugInfo" -> loggingSeq.mkString(", "),
"oldHighWaterMark" -> info.highWaterMark,
"candidateHighWaterMark" -> candidateHighWaterMark,
"updatedFrom" -> "syncIdentity"
)
)
}
newField
} else {
field
}
Expand All @@ -295,12 +436,19 @@ object IdentityColumn extends DeltaLogging {
* been merged with the corresponding high water marks of `schemaWithHighWaterMarksToMerge`.
*/
def copySchemaWithMergedHighWaterMarks(
schemaToCopy: StructType, schemaWithHighWaterMarksToMerge: StructType): StructType = {
deltaLog: DeltaLog,
schemaToCopy: StructType,
schemaWithHighWaterMarksToMerge: StructType
): StructType = {
val newHighWatermarks = getIdentityColumns(schemaWithHighWaterMarksToMerge).flatMap { f =>
val info = getIdentityInfo(f)
info.highWaterMark.map(waterMark => DeltaColumnMapping.getPhysicalName(f) -> waterMark)
}
updateSchema(schemaToCopy, newHighWatermarks)
updateSchema(
deltaLog,
schemaToCopy,
newHighWatermarks
)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite
def precommitUpdateSchemaWithIdentityHighWaterMarks(): Unit = {
if (updatedIdentityHighWaterMarks.nonEmpty) {
val newSchema = IdentityColumn.updateSchema(
metadata.schema, updatedIdentityHighWaterMarks.toSeq)
deltaLog,
metadata.schema,
updatedIdentityHighWaterMarks.toSeq
)
val updatedMetadata = metadata.copy(schemaString = newSchema.json)
updateMetadataAfterWrite(updatedMetadata)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,10 @@ abstract class CloneTableBase(
.toMap
val clonedSchema =
IdentityColumn.copySchemaWithMergedHighWaterMarks(
deltaLog = targetSnapshot.deltaLog,
schemaToCopy = clonedMetadata.schema,
schemaWithHighWaterMarksToMerge = targetSnapshot.metadata.schema)
schemaWithHighWaterMarksToMerge = targetSnapshot.metadata.schema
)
clonedMetadata.copy(configuration = filteredConfiguration, schemaString = clonedSchema.json)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ case class RestoreTableCommand(sourceTable: DeltaTableV2)
// We need to merge the schema of the latest snapshot with the schema of the snapshot
// we're restoring to ensure that the high water mark is correct.
val mergedSchema = IdentityColumn.copySchemaWithMergedHighWaterMarks(
deltaLog = deltaLog,
schemaToCopy = snapshotToRestore.metadata.schema,
schemaWithHighWaterMarksToMerge = latestSnapshot.metadata.schema)
schemaWithHighWaterMarksToMerge = latestSnapshot.metadata.schema
)

txn.updateMetadata(snapshotToRestore.metadata.copy(schemaString = mergedSchema.json))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,14 @@ case class AlterTableChangeColumnDeltaCommand(
if (syncIdentity) {
assert(oldColumn == newColumn)
val df = txn.snapshot.deltaLog.createDataFrame(txn.snapshot, txn.filterFiles())
val field = IdentityColumn.syncIdentity(newColumn, df)
val allowLoweringHighWaterMarkForSyncIdentity = sparkSession.conf
.get(DeltaSQLConf.DELTA_IDENTITY_ALLOW_SYNC_IDENTITY_TO_LOWER_HIGH_WATER_MARK)
val field = IdentityColumn.syncIdentity(
deltaLog,
newColumn,
df,
allowLoweringHighWaterMarkForSyncIdentity
)
txn.setSyncIdentity()
txn.readWholeTable()
field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2261,6 +2261,18 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_IDENTITY_ALLOW_SYNC_IDENTITY_TO_LOWER_HIGH_WATER_MARK =
buildConf("identityColumn.allowSyncIdentityToLowerHighWaterMark.enabled")
.internal()
.doc(
"""
| If true, the SYNC IDENTITY command can reduce the high water mark in a Delta IDENTITY
| column. If false, the high water mark will only be updated if it
| respects the column's specified start, step, and existing high watermark value.
|""".stripMargin)
.booleanConf
.createWithDefault(false)

///////////
// TESTING
///////////
Expand Down
Loading

0 comments on commit 9b15a8f

Please sign in to comment.