Skip to content
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@
<td><h5>data-file.external-paths.strategy</h5></td>
<td style="word-wrap: break-word;">none</td>
<td><p>Enum</p></td>
<td>The strategy of selecting an external path when writing data.<br /><br />Possible values:<ul><li>"none": Do not choose any external storage, data will still be written to the default warehouse path.</li><li>"specific-fs": Select a specific file system as the external path. Currently supported are S3 and OSS.</li><li>"round-robin": When writing a new file, a path is chosen from data-file.external-paths in turn.</li></ul></td>
<td>The strategy of selecting an external path when writing data.<br /><br />Possible values:<ul><li>"none": Do not choose any external storage, data will still be written to the default warehouse path.</li><li>"specific-fs": Select a specific file system as the external path. Currently supported are S3 and OSS.</li><li>"round-robin": When writing a new file, a path is chosen from data-file.external-paths in turn.</li><li>"entropy-inject": When writing a new file, a path is chosen based on the hash value of the file's content.</li></ul></td>
</tr>
<tr>
<td><h5>data-file.path-directory</h5></td>
Expand Down
6 changes: 5 additions & 1 deletion paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -3879,7 +3879,11 @@ public enum ExternalPathStrategy implements DescribedEnum {

ROUND_ROBIN(
"round-robin",
"When writing a new file, a path is chosen from data-file.external-paths in turn.");
"When writing a new file, a path is chosen from data-file.external-paths in turn."),

ENTROPY_INJECT(
"entropy-inject",
"When writing a new file, a path is chosen based on the hash value of the file's content.");

private final String value;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.fs;

import org.apache.paimon.shade.guava30.com.google.common.hash.HashCode;
import org.apache.paimon.shade.guava30.com.google.common.hash.HashFunction;
import org.apache.paimon.shade.guava30.com.google.common.hash.Hashing;

import java.nio.charset.StandardCharsets;
import java.util.List;

/** Provider for entropy inject external data paths. */
public class EntropyInjectExternalPathProvider extends ExternalPathProvider {

private static final HashFunction HASH_FUNC = Hashing.murmur3_32();
private static final int HASH_BINARY_STRING_BITS = 20;
// Entropy generated will be divided into dirs with this lengths
private static final int ENTROPY_DIR_LENGTH = 4;
// Will create DEPTH many dirs from the entropy
private static final int ENTROPY_DIR_DEPTH = 3;

public EntropyInjectExternalPathProvider(
List<Path> externalTablePaths, Path relativeBucketPath) {
super(externalTablePaths, relativeBucketPath);
}

@Override
public Path getNextExternalDataPath(String fileName) {
String hashDirs = computeHash(fileName);
Path filePathWithHashDirs = new Path(relativeBucketPath, hashDirs + "/" + fileName);
position++;
if (position == externalTablePaths.size()) {
position = 0;
}
return new Path(externalTablePaths.get(position), filePathWithHashDirs);
}

public String computeHash(String fileName) {
HashCode hashCode = HASH_FUNC.hashString(fileName, StandardCharsets.UTF_8);
String hashAsBinaryString = Integer.toBinaryString(hashCode.asInt() | Integer.MIN_VALUE);
String hash =
hashAsBinaryString.substring(hashAsBinaryString.length() - HASH_BINARY_STRING_BITS);
return dirsFromHash(hash);
}

/**
* Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH.
*
* @param hash 10011001100110011001
* @return 1001/1001/1001/10011001 with depth 3 and length 4
*/
private String dirsFromHash(String hash) {
StringBuilder hashWithDirs = new StringBuilder();

for (int i = 0; i < ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH; i += ENTROPY_DIR_LENGTH) {
if (i > 0) {
hashWithDirs.append("/");
}
hashWithDirs.append(hash, i, Math.min(i + ENTROPY_DIR_LENGTH, hash.length()));
}

if (hash.length() > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH) {
hashWithDirs
.append("/")
.append(hash, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, hash.length());
}

return hashWithDirs.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
/** Provider for external data paths. */
public class ExternalPathProvider implements Serializable {

private final List<Path> externalTablePaths;
private final Path relativeBucketPath;
protected final List<Path> externalTablePaths;
protected final Path relativeBucketPath;

private int position;
protected int position;

public ExternalPathProvider(List<Path> externalTablePaths, Path relativeBucketPath) {
this.externalTablePaths = externalTablePaths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ protected FileStorePathFactory pathFactory(CoreOptions options, String format) {
options.fileCompression(),
options.dataFilePathDirectory(),
createExternalPaths(),
options.externalPathStrategy(),
options.indexFileInDataFileDir());
}

Expand Down Expand Up @@ -168,7 +169,6 @@ private List<Path> createExternalPaths() {
paths.add(path);
}
}

checkArgument(!paths.isEmpty(), "External paths should not be empty");
return paths;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public FormatTableFileWriter(
options.formatTableFileCompression(),
options.dataFilePathDirectory(),
null,
false);
CoreOptions.ExternalPathStrategy.NONE,
options.indexFileInDataFileDir());
}

