diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index a62c58442709..a6bc1d28adc5 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -390,7 +390,7 @@
data-file.external-paths.strategy |
none |
Enum |
- The strategy of selecting an external path when writing data.
Possible values:- "none": Do not choose any external storage, data will still be written to the default warehouse path.
- "specific-fs": Select a specific file system as the external path. Currently supported are S3 and OSS.
- "round-robin": When writing a new file, a path is chosen from data-file.external-paths in turn.
|
+ The strategy of selecting an external path when writing data.
Possible values:- "none": Do not choose any external storage, data will still be written to the default warehouse path.
- "specific-fs": Select a specific file system as the external path. Currently supported are S3 and OSS.
- "round-robin": When writing a new file, a path is chosen from data-file.external-paths in turn.
- "entropy-inject": When writing a new file, a path is chosen based on the hash value of the file's content.
|
data-file.path-directory |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 49585b7f9aa5..7331373eb71c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -3879,7 +3879,11 @@ public enum ExternalPathStrategy implements DescribedEnum {
ROUND_ROBIN(
"round-robin",
- "When writing a new file, a path is chosen from data-file.external-paths in turn.");
+ "When writing a new file, a path is chosen from data-file.external-paths in turn."),
+
+ ENTROPY_INJECT(
+ "entropy-inject",
+ "When writing a new file, a path is chosen based on the hash value of the file's content.");
private final String value;
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java b/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java
new file mode 100644
index 000000000000..5c1548fa6dcc
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.apache.paimon.shade.guava30.com.google.common.hash.HashCode;
+import org.apache.paimon.shade.guava30.com.google.common.hash.HashFunction;
+import org.apache.paimon.shade.guava30.com.google.common.hash.Hashing;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/** Provider for entropy inject external data paths. */
+public class EntropyInjectExternalPathProvider extends ExternalPathProvider {
+
+ private static final HashFunction HASH_FUNC = Hashing.murmur3_32();
+ private static final int HASH_BINARY_STRING_BITS = 20;
+ // Entropy generated will be divided into dirs with this lengths
+ private static final int ENTROPY_DIR_LENGTH = 4;
+ // Will create DEPTH many dirs from the entropy
+ private static final int ENTROPY_DIR_DEPTH = 3;
+
+ public EntropyInjectExternalPathProvider(
+ List externalTablePaths, Path relativeBucketPath) {
+ super(externalTablePaths, relativeBucketPath);
+ }
+
+ @Override
+ public Path getNextExternalDataPath(String fileName) {
+ String hashDirs = computeHash(fileName);
+ Path filePathWithHashDirs = new Path(relativeBucketPath, hashDirs + "/" + fileName);
+ position++;
+ if (position == externalTablePaths.size()) {
+ position = 0;
+ }
+ return new Path(externalTablePaths.get(position), filePathWithHashDirs);
+ }
+
+ public String computeHash(String fileName) {
+ HashCode hashCode = HASH_FUNC.hashString(fileName, StandardCharsets.UTF_8);
+ String hashAsBinaryString = Integer.toBinaryString(hashCode.asInt() | Integer.MIN_VALUE);
+ String hash =
+ hashAsBinaryString.substring(hashAsBinaryString.length() - HASH_BINARY_STRING_BITS);
+ return dirsFromHash(hash);
+ }
+
+ /**
+ * Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH.
+ *
+ * @param hash 10011001100110011001
+ * @return 1001/1001/1001/10011001 with depth 3 and length 4
+ */
+ private String dirsFromHash(String hash) {
+ StringBuilder hashWithDirs = new StringBuilder();
+
+ for (int i = 0; i < ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH; i += ENTROPY_DIR_LENGTH) {
+ if (i > 0) {
+ hashWithDirs.append("/");
+ }
+ hashWithDirs.append(hash, i, Math.min(i + ENTROPY_DIR_LENGTH, hash.length()));
+ }
+
+ if (hash.length() > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH) {
+ hashWithDirs
+ .append("/")
+ .append(hash, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, hash.length());
+ }
+
+ return hashWithDirs.toString();
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
index 720773d64f60..9d95b1d7e1c2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java
@@ -25,10 +25,10 @@
/** Provider for external data paths. */
public class ExternalPathProvider implements Serializable {
- private final List externalTablePaths;
- private final Path relativeBucketPath;
+ protected final List externalTablePaths;
+ protected final Path relativeBucketPath;
- private int position;
+ protected int position;
public ExternalPathProvider(List externalTablePaths, Path relativeBucketPath) {
this.externalTablePaths = externalTablePaths;
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 3c25c39c7013..4eea07a7e93a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -135,6 +135,7 @@ protected FileStorePathFactory pathFactory(CoreOptions options, String format) {
options.fileCompression(),
options.dataFilePathDirectory(),
createExternalPaths(),
+ options.externalPathStrategy(),
options.indexFileInDataFileDir());
}
@@ -168,7 +169,6 @@ private List createExternalPaths() {
paths.add(path);
}
}
-
checkArgument(!paths.isEmpty(), "External paths should not be empty");
return paths;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
index 31615669b072..fc8951036d24 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
@@ -65,7 +65,8 @@ public FormatTableFileWriter(
options.formatTableFileCompression(),
options.dataFilePathDirectory(),
null,
- false);
+ CoreOptions.ExternalPathStrategy.NONE,
+ options.indexFileInDataFileDir());
}
public void withWriteType(RowType writeType) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index d490e2e3f2f7..d95fecae4256 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.EntropyInjectExternalPathProvider;
import org.apache.paimon.fs.ExternalPathProvider;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexInDataFileDirPathFactory;
@@ -79,6 +80,7 @@ public class FileStorePathFactory {
private final AtomicInteger indexFileCount;
private final AtomicInteger statsFileCount;
private final List externalPaths;
+ private final CoreOptions.ExternalPathStrategy strategy;
public FileStorePathFactory(
Path root,
@@ -92,6 +94,7 @@ public FileStorePathFactory(
String fileCompression,
@Nullable String dataFilePathDirectory,
List externalPaths,
+ CoreOptions.ExternalPathStrategy strategy,
boolean indexFileInDataFileDir) {
this.root = root;
this.dataFilePathDirectory = dataFilePathDirectory;
@@ -112,6 +115,7 @@ public FileStorePathFactory(
this.indexFileCount = new AtomicInteger(0);
this.statsFileCount = new AtomicInteger(0);
this.externalPaths = externalPaths;
+ this.strategy = strategy;
}
public Path root() {
@@ -243,7 +247,10 @@ private ExternalPathProvider createExternalPathProvider(BinaryRow partition, int
if (externalPaths == null || externalPaths.isEmpty()) {
return null;
}
-
+ if (strategy == CoreOptions.ExternalPathStrategy.ENTROPY_INJECT) {
+ return new EntropyInjectExternalPathProvider(
+ externalPaths, relativeBucketPath(partition, bucket));
+ }
return new ExternalPathProvider(externalPaths, relativeBucketPath(partition, bucket));
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
index 9c2106bf62b8..9fbe7b9510cc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java
@@ -172,6 +172,7 @@ protected void foreachIndexReader(
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
+ CoreOptions.ExternalPathStrategy.NONE,
false);
Table table = fileSystemCatalog.getTable(Identifier.create(tableName, tableName));
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
index 0c4833686fed..266e6b4dc900 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
@@ -19,11 +19,15 @@
package org.apache.paimon.io;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.fs.EntropyInjectExternalPathProvider;
import org.apache.paimon.fs.Path;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.util.Collections;
+import java.util.List;
+
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link DataFilePathFactory}. */
@@ -91,4 +95,68 @@ public void testWithPartition() {
new Path(tempDir.toString() + "/dt=20211224/bucket-123/my-data-file-name")
.toString());
}
+
+ @Test
+ public void testEntropyInjectWithNoPartition() {
+ EntropyInjectExternalPathProvider externalPathProvider =
+ createExternalPathProvider(new Path(tempDir.toString()), "bucket-123");
+ DataFilePathFactory pathFactory =
+ new DataFilePathFactory(
+ new Path(tempDir + "/bucket-123"),
+ CoreOptions.FILE_FORMAT.defaultValue(),
+ CoreOptions.DATA_FILE_PREFIX.defaultValue(),
+ CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+ CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue(),
+ externalPathProvider);
+ String uuid = pathFactory.uuid();
+
+ for (int i = 0; i < 20; i++) {
+ String filename =
+ "data-" + uuid + "-" + i + "." + CoreOptions.FILE_FORMAT.defaultValue();
+ assertThat(pathFactory.newPath())
+ .isEqualTo(
+ new Path(
+ tempDir.toString()
+ + "/bucket-123/"
+ + externalPathProvider.computeHash(filename)
+ + "/"
+ + filename));
+ }
+ }
+
+ @Test
+ public void testEntropyInjectWithPartition() {
+ EntropyInjectExternalPathProvider externalPathProvider =
+ createExternalPathProvider(new Path(tempDir.toString()), "dt=20211224/bucket-123");
+ DataFilePathFactory pathFactory =
+ new DataFilePathFactory(
+ new Path(tempDir + "/dt=20211224/bucket-123"),
+ CoreOptions.FILE_FORMAT.defaultValue(),
+ CoreOptions.DATA_FILE_PREFIX.defaultValue(),
+ CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+ CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue(),
+ externalPathProvider);
+ String uuid = pathFactory.uuid();
+
+ for (int i = 0; i < 20; i++) {
+ String filename =
+ "data-" + uuid + "-" + i + "." + CoreOptions.FILE_FORMAT.defaultValue();
+ assertThat(pathFactory.newPath())
+ .isEqualTo(
+ new Path(
+ tempDir.toString()
+ + "/dt=20211224/bucket-123/"
+ + externalPathProvider.computeHash(filename)
+ + "/"
+ + filename));
+ }
+ }
+
+ public EntropyInjectExternalPathProvider createExternalPathProvider(
+ Path path, String relativeBucketPath) {
+ List externalPaths = Collections.singletonList(path);
+ return new EntropyInjectExternalPathProvider(externalPaths, new Path(relativeBucketPath));
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 97d863892907..375ca7805fd1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -239,6 +239,7 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
+ CoreOptions.ExternalPathStrategy.NONE,
false);
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
@@ -261,6 +262,7 @@ public FileStorePathFactory apply(String format) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
+ CoreOptions.ExternalPathStrategy.NONE,
false);
}
};
diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index 592c27917d3a..ead73499ffc7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -155,6 +155,7 @@ protected ManifestFile createManifestFile(String pathStr) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
+ CoreOptions.ExternalPathStrategy.NONE,
false),
Long.MAX_VALUE,
null)
diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index 2e45d904a5bb..fc19f6262d91 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -143,6 +143,7 @@ private ManifestFile createManifestFile(String pathStr) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
+ CoreOptions.ExternalPathStrategy.NONE,
false);
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index e3c63f13eb40..d719a6d5cdd4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -175,6 +175,7 @@ private FileStorePathFactory createPathFactory(String pathStr) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
+ CoreOptions.ExternalPathStrategy.NONE,
false);
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
index 7e7b36d7c59a..eb7e9f88b6f9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
@@ -94,6 +94,7 @@ public void testCreateDataFilePathFactoryWithPartition() {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
+ CoreOptions.ExternalPathStrategy.NONE,
false);
assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
@@ -138,6 +139,7 @@ public static FileStorePathFactory createNonPartFactory(Path root) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
+ CoreOptions.ExternalPathStrategy.NONE,
false);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index a35bd3025bf7..fc7c7b7a10d2 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -237,6 +237,17 @@ public void testBatchReadWriteWithPartitionedRecordsWithPkWithExternalPathSpecif
checkExternalPathTestResult(options, externalPath1.toString());
}
+ @Test
+ public void
+ testBatchReadWriteWithPartitionedRecordsWithPkWithExternalPathEntropyInjectStrategy()
+ throws Exception {
+ Map options = new HashMap<>();
+ options.put(
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "file://" + externalPath1.toString());
+ options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "entropy-inject");
+ checkExternalPathTestResult(options, externalPath1.toString());
+ }
+
public void checkExternalPathTestResult(Map options, String externalPath)
throws Exception {
List initialRecords =
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index cb7a854286a7..d2fbf63eea3e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -112,6 +112,7 @@ public TestChangelogDataReadWrite(String root) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
+ CoreOptions.ExternalPathStrategy.NONE,
false);
this.snapshotManager = newSnapshotManager(LocalFileIO.create(), new Path(root));
this.commitUser = UUID.randomUUID().toString();
diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index b4ccd81bf7be..364a33934ae3 100644
--- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -210,6 +210,7 @@ protected void foreachIndexReader(String tableName, Consumer co
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
+ CoreOptions.ExternalPathStrategy.NONE,
false);
Table table = fileSystemCatalog.getTable(Identifier.create("db", tableName));