Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileSystemBranchManager;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionStatisticsReporter;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SimpleFileReader;
Expand Down Expand Up @@ -449,6 +451,13 @@ public ExpireSnapshots newExpireChangelog() {
@Override
public TableCommitImpl newCommit(String commitUser) {
CoreOptions options = coreOptions();
InternalRowPartitionComputer partitionComputer =
new InternalRowPartitionComputer(
options.partitionDefaultName(),
schema().logicalPartitionType(),
partitionKeys().toArray(new String[0]),
options.legacyPartitionName());

return new TableCommitImpl(
store().newCommit(commitUser, this),
newExpireRunnable(),
Expand All @@ -459,7 +468,25 @@ public TableCommitImpl newCommit(String commitUser) {
options.snapshotExpireExecutionMode(),
name(),
options.forceCreatingSnapshot(),
options.fileOperationThreadNum());
options.fileOperationThreadNum(),
partitionStatisticsReporter(),
partitionComputer);
}

@Nullable
private PartitionStatisticsReporter partitionStatisticsReporter() {
CoreOptions options = coreOptions();
if (options.toConfiguration()
.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC)
.toMillis()
<= 0
|| partitionKeys().isEmpty()
|| !options.partitionedTableInMetastore()
|| catalogEnvironment().partitionHandler() == null) {
return null;
}

return new PartitionStatisticsReporter(this, catalogEnvironment().partitionHandler());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexPathFactory;
import org.apache.paimon.io.DataFileMeta;
Expand All @@ -37,7 +38,11 @@
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.FileOperationThreadPool;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;
import org.apache.paimon.utils.PartitionStatisticsReporter;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
Expand All @@ -54,6 +59,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -87,6 +93,8 @@ public class TableCommitImpl implements InnerTableCommit {
private final String tableName;
private final boolean forceCreatingSnapshot;
private final ThreadPoolExecutor fileCheckExecutor;
@Nullable private final PartitionStatisticsReporter reporter;
private final InternalRowPartitionComputer partitionComputer;

@Nullable private Map<String, String> overwritePartition = null;
private boolean batchCommitted = false;
Expand All @@ -102,7 +110,9 @@ public TableCommitImpl(
ExpireExecutionMode expireExecutionMode,
String tableName,
boolean forceCreatingSnapshot,
int threadNum) {
int threadNum,
PartitionStatisticsReporter partitionStatisticsReporter,
InternalRowPartitionComputer partitionComputer) {
if (partitionExpire != null) {
commit.withPartitionExpire(partitionExpire);
}
Expand All @@ -111,6 +121,8 @@ public TableCommitImpl(
this.expireSnapshots = expireSnapshots;
this.partitionExpire = partitionExpire;
this.tagAutoManager = tagAutoManager;
this.partitionComputer = partitionComputer;
this.reporter = partitionStatisticsReporter;

this.consumerExpireTime = consumerExpireTime;
this.consumerManager = consumerManager;
Expand Down Expand Up @@ -256,6 +268,7 @@ public void commitMultiple(List<ManifestCommittable> committables, boolean check
maintainExecutor,
newSnapshots > 0 || expireForEmptyCommit);
}
reportPartitionStats(committables);
}

public int filterAndCommitMultiple(List<ManifestCommittable> committables) {
Expand Down Expand Up @@ -398,6 +411,39 @@ public void expireSnapshots() {
}
}

private void reportPartitionStats(List<ManifestCommittable> committables) {
// pre-checks

if (reporter == null) {
return;
}
if (!batchCommitted) {
return;
}

long currentTime = System.currentTimeMillis();
LOG.info("Start to report partition statistics");
try {
// collect distinct partitions from all committables
List<BinaryRow> parts =
committables.stream()
.map(ManifestCommittable::fileCommittables)
.flatMap(List::stream)
.map(CommitMessage::partition)
.distinct()
.collect(Collectors.toList());
for (BinaryRow part : parts) {
LinkedHashMap<String, String> spec = partitionComputer.generatePartValues(part);
String partitionPath = PartitionPathUtils.generatePartitionPath(spec);
reporter.report(partitionPath, currentTime);
}
} catch (Throwable e) {
LOG.warn("Failed to report partition statistics after commit", e);
} finally {
IOUtils.closeQuietly(reporter);
}
}

@Override
public void close() throws Exception {
commit.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,50 +41,10 @@ trait WriteHelper extends Logging {
return
}

reportToHms(messages)
batchCreateTag()
markDoneIfNeeded(messages)
}

private def reportToHms(messages: Seq[CommitMessage]): Unit = {
val config = coreOptions.toConfiguration
if (
config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis <= 0 ||
table.partitionKeys.isEmpty ||
!coreOptions.partitionedTableInMetastore ||
table.catalogEnvironment.partitionHandler() == null
) {
return
}

val partitionComputer = new InternalRowPartitionComputer(
coreOptions.partitionDefaultName,
table.schema.logicalPartitionType,
table.partitionKeys.toArray(new Array[String](0)),
coreOptions.legacyPartitionName()
)
val hmsReporter = new PartitionStatisticsReporter(
table,
table.catalogEnvironment.partitionHandler()
)

val partitions = messages.map(_.partition()).distinct
val currentTime = System.currentTimeMillis()
try {
partitions.foreach {
partition =>
val partitionPath = PartitionPathUtils.generatePartitionPath(
partitionComputer.generatePartValues(partition))
hmsReporter.report(partitionPath, currentTime)
}
} catch {
case e: Throwable =>
logWarning("Failed to report to hms", e)
} finally {
hmsReporter.close()
}
}

private def batchCreateTag(): Unit = {
if (coreOptions.tagCreationMode() == TagCreationMode.BATCH) {
val tagCreation = new TagBatchCreation(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.catalog.CachingCatalog
import org.apache.paimon.client.ClientPool
import org.apache.paimon.fs.Path
import org.apache.paimon.spark.PaimonHiveTestBase
import org.apache.paimon.hive.HiveCatalog
import org.apache.paimon.spark.{PaimonHiveTestBase, SparkCatalog}
import org.apache.paimon.table.FileStoreTable

import org.apache.hadoop.hive.metastore.IMetaStoreClient
import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.thrift.TException
import org.junit.jupiter.api.Assertions

abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
Expand Down Expand Up @@ -773,6 +778,69 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
}
}

test("test report partition statistics: write/compaction") {
Seq(paimonHiveCatalogName).foreach {
catalogName =>
spark.sql(s"USE $catalogName")
withTable("paimon_tbl") {
spark.sql(s"""
|CREATE TABLE paimon_tbl (id STRING, name STRING, pt STRING)
|USING PAIMON
|TBLPROPERTIES (
|'partition.idle-time-to-report-statistic' = '1s',
|'metastore.partitioned-table' = 'true',
|'compaction.min.file-num' = '1'
|) PARTITIONED BY (pt)
|""".stripMargin)
spark.sql("insert into paimon_tbl values('1', 'jack', '2025')")

val tbl = "paimon_tbl"
val hiveCatalog = spark.sessionState.catalogManager.currentCatalog
.asInstanceOf[SparkCatalog]
.paimonCatalog()
.asInstanceOf[CachingCatalog]
.wrapped()
val clientField = classOf[HiveCatalog].getDeclaredField("clients")
clientField.setAccessible(true)

val clients =
clientField.get(hiveCatalog).asInstanceOf[ClientPool[IMetaStoreClient, TException]]

try {
val part = clients.run(
(client: IMetaStoreClient) => client.getPartition("default", tbl, "pt=2025"))
Assertions.assertEquals(1, part.getParameters.get("numFiles").toInt)
} catch {
case e: Exception => Assertions.fail(e.getMessage)
}

spark.sql("insert into paimon_tbl values('2', 'jack', '2025')")

try {
val part = clients.run(
(client: IMetaStoreClient) => client.getPartition("default", tbl, "pt=2025"))
Assertions.assertEquals(2, part.getParameters.get("numFiles").toInt)
} catch {
case e: Exception => Assertions.fail(e.getMessage)
}

spark.sql(s"""
CALL sys.compact(table => 'default.$tbl', partitions => "pt='2025'")""")

try {
val part = clients.run(
(client: IMetaStoreClient) => client.getPartition("default", tbl, "pt=2025"))
// The paimon latest snapshot should be 1, but the hive will get the partition statistic by hdfs client
// so it equal to 3. But it also triggered by paimon report action.
Assertions.assertEquals(3, part.getParameters.get("numFiles").toInt)
} catch {
case e: Exception => Assertions.fail(e.getMessage)
}
}
}

}

def getDatabaseProp(dbName: String, propertyName: String): String = {
spark
.sql(s"DESC DATABASE EXTENDED $dbName")
Expand Down