Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,18 @@
<td>Long</td>
<td>After configuring this time, only the data files created after this time will be read. It is independent of snapshots, but it is imprecise filtering (depending on whether or not compaction occurs).</td>
</tr>
<tr>
<td><h5>scan.ignore-corrupt-files</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Ignore corrupt files while scanning.</td>
</tr>
<tr>
<td><h5>scan.ignore-lost-files</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Ignore lost files while scanning.</td>
</tr>
<tr>
<td><h5>scan.manifest.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
20 changes: 20 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,18 @@ public InlineElement getDescription() {
.withDescription(
"The delay duration of stream read when scan incremental snapshots.");

public static final ConfigOption<Boolean> SCAN_IGNORE_CORRUPT_FILE =
key("scan.ignore-corrupt-files")
.booleanType()
.defaultValue(false)
.withDescription("Ignore corrupt files while scanning.");

public static final ConfigOption<Boolean> SCAN_IGNORE_LOST_FILE =
key("scan.ignore-lost-files")
.booleanType()
.defaultValue(false)
.withDescription("Ignore lost files while scanning.");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<LogConsistency> LOG_CONSISTENCY =
key("log.consistency")
Expand Down Expand Up @@ -2837,6 +2849,14 @@ public String scanVersion() {
return options.get(SCAN_VERSION);
}

public boolean scanIgnoreCorruptFile() {
return options.get(SCAN_IGNORE_CORRUPT_FILE);
}

public boolean scanIgnoreLostFile() {
return options.get(SCAN_IGNORE_LOST_FILE);
}

public Pair<String, String> incrementalBetween() {
String str = options.get(INCREMENTAL_BETWEEN);
String[] split = str.split(",");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ public RawFileSplitRead newRead() {
rowType,
FileFormatDiscover.of(options),
pathFactory(),
options.fileIndexReadEnabled(),
options.rowTrackingEnabled());
options);
}

public DataEvolutionSplitRead newDataEvolutionRead() {
Expand All @@ -93,12 +92,7 @@ public DataEvolutionSplitRead newDataEvolutionRead() {
"Field merge read is only supported when data-evolution.enabled is true.");
}
return new DataEvolutionSplitRead(
fileIO,
schemaManager,
schema,
rowType,
FileFormatDiscover.of(options),
pathFactory());
fileIO, schemaManager, schema, rowType, options, pathFactory());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ public RawFileSplitRead newBatchRawFileRead() {
valueType,
FileFormatDiscover.of(options),
pathFactory(),
options.fileIndexReadEnabled(),
false);
options);
}

public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public ChainKeyValueFileReaderFactory(
RowType valueType,
FormatReaderMapping.Builder formatReaderMappingBuilder,
DataFilePathFactory pathFactory,
long asyncThreshold,
BinaryRow partition,
DeletionVector.Factory dvFactory,
ChainReadContext chainReadContext) {
ChainReadContext chainReadContext,
CoreOptions coreOptions) {
super(
fileIO,
schemaManager,
Expand All @@ -64,9 +64,9 @@ public ChainKeyValueFileReaderFactory(
valueType,
formatReaderMappingBuilder,
pathFactory,
asyncThreshold,
partition,
dvFactory);
dvFactory,
coreOptions);
this.chainReadContext = chainReadContext;
CoreOptions options = new CoreOptions(schema.options());
this.currentBranch = options.branch();
Expand Down Expand Up @@ -132,10 +132,10 @@ public ChainKeyValueFileReaderFactory build(
wrapped.readValueType,
builder,
wrapped.pathFactory.createChainReadDataFilePathFactory(chainReadContext),
wrapped.options.fileReaderAsyncThreshold().getBytes(),
partition,
dvFactory,
chainReadContext);
chainReadContext,
wrapped.options);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.table.SpecialFields;
Expand All @@ -35,6 +36,9 @@
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.RoaringBitmap32;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
Expand All @@ -43,9 +47,11 @@

/** Reads {@link InternalRow} from data files. */
public class DataFileRecordReader implements FileRecordReader<InternalRow> {

private static final Logger LOG = LoggerFactory.getLogger(DataFileRecordReader.class);
private final Path filePath;
private final RowType tableRowType;
private final FileRecordReader<InternalRow> reader;
private final boolean ignoreCorruptFiles;
@Nullable private final int[] indexMapping;
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
Expand All @@ -59,6 +65,8 @@ public DataFileRecordReader(
RowType tableRowType,
FormatReaderFactory readerFactory,
FormatReaderFactory.Context context,
boolean ignoreCorruptFiles,
boolean ignoreLostFiles,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo,
Expand All @@ -69,30 +77,37 @@ public DataFileRecordReader(
throws IOException {
this(
tableRowType,
createReader(readerFactory, context),
createReader(readerFactory, context, ignoreCorruptFiles, ignoreLostFiles),
ignoreCorruptFiles,
ignoreLostFiles,
indexMapping,
castMapping,
partitionInfo,
rowTrackingEnabled,
firstRowId,
maxSequenceNumber,
systemFields,
context.selection());
context.selection(),
context.filePath());
}

public DataFileRecordReader(
RowType tableRowType,
FileRecordReader<InternalRow> reader,
boolean ignoreCorruptFiles,
boolean ignoreLostFiles,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo,
boolean rowTrackingEnabled,
@Nullable Long firstRowId,
long maxSequenceNumber,
Map<String, Integer> systemFields,
@Nullable RoaringBitmap32 selection) {
@Nullable RoaringBitmap32 selection,
Path filePath) {
this.tableRowType = tableRowType;
this.reader = reader;
this.ignoreCorruptFiles = ignoreCorruptFiles;
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
this.castMapping = castMapping;
Expand All @@ -101,22 +116,62 @@ public DataFileRecordReader(
this.maxSequenceNumber = maxSequenceNumber;
this.systemFields = systemFields;
this.selection = selection;
this.filePath = filePath;
}

private static FileRecordReader<InternalRow> createReader(
FormatReaderFactory readerFactory, FormatReaderFactory.Context context)
FormatReaderFactory readerFactory,
FormatReaderFactory.Context context,
boolean ignoreCorruptFiles,
boolean ignoreLostFiles)
throws IOException {
try {
return readerFactory.createReader(context);
} catch (Exception e) {
FileUtils.checkExists(context.fileIO(), context.filePath());
throw e;
boolean exists = context.fileIO().exists(context.filePath());
if (!exists) {
if (ignoreLostFiles) {
LOG.warn(
"Failed to create FileRecordReader for file: {}, file lost",
context.filePath());
return null;
} else {
throw FileUtils.newFileNotFoundException(context.filePath());
}
} else {
if (ignoreCorruptException(e, ignoreCorruptFiles)) {
LOG.warn(
"Failed to create FileRecordReader for file: {}, ignore exception",
context.filePath(),
e);
return null;
} else {
throw new IOException(
"Failed to create FileRecordReader for file: " + context.filePath(), e);
}
}
}
}

@Nullable
@Override
public FileRecordIterator<InternalRow> readBatch() throws IOException {
if (reader == null) {
LOG.warn("Reader is not initialized, maybe file: {} is corrupt.", filePath);
return null;
}
try {
return readBatchInternal();
} catch (Exception e) {
if (ignoreCorruptException(e, ignoreCorruptFiles)) {
LOG.warn("Failed to read batch from file: {}, ignore exception", filePath, e);
return null;
}
throw new IOException("Failed to read batch from file: " + filePath, e);
}
}

private FileRecordIterator<InternalRow> readBatchInternal() throws IOException {
FileRecordIterator<InternalRow> iterator = reader.readBatch();
if (iterator == null) {
return null;
Expand Down Expand Up @@ -186,6 +241,15 @@ public FileRecordIterator<InternalRow> readBatch() throws IOException {

@Override
public void close() throws IOException {
reader.close();
if (reader != null) {
reader.close();
}
}

private static boolean ignoreCorruptException(Throwable e, boolean ignoreCorruptFiles) {
return ignoreCorruptFiles
&& (e instanceof IOException
|| e instanceof RuntimeException
|| e instanceof InternalError);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> {
private final FormatReaderMapping.Builder formatReaderMappingBuilder;
private final DataFilePathFactory pathFactory;
private final long asyncThreshold;

private final boolean ignoreCorruptFiles;
private final boolean ignoreLostFiles;
private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;
Expand All @@ -80,17 +81,19 @@ protected KeyValueFileReaderFactory(
RowType valueType,
FormatReaderMapping.Builder formatReaderMappingBuilder,
DataFilePathFactory pathFactory,
long asyncThreshold,
BinaryRow partition,
DeletionVector.Factory dvFactory) {
DeletionVector.Factory dvFactory,
CoreOptions coreOptions) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schema = schema;
this.keyType = keyType;
this.valueType = valueType;
this.formatReaderMappingBuilder = formatReaderMappingBuilder;
this.pathFactory = pathFactory;
this.asyncThreshold = asyncThreshold;
this.asyncThreshold = coreOptions.fileReaderAsyncThreshold().getBytes();
this.ignoreCorruptFiles = coreOptions.scanIgnoreCorruptFile();
this.ignoreLostFiles = coreOptions.scanIgnoreLostFile();
this.partition = partition;
this.formatReaderMappings = new HashMap<>();
this.dvFactory = dvFactory;
Expand Down Expand Up @@ -149,6 +152,8 @@ private FileRecordReader<KeyValue> createRecordReader(
? new FormatReaderContext(fileIO, filePath, fileSize)
: new OrcFormatReaderContext(
fileIO, filePath, fileSize, orcPoolSize),
ignoreCorruptFiles,
ignoreLostFiles,
formatReaderMapping.getIndexMapping(),
formatReaderMapping.getCastMapping(),
PartitionUtils.create(
Expand Down Expand Up @@ -282,9 +287,9 @@ public KeyValueFileReaderFactory build(
readValueType,
builder,
pathFactory.createDataFilePathFactory(partition, bucket),
options.fileReaderAsyncThreshold().getBytes(),
partition,
dvFactory);
dvFactory,
options);
}

protected FormatReaderMapping.Builder formatReaderMappingBuilder(
Expand Down Expand Up @@ -314,5 +319,9 @@ protected FormatReaderMapping.Builder formatReaderMappingBuilder(
public FileIO fileIO() {
return fileIO;
}

public CoreOptions options() {
return options;
}
}
}
Loading
Loading