Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 42 additions & 59 deletions modules/rntuple.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -922,27 +922,37 @@ async function readHeaderFooter(tuple) {
});
}

function readEntry(rntuple, fieldName, entryIndex) {
function readEntry(rntuple, fieldName, clusterIndex, entryIndex) {
const builder = rntuple.builder,
field = builder.fieldDescriptors.find(f => f.fieldName === fieldName),
fieldData = rntuple._clusterData[fieldName];
columns = rntuple.fieldToColumns[fieldName];

if (!field)
throw new Error(`No descriptor for field ${fieldName}`);
if (!fieldData)
throw new Error(`No data for field ${fieldName}`);
if (!columns)
throw new Error(`No columns field ${fieldName}`);

// Detect and decode string fields
if (Array.isArray(fieldData) && fieldData.length === 2) {
const [offsets, payload] = fieldData,
start = entryIndex === 0 ? 0 : Number(offsets[entryIndex - 1]),
end = Number(offsets[entryIndex]),
decoded = payload.slice(start, end).join(''); // Convert to string
return decoded;

const pages = builder.pageLocations[clusterIndex]?.[columns[0].index]?.pages;
if (!pages)
throw new Error(`No pages found ${fieldName}`);

let pageid = 0;
while ((pageid < pages.length - 1) && (entryIndex >= Number(pages[pageid].numElements))) {
entryIndex -= Number(pages[pageid].numElements);
pageid++;
}

// Fallback: primitive type (e.g. int, float)
return fieldData[0][entryIndex];
if (field.typeName === 'std::string') {
// string extracted from two columns
const offsets = rntuple._clusterData[columns[0].index][pageid],
payload = rntuple._clusterData[columns[1].index][pageid],
start = entryIndex === 0 ? 0 : Number(offsets[entryIndex - 1]),
end = Number(offsets[entryIndex]);
return payload.slice(start, end).join(''); // Convert to string
}
const values = rntuple._clusterData[columns[0].index];
return values[pageid][entryIndex];
}

/** @summary Return field name for specified branch index
Expand All @@ -953,11 +963,11 @@ function getSelectorFieldName(selector, i) {
}

// Read and process the next data cluster from the RNTuple
function readNextCluster(rntuple, selector) {
async function readNextCluster(rntuple, selector) {
const builder = rntuple.builder;

// Add validation
if (!builder.clusterSummaries || builder.clusterSummaries.length === 0)
if (!builder.clusterSummaries)
throw new Error('No cluster summaries available - possibly incomplete file reading');

const clusterIndex = selector.currentCluster,
Expand All @@ -967,6 +977,11 @@ function readNextCluster(rntuple, selector) {
// Collect only selected field names from selector
selectedFields = [];

if (!clusterSummary) {
selector.Terminate(clusterIndex > 0);
return false;
}

for (let i = 0; i < selector.numBranches(); ++i)
selectedFields.push(getSelectorFieldName(selector, i));

Expand All @@ -993,7 +1008,7 @@ function readNextCluster(rntuple, selector) {
// Early exit if no pages to read (i.e., no selected fields matched)
if (pages.length === 0) {
selector.Terminate(false);
return Promise.resolve();
return false;
}

// Build flat array of [offset, size, offset, size, ...] to read pages
Expand Down Expand Up @@ -1040,68 +1055,35 @@ function readNextCluster(rntuple, selector) {
});

return Promise.all(unzipPromises).then(unzipBlobs => {
rntuple._clusterData = {}; // store deserialized data per field
rntuple._clusterData = {}; // store deserialized data per column index

for (let i = 0; i < unzipBlobs.length; ++i) {
const blob = unzipBlobs[i];
// Ensure blob is a DataView
if (!(blob instanceof DataView))
throw new Error(`Invalid blob type for page ${i}: ${Object.prototype.toString.call(blob)}`);
const {
page,
colDesc
} = pages[i],
field = builder.fieldDescriptors[colDesc.fieldId],
values = builder.deserializePage(blob, colDesc, page);
const colDesc = pages[i].colDesc,
values = builder.deserializePage(blob, colDesc, pages[i].page);

// Support multiple representations (e.g., string fields with offsets + payload)
if (!rntuple._clusterData[field.fieldName])
rntuple._clusterData[field.fieldName] = [];

// splitting string fields into offset and payload components
if (field.typeName === 'std::string') {
if (
colDesc.coltype === ENTupleColumnType.kIndex64 ||
colDesc.coltype === ENTupleColumnType.kIndex32 ||
colDesc.coltype === ENTupleColumnType.kSplitIndex64 ||
colDesc.coltype === ENTupleColumnType.kSplitIndex32
) // Index64/Index32
rntuple._clusterData[field.fieldName][0] = values; // Offsets
else if (colDesc.coltype === ENTupleColumnType.kChar)
rntuple._clusterData[field.fieldName][1] = values; // Payload
else
throw new Error(`Unsupported column type for string field: ${colDesc.coltype}`);
} else
rntuple._clusterData[field.fieldName][0] = values;
}
if (!rntuple._clusterData[colDesc.index])
rntuple._clusterData[colDesc.index] = [];

// Ensure string fields have ending offset for proper reconstruction of the last entry
for (const fieldName of selectedFields) {
const field = builder.fieldDescriptors.find(f => f.fieldName === fieldName),
colData = rntuple._clusterData[fieldName];
if (field.typeName === 'std::string') {
if (!Array.isArray(colData) || colData.length !== 2)
throw new Error(`String field '${fieldName}' must have 2 columns`);
if (colData[0].length !== builder.clusterSummaries[clusterIndex].numEntries)
throw new Error(`Malformed string field '${fieldName}': missing final offset`);
}
rntuple._clusterData[colDesc.index].push(values);
}

const numEntries = clusterSummary.numEntries;
for (let i = 0; i < numEntries; ++i) {
for (let b = 0; b < selector.numBranches(); ++b) {
const fieldName = getSelectorFieldName(selector, b),
tgtName = selector.nameOfBranch(b),
values = rntuple._clusterData[fieldName];
tgtName = selector.nameOfBranch(b);

if (!values)
throw new Error(`Missing values for selected field: ${fieldName}`);
selector.tgtobj[tgtName] = readEntry(rntuple, fieldName, i);
selector.tgtobj[tgtName] = readEntry(rntuple, fieldName, clusterIndex, i);
}
selector.Process();
selector.Process(selector.currentEntry++);
}

selector.Terminate(true);
return readNextCluster(rntuple, selector);
});
});
}
Expand All @@ -1112,6 +1094,7 @@ function rntupleProcess(rntuple, selector, args) {
return readHeaderFooter(rntuple).then(() => {
selector.Begin();
selector.currentCluster = 0;
selector.currentEntry = 0;
return readNextCluster(rntuple, selector, args);
}).then(() => selector);
}
Expand Down