Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
da330bd
small block read
iaojnh May 14, 2026
a5077f3
buffer write
iaojnh May 15, 2026
6ecb2b5
fix
iaojnh May 15, 2026
be1d0f4
fix
iaojnh May 15, 2026
9290c3e
upd
iaojnh May 18, 2026
7b0db62
upd
iaojnh May 19, 2026
01a46f6
upd
iaojnh May 19, 2026
c60900b
fix
iaojnh May 19, 2026
f0b9898
fix
iaojnh May 19, 2026
4997a1f
fix
iaojnh May 19, 2026
4bece9a
fix
iaojnh May 19, 2026
91e7b7f
fix
iaojnh May 19, 2026
f78fe39
fix
iaojnh May 19, 2026
21081e6
fix
iaojnh May 19, 2026
85f89dc
fix
iaojnh May 20, 2026
9e58670
Merge branch 'main' into fix/exclude-arrow
iaojnh May 20, 2026
46bf9aa
Merge branch 'alibaba:main' into main
iaojnh May 21, 2026
a05c38d
Merge branch 'main' into feat/buffer-write
iaojnh May 21, 2026
5f8a745
fix
iaojnh May 21, 2026
4940ef0
fix
iaojnh May 22, 2026
f545524
fix
iaojnh May 22, 2026
29989bc
Merge branch 'main' into feat/buffer-write
iaojnh May 22, 2026
23be06e
clang format
iaojnh May 22, 2026
73d5010
fix
iaojnh May 25, 2026
d6547aa
Merge branch 'alibaba:main' into main
iaojnh May 25, 2026
67742c0
fix
iaojnh May 25, 2026
fcce41d
fix
iaojnh May 25, 2026
4b8f2e6
fix
iaojnh May 25, 2026
70323d3
fix
iaojnh May 25, 2026
31266ef
clang format
iaojnh May 25, 2026
c9a4d94
Merge branch 'alibaba:main' into main
iaojnh May 26, 2026
61ce959
Merge branch 'main' into feat/buffer-write
iaojnh May 26, 2026
881a0b0
fix compile
iaojnh May 26, 2026
af4413b
fix
iaojnh May 26, 2026
6c0bc81
clang format
iaojnh May 26, 2026
351b546
fix
iaojnh May 26, 2026
235cffd
Merge branch 'main' into feat/buffer-write
iaojnh May 26, 2026
01d0884
Merge branch 'main' into feat/buffer-write
iaojnh May 27, 2026
1eec933
fix
iaojnh May 27, 2026
1b1c221
clang format
iaojnh May 27, 2026
6fa450f
fix
iaojnh May 28, 2026
59f80c1
clang format
iaojnh May 28, 2026
ca7cffc
Merge branch 'main' into feat/buffer-write
iaojnh May 28, 2026
1ddc960
fix
iaojnh May 28, 2026
bf11afe
add buffer storage write ut
iaojnh May 28, 2026
327bf43
fix ut
iaojnh May 28, 2026
f5f334c
fix for pr comment
iaojnh May 29, 2026
e376f38
Merge branch 'main' into feat/buffer-write
iaojnh May 29, 2026
f9063e5
add ut
iaojnh May 29, 2026
bdeaa63
clang format
iaojnh May 29, 2026
e796a31
fix ut
iaojnh May 29, 2026
971da98
fix ut
iaojnh May 29, 2026
5bff8b6
Merge branch 'alibaba:main' into main
iaojnh May 29, 2026
e7ecbc3
Merge branch 'main' into feat/buffer-write
iaojnh May 29, 2026
f17f45b
fix ut
iaojnh May 29, 2026
3d15476
Merge branch 'main' into feat/buffer-write
iaojnh May 29, 2026
12b1337
fix
iaojnh May 29, 2026
9b2edc5
clang format
iaojnh May 30, 2026
de292b0
fix
iaojnh May 30, 2026
3a12126
fix
iaojnh May 30, 2026
9a5cf34
clang format
iaojnh May 30, 2026
6e2c397
fix
iaojnh May 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
452 changes: 386 additions & 66 deletions src/ailego/buffer/vector_page_table.cc

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/core/algorithm/flat/flat_streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ FlatStreamer<BATCH_SIZE>::FlatStreamer() : entity_(stats_) {}

