Skip to content

Commit 85037c6

Browse files
committed
IoTConsensus hashes the group ID to assign to fixed directories.
1 parent 45ebc47 commit 85037c6

File tree

3 files changed

+108
-20
lines changed

3 files changed

+108
-20
lines changed

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java

Lines changed: 72 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import java.util.ArrayList;
7777
import java.util.Arrays;
7878
import java.util.Collections;
79+
import java.util.HashMap;
7980
import java.util.List;
8081
import java.util.Map;
8182
import java.util.Optional;
@@ -173,29 +174,76 @@ private void initAndRecover() throws IOException {
173174
if (!storageDir.mkdirs()) {
174175
throw new IOException(String.format("Unable to create consensus dir at %s", storageDir));
175176
}
176-
} else {
177+
}
178+
}
179+
180+
Map<ConsensusGroupId, String> consensusGroupDirs = new HashMap<>();
181+
182+
for (String baseFolder : folderManager.getFolders()) {
183+
File storageDir = new File(baseFolder);
184+
if (storageDir.exists()) {
177185
try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
178186
for (Path path : stream) {
187+
if (!Files.isDirectory(path)) {
188+
continue;
189+
}
190+
179191
String[] items = path.getFileName().toString().split("_");
180-
ConsensusGroupId consensusGroupId =
181-
ConsensusGroupId.Factory.create(
182-
Integer.parseInt(items[0]), Integer.parseInt(items[1]));
183-
IoTConsensusServerImpl consensus =
184-
new IoTConsensusServerImpl(
185-
path.toString(),
186-
new Peer(consensusGroupId, thisNodeId, thisNode),
187-
new TreeSet<>(),
188-
registry.apply(consensusGroupId),
189-
backgroundTaskService,
190-
clientManager,
191-
syncClientManager,
192-
config);
193-
stateMachineMap.put(consensusGroupId, consensus);
192+
if (items.length != 2) {
193+
continue; // Skip directories that do not meet the naming convention.
194+
}
195+
196+
try {
197+
ConsensusGroupId consensusGroupId =
198+
ConsensusGroupId.Factory.create(
199+
Integer.parseInt(items[0]), Integer.parseInt(items[1]));
200+
String expectedFolder;
201+
try {
202+
expectedFolder = folderManager.getFolderByHashId(consensusGroupId.getId());
203+
} catch (DiskSpaceInsufficientException e) {
204+
logger.warn(
205+
"Cannot determine expected folder for group {}, skipping", consensusGroupId);
206+
continue;
207+
}
208+
209+
// Use this directory if it is in the correct baseFolder where the consensus group
210+
// should be located.
211+
if (baseFolder.equals(expectedFolder)) {
212+
consensusGroupDirs.put(consensusGroupId, path.toString());
213+
} else {
214+
logger.warn(
215+
"Found consensus group {} in wrong folder: {} (should be in {}), skipping",
216+
consensusGroupId,
217+
baseFolder,
218+
expectedFolder);
219+
}
220+
} catch (NumberFormatException e) {
221+
logger.warn("Invalid consensus group directory name: {}", path.getFileName());
222+
}
194223
}
224+
} catch (IOException e) {
225+
logger.error("Error scanning directory: {}", baseFolder, e);
195226
}
196227
}
197228
}
198229

