From bb61817af2ecc842bd6b07af50eda98a8eae5aa6 Mon Sep 17 00:00:00 2001 From: "wenchao.wu" Date: Thu, 30 Apr 2026 18:59:50 +0800 Subject: [PATCH] [spark] Preserve table options for RTAS saveAsTable overwrite. --- .../PaimonCreateTableAsSelectStrategy.scala | 92 +++++++---- .../PaimonCreateTableAsSelectStrategy.scala | 91 +++++++---- .../PaimonCreateTableAsSelectStrategy.scala | 93 +++++++---- .../PaimonCreateTableAsSelectStrategy.scala | 91 +++++++---- .../spark/execution/PaimonStrategy.scala | 5 +- .../PaimonCreateTableAsSelectStrategy.scala | 150 ++++++++++++++---- .../spark/sql/DataFrameWriteTestBase.scala | 31 +++- 7 files changed, 412 insertions(+), 141 deletions(-) diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index cc6258e6eb69..d9a4e9841865 100644 --- a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -23,10 +23,10 @@ import org.apache.paimon.iceberg.IcebergOptions import org.apache.paimon.spark.catalog.FormatTableCatalog import org.apache.spark.sql.{SparkSession, Strategy} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan} -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec +import org.apache.spark.sql.execution.datasources.v2.{CreateTableAsSelectExec, DataSourceV2Relation, ReplaceTableAsSelectExec} import org.apache.spark.sql.util.CaseInsensitiveStringMap import scala.collection.JavaConverters._ @@ -39,30 +39,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends Strate case _: StagingTableCatalog => throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") case _ => - val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq - - // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) - val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq - - val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys - - val (tableOptions, writeOptions) = options.partition { - case (key, _) => allTableOptionKeys.contains(key) - } - val newProps = CatalogV2Util.withDefaultOwnership(props) ++ tableOptions - - val isPartitionedFormatTable = { - catalog match { - case catalog: FormatTableCatalog => - catalog.isFormatTable(newProps.get("provider").orNull) && parts.nonEmpty - case _ => false - } - } - - if (isPartitionedFormatTable) { - throw new UnsupportedOperationException( - "Using CTAS with partitioned format table is not supported yet.") - } + val (newProps, writeOptions) = propertiesWithOptions(props, options) + failIfPartitionedFormatTable(catalog, newProps, parts) CreateTableAsSelectExec( catalog, @@ -75,6 +53,66 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends Strate ifNotExists ) :: Nil } + case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) => + catalog match { + case _: StagingTableCatalog => + throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") + case _ => + val (newProps, writeOptions) = propertiesWithOptions(props, options) + + ReplaceTableAsSelectExec( + catalog, + ident, + parts, + query, + planLater(query), + newProps, + new CaseInsensitiveStringMap(writeOptions.asJava), + orCreate, + invalidateCache + ) :: Nil + } case _ => Nil } + + private lazy val tableOptionKeys: Seq[String] = { + val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq + + // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) + val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq + + coreOptionKeys ++ icebergOptionKeys + } + + private def propertiesWithOptions( + props: Map[String, String], + options: Map[String, String]): (Map[String, String], Map[String, String]) = { + val (tableOptions, writeOptions) = options.partition { + case (key, _) => tableOptionKeys.contains(key) + } + (CatalogV2Util.withDefaultOwnership(props) ++ tableOptions, writeOptions) + } + + private def failIfPartitionedFormatTable( + catalog: TableCatalog, + properties: Map[String, String], + parts: Seq[_]): Unit = { + val isPartitionedFormatTable = { + catalog match { + case catalog: FormatTableCatalog => + catalog.isFormatTable(properties.get("provider").orNull) && parts.nonEmpty + case _ => false + } + } + + if (isPartitionedFormatTable) { + throw new UnsupportedOperationException( + "Using CTAS with partitioned format table is not supported yet.") + } + } + + private def invalidateCache(catalog: TableCatalog, table: Table, ident: Identifier): Unit = { + val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) + spark.sharedState.cacheManager.uncacheQuery(spark, v2Relation, true, false) + } } diff --git a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index a09996f1534a..3c4b192850dd 100644 --- a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -21,14 +21,13 @@ package org.apache.spark.sql.execution.shim import org.apache.paimon.CoreOptions import org.apache.paimon.iceberg.IcebergOptions import org.apache.paimon.spark.SparkCatalog -import org.apache.paimon.spark.catalog.FormatTableCatalog import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.ResolvedDBObjectName -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec} -import org.apache.spark.sql.connector.catalog.StagingTableCatalog +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan} -import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec +import org.apache.spark.sql.execution.datasources.v2.{CreateTableAsSelectExec, DataSourceV2Relation, ReplaceTableAsSelectExec} import org.apache.spark.sql.util.CaseInsensitiveStringMap import scala.collection.JavaConverters._ @@ -51,30 +50,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) case _: StagingTableCatalog => throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") case _ => - val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq - - // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) - val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq - - val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys - - val (tableOptions, writeOptions) = options.partition { - case (key, _) => allTableOptionKeys.contains(key) - } - val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions) - - val isPartitionedFormatTable = { - catalog match { - case catalog: FormatTableCatalog => - catalog.isFormatTable(newTableSpec.provider.orNull) && parts.nonEmpty - case _ => false - } - } - - if (isPartitionedFormatTable) { - throw new UnsupportedOperationException( - "Using CTAS with partitioned format table is not supported yet.") - } + val (newTableSpec, writeOptions) = tableSpecWithOptions(tableSpec, options) + failIfPartitionedFormatTable(catalog, newTableSpec, parts) CreateTableAsSelectExec( catalog.asTableCatalog, @@ -87,6 +64,64 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) ifNotExists ) :: Nil } + case ReplaceTableAsSelect( + ResolvedDBObjectName(catalog: SparkCatalog, ident), + parts, + query, + tableSpec: TableSpec, + options, + orCreate) => + catalog match { + case _: StagingTableCatalog => + throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") + case _ => + val (newTableSpec, writeOptions) = tableSpecWithOptions(tableSpec, options) + + ReplaceTableAsSelectExec( + catalog.asTableCatalog, + ident.asIdentifier, + parts, + query, + planLater(query), + qualifyLocInTableSpec(newTableSpec), + new CaseInsensitiveStringMap(writeOptions.asJava), + orCreate, + invalidateCache + ) :: Nil + } case _ => Nil } + + private lazy val tableOptionKeys: Seq[String] = { + val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq + + // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) + val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq + + coreOptionKeys ++ icebergOptionKeys + } + + private def tableSpecWithOptions( + tableSpec: TableSpec, + options: Map[String, String]): (TableSpec, Map[String, String]) = { + val (tableOptions, writeOptions) = options.partition { + case (key, _) => tableOptionKeys.contains(key) + } + (tableSpec.copy(properties = tableSpec.properties ++ tableOptions), writeOptions) + } + + private def failIfPartitionedFormatTable( + catalog: SparkCatalog, + tableSpec: TableSpec, + parts: Seq[_]): Unit = { + if (catalog.isFormatTable(tableSpec.provider.orNull) && parts.nonEmpty) { + throw new UnsupportedOperationException( + "Using CTAS with partitioned format table is not supported yet.") + } + } + + private def invalidateCache(catalog: TableCatalog, table: Table, ident: Identifier): Unit = { + val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) + spark.sharedState.cacheManager.uncacheQuery(spark, v2Relation, true, false) + } } diff --git a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index 4a82f35188dd..319eb88cff26 100644 --- a/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -21,14 +21,13 @@ package org.apache.spark.sql.execution.shim import org.apache.paimon.CoreOptions import org.apache.paimon.iceberg.IcebergOptions import org.apache.paimon.spark.SparkCatalog -import org.apache.paimon.spark.catalog.FormatTableCatalog import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec} -import org.apache.spark.sql.connector.catalog.StagingTableCatalog +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan} -import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec +import org.apache.spark.sql.execution.datasources.v2.{CreateTableAsSelectExec, DataSourceV2Relation, ReplaceTableAsSelectExec} import org.apache.spark.sql.util.CaseInsensitiveStringMap import scala.collection.JavaConverters._ @@ -53,30 +52,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) case _: StagingTableCatalog => throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") case _ => - val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq - - // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) - val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq - - val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys - - val (tableOptions, writeOptions) = options.partition { - case (key, _) => allTableOptionKeys.contains(key) - } - val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions) - - val isPartitionedFormatTable = { - catalog match { - case catalog: FormatTableCatalog => - catalog.isFormatTable(newTableSpec.provider.orNull) && parts.nonEmpty - case _ => false - } - } - - if (isPartitionedFormatTable) { - throw new UnsupportedOperationException( - "Using CTAS with partitioned format table is not supported yet.") - } + val (newTableSpec, writeOptions) = tableSpecWithOptions(tableSpec, options) + failIfPartitionedFormatTable(catalog, newTableSpec, parts) CreateTableAsSelectExec( catalog.asTableCatalog, @@ -89,6 +66,66 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) ifNotExists ) :: Nil } + case ReplaceTableAsSelect( + ResolvedIdentifier(catalog: SparkCatalog, ident), + parts, + query, + tableSpec: TableSpec, + options, + orCreate, + analyzedQuery) => + assert(analyzedQuery.isDefined) + catalog match { + case _: StagingTableCatalog => + throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") + case _ => + val (newTableSpec, writeOptions) = tableSpecWithOptions(tableSpec, options) + + ReplaceTableAsSelectExec( + catalog.asTableCatalog, + ident, + parts, + analyzedQuery.get, + planLater(query), + qualifyLocInTableSpec(newTableSpec), + new CaseInsensitiveStringMap(writeOptions.asJava), + orCreate, + invalidateCache + ) :: Nil + } case _ => Nil } + + private lazy val tableOptionKeys: Seq[String] = { + val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq + + // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) + val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq + + coreOptionKeys ++ icebergOptionKeys + } + + private def tableSpecWithOptions( + tableSpec: TableSpec, + options: Map[String, String]): (TableSpec, Map[String, String]) = { + val (tableOptions, writeOptions) = options.partition { + case (key, _) => tableOptionKeys.contains(key) + } + (tableSpec.copy(properties = tableSpec.properties ++ tableOptions), writeOptions) + } + + private def failIfPartitionedFormatTable( + catalog: SparkCatalog, + tableSpec: TableSpec, + parts: Seq[_]): Unit = { + if (catalog.isFormatTable(tableSpec.provider.orNull) && parts.nonEmpty) { + throw new UnsupportedOperationException( + "Using CTAS with partitioned format table is not supported yet.") + } + } + + private def invalidateCache(catalog: TableCatalog, table: Table, ident: Identifier): Unit = { + val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) + spark.sharedState.cacheManager.uncacheQuery(spark, v2Relation, true, false) + } } diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index 61e25b7c16a9..f324bc0bda01 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -21,14 +21,13 @@ package org.apache.spark.sql.execution.shim import org.apache.paimon.CoreOptions import org.apache.paimon.iceberg.IcebergOptions import org.apache.paimon.spark.SparkCatalog -import org.apache.paimon.spark.catalog.FormatTableCatalog import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec} -import org.apache.spark.sql.connector.catalog.StagingTableCatalog +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, SparkStrategy} -import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec +import org.apache.spark.sql.execution.datasources.v2.{CreateTableAsSelectExec, DataSourceV2Relation, ReplaceTableAsSelectExec} import scala.collection.JavaConverters._ @@ -51,30 +50,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) case _: StagingTableCatalog => throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") case _ => - val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq - - // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) - val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq - - val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys - - val (tableOptions, writeOptions) = options.partition { - case (key, _) => allTableOptionKeys.contains(key) - } - val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions) - - val isPartitionedFormatTable = { - catalog match { - case catalog: FormatTableCatalog => - catalog.isFormatTable(newTableSpec.provider.orNull) && parts.nonEmpty - case _ => false - } - } - - if (isPartitionedFormatTable) { - throw new UnsupportedOperationException( - "Using CTAS with partitioned format table is not supported yet.") - } + val (newTableSpec, writeOptions) = tableSpecWithOptions(tableSpec, options) + failIfPartitionedFormatTable(catalog, newTableSpec, parts) CreateTableAsSelectExec( catalog.asTableCatalog, @@ -85,6 +62,64 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) writeOptions, ifNotExists) :: Nil } + case ReplaceTableAsSelect( + ResolvedIdentifier(catalog: SparkCatalog, ident), + parts, + query, + tableSpec: TableSpec, + options, + orCreate, + true) => + catalog match { + case _: StagingTableCatalog => + throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") + case _ => + val (newTableSpec, writeOptions) = tableSpecWithOptions(tableSpec, options) + + ReplaceTableAsSelectExec( + catalog.asTableCatalog, + ident, + parts, + query, + qualifyLocInTableSpec(newTableSpec), + writeOptions, + orCreate, + invalidateCache) :: Nil + } case _ => Nil } + + private lazy val tableOptionKeys: Seq[String] = { + val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq + + // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) + val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq + + coreOptionKeys ++ icebergOptionKeys + } + + private def tableSpecWithOptions( + tableSpec: TableSpec, + options: Map[String, String]): (TableSpec, Map[String, String]) = { + val (tableOptions, writeOptions) = options.partition { + case (key, _) => tableOptionKeys.contains(key) + } + (tableSpec.copy(properties = tableSpec.properties ++ tableOptions), writeOptions) + } + + private def failIfPartitionedFormatTable( + catalog: SparkCatalog, + tableSpec: TableSpec, + parts: Seq[_]): Unit = { + if (catalog.isFormatTable(tableSpec.provider.orNull) && parts.nonEmpty) { + throw new UnsupportedOperationException( + "Using CTAS with partitioned format table is not supported yet.") + } + } + + private def invalidateCache(catalog: TableCatalog, table: Table, ident: Identifier): Unit = { + val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) + val classicSpark = spark.asInstanceOf[org.apache.spark.sql.classic.SparkSession] + classicSpark.sharedState.cacheManager.uncacheQuery(classicSpark, v2Relation, true, false) + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala index 3be8b5a74e79..caba039782e0 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable} import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, PredicateHelper} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, DescribeRelation, LogicalPlan, ShowCreateTable} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, DescribeRelation, LogicalPlan, ReplaceTableAsSelect, ShowCreateTable} import org.apache.spark.sql.connector.catalog.{Identifier, PaimonLookupCatalog, TableCatalog} import org.apache.spark.sql.execution.{PaimonDescribeTableExec, SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation} @@ -51,6 +51,9 @@ case class PaimonStrategy(spark: SparkSession) case ctas: CreateTableAsSelect => PaimonCreateTableAsSelectStrategy(spark)(ctas) + case rtas: ReplaceTableAsSelect => + PaimonCreateTableAsSelectStrategy(spark)(rtas) + case c @ PaimonCallCommand(procedure, args) => val input = buildInternalRow(args) PaimonCallExec(c.output, procedure, input) :: Nil diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala index 61e25b7c16a9..c10dd0616005 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/shim/PaimonCreateTableAsSelectStrategy.scala @@ -21,14 +21,13 @@ package org.apache.spark.sql.execution.shim import org.apache.paimon.CoreOptions import org.apache.paimon.iceberg.IcebergOptions import org.apache.paimon.spark.SparkCatalog -import org.apache.paimon.spark.catalog.FormatTableCatalog import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec} -import org.apache.spark.sql.connector.catalog.StagingTableCatalog +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect, TableSpec} +import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, Table, TableCatalog} import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan, SparkStrategy} -import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec +import org.apache.spark.sql.execution.datasources.v2.{CreateTableAsSelectExec, DataSourceV2Relation, ReplaceTableAsSelectExec} import scala.collection.JavaConverters._ @@ -51,30 +50,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) case _: StagingTableCatalog => throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") case _ => - val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq - - // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) - val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq - - val allTableOptionKeys = coreOptionKeys ++ icebergOptionKeys - - val (tableOptions, writeOptions) = options.partition { - case (key, _) => allTableOptionKeys.contains(key) - } - val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ tableOptions) - - val isPartitionedFormatTable = { - catalog match { - case catalog: FormatTableCatalog => - catalog.isFormatTable(newTableSpec.provider.orNull) && parts.nonEmpty - case _ => false - } - } - - if (isPartitionedFormatTable) { - throw new UnsupportedOperationException( - "Using CTAS with partitioned format table is not supported yet.") - } + val (newTableSpec, writeOptions) = tableSpecWithOptions(tableSpec, options) + failIfPartitionedFormatTable(catalog, newTableSpec, parts) CreateTableAsSelectExec( catalog.asTableCatalog, @@ -85,6 +62,123 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) writeOptions, ifNotExists) :: Nil } + case ReplaceTableAsSelect( + ResolvedIdentifier(catalog: SparkCatalog, ident), + parts, + query, + tableSpec: TableSpec, + options, + orCreate, + true) => + catalog match { + case _: StagingTableCatalog => + throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.") + case _ => + val (newTableSpec, writeOptions) = tableSpecWithOptions(tableSpec, options) + + replaceTableAsSelectExec( + catalog.asTableCatalog, + ident, + parts, + query, + qualifyLocInTableSpec(newTableSpec), + writeOptions, + orCreate) :: Nil + } case _ => Nil } + + private lazy val tableOptionKeys: Seq[String] = { + val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq + + // Include Iceberg compatibility options in table properties (fix for DataFrame writer options) + val icebergOptionKeys = IcebergOptions.getOptions.asScala.map(_.key()).toSeq + + coreOptionKeys ++ icebergOptionKeys + } + + private def tableSpecWithOptions( + tableSpec: TableSpec, + options: Map[String, String]): (TableSpec, Map[String, String]) = { + val (tableOptions, writeOptions) = options.partition { + case (key, _) => tableOptionKeys.contains(key) + } + (tableSpec.copy(properties = tableSpec.properties ++ tableOptions), writeOptions) + } + + private def failIfPartitionedFormatTable( + catalog: SparkCatalog, + tableSpec: TableSpec, + parts: Seq[_]): Unit = { + if (catalog.isFormatTable(tableSpec.provider.orNull) && parts.nonEmpty) { + throw new UnsupportedOperationException( + "Using CTAS with partitioned format table is not supported yet.") + } + } + + private def replaceTableAsSelectExec( + catalog: TableCatalog, + ident: Identifier, + parts: Seq[_], + query: LogicalPlan, + tableSpec: TableSpec, + writeOptions: Map[String, String], + orCreate: Boolean): SparkPlan = { + val constructor = + classOf[ReplaceTableAsSelectExec].getConstructors.find(_.getParameterTypes.length == 8).get + // Spark 3.5 uses Function3(catalog, table, ident); Spark 4.1 uses Function2(catalog, ident). + val invalidate = + if (classOf[Function3[_, _, _, _]].isAssignableFrom(constructor.getParameterTypes.last)) { + ( + (catalog: TableCatalog, table: Table, ident: Identifier) => + invalidateCache(catalog, table, ident)).asInstanceOf[AnyRef] + } else { + ((catalog: TableCatalog, ident: Identifier) => invalidateCache(catalog, ident)) + .asInstanceOf[AnyRef] + } + + constructor + .newInstance( + catalog, + ident, + parts, + query, + tableSpec, + writeOptions, + java.lang.Boolean.valueOf(orCreate), + invalidate) + .asInstanceOf[SparkPlan] + } + + private def invalidateCache(catalog: TableCatalog, ident: Identifier): Unit = { + invalidateCache(catalog, catalog.loadTable(ident), ident) + } + + private def invalidateCache(catalog: TableCatalog, table: Table, ident: Identifier): Unit = { + val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) + val sparkObject = spark.asInstanceOf[AnyRef] + val sharedState = sparkObject.getClass.getMethod("sharedState").invoke(sparkObject) + val cacheManager = sharedState.getClass.getMethod("cacheManager").invoke(sharedState) + val uncacheQuery = + cacheManager.getClass.getMethods + .find { + method => + val parameterTypes = method.getParameterTypes + method.getName == "uncacheQuery" && + parameterTypes.length == 4 && + parameterTypes(0).isAssignableFrom(sparkObject.getClass) && + parameterTypes(1).isAssignableFrom(v2Relation.getClass) + } + .getOrElse( + throw new NoSuchMethodException( + "CacheManager.uncacheQuery(SparkSession, LogicalPlan, ...)") + ) + + uncacheQuery.invoke( + cacheManager, + sparkObject, + v2Relation, + java.lang.Boolean.TRUE, + java.lang.Boolean.FALSE) + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala index b25e41a3fb42..e950ce93bb58 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala @@ -150,13 +150,17 @@ abstract class DataFrameWriteTestBase extends PaimonSparkTestBase { sql("SHOW PARTITIONS t"), Seq(Row("pt=p1")) ) + val replacedTable = loadTable("t") + Assertions.assertEquals(2, replacedTable.primaryKeys().size()) + Assertions.assertEquals("a", replacedTable.primaryKeys().get(0)) + Assertions.assertEquals("pt", replacedTable.primaryKeys().get(1)) } } } } test("Paimon: DataFrameWrite.saveAsTable") { - withTable("test_ctas") { + withTable("test_ctas", "test_rtas") { Seq((1L, "x1"), (2L, "x2")) .toDF("a", "b") .write @@ -180,6 +184,31 @@ abstract class DataFrameWriteTestBase extends PaimonSparkTestBase { // non-core options should not be here. Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema")) Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema.explicit-cast")) + + Seq((3L, "x3"), (4L, "x4")) + .toDF("a", "b") + .write + .format("paimon") + .mode("overwrite") + .option("primary-key", "a") + .option("bucket", "-1") + .option("target-file-size", "256MB") + .option("write.merge-schema", "true") + .option("write.merge-schema.explicit-cast", "true") + .saveAsTable("test_rtas") + + val replacedTable = loadTable("test_rtas") + Assertions.assertEquals(1, replacedTable.primaryKeys().size()) + Assertions.assertEquals("a", replacedTable.primaryKeys().get(0)) + + // check all the core options + Assertions.assertEquals("-1", replacedTable.options().get("bucket")) + Assertions.assertEquals("256MB", replacedTable.options().get("target-file-size")) + + // non-core options should not be here. + Assertions.assertFalse(replacedTable.options().containsKey("write.merge-schema")) + Assertions.assertFalse( + replacedTable.options().containsKey("write.merge-schema.explicit-cast")) } }