diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/IdentityColumn.scala b/spark/src/main/scala/org/apache/spark/sql/delta/IdentityColumn.scala index 4187289ca5..8e400cacdc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/IdentityColumn.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/IdentityColumn.scala @@ -16,8 +16,11 @@ package org.apache.spark.sql.delta +import scala.collection.mutable + import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSourceUtils._ +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.stats.{DeltaFileStatistics, DeltaJobStatisticsTracker} import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.conf.Configuration @@ -31,6 +34,17 @@ import org.apache.spark.sql.execution.datasources.WriteTaskStats import org.apache.spark.sql.functions.{array, max, min, to_json} import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} +/** + * This object holds String constants used the field `debugInfo` for + * logging [[IdentityColumn.opTypeHighWaterMarkUpdate]]. + * Each string represents an unexpected or notable event while calculating the high water mark. + */ +object IdentityColumnHighWaterMarkUpdateInfo { + val EXISTING_WATER_MARK_BEFORE_START = "existing_water_mark_before_start" + val CANDIDATE_HIGH_WATER_MARK_ROUNDED = "candidate_high_watermark_rounded" + val CANDIDATE_HIGH_WATER_MARK_BEFORE_START = "candidate_high_water_mark_before_start" +} + /** * Provide utility methods related to IDENTITY column support for Delta. */ @@ -46,6 +60,8 @@ object IdentityColumn extends DeltaLogging { val opTypeWrite = "delta.identityColumn.write" // When IDENTITY column update causes transaction to abort. val opTypeAbort = "delta.identityColumn.abort" + // When we update the high watermark of an IDENTITY column. + val opTypeHighWaterMarkUpdate = "delta.identityColumn.highWaterMarkUpdate" // Return true if `field` is an identity column that allows explicit insert. Caller must ensure // `isIdentityColumn(field)` is true. @@ -134,14 +150,130 @@ object IdentityColumn extends DeltaLogging { )) } + /** Round `value` to the next value that follows start and step configuration. */ + protected[delta] def roundToNext(start: Long, step: Long, value: Long): Long = { + val valueOffset = Math.subtractExact(value, start) + if (valueOffset % step == 0) { + value + } else { + // An identity value follows the formula start + step * n. So n = (value - start) / step. + // Where n is a non-negative integer if the value respects the start. + // Since the value doesn't follow this formula, we need to ceil n. + // corrected value = start + step * ceil(n). + // However, we can't cast to Double for division because it's only accurate up to 54 bits. + // Instead, we will do a floored division and add 1. + // start + step * ((value - start) / step + 1) + val quotient = valueOffset / step + // `valueOffset` will have the same sign as `step` if `value` respects the start. + val stepMultiple = if (Math.signum(valueOffset) == Math.signum(step)) { + Math.addExact(quotient, 1L) + } else { + // Don't add one. Otherwise, we end up rounding 2 values up, which may skip the start. + quotient + } + Math.addExact( + start, + Math.multiplyExact(step, stepMultiple) + ) + } + } + + /** + * Update the high water mark of the IDENTITY column based on `candidateHighWaterMark`. + * + * We validate against the identity column definition (start, step) and may insert a high + * watermark that's different from `candidateHighWaterMark` if it's not valid. This method + * may also not update the high watermark if the candidate doesn't respect the start, is + * below the current watermark or is a NOOP. + * + * @param field The IDENTITY column to update. + * @param candidateHighWaterMark The candidate high water mark to update to. + * @param allowLoweringHighWaterMarkForSyncIdentity Whether to allow lowering the high water mark. + * Lowering the high water mark is NOT SAFE in + * general, but may be a valid operation in SYNC + * IDENTITY (e.g. repair a high water mark after + * a bad sync). + * @return A new `StructField` with the high water mark updated to `candidateHighWaterMark` and + * a Seq[String] that contains debug information for logging. + */ + protected[delta] def updateToValidHighWaterMark( + field: StructField, + candidateHighWaterMark: Long, + allowLoweringHighWaterMarkForSyncIdentity: Boolean + ): (StructField, Seq[String]) = { + require(ColumnWithDefaultExprUtils.isIdentityColumn(field)) + + val info = getIdentityInfo(field) + val positiveStep = info.step > 0 + val orderInStepDirection = if (positiveStep) Ordering.Long else Ordering.Long.reverse + + val loggingBuffer = new mutable.ArrayBuffer[String] + + // We check `candidateHighWaterMark` and not `newHighWaterMark` because + // newHighWaterMark may not be part of the column. E.g. a generated by default column + // has candidateHighWaterMark = 9, start = 10, step = 3, and previous highWaterMark = None. + // We don't want to bump the high water mark to 10 because the next value generated will + // be 13, and we'll miss the specified start entirely. + val isBeforeStart = orderInStepDirection.lt(candidateHighWaterMark, info.start) + if (isBeforeStart) { + loggingBuffer.append( + IdentityColumnHighWaterMarkUpdateInfo.CANDIDATE_HIGH_WATER_MARK_BEFORE_START) + } + + // We must round on the generated by default case because the candidate may be a user inserted + // value and may not follow the identity column definition. We're not skipping this check + // for the generated always case. It's effectively a NOOP since generated always values should + // theoretically always respect the identity column definition. If the high watermark was + // wrong (for some reason), this is our chance to fix it. + val roundedCandidateHighWaterMark = roundToNext(info.start, info.step, candidateHighWaterMark) + if (roundedCandidateHighWaterMark != candidateHighWaterMark) { + loggingBuffer.append(IdentityColumnHighWaterMarkUpdateInfo.CANDIDATE_HIGH_WATER_MARK_ROUNDED) + } + + // If allowLoweringHighWaterMarkForSyncIdentity is true, we can ignore the existing high water + // mark. + val newHighWaterMark = info.highWaterMark match { + case Some(oldWaterMark) if !allowLoweringHighWaterMarkForSyncIdentity => + orderInStepDirection.max(oldWaterMark, roundedCandidateHighWaterMark) + case _ => roundedCandidateHighWaterMark + } + + val tableHasBadHighWaterMark = info.highWaterMark.exists(oldWaterMark => + orderInStepDirection.lt(oldWaterMark, info.start)) + if (tableHasBadHighWaterMark) { + loggingBuffer.append( + IdentityColumnHighWaterMarkUpdateInfo.EXISTING_WATER_MARK_BEFORE_START) + } + + val isChanged = !info.highWaterMark.contains(newHighWaterMark) + + // If a table already has a bad high water mark, we shouldn't prevent them from updating the + // high water mark. Always try to update to newHighWaterMark, which is guaranteed to be a better + // choice than the existing one since we do a max(). + // Note that means if a table has bad water mark, we can set the high water to the start due to + // the rounding logic. + // Don't update if it's before start or the high watermark is the same. + if (tableHasBadHighWaterMark || (!isBeforeStart && isChanged)) { + val newMetadata = new MetadataBuilder().withMetadata(field.metadata) + .putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark) + .build() + (field.copy(metadata = newMetadata), loggingBuffer.toIndexedSeq) + } else { + // If we don't update the high watermark, we don't need to log the update. + (field, Nil) + } + } + /** * Return a new schema with IDENTITY high water marks updated in the schema. * The new high watermarks are decided based on the `updatedIdentityHighWaterMarks` and old high * watermark values present in the passed `schema`. */ def updateSchema( + deltaLog: DeltaLog, schema: StructType, - updatedIdentityHighWaterMarks: Seq[(String, Long)]) : StructType = { + updatedIdentityHighWaterMarks: Seq[(String, Long)] + ): StructType = { val updatedIdentityHighWaterMarksGrouped = updatedIdentityHighWaterMarks.groupBy(_._1).mapValues(v => v.map(_._2)) StructType(schema.map { f => @@ -149,17 +281,27 @@ object IdentityColumn extends DeltaLogging { case Some(newWatermarks) if ColumnWithDefaultExprUtils.isIdentityColumn(f) => val oldIdentityInfo = getIdentityInfo(f) val positiveStep = oldIdentityInfo.step > 0 - val newHighWaterMark = if (positiveStep) { - oldIdentityInfo.highWaterMark.map(Math.max(_, newWatermarks.max)) - .getOrElse(newWatermarks.max) + val candidateHighWaterMark = if (positiveStep) { + newWatermarks.max } else { - oldIdentityInfo.highWaterMark.map(Math.min(_, newWatermarks.min)) - .getOrElse(newWatermarks.min) + newWatermarks.min + } + val (newField, loggingSeq) = updateToValidHighWaterMark( + f, candidateHighWaterMark, allowLoweringHighWaterMarkForSyncIdentity = false) + if (loggingSeq.nonEmpty) { + recordDeltaEvent( + deltaLog = deltaLog, + opType = opTypeHighWaterMarkUpdate, + data = Map( + "columnName" -> f.name, + "debugInfo" -> loggingSeq.mkString(", "), + "oldHighWaterMark" -> oldIdentityInfo.highWaterMark, + "candidateHighWaterMark" -> candidateHighWaterMark, + "updatedFrom" -> "updateSchema" + ) + ) } - val builder = new MetadataBuilder() - .withMetadata(f.metadata) - .putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark) - f.copy(metadata = builder.build()) + newField case _ => f } @@ -252,19 +394,12 @@ object IdentityColumn extends DeltaLogging { // Calculate the sync'ed IDENTITY high water mark based on actual data and returns a // potentially updated `StructField`. - def syncIdentity(field: StructField, df: DataFrame): StructField = { - // Round `value` to the next value that follows start and step configuration. - def roundToNext(start: Long, step: Long, value: Long): Long = { - if (Math.subtractExact(value, start) % step == 0) { - value - } else { - // start + step * ((value - start) / step + 1) - Math.addExact( - Math.multiplyExact(Math.addExact(Math.subtractExact(value, start) / step, 1), step), - start) - } - } - + def syncIdentity( + deltaLog: DeltaLog, + field: StructField, + df: DataFrame, + allowLoweringHighWaterMarkForSyncIdentity: Boolean + ): StructField = { assert(ColumnWithDefaultExprUtils.isIdentityColumn(field)) // Run a query to get the actual high water mark (max or min value of the IDENTITY column) from // the actual data. @@ -274,17 +409,23 @@ object IdentityColumn extends DeltaLogging { val resultRow = df.select(expr).collect().head if (!resultRow.isNullAt(0)) { - val result = resultRow.getLong(0) - val isBeforeStart = if (positiveStep) result < info.start else result > info.start - val newHighWaterMark = roundToNext(info.start, info.step, result) - if (isBeforeStart || info.highWaterMark.contains(newHighWaterMark)) { - field - } else { - val newMetadata = new MetadataBuilder().withMetadata(field.metadata) - .putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark) - .build() - field.copy(metadata = newMetadata) + val candidateHighWaterMark = resultRow.getLong(0) + val (newField, loggingSeq) = updateToValidHighWaterMark( + field, candidateHighWaterMark, allowLoweringHighWaterMarkForSyncIdentity) + if (loggingSeq.nonEmpty) { + recordDeltaEvent( + deltaLog = deltaLog, + opType = opTypeHighWaterMarkUpdate, + data = Map( + "columnName" -> field.name, + "debugInfo" -> loggingSeq.mkString(", "), + "oldHighWaterMark" -> info.highWaterMark, + "candidateHighWaterMark" -> candidateHighWaterMark, + "updatedFrom" -> "syncIdentity" + ) + ) } + newField } else { field } @@ -295,12 +436,19 @@ object IdentityColumn extends DeltaLogging { * been merged with the corresponding high water marks of `schemaWithHighWaterMarksToMerge`. */ def copySchemaWithMergedHighWaterMarks( - schemaToCopy: StructType, schemaWithHighWaterMarksToMerge: StructType): StructType = { + deltaLog: DeltaLog, + schemaToCopy: StructType, + schemaWithHighWaterMarksToMerge: StructType + ): StructType = { val newHighWatermarks = getIdentityColumns(schemaWithHighWaterMarksToMerge).flatMap { f => val info = getIdentityInfo(f) info.highWaterMark.map(waterMark => DeltaColumnMapping.getPhysicalName(f) -> waterMark) } - updateSchema(schemaToCopy, newHighWatermarks) + updateSchema( + deltaLog, + schemaToCopy, + newHighWatermarks + ) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index dfb23a8e0f..9018ae85b4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -455,7 +455,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite def precommitUpdateSchemaWithIdentityHighWaterMarks(): Unit = { if (updatedIdentityHighWaterMarks.nonEmpty) { val newSchema = IdentityColumn.updateSchema( - metadata.schema, updatedIdentityHighWaterMarks.toSeq) + deltaLog, + metadata.schema, + updatedIdentityHighWaterMarks.toSeq + ) val updatedMetadata = metadata.copy(schemaString = newSchema.json) updateMetadataAfterWrite(updatedMetadata) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala index 7d78c7d0ac..c2db8803b3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala @@ -308,8 +308,10 @@ abstract class CloneTableBase( .toMap val clonedSchema = IdentityColumn.copySchemaWithMergedHighWaterMarks( + deltaLog = targetSnapshot.deltaLog, schemaToCopy = clonedMetadata.schema, - schemaWithHighWaterMarksToMerge = targetSnapshot.metadata.schema) + schemaWithHighWaterMarksToMerge = targetSnapshot.metadata.schema + ) clonedMetadata.copy(configuration = filteredConfiguration, schemaString = clonedSchema.json) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala index ec086f4799..d59d921ebc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/RestoreTableCommand.scala @@ -199,8 +199,10 @@ case class RestoreTableCommand(sourceTable: DeltaTableV2) // We need to merge the schema of the latest snapshot with the schema of the snapshot // we're restoring to ensure that the high water mark is correct. val mergedSchema = IdentityColumn.copySchemaWithMergedHighWaterMarks( + deltaLog = deltaLog, schemaToCopy = snapshotToRestore.metadata.schema, - schemaWithHighWaterMarksToMerge = latestSnapshot.metadata.schema) + schemaWithHighWaterMarksToMerge = latestSnapshot.metadata.schema + ) txn.updateMetadata(snapshotToRestore.metadata.copy(schemaString = mergedSchema.json)) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index a2f92ef8c5..46d4ffbc16 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -771,7 +771,14 @@ case class AlterTableChangeColumnDeltaCommand( if (syncIdentity) { assert(oldColumn == newColumn) val df = txn.snapshot.deltaLog.createDataFrame(txn.snapshot, txn.filterFiles()) - val field = IdentityColumn.syncIdentity(newColumn, df) + val allowLoweringHighWaterMarkForSyncIdentity = sparkSession.conf + .get(DeltaSQLConf.DELTA_IDENTITY_ALLOW_SYNC_IDENTITY_TO_LOWER_HIGH_WATER_MARK) + val field = IdentityColumn.syncIdentity( + deltaLog, + newColumn, + df, + allowLoweringHighWaterMarkForSyncIdentity + ) txn.setSyncIdentity() txn.readWholeTable() field diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index b771521960..286ce80cef 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -2261,6 +2261,18 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_IDENTITY_ALLOW_SYNC_IDENTITY_TO_LOWER_HIGH_WATER_MARK = + buildConf("identityColumn.allowSyncIdentityToLowerHighWaterMark.enabled") + .internal() + .doc( + """ + | If true, the SYNC IDENTITY command can reduce the high water mark in a Delta IDENTITY + | column. If false, the high water mark will only be updated if it + | respects the column's specified start, step, and existing high watermark value. + |""".stripMargin) + .booleanConf + .createWithDefault(false) + /////////// // TESTING /////////// diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnIngestionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnIngestionSuite.scala index f0958c7338..104bb67fd2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnIngestionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnIngestionSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta import java.io.PrintWriter import org.apache.spark.sql.delta.GeneratedAsIdentityType.{GeneratedAlways, GeneratedByDefault} +import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier @@ -330,6 +331,164 @@ trait IdentityColumnIngestionSuiteBase extends IdentityColumnTestUtils { } } } + + /** + * Creates a source and destination table with the same schema such that if it is a positive step, + * the source table has identity column values < the target table's start value. If it's + * a negative step, the source table has identity column values > the target table's start value. + * @param isSrcDataSubsetOfTgt Whether the source data is a subset of the target data. If false, + * some data is inserted into the target table below the start of + * the identity column value. + * @param positiveStep Whether the identity column values are generated in a positive step. + * @param expectValidHighWaterMark Whether the high water mark is expected to be set to a valid + * value in the target table after running `insertDataFn`. If so, + * we check that it respects the start value of the column. + * @param insertDataFn Function that inserts data from the source table to the target table. + */ + private def withSrcAndDestTables( + isSrcDataSubsetOfTgt: Boolean, + positiveStep: Boolean, + expectValidHighWaterMark: Boolean)( + insertDataFn: (String, String) => Unit): Unit = { + import testImplicits._ + val srcTblName = s"${getRandomTableName}_src" + val tgtTblName = s"${getRandomTableName}_tgt" + withTable(srcTblName, tgtTblName) { + val targetTableStartWith = if (positiveStep) 100000 else -100000 + val targetTableIncrementBy = if (positiveStep) 53 else -53 + // Create a generated always source table with (id, value) + // starting with 0 and incrementing by targetTableIncrementBy. + generateTableWithIdentityColumn(srcTblName, step = targetTableIncrementBy) + + val srcDeltaLog = DeltaLog.forTable(spark, TableIdentifier(srcTblName)) + assert(getHighWaterMark(srcDeltaLog.update(), colName = "id").isDefined) + // While id col values generation is nondeterministic, the high water mark + // should really not exceed this value. + if (positiveStep) { + assert(highWaterMark(srcDeltaLog.update(), colName = "id") < targetTableStartWith) + } else { + assert(highWaterMark(srcDeltaLog.update(), colName = "id") > targetTableStartWith) + } + + // Create a generated by default target table with (id, value) + createTableWithIdColAndIntValueCol( + tgtTblName, + GeneratedByDefault, + startsWith = Some(targetTableStartWith), + incrementBy = Some(targetTableIncrementBy), + tblProperties = Map.empty) + + val tgtDeltaLog = DeltaLog.forTable(spark, TableIdentifier(tgtTblName)) + assert(getHighWaterMark(tgtDeltaLog.update(), colName = "id").isEmpty, + "High watermark should not be set if the table is empty.") + + if (isSrcDataSubsetOfTgt) { + sql(s"INSERT INTO $tgtTblName(id, value) SELECT * FROM $srcTblName") + } else { + // Manually insert some data into the target table below the startWith. + if (positiveStep) { + sql(s"INSERT INTO $tgtTblName(id, value) VALUES (1, 100), (2, 101)") + } else { + sql(s"INSERT INTO $tgtTblName(id, value) VALUES (-1, 100), (-2, 101)") + } + } + + assert(getHighWaterMark(tgtDeltaLog.update(), colName = "id").isEmpty, + "High watermark should not be set for user inserted data.") + if (positiveStep) { + assert(sql(s"SELECT max(id) FROM $tgtTblName").as[Long].head < targetTableStartWith) + } else { + assert(sql(s"SELECT min(id) FROM $tgtTblName").as[Long].head > targetTableStartWith) + } + + insertDataFn(srcTblName, tgtTblName) + + if (expectValidHighWaterMark) { + if (positiveStep) { + assert(highWaterMark(tgtDeltaLog.update(), colName = "id") >= targetTableStartWith) + } else { + assert(highWaterMark(tgtDeltaLog.update(), colName = "id") <= targetTableStartWith) + } + } + } + } + + for { + cdfEnabled <- DeltaTestUtils.BOOLEAN_DOMAIN + isSrcDataSubsetOfTgt <- DeltaTestUtils.BOOLEAN_DOMAIN + positiveStep <- DeltaTestUtils.BOOLEAN_DOMAIN + statementWithOnlyUpdates <- DeltaTestUtils.BOOLEAN_DOMAIN + } test( + s"MERGE UPSERT with source on identity column, cdfEnabled=$cdfEnabled, " + + s"isSrcDataSubsetOfTgt=$isSrcDataSubsetOfTgt, " + + s"positiveStep=$positiveStep, statementWithOnlyUpdates=$statementWithOnlyUpdates") { + val expectValidHighWaterMark = !statementWithOnlyUpdates && !isSrcDataSubsetOfTgt + withSrcAndDestTables( + isSrcDataSubsetOfTgt, + positiveStep, + expectValidHighWaterMark) { (srcTblName, tgtTblName) => + if (cdfEnabled) { + val cdfPropKey = DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey + sql(s"ALTER TABLE $tgtTblName SET TBLPROPERTIES('$cdfPropKey' = 'true')") + } + // Merge into the target table from the source table. + // The target table will generate values starting from targetTableStartWith. + // The high water mark from the source should not interfere. + if (statementWithOnlyUpdates) { + sql( + s""" + |MERGE INTO $tgtTblName tgt + |USING $srcTblName src ON tgt.id = src.id + |WHEN MATCHED THEN UPDATE SET tgt.value = src.value + |""".stripMargin) + } else { + sql( + s""" + |MERGE INTO $tgtTblName tgt + |USING $srcTblName src ON tgt.id = src.id + |WHEN MATCHED THEN UPDATE SET tgt.value = src.value + |WHEN NOT MATCHED THEN INSERT (value) VALUES (src.value) + |""".stripMargin) + } + + if (!expectValidHighWaterMark) { + val tgtDeltaLog = DeltaLog.forTable(spark, TableIdentifier(tgtTblName)) + assert(getHighWaterMark(tgtDeltaLog.update(), colName = "id").isEmpty) + } + } + } + + for (positiveStep <- DeltaTestUtils.BOOLEAN_DOMAIN) + test(s"MERGE UPSERT into a table with a bad watermark, positiveStep=$positiveStep") { + // Suppose that a table has a bad watermark (for whatever reason), the system should still + // have a sensible behavior and be robust to these bad watermark. + withSrcAndDestTables( + isSrcDataSubsetOfTgt = false, + positiveStep, + expectValidHighWaterMark = false) { (srcTblName, tgtTblName) => + val tgtDeltaLog = DeltaLog.forTable(spark, TableIdentifier(tgtTblName)) + forceBadWaterMark(tgtDeltaLog) + val badWaterMark = highWaterMark(tgtDeltaLog.update(), colName = "id") + sql( + s""" + |MERGE INTO $tgtTblName tgt + |USING $srcTblName src ON tgt.id = src.id + |WHEN MATCHED THEN UPDATE SET tgt.value = src.value + |WHEN NOT MATCHED THEN INSERT (value) VALUES (src.value) + |""".stripMargin) + + // Even though the high water mark is invalid, we don't want to prevent updates to the high + // water mark as this would lead to us generating the same values over and over. + val newHighWaterMark = highWaterMark(tgtDeltaLog.update(), colName = "id") + assert(newHighWaterMark !== badWaterMark, + "New data was inserted. The high water mark should have updated") + if (positiveStep) { + assert(newHighWaterMark > badWaterMark) + } else { + assert(newHighWaterMark < badWaterMark) + } + } + } } class IdentityColumnIngestionScalaSuite diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSyncSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSyncSuite.scala index 4e7e64890f..2c43530374 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSyncSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSyncSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.GeneratedAsIdentityType.GeneratedByDefault -import org.apache.spark.sql.delta.sources.DeltaSourceUtils +import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.TableIdentifier @@ -185,7 +185,7 @@ trait IdentityColumnSyncSuiteBase } test("alter table sync identity - deleting high watermark rows followed by sync identity" + - " brings down the highWatermark") { + " brings down the highWatermark only with a flag") { for (generatedAsIdentityType <- GeneratedAsIdentityType.values) { val tblName = getRandomTableName withTable(tblName) { @@ -198,8 +198,22 @@ trait IdentityColumnSyncSuiteBase checkAnswer(sql(s"SELECT max(id) FROM $tblName"), Row(41)) sql(s"DELETE FROM $tblName WHERE value IN (0, 3, 4)") assert(highWaterMark(deltaLog.snapshot, "id") === 41L) - sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY") - assert(highWaterMark(deltaLog.snapshot, "id") === 21L) + // Unless this flag is enabled, the high watermark is not updated if it is lower + // than the previous high watermark. + withSQLConf( + DeltaSQLConf.DELTA_IDENTITY_ALLOW_SYNC_IDENTITY_TO_LOWER_HIGH_WATER_MARK.key -> "false" + ) { + sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY") + assert(highWaterMark(deltaLog.update(), "id") === 41L) + } + // With the flag enabled, the high watermark is updated even if it is lower, + // than the previous high watermark, as long as it is higher than the defined start. + withSQLConf( + DeltaSQLConf.DELTA_IDENTITY_ALLOW_SYNC_IDENTITY_TO_LOWER_HIGH_WATER_MARK.key -> "true" + ) { + sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY") + assert(highWaterMark(deltaLog.update(), "id") === 21L) + } sql(s"INSERT INTO $tblName(value) VALUES (8)") checkAnswer(sql(s"SELECT max(id) FROM $tblName"), Row(31)) checkAnswer(sql(s"SELECT COUNT(DISTINCT id) == COUNT(*) FROM $tblName"), Row(true)) @@ -252,6 +266,348 @@ trait IdentityColumnSyncSuiteBase "ALTER TABLE ALTER COLUMN SYNC IDENTITY cannot be called on non IDENTITY columns.")) } } + + for (positiveStep <- DeltaTestUtils.BOOLEAN_DOMAIN) + test(s"SYNC IDENTITY on table with bad water mark. positiveStep = $positiveStep") { + val tblName = getRandomTableName + withTable(tblName) { + val incrementBy = if (positiveStep) 48 else -48 + createTableWithIdColAndIntValueCol( + tblName, + GeneratedByDefault, + startsWith = Some(100), + incrementBy = Some(incrementBy) + ) + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tblName)) + + // Insert data that don't respect the start. + if (positiveStep) { + sql(s"INSERT INTO $tblName(id, value) VALUES (4, 4)") + } else { + sql(s"INSERT INTO $tblName(id, value) VALUES (196, 196)") + } + forceBadWaterMark(deltaLog) + val badWaterMark = highWaterMark(deltaLog.snapshot, "id") + + // Even though the candidate high water mark and the existing high water mark is invalid, + // we don't want to prevent updates to the high water mark as this would lead to us + // generating the same values over and over. + sql(s"ALTER TABLE $tblName ALTER COLUMN id SYNC IDENTITY") + val newHighWaterMark = highWaterMark(deltaLog.update(), colName = "id") + assert(newHighWaterMark !== badWaterMark, + "Sync identity should update the high water mark based on the data.") + if (positiveStep) { + assert(newHighWaterMark > badWaterMark) + } else { + assert(newHighWaterMark < badWaterMark) + } + } + } + + for { + allowExplicitInsert <- DeltaTestUtils.BOOLEAN_DOMAIN + allowLoweringHighWatermarkForSyncIdentity <- DeltaTestUtils.BOOLEAN_DOMAIN + } test(s"IdentityColumn.updateToValidHighWaterMark - allowExplicitInsert = $allowExplicitInsert," + + s" allowLoweringHighWatermarkForSyncIdentity = $allowLoweringHighWatermarkForSyncIdentity") { + /** + * Unit test for the updateToValidHighWaterMark function by creating a StructField with the + * specified start, step, and existing high water mark. After calling the function, we verify + * the StructField's metadata has the expect high water mark. + */ + def testUpdateToValidHighWaterMark( + start: Long, + step: Long, + allowExplicitInsert: Boolean, + allowLoweringHighWatermarkForSyncIdentity: Boolean, + existingHighWaterMark: Option[Long], + candidateHighWaterMark: Long, + expectedHighWaterMark: Option[Long]): Unit = { + /** Creates a MetadataBuilder for Struct Metadata. */ + def getMetadataBuilder(highWaterMarkOpt: Option[Long]): MetadataBuilder = { + var metadataBuilder = new MetadataBuilder() + .putLong(DeltaSourceUtils.IDENTITY_INFO_START, start) + .putLong(DeltaSourceUtils.IDENTITY_INFO_STEP, step) + .putBoolean(DeltaSourceUtils.IDENTITY_INFO_ALLOW_EXPLICIT_INSERT, allowExplicitInsert) + + highWaterMarkOpt match { + case Some(oldHighWaterMark) => + metadataBuilder = metadataBuilder.putLong( + DeltaSourceUtils.IDENTITY_INFO_HIGHWATERMARK, oldHighWaterMark) + case None => () + } + metadataBuilder + } + + val initialStructField = StructField( + name = "id", + LongType, + nullable = false, + metadata = getMetadataBuilder(existingHighWaterMark).build()) + val (updatedStructField, _) = + IdentityColumn.updateToValidHighWaterMark( + initialStructField, candidateHighWaterMark, allowLoweringHighWatermarkForSyncIdentity) + val expectedMetadata = getMetadataBuilder(expectedHighWaterMark).build() + assert(updatedStructField.metadata === expectedMetadata) + } + + // existingHighWaterMark = None, positive step + testUpdateToValidHighWaterMark( + start = 1L, + step = 3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = None, + candidateHighWaterMark = 2L, + expectedHighWaterMark = Some(4L) // rounded up + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = 3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = None, + candidateHighWaterMark = 0L, + expectedHighWaterMark = None // below start + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = 3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = None, + candidateHighWaterMark = 1L, + expectedHighWaterMark = Some(1L) // equal to start + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = 3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = None, + candidateHighWaterMark = 7L, + expectedHighWaterMark = Some(7L) // respects start and step + ) + + // existingHighWaterMark = None, negative step + testUpdateToValidHighWaterMark( + start = 1L, + step = -3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = None, + candidateHighWaterMark = -1L, + expectedHighWaterMark = Some(-2L) // rounded up + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = -3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = None, + candidateHighWaterMark = 2L, + expectedHighWaterMark = None // above start + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = -3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = None, + candidateHighWaterMark = 1L, + expectedHighWaterMark = Some(1L) // equal to start + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = -3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = None, + candidateHighWaterMark = -5L, + expectedHighWaterMark = Some(-5L) // respects start and step + ) + + // existingHighWaterMark = Some, positive step + testUpdateToValidHighWaterMark( + start = 1L, + step = 3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(4L), + candidateHighWaterMark = 5L, + expectedHighWaterMark = Some(7L) // rounded up + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = 3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(4L), + candidateHighWaterMark = 0L, + expectedHighWaterMark = Some(4L) // below start, preserve existing high watermark + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = 3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(-5L), + candidateHighWaterMark = -2L, + expectedHighWaterMark = Some(-2L) // below start, bad existing water mark, update to candidate + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = 3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(-5L), + candidateHighWaterMark = 0L, + expectedHighWaterMark = Some(1L) // below start, bad existing water mark, update rounded up + ) + + testUpdateToValidHighWaterMark( + start = 1L, + step = 3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(-5L), + candidateHighWaterMark = -9L, + expectedHighWaterMark = if (allowLoweringHighWatermarkForSyncIdentity) { + // below start, bad existing water mark, allow lowering, rounded down + Some(-8L) + } else { + // below start, bad existing water mark, keep existing + Some(-5L) + } + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = 3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(4L), + candidateHighWaterMark = 1L, + expectedHighWaterMark = if (allowLoweringHighWatermarkForSyncIdentity) { + Some(1L) // allow lowering + } else { + Some(4L) // below existing high watermark + } + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = 3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(4L), + candidateHighWaterMark = 7L, + expectedHighWaterMark = Some(7L) // respects start and step + ) + + // existingHighWaterMark = Some, negative step + testUpdateToValidHighWaterMark( + start = 1L, + step = -3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(-2L), + candidateHighWaterMark = -3L, + expectedHighWaterMark = Some(-5L) // rounded up + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = -3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(-2L), + candidateHighWaterMark = 2L, + expectedHighWaterMark = Some(-2L) // above start, preserve existing high water mark + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = -3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(7L), + candidateHighWaterMark = 4L, + expectedHighWaterMark = Some(4L) // above start, bad existing water mark, update to candidate + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = -3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(7L), + candidateHighWaterMark = 6L, + expectedHighWaterMark = Some(4L) // above start, bad existing water mark, update rounded down + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = -3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(7L), + candidateHighWaterMark = 11L, + expectedHighWaterMark = if (allowLoweringHighWatermarkForSyncIdentity) { + // above start, bad existing water mark, allow lowering, rounded down + Some(10L) + } else { + // above start, bad existing water mark, keep existing + Some(7L) + } + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = -3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(-2L), + candidateHighWaterMark = 1L, + expectedHighWaterMark = if (allowLoweringHighWatermarkForSyncIdentity) { + Some(1L) // allow lowering + } else { + Some(-2L) // higher than high watermark + } + ) + testUpdateToValidHighWaterMark( + start = 1L, + step = -3L, + allowExplicitInsert = allowExplicitInsert, + allowLoweringHighWatermarkForSyncIdentity = allowLoweringHighWatermarkForSyncIdentity, + existingHighWaterMark = Some(-2L), + candidateHighWaterMark = -5L, + expectedHighWaterMark = Some(-5L) // respects start and step + ) + } + + test("IdentityColumn.roundToNext") { + val posStart = 7L + val negStart = -7L + val posLargeStart = Long.MaxValue - 10000 + val negLargeStart = Long.MinValue + 10000 + for (start <- Seq(posStart, negStart, posLargeStart, negLargeStart)) { + assert(IdentityColumn.roundToNext(start = start, step = 3L, value = start) === start) + assert(IdentityColumn.roundToNext( + start = start, step = 3L, value = start + 5L) === start + 6L) + assert(IdentityColumn.roundToNext( + start = start, step = 3L, value = start + 6L) === start + 6L) + assert(IdentityColumn.roundToNext( + start = start, step = 3L, value = start - 5L) === start - 3L) // bad watermark + assert(IdentityColumn.roundToNext( + start = start, step = 3L, value = start - 7L) === start - 6L) // bad watermark + assert(IdentityColumn.roundToNext( + start = start, step = 3L, value = start - 6L) === start - 6L) // bad watermark + assert(IdentityColumn.roundToNext(start = start, step = -3L, value = start) === start) + assert(IdentityColumn.roundToNext( + start = start, step = -3L, value = start - 5L) === start - 6L) + assert(IdentityColumn.roundToNext( + start = start, step = -3L, value = start - 6L) === start - 6L) + assert(IdentityColumn.roundToNext( + start = start, step = -3L, value = start + 5L) === start + 3L) // bad watermark + assert(IdentityColumn.roundToNext( + start = start, step = -3L, value = start + 7L) === start + 6L) // bad watermark + assert(IdentityColumn.roundToNext( + start = start, step = -3L, value = start + 6L) === start + 6L) // bad watermark + } + } } class IdentityColumnSyncScalaSuite diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnTestUtils.scala index 82fdd61624..b1a319e424 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnTestUtils.scala @@ -99,12 +99,26 @@ trait IdentityColumnTestUtils checkAnswer(sql(s"SELECT * FROM $tableName ORDER BY value ASC"), expectedAnswer) } + + /** + * Retrieves the high watermark information for the given `colName` in the metadata of + * given `snapshot`, if it's present. Returns None if the high watermark has not been set yet. + */ + protected def getHighWaterMark(snapshot: Snapshot, colName: String): Option[Long] = { + val metadata = snapshot.schema(colName).metadata + if (metadata.contains(DeltaSourceUtils.IDENTITY_INFO_HIGHWATERMARK)) { + Some(metadata.getLong(DeltaSourceUtils.IDENTITY_INFO_HIGHWATERMARK)) + } else { + None + } + } + /** * Retrieves the high watermark information for the given `colName` in the metadata of * given `snapshot` */ protected def highWaterMark(snapshot: Snapshot, colName: String): Long = { - snapshot.schema(colName).metadata.getLong(DeltaSourceUtils.IDENTITY_INFO_HIGHWATERMARK) + getHighWaterMark(snapshot, colName).get } /** @@ -174,5 +188,28 @@ trait IdentityColumnTestUtils assert(sortedRows.last.id <= expectedUpperBound) assert(sortedRows.map(_.id).distinct.size === expectedDistinctCount) } + + /** Force a bad high water mark on all identity columns in the table with a manual commit. */ + def forceBadWaterMark(deltaLog: DeltaLog): Unit = { + deltaLog.withNewTransaction { txn => + // Manually corrupt the high water mark. + val tblSchema = txn.snapshot.schema + val badTblSchema = StructType(tblSchema.map { + case tblIdCol if ColumnWithDefaultExprUtils.isIdentityColumn(tblIdCol) => + val identityInfo = IdentityColumn.getIdentityInfo(tblIdCol) + // This bad water mark needs to follow the step and start, + // otherwise we fail the requirement in GenerateIdentityValues + val badWaterMark = identityInfo.start - identityInfo.step * 1000 + val tblColMetadata = tblIdCol.metadata + val badMetadata = new MetadataBuilder().withMetadata(tblColMetadata) + .putLong(DeltaSourceUtils.IDENTITY_INFO_HIGHWATERMARK, badWaterMark).build() + tblIdCol.copy(metadata = badMetadata) + case f => f + }) + val updatedMetadata = txn.snapshot.metadata.copy(schemaString = badTblSchema.json) + txn.updateMetadata(updatedMetadata, ignoreDefaultProperties = false) + txn.commit(Nil, DeltaOperations.ManualUpdate) + } + } }