diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index fc7a0c38394a..2bce74bc68ce 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1085,6 +1085,18 @@
Long |
After configuring this time, only the data files created after this time will be read. It is independent of snapshots, but it is imprecise filtering (depending on whether or not compaction occurs). |
+
+ scan.ignore-corrupt-files |
+ false |
+ Boolean |
+ Ignore corrupt files while scanning. |
+
+
+ scan.ignore-lost-files |
+ false |
+ Boolean |
+ Ignore lost files while scanning. |
+
scan.manifest.parallelism |
(none) |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 20aa5182d6e4..aac48284b72f 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -969,6 +969,18 @@ public InlineElement getDescription() {
.withDescription(
"The delay duration of stream read when scan incremental snapshots.");
+ public static final ConfigOption SCAN_IGNORE_CORRUPT_FILE =
+ key("scan.ignore-corrupt-files")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Ignore corrupt files while scanning.");
+
+ public static final ConfigOption SCAN_IGNORE_LOST_FILE =
+ key("scan.ignore-lost-files")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Ignore lost files while scanning.");
+
@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption LOG_CONSISTENCY =
key("log.consistency")
@@ -2837,6 +2849,14 @@ public String scanVersion() {
return options.get(SCAN_VERSION);
}
+ public boolean scanIgnoreCorruptFile() {
+ return options.get(SCAN_IGNORE_CORRUPT_FILE);
+ }
+
+ public boolean scanIgnoreLostFile() {
+ return options.get(SCAN_IGNORE_LOST_FILE);
+ }
+
public Pair incrementalBetween() {
String str = options.get(INCREMENTAL_BETWEEN);
String[] split = str.split(",");
diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index f3f0859d8c56..599a5d8c77be 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -83,8 +83,7 @@ public RawFileSplitRead newRead() {
rowType,
FileFormatDiscover.of(options),
pathFactory(),
- options.fileIndexReadEnabled(),
- options.rowTrackingEnabled());
+ options);
}
public DataEvolutionSplitRead newDataEvolutionRead() {
@@ -93,12 +92,7 @@ public DataEvolutionSplitRead newDataEvolutionRead() {
"Field merge read is only supported when data-evolution.enabled is true.");
}
return new DataEvolutionSplitRead(
- fileIO,
- schemaManager,
- schema,
- rowType,
- FileFormatDiscover.of(options),
- pathFactory());
+ fileIO, schemaManager, schema, rowType, options, pathFactory());
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 077068df73ff..710e2dc3a587 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -129,8 +129,7 @@ public RawFileSplitRead newBatchRawFileRead() {
valueType,
FileFormatDiscover.of(options),
pathFactory(),
- options.fileIndexReadEnabled(),
- false);
+ options);
}
public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
index 2c4d419989b8..c86d0b272348 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
@@ -52,10 +52,10 @@ public ChainKeyValueFileReaderFactory(
RowType valueType,
FormatReaderMapping.Builder formatReaderMappingBuilder,
DataFilePathFactory pathFactory,
- long asyncThreshold,
BinaryRow partition,
DeletionVector.Factory dvFactory,
- ChainReadContext chainReadContext) {
+ ChainReadContext chainReadContext,
+ CoreOptions coreOptions) {
super(
fileIO,
schemaManager,
@@ -64,9 +64,9 @@ public ChainKeyValueFileReaderFactory(
valueType,
formatReaderMappingBuilder,
pathFactory,
- asyncThreshold,
partition,
- dvFactory);
+ dvFactory,
+ coreOptions);
this.chainReadContext = chainReadContext;
CoreOptions options = new CoreOptions(schema.options());
this.currentBranch = options.branch();
@@ -132,10 +132,10 @@ public ChainKeyValueFileReaderFactory build(
wrapped.readValueType,
builder,
wrapped.pathFactory.createChainReadDataFilePathFactory(chainReadContext),
- wrapped.options.fileReaderAsyncThreshold().getBytes(),
partition,
dvFactory,
- chainReadContext);
+ chainReadContext,
+ wrapped.options);
}
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index 2584aef00f6c..70650fbf4b8c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -27,6 +27,7 @@
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.table.SpecialFields;
@@ -35,6 +36,9 @@
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.RoaringBitmap32;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -43,9 +47,11 @@
/** Reads {@link InternalRow} from data files. */
public class DataFileRecordReader implements FileRecordReader {
-
+ private static final Logger LOG = LoggerFactory.getLogger(DataFileRecordReader.class);
+ private final Path filePath;
private final RowType tableRowType;
private final FileRecordReader reader;
+ private final boolean ignoreCorruptFiles;
@Nullable private final int[] indexMapping;
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
@@ -59,6 +65,8 @@ public DataFileRecordReader(
RowType tableRowType,
FormatReaderFactory readerFactory,
FormatReaderFactory.Context context,
+ boolean ignoreCorruptFiles,
+ boolean ignoreLostFiles,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo,
@@ -69,7 +77,9 @@ public DataFileRecordReader(
throws IOException {
this(
tableRowType,
- createReader(readerFactory, context),
+ createReader(readerFactory, context, ignoreCorruptFiles, ignoreLostFiles),
+ ignoreCorruptFiles,
+ ignoreLostFiles,
indexMapping,
castMapping,
partitionInfo,
@@ -77,12 +87,15 @@ public DataFileRecordReader(
firstRowId,
maxSequenceNumber,
systemFields,
- context.selection());
+ context.selection(),
+ context.filePath());
}
public DataFileRecordReader(
RowType tableRowType,
FileRecordReader reader,
+ boolean ignoreCorruptFiles,
+ boolean ignoreLostFiles,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo,
@@ -90,9 +103,11 @@ public DataFileRecordReader(
@Nullable Long firstRowId,
long maxSequenceNumber,
Map systemFields,
- @Nullable RoaringBitmap32 selection) {
+ @Nullable RoaringBitmap32 selection,
+ Path filePath) {
this.tableRowType = tableRowType;
this.reader = reader;
+ this.ignoreCorruptFiles = ignoreCorruptFiles;
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
this.castMapping = castMapping;
@@ -101,22 +116,62 @@ public DataFileRecordReader(
this.maxSequenceNumber = maxSequenceNumber;
this.systemFields = systemFields;
this.selection = selection;
+ this.filePath = filePath;
}
private static FileRecordReader createReader(
- FormatReaderFactory readerFactory, FormatReaderFactory.Context context)
+ FormatReaderFactory readerFactory,
+ FormatReaderFactory.Context context,
+ boolean ignoreCorruptFiles,
+ boolean ignoreLostFiles)
throws IOException {
try {
return readerFactory.createReader(context);
} catch (Exception e) {
- FileUtils.checkExists(context.fileIO(), context.filePath());
- throw e;
+ boolean exists = context.fileIO().exists(context.filePath());
+ if (!exists) {
+ if (ignoreLostFiles) {
+ LOG.warn(
+ "Failed to create FileRecordReader for file: {}, file lost",
+ context.filePath());
+ return null;
+ } else {
+ throw FileUtils.newFileNotFoundException(context.filePath());
+ }
+ } else {
+ if (ignoreCorruptException(e, ignoreCorruptFiles)) {
+ LOG.warn(
+ "Failed to create FileRecordReader for file: {}, ignore exception",
+ context.filePath(),
+ e);
+ return null;
+ } else {
+ throw new IOException(
+ "Failed to create FileRecordReader for file: " + context.filePath(), e);
+ }
+ }
}
}
@Nullable
@Override
public FileRecordIterator readBatch() throws IOException {
+ if (reader == null) {
+ LOG.warn("Reader is not initialized, maybe file: {} is corrupt.", filePath);
+ return null;
+ }
+ try {
+ return readBatchInternal();
+ } catch (Exception e) {
+ if (ignoreCorruptException(e, ignoreCorruptFiles)) {
+ LOG.warn("Failed to read batch from file: {}, ignore exception", filePath, e);
+ return null;
+ }
+ throw new IOException("Failed to read batch from file: " + filePath, e);
+ }
+ }
+
+ private FileRecordIterator readBatchInternal() throws IOException {
FileRecordIterator iterator = reader.readBatch();
if (iterator == null) {
return null;
@@ -186,6 +241,15 @@ public FileRecordIterator readBatch() throws IOException {
@Override
public void close() throws IOException {
- reader.close();
+ if (reader != null) {
+ reader.close();
+ }
+ }
+
+ private static boolean ignoreCorruptException(Throwable e, boolean ignoreCorruptFiles) {
+ return ignoreCorruptFiles
+ && (e instanceof IOException
+ || e instanceof RuntimeException
+ || e instanceof InternalError);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 91056c588fcb..61fbefc5204c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -67,7 +67,8 @@ public class KeyValueFileReaderFactory implements FileReaderFactory {
private final FormatReaderMapping.Builder formatReaderMappingBuilder;
private final DataFilePathFactory pathFactory;
private final long asyncThreshold;
-
+ private final boolean ignoreCorruptFiles;
+ private final boolean ignoreLostFiles;
private final Map formatReaderMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;
@@ -80,9 +81,9 @@ protected KeyValueFileReaderFactory(
RowType valueType,
FormatReaderMapping.Builder formatReaderMappingBuilder,
DataFilePathFactory pathFactory,
- long asyncThreshold,
BinaryRow partition,
- DeletionVector.Factory dvFactory) {
+ DeletionVector.Factory dvFactory,
+ CoreOptions coreOptions) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schema = schema;
@@ -90,7 +91,9 @@ protected KeyValueFileReaderFactory(
this.valueType = valueType;
this.formatReaderMappingBuilder = formatReaderMappingBuilder;
this.pathFactory = pathFactory;
- this.asyncThreshold = asyncThreshold;
+ this.asyncThreshold = coreOptions.fileReaderAsyncThreshold().getBytes();
+ this.ignoreCorruptFiles = coreOptions.scanIgnoreCorruptFile();
+ this.ignoreLostFiles = coreOptions.scanIgnoreLostFile();
this.partition = partition;
this.formatReaderMappings = new HashMap<>();
this.dvFactory = dvFactory;
@@ -149,6 +152,8 @@ private FileRecordReader createRecordReader(
? new FormatReaderContext(fileIO, filePath, fileSize)
: new OrcFormatReaderContext(
fileIO, filePath, fileSize, orcPoolSize),
+ ignoreCorruptFiles,
+ ignoreLostFiles,
formatReaderMapping.getIndexMapping(),
formatReaderMapping.getCastMapping(),
PartitionUtils.create(
@@ -282,9 +287,9 @@ public KeyValueFileReaderFactory build(
readValueType,
builder,
pathFactory.createDataFilePathFactory(partition, bucket),
- options.fileReaderAsyncThreshold().getBytes(),
partition,
- dvFactory);
+ dvFactory,
+ options);
}
protected FormatReaderMapping.Builder formatReaderMappingBuilder(
@@ -314,5 +319,9 @@ protected FormatReaderMapping.Builder formatReaderMappingBuilder(
public FileIO fileIO() {
return fileIO;
}
+
+ public CoreOptions options() {
+ return options;
+ }
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index fa11671dc621..6486c3039827 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.ForceSingleBatchReader;
import org.apache.paimon.data.BinaryRow;
@@ -89,6 +90,7 @@ public class DataEvolutionSplitRead implements SplitRead {
private final FileStorePathFactory pathFactory;
private final Map formatReaderMappings;
private final Function schemaFetcher;
+ private final CoreOptions coreOptions;
@Nullable private VariantAccessInfo[] variantAccess;
protected RowType readRowType;
@@ -98,14 +100,15 @@ public DataEvolutionSplitRead(
SchemaManager schemaManager,
TableSchema schema,
RowType rowType,
- FileFormatDiscover formatDiscover,
+ CoreOptions coreOptions,
FileStorePathFactory pathFactory) {
this.fileIO = fileIO;
final Map cache = new HashMap<>();
this.schemaFetcher =
schemaId -> cache.computeIfAbsent(schemaId, key -> schemaManager.schema(schemaId));
this.schema = schema;
- this.formatDiscover = formatDiscover;
+ this.formatDiscover = FileFormatDiscover.of(coreOptions);
+ this.coreOptions = coreOptions;
this.pathFactory = pathFactory;
this.formatReaderMappings = new HashMap<>();
this.readRowType = rowType;
@@ -377,6 +380,8 @@ private RecordReader createFileReader(
schema.logicalRowType(),
formatReaderMapping.getReaderFactory(),
formatReaderContext,
+ coreOptions.scanIgnoreCorruptFile(),
+ coreOptions.scanIgnoreLostFile(),
formatReaderMapping.getIndexMapping(),
formatReaderMapping.getCastMapping(),
PartitionUtils.create(
@@ -404,6 +409,8 @@ private FileRecordReader createFileReader(
schema.logicalRowType(),
formatReaderMapping.getReaderFactory(),
formatReaderContext,
+ coreOptions.scanIgnoreCorruptFile(),
+ coreOptions.scanIgnoreLostFile(),
formatReaderMapping.getIndexMapping(),
formatReaderMapping.getCastMapping(),
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition),
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 05ee3ba499f9..6fcf76c88a25 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.variant.VariantAccessInfo;
@@ -81,6 +82,8 @@ public class RawFileSplitRead implements SplitRead {
private final Map formatReaderMappings;
private final boolean fileIndexReadEnabled;
private final boolean rowTrackingEnabled;
+ private final boolean ignoreCorruptFiles;
+ private final boolean ignoreLostFiles;
private RowType readRowType;
@Nullable private List filters;
@@ -95,16 +98,17 @@ public RawFileSplitRead(
RowType rowType,
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
- boolean fileIndexReadEnabled,
- boolean rowTrackingEnabled) {
+ CoreOptions coreOptions) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schema = schema;
this.formatDiscover = formatDiscover;
this.pathFactory = pathFactory;
this.formatReaderMappings = new HashMap<>();
- this.fileIndexReadEnabled = fileIndexReadEnabled;
- this.rowTrackingEnabled = rowTrackingEnabled;
+ this.fileIndexReadEnabled = coreOptions.fileIndexReadEnabled();
+ this.ignoreCorruptFiles = coreOptions.scanIgnoreCorruptFile();
+ this.ignoreLostFiles = coreOptions.scanIgnoreLostFile();
+ this.rowTrackingEnabled = coreOptions.rowTrackingEnabled();
this.readRowType = rowType;
}
@@ -271,6 +275,8 @@ private FileRecordReader createFileReader(
schema.logicalRowType(),
formatReaderMapping.getReaderFactory(),
formatReaderContext,
+ ignoreCorruptFiles,
+ ignoreLostFiles,
formatReaderMapping.getIndexMapping(),
formatReaderMapping.getCastMapping(),
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition),
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index d0779c23aeac..fbb1edf5ffc1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -195,6 +195,8 @@ protected RecordReader createReader(FormatDataSplit dataSplit) thro
return new DataFileRecordReader(
readType(),
reader,
+ options.scanIgnoreCorruptFile(),
+ options.scanIgnoreLostFile(),
null,
null,
PartitionUtils.create(partitionMapping, dataSplit.partition()),
@@ -202,7 +204,8 @@ protected RecordReader createReader(FormatDataSplit dataSplit) thro
null,
0,
Collections.emptyMap(),
- null);
+ null,
+ formatReaderContext.filePath());
} catch (Exception e) {
FileUtils.checkExists(formatReaderContext.fileIO(), formatReaderContext.filePath());
throw e;
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
index d714487f4762..8c419537611f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
@@ -109,17 +109,21 @@ public static Stream listVersionedDirectories(
public static void checkExists(FileIO fileIO, Path file) throws IOException {
if (!fileIO.exists(file)) {
- throw new FileNotFoundException(
- String.format(
- "File '%s' not found, Possible causes: "
- + "1.snapshot expires too fast, you can configure 'snapshot.time-retained'"
- + " option with a larger value. "
- + "2.consumption is too slow, you can improve the performance of consumption"
- + " (For example, increasing parallelism).",
- file));
+ throw newFileNotFoundException(file);
}
}
+ public static FileNotFoundException newFileNotFoundException(Path file) {
+ return new FileNotFoundException(
+ String.format(
+ "File '%s' not found, Possible causes: "
+ + "1.snapshot expires too fast, you can configure 'snapshot.time-retained'"
+ + " option with a larger value. "
+ + "2.consumption is too slow, you can improve the performance of consumption"
+ + " (For example, increasing parallelism).",
+ file));
+ }
+
public static RecordReader createFormatReader(
FileIO fileIO, FormatReaderFactory format, Path file, @Nullable Long fileSize)
throws IOException {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index cb7a854286a7..3cb98ae5effc 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -139,6 +139,7 @@ public KeyValueTableRead createReadWithKey() {
pathFactory,
EXTRACTOR,
options));
+
RawFileSplitRead rawFileRead =
new RawFileSplitRead(
LocalFileIO.create(),
@@ -147,8 +148,7 @@ public KeyValueTableRead createReadWithKey() {
VALUE_TYPE,
FileFormatDiscover.of(options),
pathFactory,
- options.fileIndexReadEnabled(),
- false);
+ options);
return new KeyValueTableRead(() -> read, () -> rawFileRead, null);
}
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
index 5d0864cfe599..6ee2ceb32603 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.sql
+import org.apache.paimon.fs.Path
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.table.source.DataSplit
@@ -428,6 +429,73 @@ class PaimonQueryTest extends PaimonSparkTestBase {
}
}
+ fileFormats.foreach {
+ fileFormat =>
+ test(s"Query ignore-corrupt-files: file.format=$fileFormat") {
+ withTable("T") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, pt STRING)
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES ('file.format'='$fileFormat', 'bucket'='4', 'bucket-key'='id')
+ |""".stripMargin)
+ spark.sql("INSERT INTO T VALUES (1, 'x1', '2024'), (3, 'x3', '2024')")
+
+ spark.sql("INSERT INTO T VALUES (2, 'x2', '2024'), (4, 'x4', '2024')")
+
+ val allFiles = getAllFiles("T", Seq("pt"), null)
+ Assertions.assertEquals(4, allFiles.length)
+ val corruptFile = allFiles.head
+ val io = loadTable("T").fileIO()
+ io.overwriteFileUtf8(new Path(corruptFile), "corrupt file")
+ val content = io.readFileUtf8(new Path(corruptFile))
+ Assertions.assertEquals("corrupt file", content)
+
+ withSQLConf("spark.paimon.scan.ignore-corrupt-files" -> "true") {
+ val res = spark.sql("SELECT * FROM T")
+ Assertions.assertEquals(3, res.collect().length)
+ }
+ }
+ }
+ }
+
+ fileFormats.foreach {
+ fileFormat =>
+ test(s"Query ignore-lost-files: file.format=$fileFormat") {
+ withTable("T") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, pt STRING)
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES ('file.format'='$fileFormat', 'bucket'='4', 'bucket-key'='id')
+ |""".stripMargin)
+ spark.sql("INSERT INTO T VALUES (1, 'x1', '2024'), (3, 'x3', '2024')")
+
+ spark.sql("INSERT INTO T VALUES (2, 'x2', '2024'), (4, 'x4', '2024')")
+
+ val allFiles = getAllFiles("T", Seq("pt"), null)
+ Assertions.assertEquals(4, allFiles.length)
+ val lostFile = allFiles.head
+ val io = loadTable("T").fileIO()
+ io.deleteQuietly(new Path(lostFile))
+
+ withSQLConf("spark.paimon.scan.ignore-corrupt-files" -> "true") {
+ var failed: Boolean = false
+ try {
+ spark.sql("SELECT * FROM T").collect()
+ } catch {
+ case e: Exception => failed = true
+ }
+ Assertions.assertTrue(failed)
+ }
+
+ withSQLConf("spark.paimon.scan.ignore-lost-files" -> "true") {
+ val res = spark.sql("SELECT * FROM T")
+ Assertions.assertEquals(3, res.collect().length)
+ }
+
+ }
+ }
+ }
+
private def getAllFiles(
tableName: String,
partitions: Seq[String],