public void withWriteType(RowType writeType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.EntropyInjectExternalPathProvider;
import org.apache.paimon.fs.ExternalPathProvider;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexInDataFileDirPathFactory;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class FileStorePathFactory {
private final AtomicInteger indexFileCount;
private final AtomicInteger statsFileCount;
private final List<Path> externalPaths;
private final CoreOptions.ExternalPathStrategy strategy;

public FileStorePathFactory(
Path root,
Expand All @@ -92,6 +94,7 @@ public FileStorePathFactory(
String fileCompression,
@Nullable String dataFilePathDirectory,
List<Path> externalPaths,
CoreOptions.ExternalPathStrategy strategy,
boolean indexFileInDataFileDir) {
this.root = root;
this.dataFilePathDirectory = dataFilePathDirectory;
Expand All @@ -112,6 +115,7 @@ public FileStorePathFactory(
this.indexFileCount = new AtomicInteger(0);
this.statsFileCount = new AtomicInteger(0);
this.externalPaths = externalPaths;
this.strategy = strategy;
}

public Path root() {
Expand Down Expand Up @@ -243,7 +247,10 @@ private ExternalPathProvider createExternalPathProvider(BinaryRow partition, int
if (externalPaths == null || externalPaths.isEmpty()) {
return null;
}

if (strategy == CoreOptions.ExternalPathStrategy.ENTROPY_INJECT) {
return new EntropyInjectExternalPathProvider(
externalPaths, relativeBucketPath(partition, bucket));
}
return new ExternalPathProvider(externalPaths, relativeBucketPath(partition, bucket));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ protected void foreachIndexReader(
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
false);

Table table = fileSystemCatalog.getTable(Identifier.create(tableName, tableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
package org.apache.paimon.io;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.EntropyInjectExternalPathProvider;
import org.apache.paimon.fs.Path;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.util.Collections;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link DataFilePathFactory}. */
Expand Down Expand Up @@ -91,4 +95,68 @@ public void testWithPartition() {
new Path(tempDir.toString() + "/dt=20211224/bucket-123/my-data-file-name")
.toString());
}

@Test
public void testEntropyInjectWithNoPartition() {
EntropyInjectExternalPathProvider externalPathProvider =
createExternalPathProvider(new Path(tempDir.toString()), "bucket-123");
DataFilePathFactory pathFactory =
new DataFilePathFactory(
new Path(tempDir + "/bucket-123"),
CoreOptions.FILE_FORMAT.defaultValue(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue(),
externalPathProvider);
String uuid = pathFactory.uuid();

for (int i = 0; i < 20; i++) {
String filename =
"data-" + uuid + "-" + i + "." + CoreOptions.FILE_FORMAT.defaultValue();
assertThat(pathFactory.newPath())
.isEqualTo(
new Path(
tempDir.toString()
+ "/bucket-123/"
+ externalPathProvider.computeHash(filename)
+ "/"
+ filename));
}
}

@Test
public void testEntropyInjectWithPartition() {
EntropyInjectExternalPathProvider externalPathProvider =
createExternalPathProvider(new Path(tempDir.toString()), "dt=20211224/bucket-123");
DataFilePathFactory pathFactory =
new DataFilePathFactory(
new Path(tempDir + "/dt=20211224/bucket-123"),
CoreOptions.FILE_FORMAT.defaultValue(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue(),
externalPathProvider);
String uuid = pathFactory.uuid();

for (int i = 0; i < 20; i++) {
String filename =
"data-" + uuid + "-" + i + "." + CoreOptions.FILE_FORMAT.defaultValue();
assertThat(pathFactory.newPath())
.isEqualTo(
new Path(
tempDir.toString()
+ "/dt=20211224/bucket-123/"
+ externalPathProvider.computeHash(filename)
+ "/"
+ filename));
}
}

public EntropyInjectExternalPathProvider createExternalPathProvider(
Path path, String relativeBucketPath) {
List<Path> externalPaths = Collections.singletonList(path);
return new EntropyInjectExternalPathProvider(externalPaths, new Path(relativeBucketPath));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
false);
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
Expand All @@ -261,6 +262,7 @@ public FileStorePathFactory apply(String format) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
false);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ protected ManifestFile createManifestFile(String pathStr) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
false),
Long.MAX_VALUE,
null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ private ManifestFile createManifestFile(String pathStr) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
false);
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ private FileStorePathFactory createPathFactory(String pathStr) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public void testCreateDataFilePathFactoryWithPartition() {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
false);

assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
Expand Down Expand Up @@ -138,6 +139,7 @@ public static FileStorePathFactory createNonPartFactory(Path root) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,17 @@ public void testBatchReadWriteWithPartitionedRecordsWithPkWithExternalPathSpecif
checkExternalPathTestResult(options, externalPath1.toString());
}

@Test
public void
testBatchReadWriteWithPartitionedRecordsWithPkWithExternalPathEntropyInjectStrategy()
throws Exception {
Map<String, String> options = new HashMap<>();
options.put(
CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), "file://" + externalPath1.toString());
options.put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "entropy-inject");
checkExternalPathTestResult(options, externalPath1.toString());
}

public void checkExternalPathTestResult(Map<String, String> options, String externalPath)
throws Exception {
List<Row> initialRecords =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public TestChangelogDataReadWrite(String root) {
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
false);
this.snapshotManager = newSnapshotManager(LocalFileIO.create(), new Path(root));
this.commitUser = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ protected void foreachIndexReader(String tableName, Consumer<FileIndexReader> co
CoreOptions.FILE_COMPRESSION.defaultValue(),
null,
null,
CoreOptions.ExternalPathStrategy.NONE,
false);

Table table = fileSystemCatalog.getTable(Identifier.create("db", tableName));
Expand Down
Loading