diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 147b87836946..c557ac61b615 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -100,6 +100,7 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; +import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; import org.apache.iotdb.db.trigger.executor.TriggerExecutor; @@ -223,6 +224,8 @@ protected void start() { ConfigNodeInfo.getInstance().storeConfigNodeList(); // Register this DataNode to the cluster when first start sendRegisterRequestToConfigNode(false); + // Clean up active load listening directories on first startup + ActiveLoadAgent.cleanupListeningDirectories(); } else { // Send restart request of this DataNode sendRestartRequestToConfigNode(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java index f060bd9a96f3..6065c349c8c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java @@ -19,8 +19,21 @@ package org.apache.iotdb.db.storageengine.load.active; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.conf.IoTDBDescriptor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + public class ActiveLoadAgent { + private static final Logger LOGGER = LoggerFactory.getLogger(ActiveLoadAgent.class); + private final ActiveLoadTsFileLoader activeLoadTsFileLoader; private final ActiveLoadDirScanner activeLoadDirScanner; private final ActiveLoadMetricsCollector activeLoadMetricsCollector; @@ -48,4 +61,81 @@ public synchronized void start() { activeLoadDirScanner.start(); activeLoadMetricsCollector.start(); } + + /** + * Clean up all listening directories for active load on DataNode first startup. This method will + * clean up all files and subdirectories in the listening directories, including: 1. Pending + * directories (configured by load_active_listening_dirs) 2. Pipe directory (for pipe data sync) + * 3. Failed directory (for failed files) + * + *
This method is called during DataNode startup and must not throw any exceptions to ensure
+ * startup can proceed normally. All exceptions are caught and logged internally.
+ */
+ public static void cleanupListeningDirectories() {
+ try {
+ final List