230+
for (Map.Entry<ConsensusGroupId, String> entry : consensusGroupDirs.entrySet()) {
231+
ConsensusGroupId consensusGroupId = entry.getKey();
232+
String path = entry.getValue();
233+
234+
IoTConsensusServerImpl consensus =
235+
new IoTConsensusServerImpl(
236+
path,
237+
new Peer(consensusGroupId, thisNodeId, thisNode),
238+
new TreeSet<>(),
239+
registry.apply(consensusGroupId),
240+
backgroundTaskService,
241+
clientManager,
242+
syncClientManager,
243+
config);
244+
stateMachineMap.put(consensusGroupId, consensus);
245+
}
246+
199247
if (correctPeerListBeforeStart != null) {
200248
BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
201249
(consensusGroupId, peers) -> {
@@ -284,7 +332,7 @@ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
284332

285333
String path = null;
286334
try {
287-
path = buildPeerDir(folderManager.getNextFolder(), groupId);
335+
path = buildPeerDir(folderManager.getFolderByHashId(groupId.getId()), groupId);
288336
} catch (DiskSpaceInsufficientException e) {
289337
logger.warn(
290338
"Failed to create consensus directory for group {} due to disk space insufficiency: {}",
@@ -293,6 +341,7 @@ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
293341
return null;
294342
}
295343
File file = new File(path);
344+
// debug print the path of consensus dir
296345
System.out.println(file.getAbsolutePath());
297346
if (!file.mkdirs()) {
298347
logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
@@ -336,8 +385,13 @@ public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException
336385
if (!exist.get()) {
337386
throw new ConsensusGroupNotExistException(groupId);
338387
}
339-
for (String folder : folderManager.getFolders()) {
340-
FileUtils.deleteFileOrDirectory(new File(buildPeerDir(folder, groupId)));
388+
try {
389+
FileUtils.deleteFileOrDirectory(
390+
new File(buildPeerDir(folderManager.getFolderByHashId(groupId.getId()), groupId)));
391+
} catch (DiskSpaceInsufficientException e) {
392+
logger.warn(
393+
"Failed to delete consensus directory for group {} due to : {}", groupId, e.getMessage());
394+
throw new ConsensusException(e);
341395
}
342396
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
343397
}

iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,8 +262,10 @@ public void multiRegionSnapshotTest() throws ConsensusException, DiskSpaceInsuff
262262
}
263263

264264
for (int i = 0; i < folders.length; i++) {
265-
File dataDir = new File(IoTConsensus.buildPeerDir(folders[i], dataRegionIds[i]));
266-
System.out.println(dataDir.getAbsolutePath());
265+
File dataDir =
266+
new File(
267+
IoTConsensus.buildPeerDir(
268+
folders[dataRegionIds[i].getId() % folders.length], dataRegionIds[i]));
267269
File[] versionFiles1 =
268270
dataDir.listFiles(
269271
(dir, name) -> name.startsWith(IoTConsensusServerImpl.SNAPSHOT_DIR_NAME));

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/FolderManager.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.HashMap;
3636
import java.util.List;
3737
import java.util.Map;
38+
import java.util.stream.Collectors;
3839

3940
public class FolderManager {
4041
private static final Logger logger = LoggerFactory.getLogger(FolderManager.class);
@@ -104,6 +105,37 @@ public String getNextFolder() throws DiskSpaceInsufficientException {
104105
}
105106
}
106107

108+
/**
109+
* Get folder by hash ID (typically consensus group ID), distributing as evenly as possible among
110+
* healthy folders using hash-based distribution
111+
*
112+
* @param hashId the hash ID (typically consensus group ID) used to determine folder selection
113+
* @return the selected folder path
114+
* @throws DiskSpaceInsufficientException if no healthy folders are available
115+
*/
116+
public String getFolderByHashId(int hashId) throws DiskSpaceInsufficientException {
117+
if (folders.isEmpty()) {
118+
throw new DiskSpaceInsufficientException(folders);
119+
}
120+
121+
List<String> healthyFolders =
122+
folders.stream()
123+
.filter(
124+
folder ->
125+
foldersStates.getOrDefault(folder, FolderState.ABNORMAL) == FolderState.HEALTHY)
126+
.collect(Collectors.toList());
127+
128+
if (healthyFolders.isEmpty()) {
129+
logger.error("No healthy folders available, change system mode to read-only.");
130+
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
131+
CommonDescriptor.getInstance().getConfig().setStatusReason(NodeStatus.DISK_FULL);
132+
throw new DiskSpaceInsufficientException(folders);
133+
}
134+
135+
int index = Math.abs(hashId) % healthyFolders.size();
136+
return healthyFolders.get(index);
137+
}
138+
107139
boolean hasHealthyFolder() {
108140
return folders.stream()
109141
.anyMatch(

0 commit comments

Comments
 (0)