[lake/lance] Refactor LanceArrowWriter#2345
Conversation
8630ee7 to
a1ed91b
Compare
|
Just to double-check, doesn’t this still retain the The original goal of the issue was to reuse the |
wuchong
left a comment
There was a problem hiding this comment.
Thanks @XuQianJin-Stars , I left some comments.
| <groupId>org.apache.fluss</groupId> | ||
| <artifactId>fluss-common</artifactId> | ||
| <version>${project.version}</version> | ||
| <scope>provided</scope> |
There was a problem hiding this comment.
Why need to include fluss-common into shaded jar? If this is needed for testing, we can add a test scope.
@luoyuxia could you also help to check this ?
There was a problem hiding this comment.
Yes, we don't need to include fluss-common
| private static List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf> | ||
| getFieldBuffers( | ||
| org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector vector) { | ||
| try { | ||
| Method method = vector.getClass().getMethod("getFieldBuffers"); | ||
| return (List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf>) | ||
| method.invoke(vector); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failed to get field buffers from shaded vector", e); | ||
| } | ||
| } | ||
|
|
||
| private static int getValueCount( | ||
| org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector vector) { | ||
| try { | ||
| Method method = vector.getClass().getMethod("getValueCount"); | ||
| return (int) method.invoke(vector); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failed to get value count from shaded vector", e); | ||
| } | ||
| } | ||
|
|
||
| private static ByteBuffer getByteBuffer( | ||
| org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf buf) { | ||
| try { | ||
| Method method = buf.getClass().getMethod("nioBuffer", long.class, int.class); | ||
| return (ByteBuffer) method.invoke(buf, 0L, (int) buf.capacity()); | ||
| } catch (Exception e) { | ||
| try { | ||
| Field field = buf.getClass().getDeclaredField("memoryAddress"); | ||
| field.setAccessible(true); | ||
| long address = (long) field.get(buf); | ||
| return null; | ||
| } catch (Exception ex) { | ||
| throw new RuntimeException("Failed to get ByteBuffer from ArrowBuf", ex); |
There was a problem hiding this comment.
Why introduce these reflections? It seems they provide these methods and can be directly invoked.
| FieldVector fieldVector = shadedRoot.getVector(i); | ||
| fieldWriters[i] = ArrowUtils.createArrowFieldWriter(fieldVector, rowType.getTypeAt(i)); |
There was a problem hiding this comment.
Can we direclty use ArrowWriter instead of ArrowFieldWriter? It seems here missed to call initFieldVector which has been done in ArrowWriter.
| shadedRoot.setRowCount(recordsCount); | ||
| } | ||
|
|
||
| public void reset() { |
There was a problem hiding this comment.
This is never called, should we call it in LanceLakeWriter#complete?
| @Override | ||
| public void write(LogRecord record) throws IOException { | ||
| arrowWriter.write(record); | ||
| buffer.add(record.getRow()); |
There was a problem hiding this comment.
Why buffer it first instead of writing to arrow directly? This introduce doubled memory overhead.
| List<FragmentMetadata> fragments = | ||
| Fragment.create(datasetUri, nonShadedAllocator, nonShadedRoot, writeParams); | ||
|
|
||
| allFragments.addAll(fragments); |
There was a problem hiding this comment.
No need to use a memory shared variable allFragments, this can be a local variable and as a return value of the flush() method? I can't find the reset of allFragments.
1baa6a1 to
458473b
Compare
wuchong
left a comment
There was a problem hiding this comment.
LGTM. I pushed a commit to revert the changes of LICENSE as we have reverted to shade fluss-common into fluss-lance.
Purpose
Linked issue: close #1569
Refactor fluss-lake-lance module to eliminate code duplication by reusing fluss-common's ArrowFieldWriter implementations.
Issue: The fluss-lake-lance module previously maintained a separate LanceArrowWriter with duplicate FieldWriter implementations because fluss-common uses shaded Arrow API while Lance library requires non-shaded Arrow API.
Solution: Implement ArrowDataConverter to bridge shaded and non-shaded Arrow via zero-copy off-heap memory sharing. This allows lance module to reuse fluss-common's ArrowFieldWriter implementations, eliminating the need for duplicate writer code.
Brief change log
1. Eliminated Code Duplication (~400 lines removed)
LanceArrowWriterclass and all its inner FieldWriter classes from fluss-lake-lanceArrowFieldWriter2. Created ArrowDataConverter (Zero-Copy Bridge)
3. Created ShadedArrowBatchWriter (Reuse Adapter)
ArrowUtils.createArrowFieldWriter()writeRow(),finish(),reset()4. Refactored LanceLakeWriter (Unified Writer Path)
InternalRow→ShadedArrowBatchWriter→ArrowDataConverter→Lance Fragment.create()5. Architecture Benefits
Tests
Unit Tests
LanceTieringTest.testTieringWriteTable(with/without partitions)LakeEnabledTableCreateITCase(table creation with various data types)Verification
API and Format
API Changes: None - This is an internal refactoring
LakeWriter,LanceLakeTieringFactory) remain unchangedInternal Changes:
Documentation
Documentation Updates: Not required
Code Quality Improvements: