Skip to content

Commit 669aa7d

Browse files
GH-1037: Release ArrowBufs in VectorLoader on decompression failure
1 parent 0f7665f commit 669aa7d

2 files changed

Lines changed: 83 additions & 18 deletions

File tree

vector/src/main/java/org/apache/arrow/vector/VectorLoader.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -121,31 +121,33 @@ private void loadBuffers(
121121
int bufferLayoutCount =
122122
(int) (variadicBufferLayoutCount + TypeLayout.getTypeBufferCount(field.getType()));
123123
List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount);
124-
for (int j = 0; j < bufferLayoutCount; j++) {
125-
if (!buffers.hasNext()) {
126-
throw new IllegalArgumentException(
127-
"no more buffers for field " + field + ". Expected " + bufferLayoutCount);
124+
try {
125+
for (int j = 0; j < bufferLayoutCount; j++) {
126+
if (!buffers.hasNext()) {
127+
throw new IllegalArgumentException(
128+
"no more buffers for field " + field + ". Expected " + bufferLayoutCount);
129+
}
130+
ArrowBuf nextBuf = buffers.next();
131+
// for vectors without nulls, the buffer is empty, so there is no need to decompress it.
132+
ArrowBuf bufferToAdd =
133+
nextBuf.writerIndex() > 0 ? codec.decompress(vector.getAllocator(), nextBuf) : nextBuf;
134+
ownBuffers.add(bufferToAdd);
135+
if (decompressionNeeded) {
136+
nextBuf.getReferenceManager().retain();
137+
}
128138
}
129-
ArrowBuf nextBuf = buffers.next();
130-
// for vectors without nulls, the buffer is empty, so there is no need to decompress it.
131-
ArrowBuf bufferToAdd =
132-
nextBuf.writerIndex() > 0 ? codec.decompress(vector.getAllocator(), nextBuf) : nextBuf;
133-
ownBuffers.add(bufferToAdd);
134-
if (decompressionNeeded) {
135-
// decompression performed
136-
nextBuf.getReferenceManager().retain();
139+
try {
140+
vector.loadFieldBuffers(fieldNode, ownBuffers);
141+
} catch (RuntimeException e) {
142+
throw new IllegalArgumentException(
143+
"Could not load buffers for field " + field + ". error message: " + e.getMessage(), e);
137144
}
138-
}
139-
try {
140-
vector.loadFieldBuffers(fieldNode, ownBuffers);
145+
} finally {
141146
if (decompressionNeeded) {
142147
for (ArrowBuf buf : ownBuffers) {
143148
buf.close();
144149
}
145150
}
146-
} catch (RuntimeException e) {
147-
throw new IllegalArgumentException(
148-
"Could not load buffers for field " + field + ". error message: " + e.getMessage(), e);
149151
}
150152
List<Field> children = field.getChildren();
151153
if (children.size() > 0) {

vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter;
3838
import org.apache.arrow.vector.complex.writer.BigIntWriter;
3939
import org.apache.arrow.vector.complex.writer.IntWriter;
40+
import org.apache.arrow.vector.compression.CompressionCodec;
41+
import org.apache.arrow.vector.compression.CompressionUtil;
42+
import org.apache.arrow.vector.ipc.message.ArrowBodyCompression;
4043
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
4144
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
4245
import org.apache.arrow.vector.types.pojo.ArrowType;
@@ -348,4 +351,64 @@ public static VectorUnloader newVectorUnloader(FieldVector root) {
348351
VectorSchemaRoot vsr = new VectorSchemaRoot(schema.getFields(), fields, valueCount);
349352
return new VectorUnloader(vsr);
350353
}
354+
355+
@Test
356+
public void testLoadReleasesBuffersOnDecompressionFailure() {
357+
Schema schema = new Schema(asList(Field.nullable("int", new ArrowType.Int(32, true))));
358+
CompressionCodec.Factory failingFactory =
359+
new CompressionCodec.Factory() {
360+
@Override
361+
public CompressionCodec createCodec(CompressionUtil.CodecType codecType) {
362+
return new CompressionCodec() {
363+
@Override
364+
public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
365+
throw new UnsupportedOperationException();
366+
}
367+
368+
@Override
369+
public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
370+
throw new RuntimeException("simulated decompression failure");
371+
}
372+
373+
@Override
374+
public CompressionUtil.CodecType getCodecType() {
375+
return codecType;
376+
}
377+
};
378+
}
379+
380+
@Override
381+
public CompressionCodec createCodec(
382+
CompressionUtil.CodecType codecType, int compressionLevel) {
383+
return createCodec(codecType);
384+
}
385+
};
386+
387+
try (BufferAllocator testAllocator =
388+
allocator.newChildAllocator("test", 0, Integer.MAX_VALUE)) {
389+
try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, testAllocator)) {
390+
VectorLoader loader = new VectorLoader(root, failingFactory);
391+
ArrowBodyCompression compression =
392+
new ArrowBodyCompression(
393+
CompressionUtil.CodecType.LZ4_FRAME.getType(),
394+
org.apache.arrow.flatbuf.BodyCompressionMethod.BUFFER);
395+
List<ArrowFieldNode> nodes = asList(new ArrowFieldNode(1, 0));
396+
ArrowBuf validityBuf = testAllocator.buffer(8);
397+
validityBuf.writerIndex(8);
398+
ArrowBuf dataBuf = testAllocator.buffer(4);
399+
dataBuf.writerIndex(4);
400+
try (ArrowRecordBatch batch =
401+
new ArrowRecordBatch(1, nodes, asList(validityBuf, dataBuf), compression)) {
402+
RuntimeException ex =
403+
org.junit.jupiter.api.Assertions.assertThrows(
404+
RuntimeException.class, () -> loader.load(batch));
405+
assertTrue(ex.getMessage().contains("simulated decompression failure"));
406+
} finally {
407+
validityBuf.close();
408+
dataBuf.close();
409+
}
410+
}
411+
assertEquals(0, testAllocator.getAllocatedMemory());
412+
}
413+
}
351414
}

0 commit comments

Comments
 (0)