diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index a62c58442709..a6bc1d28adc5 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -390,7 +390,7 @@
data-file.external-paths.strategy
none

Enum

- The strategy of selecting an external path when writing data.

Possible values: + The strategy of selecting an external path when writing data.

Possible values:
data-file.path-directory
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 49585b7f9aa5..7331373eb71c 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -3879,7 +3879,11 @@ public enum ExternalPathStrategy implements DescribedEnum { ROUND_ROBIN( "round-robin", - "When writing a new file, a path is chosen from data-file.external-paths in turn."); + "When writing a new file, a path is chosen from data-file.external-paths in turn."), + + ENTROPY_INJECT( + "entropy-inject", + "When writing a new file, a path is chosen based on the hash value of the file's content."); private final String value; diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java b/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java new file mode 100644 index 000000000000..5c1548fa6dcc --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/EntropyInjectExternalPathProvider.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.shade.guava30.com.google.common.hash.HashCode; +import org.apache.paimon.shade.guava30.com.google.common.hash.HashFunction; +import org.apache.paimon.shade.guava30.com.google.common.hash.Hashing; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** Provider for entropy inject external data paths. */ +public class EntropyInjectExternalPathProvider extends ExternalPathProvider { + + private static final HashFunction HASH_FUNC = Hashing.murmur3_32(); + private static final int HASH_BINARY_STRING_BITS = 20; + // Entropy generated will be divided into dirs with this lengths + private static final int ENTROPY_DIR_LENGTH = 4; + // Will create DEPTH many dirs from the entropy + private static final int ENTROPY_DIR_DEPTH = 3; + + public EntropyInjectExternalPathProvider( + List externalTablePaths, Path relativeBucketPath) { + super(externalTablePaths, relativeBucketPath); + } + + @Override + public Path getNextExternalDataPath(String fileName) { + String hashDirs = computeHash(fileName); + Path filePathWithHashDirs = new Path(relativeBucketPath, hashDirs + "/" + fileName); + position++; + if (position == externalTablePaths.size()) { + position = 0; + } + return new Path(externalTablePaths.get(position), filePathWithHashDirs); + } + + public String computeHash(String fileName) { + HashCode hashCode = HASH_FUNC.hashString(fileName, StandardCharsets.UTF_8); + String hashAsBinaryString = Integer.toBinaryString(hashCode.asInt() | Integer.MIN_VALUE); + String hash = + hashAsBinaryString.substring(hashAsBinaryString.length() - HASH_BINARY_STRING_BITS); + return dirsFromHash(hash); + } + + /** + * Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH. + * + * @param hash 10011001100110011001 + * @return 1001/1001/1001/10011001 with depth 3 and length 4 + */ + private String dirsFromHash(String hash) { + StringBuilder hashWithDirs = new StringBuilder(); + + for (int i = 0; i < ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH; i += ENTROPY_DIR_LENGTH) { + if (i > 0) { + hashWithDirs.append("/"); + } + hashWithDirs.append(hash, i, Math.min(i + ENTROPY_DIR_LENGTH, hash.length())); + } + + if (hash.length() > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH) { + hashWithDirs + .append("/") + .append(hash, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, hash.length()); + } + + return hashWithDirs.toString(); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java index 720773d64f60..9d95b1d7e1c2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/ExternalPathProvider.java @@ -25,10 +25,10 @@ /** Provider for external data paths. */ public class ExternalPathProvider implements Serializable { - private final List externalTablePaths; - private final Path relativeBucketPath; + protected final List externalTablePaths; + protected final Path relativeBucketPath; - private int position; + protected int position; public ExternalPathProvider(List externalTablePaths, Path relativeBucketPath) { this.externalTablePaths = externalTablePaths; diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 3c25c39c7013..4eea07a7e93a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -135,6 +135,7 @@ protected FileStorePathFactory pathFactory(CoreOptions options, String format) { options.fileCompression(), options.dataFilePathDirectory(), createExternalPaths(), + options.externalPathStrategy(), options.indexFileInDataFileDir()); } @@ -168,7 +169,6 @@ private List createExternalPaths() { paths.add(path); } } - checkArgument(!paths.isEmpty(), "External paths should not be empty"); return paths; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java index 31615669b072..fc8951036d24 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java @@ -65,7 +65,8 @@ public FormatTableFileWriter( options.formatTableFileCompression(), options.dataFilePathDirectory(), null, - false); + CoreOptions.ExternalPathStrategy.NONE, + options.indexFileInDataFileDir()); } public void withWriteType(RowType writeType) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java index d490e2e3f2f7..d95fecae4256 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fs.EntropyInjectExternalPathProvider; import org.apache.paimon.fs.ExternalPathProvider; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexInDataFileDirPathFactory; @@ -79,6 +80,7 @@ public class FileStorePathFactory { private final AtomicInteger indexFileCount; private final AtomicInteger statsFileCount; private final List externalPaths; + private final CoreOptions.ExternalPathStrategy strategy; public FileStorePathFactory( Path root, @@ -92,6 +94,7 @@ public FileStorePathFactory( String fileCompression, @Nullable String dataFilePathDirectory, List externalPaths, + CoreOptions.ExternalPathStrategy strategy, boolean indexFileInDataFileDir) { this.root = root; this.dataFilePathDirectory = dataFilePathDirectory; @@ -112,6 +115,7 @@ public FileStorePathFactory( this.indexFileCount = new AtomicInteger(0); this.statsFileCount = new AtomicInteger(0); this.externalPaths = externalPaths; + this.strategy = strategy; } public Path root() { @@ -243,7 +247,10 @@ private ExternalPathProvider createExternalPathProvider(BinaryRow partition, int if (externalPaths == null || externalPaths.isEmpty()) { return null; } - + if (strategy == CoreOptions.ExternalPathStrategy.ENTROPY_INJECT) { + return new EntropyInjectExternalPathProvider( + externalPaths, relativeBucketPath(partition, bucket)); + } return new ExternalPathProvider(externalPaths, relativeBucketPath(partition, bucket)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java index 9c2106bf62b8..9fbe7b9510cc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileIndexWriterTest.java @@ -172,6 +172,7 @@ protected void foreachIndexReader( CoreOptions.FILE_COMPRESSION.defaultValue(), null, null, + CoreOptions.ExternalPathStrategy.NONE, false); Table table = fileSystemCatalog.getTable(Identifier.create(tableName, tableName)); diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java index 0c4833686fed..266e6b4dc900 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java @@ -19,11 +19,15 @@ package org.apache.paimon.io; import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.EntropyInjectExternalPathProvider; import org.apache.paimon.fs.Path; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.util.Collections; +import java.util.List; + import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link DataFilePathFactory}. */ @@ -91,4 +95,68 @@ public void testWithPartition() { new Path(tempDir.toString() + "/dt=20211224/bucket-123/my-data-file-name") .toString()); } + + @Test + public void testEntropyInjectWithNoPartition() { + EntropyInjectExternalPathProvider externalPathProvider = + createExternalPathProvider(new Path(tempDir.toString()), "bucket-123"); + DataFilePathFactory pathFactory = + new DataFilePathFactory( + new Path(tempDir + "/bucket-123"), + CoreOptions.FILE_FORMAT.defaultValue(), + CoreOptions.DATA_FILE_PREFIX.defaultValue(), + CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), + CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), + CoreOptions.FILE_COMPRESSION.defaultValue(), + externalPathProvider); + String uuid = pathFactory.uuid(); + + for (int i = 0; i < 20; i++) { + String filename = + "data-" + uuid + "-" + i + "." + CoreOptions.FILE_FORMAT.defaultValue(); + assertThat(pathFactory.newPath()) + .isEqualTo( + new Path( + tempDir.toString() + + "/bucket-123/" + + externalPathProvider.computeHash(filename) + + "/" + + filename)); + } + } + + @Test + public void testEntropyInjectWithPartition() { + EntropyInjectExternalPathProvider externalPathProvider = + createExternalPathProvider(new Path(tempDir.toString()), "dt=20211224/bucket-123"); + DataFilePathFactory pathFactory = + new DataFilePathFactory( + new Path(tempDir + "/dt=20211224/bucket-123"), + CoreOptions.FILE_FORMAT.defaultValue(), + CoreOptions.DATA_FILE_PREFIX.defaultValue(), + CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(), + CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(), + CoreOptions.FILE_COMPRESSION.defaultValue(), + externalPathProvider); + String uuid = pathFactory.uuid(); + + for (int i = 0; i < 20; i++) { + String filename = + "data-" + uuid + "-" + i + "." + CoreOptions.FILE_FORMAT.defaultValue(); + assertThat(pathFactory.newPath()) + .isEqualTo( + new Path( + tempDir.toString() + + "/dt=20211224/bucket-123/" + + externalPathProvider.computeHash(filename) + + "/" + + filename)); + } + } + + public EntropyInjectExternalPathProvider createExternalPathProvider( + Path path, String relativeBucketPath) { + List externalPaths = Collections.singletonList(path); + return new EntropyInjectExternalPathProvider(externalPaths, new Path(relativeBucketPath)); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 97d863892907..375ca7805fd1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -239,6 +239,7 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f CoreOptions.FILE_COMPRESSION.defaultValue(), null, null, + CoreOptions.ExternalPathStrategy.NONE, false); int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024; FileIO fileIO = FileIOFinder.find(path); @@ -261,6 +262,7 @@ public FileStorePathFactory apply(String format) { CoreOptions.FILE_COMPRESSION.defaultValue(), null, null, + CoreOptions.ExternalPathStrategy.NONE, false); } }; diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index 592c27917d3a..ead73499ffc7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -155,6 +155,7 @@ protected ManifestFile createManifestFile(String pathStr) { CoreOptions.FILE_COMPRESSION.defaultValue(), null, null, + CoreOptions.ExternalPathStrategy.NONE, false), Long.MAX_VALUE, null) diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java index 2e45d904a5bb..fc19f6262d91 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java @@ -143,6 +143,7 @@ private ManifestFile createManifestFile(String pathStr) { CoreOptions.FILE_COMPRESSION.defaultValue(), null, null, + CoreOptions.ExternalPathStrategy.NONE, false); int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024; FileIO fileIO = FileIOFinder.find(path); diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java index e3c63f13eb40..d719a6d5cdd4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java @@ -175,6 +175,7 @@ private FileStorePathFactory createPathFactory(String pathStr) { CoreOptions.FILE_COMPRESSION.defaultValue(), null, null, + CoreOptions.ExternalPathStrategy.NONE, false); } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java index 7e7b36d7c59a..eb7e9f88b6f9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java @@ -94,6 +94,7 @@ public void testCreateDataFilePathFactoryWithPartition() { CoreOptions.FILE_COMPRESSION.defaultValue(), null, null, + CoreOptions.ExternalPathStrategy.NONE, false); assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16"); @@ -138,6 +139,7 @@ public static FileStorePathFactory createNonPartFactory(Path root) { CoreOptions.FILE_COMPRESSION.defaultValue(), null, null, + CoreOptions.ExternalPathStrategy.NONE, false); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java index a35bd3025bf7..fc7c7b7a10d2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java @@ -237,6 +237,17 @@ public void testBatchReadWriteWithPartitionedRecordsWithPkWithExternalPathSpecif checkExternalPathTestResult(options, externalPath1.toString()); } + @Test + public void + testBatchReadWriteWithPartitionedRecordsWithPkWithExternalPathEntropyInjectStrategy() + throws Exception { + Map options = new HashMap<>(); + options.put( + CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "file://" + externalPath1.toString()); + options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "entropy-inject"); + checkExternalPathTestResult(options, externalPath1.toString()); + } + public void checkExternalPathTestResult(Map options, String externalPath) throws Exception { List initialRecords = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index cb7a854286a7..d2fbf63eea3e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -112,6 +112,7 @@ public TestChangelogDataReadWrite(String root) { CoreOptions.FILE_COMPRESSION.defaultValue(), null, null, + CoreOptions.ExternalPathStrategy.NONE, false); this.snapshotManager = newSnapshotManager(LocalFileIO.create(), new Path(root)); this.commitUser = UUID.randomUUID().toString(); diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java index b4ccd81bf7be..364a33934ae3 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java @@ -210,6 +210,7 @@ protected void foreachIndexReader(String tableName, Consumer co CoreOptions.FILE_COMPRESSION.defaultValue(), null, null, + CoreOptions.ExternalPathStrategy.NONE, false); Table table = fileSystemCatalog.getTable(Identifier.create("db", tableName));