From 6dcacdf91e916de6b699ea5155259ab564113e8c Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 3 Dec 2025 16:45:27 +0800 Subject: [PATCH 1/3] Add cleanup for active load listening directories on DataNode first startup - Add cleanupListeningDirectories() method in ActiveLoadAgent to clean up all listening directories - Call cleanup method when DataNode starts for the first time - Clean up pending, pipe, and failed directories - Silent execution with minimal logging # Conflicts: # iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java --- .../org/apache/iotdb/db/service/DataNode.java | 3 + .../load/active/ActiveLoadAgent.java | 110 ++++++++++++++++++ 2 files changed, 113 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 147b878369463..c557ac61b615e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -100,6 +100,7 @@ import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; +import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; import org.apache.iotdb.db.trigger.executor.TriggerExecutor; @@ -223,6 +224,8 @@ protected void start() { ConfigNodeInfo.getInstance().storeConfigNodeList(); // Register this DataNode to the cluster when first start sendRegisterRequestToConfigNode(false); + // Clean up active load listening directories on first startup + ActiveLoadAgent.cleanupListeningDirectories(); } else { // Send restart request of this DataNode sendRestartRequestToConfigNode(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java index f060bd9a96f3c..8a49a4bc95cdc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java @@ -19,8 +19,26 @@ package org.apache.iotdb.db.storageengine.load.active; +import org.apache.iotdb.db.conf.IoTDBDescriptor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + public class ActiveLoadAgent { + private static final Logger LOGGER = LoggerFactory.getLogger(ActiveLoadAgent.class); + private final ActiveLoadTsFileLoader activeLoadTsFileLoader; private final ActiveLoadDirScanner activeLoadDirScanner; private final ActiveLoadMetricsCollector activeLoadMetricsCollector; @@ -48,4 +66,96 @@ public synchronized void start() { activeLoadDirScanner.start(); activeLoadMetricsCollector.start(); } + + /** + * Clean up all listening directories for active load on DataNode first startup. This method will + * clean up all files and subdirectories in the listening directories, including: 1. Pending + * directories (configured by load_active_listening_dirs) 2. Pipe directory (for pipe data sync) + * 3. Failed directory (for failed files) + */ + public static void cleanupListeningDirectories() { + try { + final Set dirsToClean = new HashSet<>(); + + try { + // Add configured listening dirs + if (IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) { + dirsToClean.addAll( + Arrays.asList( + IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs())); + } + + // Add pipe dir + dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir()); + + // Add failed dir + dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir()); + } catch (Exception e) { + return; + } + + int totalFilesDeleted = 0; + int totalSubDirsDeleted = 0; + + for (final String dirPath : dirsToClean) { + try { + final File dir = new File(dirPath); + + if (!dir.exists() || !dir.isDirectory()) { + continue; + } + + final long[] fileCount = {0}; + final long[] subdirCount = {0}; + + Files.walkFileTree( + dir.toPath(), + new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + try { + Files.delete(file); + fileCount[0]++; + } catch (Exception e) { + // Ignore + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) { + if (exc == null && !dir.toFile().getAbsolutePath().equals(dirPath)) { + try { + Files.delete(dir); + subdirCount[0]++; + } catch (Exception e) { + // Ignore + } + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(Path file, IOException exc) { + return FileVisitResult.CONTINUE; + } + }); + + totalFilesDeleted += fileCount[0]; + totalSubDirsDeleted += subdirCount[0]; + } catch (Exception e) { + // Ignore + } + } + + if (totalFilesDeleted > 0 || totalSubDirsDeleted > 0) { + LOGGER.info( + "Cleaned up active load listening directories, deleted {} files and {} subdirectories", + totalFilesDeleted, + totalSubDirsDeleted); + } + } catch (Throwable t) { + // Ignore all errors to prevent any unexpected behavior + } + } } From b6ad97cd6dc7d759e28076f705c61712722ea208 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 4 Dec 2025 18:00:06 +0800 Subject: [PATCH 2/3] update --- .../load/active/ActiveLoadAgent.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java index 8a49a4bc95cdc..f502128de9d0e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java @@ -91,6 +91,7 @@ public static void cleanupListeningDirectories() { // Add failed dir dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir()); } catch (Exception e) { + LOGGER.warn("Failed to get active load listening directories configuration", e); return; } @@ -105,6 +106,9 @@ public static void cleanupListeningDirectories() { continue; } + // Convert to absolute path for comparison + final String absoluteDirPath = dir.getAbsolutePath(); + final long[] fileCount = {0}; final long[] subdirCount = {0}; @@ -117,19 +121,26 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { Files.delete(file); fileCount[0]++; } catch (Exception e) { - // Ignore + LOGGER.debug("Failed to delete file: {}", file.toAbsolutePath(), e); } return FileVisitResult.CONTINUE; } @Override - public FileVisitResult postVisitDirectory(Path dir, IOException exc) { - if (exc == null && !dir.toFile().getAbsolutePath().equals(dirPath)) { + public FileVisitResult postVisitDirectory(Path subDir, IOException exc) { + if (exc != null) { + LOGGER.debug( + "Error occurred while visiting directory: {}", + subDir.toAbsolutePath(), + exc); + return FileVisitResult.CONTINUE; + } + if (!subDir.toFile().getAbsolutePath().equals(absoluteDirPath)) { try { - Files.delete(dir); + Files.delete(subDir); subdirCount[0]++; } catch (Exception e) { - // Ignore + LOGGER.debug("Failed to delete directory: {}", subDir.toAbsolutePath(), e); } } return FileVisitResult.CONTINUE; @@ -137,6 +148,7 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) { @Override public FileVisitResult visitFileFailed(Path file, IOException exc) { + LOGGER.debug("Failed to visit file: {}", file.toAbsolutePath(), exc); return FileVisitResult.CONTINUE; } }); @@ -144,7 +156,7 @@ public FileVisitResult visitFileFailed(Path file, IOException exc) { totalFilesDeleted += fileCount[0]; totalSubDirsDeleted += subdirCount[0]; } catch (Exception e) { - // Ignore + LOGGER.warn("Failed to cleanup directory: {}", dirPath, e); } } @@ -155,7 +167,7 @@ public FileVisitResult visitFileFailed(Path file, IOException exc) { totalSubDirsDeleted); } } catch (Throwable t) { - // Ignore all errors to prevent any unexpected behavior + LOGGER.warn("Unexpected error during cleanup of active load listening directories", t); } } } From 48d4da1d938fb96994a3e029393f6c26f9576153 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Mon, 8 Dec 2025 18:24:12 +0800 Subject: [PATCH 3/3] fix --- .../load/active/ActiveLoadAgent.java | 134 +++++++----------- 1 file changed, 51 insertions(+), 83 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java index f502128de9d0e..6065c349c8c54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java @@ -19,21 +19,16 @@ package org.apache.iotdb.db.storageengine.load.active; +import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; -import java.nio.file.FileVisitResult; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; +import java.util.List; public class ActiveLoadAgent { @@ -72,101 +67,74 @@ public synchronized void start() { * clean up all files and subdirectories in the listening directories, including: 1. Pending * directories (configured by load_active_listening_dirs) 2. Pipe directory (for pipe data sync) * 3. Failed directory (for failed files) + * + *