template <size_t BATCH_SIZE>
FlatStreamer<BATCH_SIZE>::~FlatStreamer() {
if (state_ == STATE_INITED) {
if (state_ == STATE_INITED || state_ == STATE_OPENED) {
this->cleanup();
}
}
Expand Down
22 changes: 16 additions & 6 deletions src/core/algorithm/flat/flat_streamer_entity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,20 @@ int FlatStreamerEntity::add(uint64_t key, const void *vec, size_t size) {

IndexStorage::MemoryBlock head_block;
this->get_head_block(head_block);
const BlockLocation *bl =
reinterpret_cast<const BlockLocation *>(head_block.data());
if (ailego_unlikely(bl == nullptr)) {
LOG_ERROR("Failed to get block loc");
return IndexError_ReadData;
BlockLocation block;
{
const BlockLocation *bl =
reinterpret_cast<const BlockLocation *>(head_block.data());
if (ailego_unlikely(bl == nullptr)) {
LOG_ERROR("Failed to get block loc");
return IndexError_ReadData;
}
block = *bl;
}
BlockLocation block = *bl;
// Release the head block reference early so that the buffer pool ref_count
// and memory budget held by it do not block subsequent acquire/evict in this
// function (alloc_block / add_to_block may compete for the same memory).
head_block.reset(nullptr);

if (!this->is_valid_block(block)) {
int ret = this->alloc_block(block, &block);
Expand Down Expand Up @@ -922,6 +929,9 @@ int FlatStreamerEntity::add_vector_with_id(const uint32_t id, const void *query,
this->get_head_block(head_block);
BlockLocation block =
*reinterpret_cast<const BlockLocation *>(head_block.data());
// Release buffer-pool pin before any alloc_block() call that may trigger
// append_segment() and rebuild the pool (same reason as in add()).
head_block.reset(nullptr);
if (!this->is_valid_block(block)) {
int ret = this->alloc_block(block, &block);
if (ailego_unlikely(ret != 0)) {
Expand Down
39 changes: 17 additions & 22 deletions src/core/algorithm/hnsw/hnsw_index_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class HnswIndexHashMap {
items_(reinterpret_cast<const Item *>(data)) {}
//! Return a empty loc or the key item loc

Slot(Chunk::Pointer &&chunk, IndexStorage::MemoryBlock &&mem_block)
: chunk_(std::move(chunk)), items_block_(std::move(mem_block)) {
items_ = reinterpret_cast<const Item *>(items_block_.data());
Slot(Chunk::Pointer &&chunk, std::vector<char> &&local_data)
: chunk_(std::move(chunk)), local_data_(std::move(local_data)) {
items_ = reinterpret_cast<const Item *>(local_data_.data());
}
const_iterator find(key_type key, uint32_t max_items, uint32_t mask) const {
auto it = &items_[key & mask];
Expand Down Expand Up @@ -73,8 +73,8 @@ class HnswIndexHashMap {

private:
Chunk::Pointer chunk_{};
const Item *items_{nullptr}; // point to chunk data
IndexStorage::MemoryBlock items_block_{};
const Item *items_{nullptr}; // point to local_data_
std::vector<char> local_data_{};
};

public:
Expand Down Expand Up @@ -114,9 +114,9 @@ class HnswIndexHashMap {
}

int cleanup(void) {
broker_.reset();
slots_.clear();
slots_.shrink_to_fit();
broker_.reset();
mask_bits_ = 0U;
slot_items_ = 0U;
slot_loc_mask_ = 0U;
Expand All @@ -141,7 +141,6 @@ class HnswIndexHashMap {
auto idx = key >> mask_bits_;
if (idx >= slots_.size()) {
if (ailego_unlikely(idx >= slots_.capacity())) {
LOG_ERROR("no space to insert");
return false;
}
for (auto i = slots_.size(); i <= idx; ++i) {
Expand All @@ -152,7 +151,6 @@ class HnswIndexHashMap {
}
auto it = slots_[idx].find(key, slot_items_, slot_loc_mask_);
if (ailego_unlikely(it == nullptr)) {
LOG_ERROR("no space to insert");
return false;
}

Expand All @@ -179,14 +177,10 @@ class HnswIndexHashMap {
LOG_ERROR("Chunk resize failed, size=%zu", size);
return false;
}
//! Read the whole data to memory
IndexStorage::MemoryBlock data_block;
if (ailego_unlikely(chunk->read(0U, data_block, size) != size)) {
LOG_ERROR("Chunk read failed, size=%zu", size);
return false;
}

slots_.emplace_back(std::move(chunk), std::move(data_block));
//! Use a local zero-initialized buffer; new chunks contain all zeros,
//! so no buffer-pool read is needed and no ref_count is pinned.
std::vector<char> local_buf(size, 0);
slots_.emplace_back(std::move(chunk), std::move(local_buf));
return true;
}

Expand All @@ -208,13 +202,14 @@ class HnswIndexHashMap {
i, chunk->data_size(), size);
return IndexError_InvalidFormat;
}
//! Read the whole data to memory
IndexStorage::MemoryBlock data_block;
if (ailego_unlikely(chunk->read(0U, data_block, size) != size)) {
LOG_ERROR("Chunk read failed, size=%zu", size);
return false;
//! Copy chunk data into a local buffer via fetch() so that no
//! buffer-pool block is pinned for the lifetime of the Slot.
std::vector<char> local_buf(size);
if (ailego_unlikely(chunk->fetch(0U, local_buf.data(), size) != size)) {
LOG_ERROR("Chunk fetch failed, size=%zu", size);
return IndexError_InvalidFormat;
}
slots_.emplace_back(std::move(chunk), std::move(data_block));
slots_.emplace_back(std::move(chunk), std::move(local_buf));
}
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/algorithm/hnsw/hnsw_streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace core {
HnswStreamer::HnswStreamer() = default;

HnswStreamer::~HnswStreamer() {
if (state_ == STATE_INITED) {
if (state_ == STATE_INITED || state_ == STATE_OPENED) {
this->cleanup();
}
}
Expand Down
20 changes: 11 additions & 9 deletions src/core/algorithm/hnsw/hnsw_streamer_entity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ int HnswStreamerEntity::init(size_t max_doc_cnt) {
std::lock_guard<std::mutex> lock(mutex_);
broker_ = std::make_shared<ChunkBroker>(stats_);
upper_neighbor_index_ = std::make_shared<NIHashMap>();
upper_neighbor_rw_mutex_ = std::make_shared<std::shared_mutex>();
keys_map_lock_ = std::make_shared<ailego::SharedMutex>();
keys_map_ = std::make_shared<HashMap<key_t, node_id_t>>();
if (!keys_map_ || !upper_neighbor_index_ || !broker_ || !keys_map_lock_) {
Expand Down Expand Up @@ -767,9 +768,10 @@ const HnswEntity::Pointer HnswStreamerEntity::clone() const {
HnswStreamerEntity *entity = new (std::nothrow) HnswStreamerEntity(
stats_, header(), chunk_size_, node_index_mask_bits_,
upper_neighbor_mask_bits_, filter_same_key_, get_vector_enabled_,
upper_neighbor_index_, keys_map_lock_, keys_map_, use_key_info_map_,
std::move(node_chunks), std::move(upper_neighbor_chunks), broker_,
node_chunk_bases_, upper_neighbor_chunk_bases_);
upper_neighbor_index_, upper_neighbor_rw_mutex_, keys_map_lock_,
keys_map_, use_key_info_map_, std::move(node_chunks),
std::move(upper_neighbor_chunks), broker_, node_chunk_bases_,
upper_neighbor_chunk_bases_);
if (ailego_unlikely(!entity)) {
LOG_ERROR("HnswStreamerEntity new failed");
}
Expand Down Expand Up @@ -800,9 +802,9 @@ const HnswEntity::Pointer HnswMmapStreamerEntity::clone() const {
auto *entity = new (std::nothrow) HnswMmapStreamerEntity(
stats_, header(), chunk_size_, node_index_mask_bits_,
upper_neighbor_mask_bits_, filter_same_key_, get_vector_enabled_,
upper_neighbor_index_, keys_map_lock_, keys_map_, use_key_info_map_,
std::move(node_chunks), std::move(upper_neighbor_chunks), broker_,
nullptr, nullptr);
upper_neighbor_index_, upper_neighbor_rw_mutex_, keys_map_lock_,
keys_map_, use_key_info_map_, std::move(node_chunks),
std::move(upper_neighbor_chunks), broker_, nullptr, nullptr);
if (ailego_unlikely(!entity)) {
LOG_ERROR("HnswMmapStreamerEntity new failed");
}
Expand Down Expand Up @@ -833,9 +835,9 @@ const HnswEntity::Pointer HnswContiguousStreamerEntity::clone() const {
auto *entity = new (std::nothrow) HnswContiguousStreamerEntity(
stats_, header(), chunk_size_, node_index_mask_bits_,
upper_neighbor_mask_bits_, filter_same_key_, get_vector_enabled_,
upper_neighbor_index_, keys_map_lock_, keys_map_, use_key_info_map_,
std::move(node_chunks), std::move(upper_neighbor_chunks), broker_,
nullptr, nullptr);
upper_neighbor_index_, upper_neighbor_rw_mutex_, keys_map_lock_,
keys_map_, use_key_info_map_, std::move(node_chunks),
std::move(upper_neighbor_chunks), broker_, nullptr, nullptr);
if (ailego_unlikely(!entity)) {
LOG_ERROR("HnswContiguousStreamerEntity new failed");
return HnswEntity::Pointer();
Expand Down
71 changes: 54 additions & 17 deletions src/core/algorithm/hnsw/hnsw_streamer_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <iostream>
#include <memory>
#include <mutex>
#include <shared_mutex>
#if defined(__linux__) || defined(__APPLE__)
#include <sys/mman.h>
#endif
Expand Down Expand Up @@ -246,19 +247,19 @@ class HnswStreamerEntity : public HnswEntity {
using NIHashMapPointer = std::shared_ptr<NIHashMap>;

//! Clone construct, used by clone method in subclasses
HnswStreamerEntity(IndexStreamer::Stats &stats, const HNSWHeader &hd,
size_t chunk_size, uint32_t node_index_mask_bits,
uint32_t upper_neighbor_mask_bits, bool filter_same_key,
bool get_vector_enabled,
const NIHashMapPointer &upper_neighbor_index,
std::shared_ptr<ailego::SharedMutex> &keys_map_lock,
const HashMapPointer<key_t, node_id_t> &keys_map,
bool use_key_info_map,
std::vector<Chunk::Pointer> &&node_chunks,
std::vector<Chunk::Pointer> &&upper_neighbor_chunks,
const ChunkBroker::Pointer &broker,
std::shared_ptr<std::vector<const uint8_t *>> node_bases,
std::shared_ptr<std::vector<const uint8_t *>> upper_bases)
HnswStreamerEntity(
IndexStreamer::Stats &stats, const HNSWHeader &hd, size_t chunk_size,
uint32_t node_index_mask_bits, uint32_t upper_neighbor_mask_bits,
bool filter_same_key, bool get_vector_enabled,
const NIHashMapPointer &upper_neighbor_index,
const std::shared_ptr<std::shared_mutex> &upper_neighbor_rw_mutex,
std::shared_ptr<ailego::SharedMutex> &keys_map_lock,
const HashMapPointer<key_t, node_id_t> &keys_map, bool use_key_info_map,
std::vector<Chunk::Pointer> &&node_chunks,
std::vector<Chunk::Pointer> &&upper_neighbor_chunks,
const ChunkBroker::Pointer &broker,
std::shared_ptr<std::vector<const uint8_t *>> node_bases,
std::shared_ptr<std::vector<const uint8_t *>> upper_bases)
: stats_(stats),
chunk_size_(chunk_size),
node_index_mask_bits_(node_index_mask_bits),
Expand All @@ -269,6 +270,7 @@ class HnswStreamerEntity : public HnswEntity {
filter_same_key_(filter_same_key),
get_vector_enabled_(get_vector_enabled),
use_key_info_map_(use_key_info_map),
upper_neighbor_rw_mutex_(upper_neighbor_rw_mutex),
upper_neighbor_index_(upper_neighbor_index),
keys_map_lock_(keys_map_lock),
keys_map_(keys_map),
Expand Down Expand Up @@ -323,6 +325,10 @@ class HnswStreamerEntity : public HnswEntity {

inline std::pair<uint32_t, uint32_t> get_upper_neighbor_chunk_loc(
level_t level, node_id_t id) const {
// Shared lock: concurrent readers are fine, but must synchronize with
// add_upper_neighbor's exclusive lock to avoid data-race on
// slots_.size() inside HnswIndexHashMap.
std::shared_lock<std::shared_mutex> lk(*upper_neighbor_rw_mutex_);
auto it = upper_neighbor_index_->find(id);
ailego_assert_abort(it != upper_neighbor_index_->end(),
"Get upper neighbor header failed");
Expand Down Expand Up @@ -370,6 +376,10 @@ class HnswStreamerEntity : public HnswEntity {
if (level == 0) {
return 0;
}
// Exclusive lock: protects upper_neighbor_chunks_.emplace_back() and
// upper_neighbor_index_->insert() from racing with concurrent find()
// calls in get_upper_neighbor_chunk_loc().
std::unique_lock<std::shared_mutex> lk(*upper_neighbor_rw_mutex_);
Chunk::Pointer chunk;
uint64_t chunk_offset = UINT64_MAX;
size_t neighbors_size = get_total_upper_neighbors_size(level);
Expand Down Expand Up @@ -408,17 +418,40 @@ class HnswStreamerEntity : public HnswEntity {
meta.level = level;
meta.index = (chunk_index << upper_neighbor_mask_bits_) |
(chunk_offset / upper_neighbor_size_);
size_t zero_start = chunk_offset;
chunk_offset += upper_neighbor_size_ * level;
if (ailego_unlikely(!upper_neighbor_index_->insert(id, meta.data))) {
LOG_ERROR("HashMap insert value failed");
return IndexError_Runtime;
}

// IMPORTANT: order matters here.
// 1) resize so the chunk's data_size covers the new region.
// 2) zero-fill the new region: storage backends like BufferStorage do
// NOT zero on resize -- only metadata is updated, and the underlying
// page may contain stale content from a previously-evicted page.
// Without this step, NeighborsHeader::neighbor_cnt is garbage and
// select_entry_point()/search_neighbors() iterate over garbage
// node_ids, eventually triggering find()'s assertion in
// get_upper_neighbor_chunk_loc().
// 3) ONLY THEN publish the entry to upper_neighbor_index_, so that any
// concurrent reader that finds this id already sees a properly
// zeroed upper-neighbor slot.
if (ailego_unlikely(chunk->resize(chunk_offset) != chunk_offset)) {
LOG_ERROR("Chunk resize to %zu failed", (size_t)chunk_offset);
return IndexError_Runtime;
}

// Use std::vector instead of a VLA: VLAs are a GNU extension and may
// produce different codegen / be rejected under clang/MSVC.
std::vector<char> zeros(neighbors_size, 0);
if (ailego_unlikely(chunk->write(zero_start, zeros.data(),
neighbors_size) != neighbors_size)) {
LOG_ERROR("Chunk write zeros failed");
return IndexError_Runtime;
}

if (ailego_unlikely(!upper_neighbor_index_->insert(id, meta.data))) {
LOG_ERROR("HashMap insert value failed");
return IndexError_Runtime;
}

return 0;
}

Expand Down Expand Up @@ -529,6 +562,10 @@ class HnswStreamerEntity : public HnswEntity {
protected:
IndexStreamer::Stats &stats_;
std::mutex mutex_{};
//! Guards upper_neighbor_index_ and upper_neighbor_chunks_ against
//! concurrent reads (find) and writes (insert/emplace_back).
//! Shared via shared_ptr so all clones synchronize on the SAME mutex.
mutable std::shared_ptr<std::shared_mutex> upper_neighbor_rw_mutex_{};
size_t max_index_size_{0UL};
uint32_t chunk_size_{kDefaultChunkSize};
uint32_t upper_neighbor_chunk_size_{kDefaultChunkSize};
Expand Down
43 changes: 24 additions & 19 deletions src/core/algorithm/hnsw_rabitq/hnsw_rabitq_index_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class HnswIndexHashMap {
items_(reinterpret_cast<const Item *>(data)) {}
//! Return a empty loc or the key item loc

Slot(Chunk::Pointer &&chunk, IndexStorage::MemoryBlock &&mem_block)
: chunk_(std::move(chunk)), items_block_(std::move(mem_block)) {
items_ = reinterpret_cast<const Item *>(items_block_.data());
Slot(Chunk::Pointer &&chunk, std::vector<char> &&local_data)
: chunk_(std::move(chunk)), local_data_(std::move(local_data)) {
items_ = reinterpret_cast<const Item *>(local_data_.data());
}
const_iterator find(key_type key, uint32_t max_items, uint32_t mask) const {
auto it = &items_[key & mask];
Expand Down Expand Up @@ -73,8 +73,8 @@ class HnswIndexHashMap {

private:
Chunk::Pointer chunk_{};
const Item *items_{nullptr}; // point to chunk data
IndexStorage::MemoryBlock items_block_{};
const Item *items_{nullptr}; // point to local_data_
std::vector<char> local_data_{};
};

public:
Expand Down Expand Up @@ -179,14 +179,18 @@ class HnswIndexHashMap {
LOG_ERROR("Chunk resize failed, size=%zu", size);
return false;
}
//! Read the whole data to memory
IndexStorage::MemoryBlock data_block;
if (ailego_unlikely(chunk->read(0U, data_block, size) != size)) {
LOG_ERROR("Chunk read failed, size=%zu", size);
return false;
}

slots_.emplace_back(std::move(chunk), std::move(data_block));
//! Use a local zero-initialized buffer; new chunks contain all zeros,
//! so no buffer-pool read is needed and no ref_count is pinned.
//! NOTE: Previously this used `chunk->read(0U, data_block, size)` which
//! returns a view into the underlying BufferPool page. That made the
//! Slot's `items_` pointer alias buffer-pool memory shared across
//! threads, which under clang -O3 release exposed a data race on
//! Slot::find()'s probing read of `it->second` (concurrent
//! const_cast writes from insert() were not reliably visible). Using a
//! private zero-initialized vector matches the HNSW (non-RABITQ)
//! implementation and avoids this race.
std::vector<char> local_buf(size, 0);
slots_.emplace_back(std::move(chunk), std::move(local_buf));
return true;
}

Expand All @@ -208,13 +212,14 @@ class HnswIndexHashMap {
i, chunk->data_size(), size);
return IndexError_InvalidFormat;
}
//! Read the whole data to memory
IndexStorage::MemoryBlock data_block;
if (ailego_unlikely(chunk->read(0U, data_block, size) != size)) {
LOG_ERROR("Chunk read failed, size=%zu", size);
return false;
//! Copy chunk data into a local buffer via fetch() so that no
//! buffer-pool block is pinned for the lifetime of the Slot.
std::vector<char> local_buf(size);
if (ailego_unlikely(chunk->fetch(0U, local_buf.data(), size) != size)) {
LOG_ERROR("Chunk fetch failed, size=%zu", size);
return IndexError_InvalidFormat;
}
slots_.emplace_back(std::move(chunk), std::move(data_block));
slots_.emplace_back(std::move(chunk), std::move(local_buf));
}
return 0;
}
Expand Down
Loading
Loading