Skip to content

Commit 458499b

Browse files
authored
Merge branch 'main' into typo
2 parents f400bfc + a71be7a commit 458499b

File tree

7 files changed

+112
-221
lines changed

7 files changed

+112
-221
lines changed

massa-async-pool/src/pool.rs

Lines changed: 35 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ use massa_db_exports::{
1010
use massa_models::{
1111
address::Address,
1212
async_msg::{
13-
AsyncMessage, AsyncMessageDeserializer, AsyncMessageInfo, AsyncMessageSerializer,
14-
AsyncMessageUpdate,
13+
AsyncMessage, AsyncMessageDeserializer, AsyncMessageSerializer, AsyncMessageUpdate,
1514
},
1615
async_msg_id::{AsyncMessageId, AsyncMessageIdSerializer},
1716
types::Applicable,
@@ -202,7 +201,8 @@ pub struct AsyncPool {
202201
/// Asynchronous pool configuration
203202
pub config: AsyncPoolConfig,
204203
pub db: ShareableMassaDBController,
205-
pub message_info_cache: BTreeMap<AsyncMessageId, AsyncMessageInfo>,
204+
/// Cache of final async messages.
205+
pub message_cache: BTreeMap<AsyncMessageId, AsyncMessage>,
206206
message_id_serializer: AsyncMessageIdSerializer,
207207
message_serializer: AsyncMessageSerializer,
208208
message_id_deserializer: AsyncMessageIdDeserializer,
@@ -215,7 +215,7 @@ impl AsyncPool {
215215
AsyncPool {
216216
config: config.clone(),
217217
db,
218-
message_info_cache: Default::default(),
218+
message_cache: Default::default(),
219219
message_id_serializer: AsyncMessageIdSerializer::new(),
220220
message_serializer: AsyncMessageSerializer::new(true),
221221
message_id_deserializer: AsyncMessageIdDeserializer::new(config.thread_count),
@@ -229,9 +229,9 @@ impl AsyncPool {
229229
}
230230
}
231231

232-
/// Recomputes the local message_info_cache after bootstrap or loading the state from disk
233-
pub fn recompute_message_info_cache(&mut self) {
234-
self.message_info_cache.clear();
232+
/// Recomputes the local message_cache after bootstrap or loading the state from disk
233+
pub fn recompute_message_cache(&mut self) {
234+
self.message_cache.clear();
235235

236236
let db = self.db.read();
237237

@@ -258,8 +258,8 @@ impl AsyncPool {
258258
.deserialize::<DeserializeError>(&serialized_message_id[ASYNC_POOL_PREFIX.len()..])
259259
.expect(MESSAGE_ID_DESER_ERROR);
260260

261-
if let Some(message) = self.fetch_message(&message_id) {
262-
self.message_info_cache.insert(message_id, message.into());
261+
if let Some(message) = self.load_message(&message_id) {
262+
self.message_cache.insert(message_id, message);
263263
}
264264

265265
// The -1 is to remove the IDENT byte at the end of the key
@@ -277,7 +277,7 @@ impl AsyncPool {
277277
self.db
278278
.write()
279279
.delete_prefix(ASYNC_POOL_PREFIX, STATE_CF, None);
280-
self.recompute_message_info_cache();
280+
self.recompute_message_cache();
281281
}
282282

283283
/// Applies pre-compiled `AsyncPoolChanges` to the pool without checking for overflows.
@@ -290,33 +290,28 @@ impl AsyncPool {
290290
match change {
291291
(id, SetUpdateOrDelete::Set(message)) => {
292292
self.put_entry(id, message.clone(), batch);
293-
self.message_info_cache
294-
.insert(*id, AsyncMessageInfo::from(message.clone()));
293+
self.message_cache.insert(*id, message.clone());
295294
}
296295

297296
(id, SetUpdateOrDelete::Update(message_update)) => {
298297
self.update_entry(id, message_update.clone(), batch);
299298

300-
self.message_info_cache
301-
.entry(*id)
302-
.and_modify(|message_info| {
303-
message_info.apply(message_update.clone());
304-
});
299+
self.message_cache.entry(*id).and_modify(|message_info| {
300+
message_info.apply(message_update.clone());
301+
});
305302
}
306303

307304
(id, SetUpdateOrDelete::Delete) => {
308305
self.delete_entry(id, batch);
309-
self.message_info_cache.remove(id);
306+
self.message_cache.remove(id);
310307
}
311308
}
312309
}
313310
}
314311

315-
/// Query a message from the database.
316-
///
317-
/// This should only be called when we know we want to execute the message.
318-
/// Otherwise, we should use the `message_info_cache`.
319-
pub fn fetch_message(&self, message_id: &AsyncMessageId) -> Option<AsyncMessage> {
312+
/// Query a message from the database and deserialize it.
313+
/// This is heavy. Use the cached version whenever possible.
314+
fn load_message(&self, message_id: &AsyncMessageId) -> Option<AsyncMessage> {
320315
let db = self.db.read();
321316

322317
let mut serialized_message_id = Vec::new();
@@ -344,19 +339,15 @@ impl AsyncPool {
344339
}
345340
}
346341

347-
/// Query a vec of messages from the database.
348-
///
349-
/// This should only be called when we know we want to execute the messages.
350-
/// Otherwise, we should use the `message_info_cache`.
342+
/// Query a vec of messages from cache.
351343
pub fn fetch_messages(
352344
&self,
353345
message_ids: &[AsyncMessageId],
354346
) -> Vec<(AsyncMessageId, Option<AsyncMessage>)> {
355347
let mut fetched_messages = Vec::with_capacity(message_ids.len());
356348

357349
for message_id in message_ids {
358-
let message = self.fetch_message(message_id);
359-
fetched_messages.push((*message_id, message));
350+
fetched_messages.push((*message_id, self.message_cache.get(message_id).cloned()));
360351
}
361352

362353
fetched_messages
@@ -1162,7 +1153,7 @@ mod tests {
11621153
let db: ShareableMassaDBController = Arc::new(RwLock::new(
11631154
Box::new(MassaDB::new(db_config)) as Box<(dyn MassaDBController + 'static)>,
11641155
));
1165-
let pool = AsyncPool::new(config, db);
1156+
let mut pool = AsyncPool::new(config, db);
11661157

11671158
let mut serialized = Vec::new();
11681159
let serializer = AsyncPoolSerializer::new();
@@ -1190,7 +1181,7 @@ mod tests {
11901181
pool.db
11911182
.write()
11921183
.write_batch(batch, versioning_batch, Some(slot_1));
1193-
1184+
pool.recompute_message_cache();
11941185
let message_ids = vec![message_id, message2_id];
11951186
let to_ser_ = pool.fetch_messages(&message_ids);
11961187
let to_ser = to_ser_
@@ -1235,7 +1226,7 @@ mod tests {
12351226
let db: ShareableMassaDBController = Arc::new(RwLock::new(
12361227
Box::new(MassaDB::new(db_config)) as Box<(dyn MassaDBController + 'static)>,
12371228
));
1238-
let pool = AsyncPool::new(config, db);
1229+
let mut pool = AsyncPool::new(config, db);
12391230

12401231
let mut serialized = Vec::new();
12411232
let serializer = AsyncPoolSerializer::new();
@@ -1261,6 +1252,7 @@ mod tests {
12611252
pool.db
12621253
.write()
12631254
.write_batch(batch, versioning_batch, Some(slot_1));
1255+
pool.recompute_message_cache();
12641256

12651257
let message_ids = vec![message_id, message2_id];
12661258
let to_ser_ = pool.fetch_messages(&message_ids);
@@ -1303,7 +1295,7 @@ mod tests {
13031295
let db: ShareableMassaDBController = Arc::new(RwLock::new(
13041296
Box::new(MassaDB::new(db_config)) as Box<(dyn MassaDBController + 'static)>,
13051297
));
1306-
let pool = AsyncPool::new(config, db);
1298+
let mut pool = AsyncPool::new(config, db);
13071299

13081300
let message = create_message();
13091301
let message_id = message.compute_id();
@@ -1320,6 +1312,7 @@ mod tests {
13201312
pool.db
13211313
.write()
13221314
.write_batch(batch, versioning_batch, Some(slot_1));
1315+
pool.recompute_message_cache();
13231316

13241317
let content = dump_column(pool.db.clone(), "state");
13251318
assert_eq!(content.len(), 26); // 2 entries added, split in 13 prefix
@@ -1337,6 +1330,7 @@ mod tests {
13371330
pool.db
13381331
.write()
13391332
.write_batch(batch2, versioning_batch2, Some(slot_2));
1333+
pool.recompute_message_cache();
13401334

13411335
let content = dump_column(pool.db.clone(), "state");
13421336
assert_eq!(content.len(), 13);
@@ -1363,7 +1357,7 @@ mod tests {
13631357
));
13641358
let mut pool = AsyncPool::new(config, db);
13651359

1366-
assert!(pool.message_info_cache.is_empty());
1360+
assert!(pool.message_cache.is_empty());
13671361

13681362
let message = create_message();
13691363
let message_id = message.compute_id();
@@ -1386,10 +1380,10 @@ mod tests {
13861380

13871381
let mut batch = DBBatch::new();
13881382
pool.apply_changes_to_batch(&changes, &mut batch);
1389-
assert_eq!(pool.message_info_cache.len() as u64, EXPECT_CACHE_COUNT + 1);
1383+
assert_eq!(pool.message_cache.len() as u64, EXPECT_CACHE_COUNT + 1);
13901384

13911385
pool.reset();
1392-
assert!(pool.message_info_cache.is_empty());
1386+
assert!(pool.message_cache.is_empty());
13931387
}
13941388

13951389
#[test]
@@ -1415,7 +1409,7 @@ mod tests {
14151409
as Box<(dyn MassaDBController + 'static)>));
14161410
let mut pool = AsyncPool::new(config.clone(), db);
14171411

1418-
assert!(pool.message_info_cache.is_empty());
1412+
assert!(pool.message_cache.is_empty());
14191413

14201414
let message = create_message();
14211415
let message_id = message.compute_id();
@@ -1438,9 +1432,9 @@ mod tests {
14381432

14391433
let mut batch = DBBatch::new();
14401434
pool.apply_changes_to_batch(&changes, &mut batch);
1441-
assert_eq!(pool.message_info_cache.len() as u64, EXPECT_CACHE_COUNT + 1);
1435+
assert_eq!(pool.message_cache.len() as u64, EXPECT_CACHE_COUNT + 1);
14421436

1443-
let message_info_cache1 = pool.message_info_cache.clone();
1437+
let message_cache1 = pool.message_cache.clone();
14441438

14451439
let versioning_batch = DBBatch::new();
14461440
let slot_1 = Slot::new(1, 0);
@@ -1455,8 +1449,8 @@ mod tests {
14551449
));
14561450
let mut pool2 = AsyncPool::new(config, db2);
14571451

1458-
pool2.recompute_message_info_cache();
1452+
pool2.recompute_message_cache();
14591453

1460-
assert_eq!(pool2.message_info_cache, message_info_cache1);
1454+
assert_eq!(pool2.message_cache, message_cache1);
14611455
}
14621456
}

massa-execution-worker/src/context.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use massa_final_state::{FinalStateController, StateChanges};
3232
use massa_hash::Hash;
3333
use massa_ledger_exports::LedgerChanges;
3434
use massa_models::address::ExecutionAddressCycleInfo;
35-
use massa_models::async_msg::{AsyncMessage, AsyncMessageInfo};
35+
use massa_models::async_msg::AsyncMessage;
3636
use massa_models::async_msg_id::AsyncMessageId;
3737
use massa_models::block_id::BlockIdSerializer;
3838
use massa_models::bytecode::Bytecode;
@@ -76,7 +76,7 @@ pub struct ExecutionContextSnapshot {
7676
pub deferred_calls_changes: DeferredCallRegistryChanges,
7777

7878
/// the associated message infos for the speculative async pool
79-
pub message_infos: BTreeMap<AsyncMessageId, AsyncMessageInfo>,
79+
pub message_infos: BTreeMap<AsyncMessageId, AsyncMessage>,
8080

8181
/// speculative list of operations executed
8282
pub executed_ops: ExecutedOpsChanges,
@@ -1050,11 +1050,9 @@ impl ExecutionContext {
10501050
let deferred_credits_transfers = self.execute_deferred_credits(&slot);
10511051

10521052
// settle emitted async messages and reimburse the senders of deleted messages
1053-
let deleted_messages = self.speculative_async_pool.settle_slot(
1054-
&slot,
1055-
&self.speculative_ledger.added_changes,
1056-
true,
1057-
);
1053+
let deleted_messages = self
1054+
.speculative_async_pool
1055+
.settle_slot(&slot, &self.speculative_ledger.added_changes);
10581056

10591057
let mut cancel_async_message_transfers = vec![];
10601058
for (_msg_id, msg) in deleted_messages {

massa-execution-worker/src/execution.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -327,11 +327,7 @@ impl ExecutionState {
327327
.inc_sc_messages_final_by(exec_out_2.state_changes.async_pool_changes.0.len());
328328

329329
self.massa_metrics.set_async_message_pool_size(
330-
self.final_state
331-
.read()
332-
.get_async_pool()
333-
.message_info_cache
334-
.len(),
330+
self.final_state.read().get_async_pool().message_cache.len(),
335331
);
336332

337333
self.massa_metrics.inc_executed_final_slot();

0 commit comments

Comments
 (0)