This method is called during DataNode startup and must not throw any exceptions to ensure + * startup can proceed normally. All exceptions are caught and logged internally. */ public static void cleanupListeningDirectories() { try { - final Set dirsToClean = new HashSet<>(); - - try { - // Add configured listening dirs - if (IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningEnable()) { - dirsToClean.addAll( - Arrays.asList( - IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs())); - } + final List dirsToClean = new ArrayList<>(); - // Add pipe dir - dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir()); + dirsToClean.addAll( + Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs())); - // Add failed dir - dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir()); - } catch (Exception e) { - LOGGER.warn("Failed to get active load listening directories configuration", e); - return; - } + // Add pipe dir + dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir()); - int totalFilesDeleted = 0; - int totalSubDirsDeleted = 0; + // Add failed dir + dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir()); + // Clean up each directory for (final String dirPath : dirsToClean) { try { + if (dirPath == null || dirPath.isEmpty()) { + continue; + } + final File dir = new File(dirPath); - if (!dir.exists() || !dir.isDirectory()) { + // Check if directory exists and is a directory + // These methods may throw SecurityException if access is denied + try { + if (!dir.exists() || !dir.isDirectory()) { + continue; + } + } catch (Exception e) { + LOGGER.debug("Failed to check directory: {}", dirPath, e); continue; } - // Convert to absolute path for comparison - final String absoluteDirPath = dir.getAbsolutePath(); - - final long[] fileCount = {0}; - final long[] subdirCount = {0}; - - Files.walkFileTree( - dir.toPath(), - new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { - try { - Files.delete(file); - fileCount[0]++; - } catch (Exception e) { - LOGGER.debug("Failed to delete file: {}", file.toAbsolutePath(), e); - } - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path subDir, IOException exc) { - if (exc != null) { - LOGGER.debug( - "Error occurred while visiting directory: {}", - subDir.toAbsolutePath(), - exc); - return FileVisitResult.CONTINUE; - } - if (!subDir.toFile().getAbsolutePath().equals(absoluteDirPath)) { - try { - Files.delete(subDir); - subdirCount[0]++; - } catch (Exception e) { - LOGGER.debug("Failed to delete directory: {}", subDir.toAbsolutePath(), e); - } - } - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult visitFileFailed(Path file, IOException exc) { - LOGGER.debug("Failed to visit file: {}", file.toAbsolutePath(), exc); - return FileVisitResult.CONTINUE; - } - }); - - totalFilesDeleted += fileCount[0]; - totalSubDirsDeleted += subdirCount[0]; + // Only delete contents inside the directory, not the directory itself + // listFiles() may throw SecurityException if access is denied + File[] files = null; + try { + files = dir.listFiles(); + } catch (Exception e) { + LOGGER.warn("Failed to list files in directory: {}", dirPath, e); + continue; + } + + if (files != null) { + for (final File file : files) { + // FileUtils.deleteFileOrDirectory internally calls file.isDirectory() and + // file.listFiles() without try-catch, so exceptions may propagate here. + // We need to catch it to prevent one file failure from stopping the cleanup. + try { + FileUtils.deleteFileOrDirectory(file, true); + } catch (Exception e) { + LOGGER.debug("Failed to delete file or directory: {}", file.getAbsolutePath(), e); + } + } + } } catch (Exception e) { LOGGER.warn("Failed to cleanup directory: {}", dirPath, e); } } - if (totalFilesDeleted > 0 || totalSubDirsDeleted > 0) { - LOGGER.info( - "Cleaned up active load listening directories, deleted {} files and {} subdirectories", - totalFilesDeleted, - totalSubDirsDeleted); - } + LOGGER.info("Cleaned up active load listening directories"); } catch (Throwable t) { + // Catch all exceptions and errors (including OutOfMemoryError, etc.) + // to ensure startup process is not affected LOGGER.warn("Unexpected error during cleanup of active load listening directories", t); } }