diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index e1ec6ff6c4f5..ec1e6ccd77e5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -57,6 +57,8 @@ import org.apache.paimon.utils.ChangelogManager; import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.FileSystemBranchManager; +import org.apache.paimon.utils.InternalRowPartitionComputer; +import org.apache.paimon.utils.PartitionStatisticsReporter; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SimpleFileReader; @@ -449,6 +451,13 @@ public ExpireSnapshots newExpireChangelog() { @Override public TableCommitImpl newCommit(String commitUser) { CoreOptions options = coreOptions(); + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + options.partitionDefaultName(), + schema().logicalPartitionType(), + partitionKeys().toArray(new String[0]), + options.legacyPartitionName()); + return new TableCommitImpl( store().newCommit(commitUser, this), newExpireRunnable(), @@ -459,7 +468,25 @@ public TableCommitImpl newCommit(String commitUser) { options.snapshotExpireExecutionMode(), name(), options.forceCreatingSnapshot(), - options.fileOperationThreadNum()); + options.fileOperationThreadNum(), + partitionStatisticsReporter(), + partitionComputer); + } + + @Nullable + private PartitionStatisticsReporter partitionStatisticsReporter() { + CoreOptions options = coreOptions(); + if (options.toConfiguration() + .get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC) + .toMillis() + <= 0 + || partitionKeys().isEmpty() + || !options.partitionedTableInMetastore() + || catalogEnvironment().partitionHandler() == null) { + return null; + } + + return new PartitionStatisticsReporter(this, catalogEnvironment().partitionHandler()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 3e778c6bcc5a..3ee3d755bd10 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -20,6 +20,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.consumer.ConsumerManager; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexPathFactory; import org.apache.paimon.io.DataFileMeta; @@ -37,7 +38,11 @@ import org.apache.paimon.utils.DataFilePathFactories; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.FileOperationThreadPool; +import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.IndexFilePathFactories; +import org.apache.paimon.utils.InternalRowPartitionComputer; +import org.apache.paimon.utils.PartitionPathUtils; +import org.apache.paimon.utils.PartitionStatisticsReporter; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors; @@ -54,6 +59,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -87,6 +93,8 @@ public class TableCommitImpl implements InnerTableCommit { private final String tableName; private final boolean forceCreatingSnapshot; private final ThreadPoolExecutor fileCheckExecutor; + @Nullable private final PartitionStatisticsReporter reporter; + private final InternalRowPartitionComputer partitionComputer; @Nullable private Map overwritePartition = null; private boolean batchCommitted = false; @@ -102,7 +110,9 @@ public TableCommitImpl( ExpireExecutionMode expireExecutionMode, String tableName, boolean forceCreatingSnapshot, - int threadNum) { + int threadNum, + PartitionStatisticsReporter partitionStatisticsReporter, + InternalRowPartitionComputer partitionComputer) { if (partitionExpire != null) { commit.withPartitionExpire(partitionExpire); } @@ -111,6 +121,8 @@ public TableCommitImpl( this.expireSnapshots = expireSnapshots; this.partitionExpire = partitionExpire; this.tagAutoManager = tagAutoManager; + this.partitionComputer = partitionComputer; + this.reporter = partitionStatisticsReporter; this.consumerExpireTime = consumerExpireTime; this.consumerManager = consumerManager; @@ -256,6 +268,7 @@ public void commitMultiple(List committables, boolean check maintainExecutor, newSnapshots > 0 || expireForEmptyCommit); } + reportPartitionStats(committables); } public int filterAndCommitMultiple(List committables) { @@ -398,6 +411,39 @@ public void expireSnapshots() { } } + private void reportPartitionStats(List committables) { + // pre-checks + + if (reporter == null) { + return; + } + if (!batchCommitted) { + return; + } + + long currentTime = System.currentTimeMillis(); + LOG.info("Start to report partition statistics"); + try { + // collect distinct partitions from all committables + List parts = + committables.stream() + .map(ManifestCommittable::fileCommittables) + .flatMap(List::stream) + .map(CommitMessage::partition) + .distinct() + .collect(Collectors.toList()); + for (BinaryRow part : parts) { + LinkedHashMap spec = partitionComputer.generatePartValues(part); + String partitionPath = PartitionPathUtils.generatePartitionPath(spec); + reporter.report(partitionPath, currentTime); + } + } catch (Throwable e) { + LOG.warn("Failed to report partition statistics after commit", e); + } finally { + IOUtils.closeQuietly(reporter); + } + } + @Override public void close() throws Exception { commit.close(); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/WriteHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/WriteHelper.scala index bc07a310c085..3559d0b1111d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/WriteHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/WriteHelper.scala @@ -41,50 +41,10 @@ trait WriteHelper extends Logging { return } - reportToHms(messages) batchCreateTag() markDoneIfNeeded(messages) } - private def reportToHms(messages: Seq[CommitMessage]): Unit = { - val config = coreOptions.toConfiguration - if ( - config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis <= 0 || - table.partitionKeys.isEmpty || - !coreOptions.partitionedTableInMetastore || - table.catalogEnvironment.partitionHandler() == null - ) { - return - } - - val partitionComputer = new InternalRowPartitionComputer( - coreOptions.partitionDefaultName, - table.schema.logicalPartitionType, - table.partitionKeys.toArray(new Array[String](0)), - coreOptions.legacyPartitionName() - ) - val hmsReporter = new PartitionStatisticsReporter( - table, - table.catalogEnvironment.partitionHandler() - ) - - val partitions = messages.map(_.partition()).distinct - val currentTime = System.currentTimeMillis() - try { - partitions.foreach { - partition => - val partitionPath = PartitionPathUtils.generatePartitionPath( - partitionComputer.generatePartValues(partition)) - hmsReporter.report(partitionPath, currentTime) - } - } catch { - case e: Throwable => - logWarning("Failed to report to hms", e) - } finally { - hmsReporter.close() - } - } - private def batchCreateTag(): Unit = { if (coreOptions.tagCreationMode() == TagCreationMode.BATCH) { val tagCreation = new TagBatchCreation(table) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index d22855fd639b..0267fd79dee5 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -18,12 +18,17 @@ package org.apache.paimon.spark.sql +import org.apache.paimon.catalog.CachingCatalog +import org.apache.paimon.client.ClientPool import org.apache.paimon.fs.Path -import org.apache.paimon.spark.PaimonHiveTestBase +import org.apache.paimon.hive.HiveCatalog +import org.apache.paimon.spark.{PaimonHiveTestBase, SparkCatalog} import org.apache.paimon.table.FileStoreTable +import org.apache.hadoop.hive.metastore.IMetaStoreClient import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.thrift.TException import org.junit.jupiter.api.Assertions abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { @@ -773,6 +778,69 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { } } + test("test report partition statistics: write/compaction") { + Seq(paimonHiveCatalogName).foreach { + catalogName => + spark.sql(s"USE $catalogName") + withTable("paimon_tbl") { + spark.sql(s""" + |CREATE TABLE paimon_tbl (id STRING, name STRING, pt STRING) + |USING PAIMON + |TBLPROPERTIES ( + |'partition.idle-time-to-report-statistic' = '1s', + |'metastore.partitioned-table' = 'true', + |'compaction.min.file-num' = '1' + |) PARTITIONED BY (pt) + |""".stripMargin) + spark.sql("insert into paimon_tbl values('1', 'jack', '2025')") + + val tbl = "paimon_tbl" + val hiveCatalog = spark.sessionState.catalogManager.currentCatalog + .asInstanceOf[SparkCatalog] + .paimonCatalog() + .asInstanceOf[CachingCatalog] + .wrapped() + val clientField = classOf[HiveCatalog].getDeclaredField("clients") + clientField.setAccessible(true) + + val clients = + clientField.get(hiveCatalog).asInstanceOf[ClientPool[IMetaStoreClient, TException]] + + try { + val part = clients.run( + (client: IMetaStoreClient) => client.getPartition("default", tbl, "pt=2025")) + Assertions.assertEquals(1, part.getParameters.get("numFiles").toInt) + } catch { + case e: Exception => Assertions.fail(e.getMessage) + } + + spark.sql("insert into paimon_tbl values('2', 'jack', '2025')") + + try { + val part = clients.run( + (client: IMetaStoreClient) => client.getPartition("default", tbl, "pt=2025")) + Assertions.assertEquals(2, part.getParameters.get("numFiles").toInt) + } catch { + case e: Exception => Assertions.fail(e.getMessage) + } + + spark.sql(s""" + CALL sys.compact(table => 'default.$tbl', partitions => "pt='2025'")""") + + try { + val part = clients.run( + (client: IMetaStoreClient) => client.getPartition("default", tbl, "pt=2025")) + // The paimon latest snapshot should be 1, but the hive will get the partition statistic by hdfs client + // so it equal to 3. But it also triggered by paimon report action. + Assertions.assertEquals(3, part.getParameters.get("numFiles").toInt) + } catch { + case e: Exception => Assertions.fail(e.getMessage) + } + } + } + + } + def getDatabaseProp(dbName: String, propertyName: String): String = { spark .sql(s"DESC DATABASE EXTENDED $dbName")