diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 147b87836946..c557ac61b615 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -100,6 +100,7 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; +import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; import org.apache.iotdb.db.trigger.executor.TriggerExecutor; @@ -223,6 +224,8 @@ protected void start() { ConfigNodeInfo.getInstance().storeConfigNodeList(); // Register this DataNode to the cluster when first start sendRegisterRequestToConfigNode(false); + // Clean up active load listening directories on first startup + ActiveLoadAgent.cleanupListeningDirectories(); } else { // Send restart request of this DataNode sendRestartRequestToConfigNode(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java index f060bd9a96f3..6065c349c8c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java @@ -19,8 +19,21 @@ package org.apache.iotdb.db.storageengine.load.active; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.conf.IoTDBDescriptor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + public class ActiveLoadAgent { + private static final Logger LOGGER = LoggerFactory.getLogger(ActiveLoadAgent.class); + private final ActiveLoadTsFileLoader activeLoadTsFileLoader; private final ActiveLoadDirScanner activeLoadDirScanner; private final ActiveLoadMetricsCollector activeLoadMetricsCollector; @@ -48,4 +61,81 @@ public synchronized void start() { activeLoadDirScanner.start(); activeLoadMetricsCollector.start(); } + + /** + * Clean up all listening directories for active load on DataNode first startup. This method will + * clean up all files and subdirectories in the listening directories, including: 1. Pending + * directories (configured by load_active_listening_dirs) 2. Pipe directory (for pipe data sync) + * 3. Failed directory (for failed files) + * + *

This method is called during DataNode startup and must not throw any exceptions to ensure + * startup can proceed normally. All exceptions are caught and logged internally. + */ + public static void cleanupListeningDirectories() { + try { + final List dirsToClean = new ArrayList<>(); + + dirsToClean.addAll( + Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs())); + + // Add pipe dir + dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir()); + + // Add failed dir + dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir()); + + // Clean up each directory + for (final String dirPath : dirsToClean) { + try { + if (dirPath == null || dirPath.isEmpty()) { + continue; + } + + final File dir = new File(dirPath); + + // Check if directory exists and is a directory + // These methods may throw SecurityException if access is denied + try { + if (!dir.exists() || !dir.isDirectory()) { + continue; + } + } catch (Exception e) { + LOGGER.debug("Failed to check directory: {}", dirPath, e); + continue; + } + + // Only delete contents inside the directory, not the directory itself + // listFiles() may throw SecurityException if access is denied + File[] files = null; + try { + files = dir.listFiles(); + } catch (Exception e) { + LOGGER.warn("Failed to list files in directory: {}", dirPath, e); + continue; + } + + if (files != null) { + for (final File file : files) { + // FileUtils.deleteFileOrDirectory internally calls file.isDirectory() and + // file.listFiles() without try-catch, so exceptions may propagate here. + // We need to catch it to prevent one file failure from stopping the cleanup. + try { + FileUtils.deleteFileOrDirectory(file, true); + } catch (Exception e) { + LOGGER.debug("Failed to delete file or directory: {}", file.getAbsolutePath(), e); + } + } + } + } catch (Exception e) { + LOGGER.warn("Failed to cleanup directory: {}", dirPath, e); + } + } + + LOGGER.info("Cleaned up active load listening directories"); + } catch (Throwable t) { + // Catch all exceptions and errors (including OutOfMemoryError, etc.) + // to ensure startup process is not affected + LOGGER.warn("Unexpected error during cleanup of active load listening directories", t); + } + } }