Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 34 additions & 33 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::db::persistence::PersistenceProvider;
use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata};
use crate::db::{self, spawn_tx_metrics_recorder};
use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor};
use crate::host::module_host::ModuleRuntime as _;
use crate::host::v8::V8Runtime;
use crate::host::ProcedureCallError;
use crate::messages::control_db::{Database, HostType};
Expand All @@ -18,7 +17,7 @@ use crate::subscription::module_subscription_actor::ModuleSubscriptions;
use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager, TransactionOffset};
use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
use crate::util::asyncify;
use crate::util::jobs::{JobCores, SingleCoreExecutor};
use crate::util::jobs::{AllocatedJobCore, JobCores};
use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, bail, Context};
use async_trait::async_trait;
Expand Down Expand Up @@ -702,39 +701,41 @@ async fn make_module_host(
program: Program,
energy_monitor: Arc<dyn EnergyMonitor>,
unregister: impl Fn() + Send + Sync + 'static,
executor: SingleCoreExecutor,
core: AllocatedJobCore,
) -> anyhow::Result<(Program, ModuleHost)> {
// `make_actor` is blocking, as it needs to compile the wasm to native code,
// which may be computationally expensive - sometimes up to 1s for a large module.
// TODO: change back to using `spawn_rayon` here - asyncify runs on tokio blocking
// threads, but those aren't for computation. Also, wasmtime uses rayon
// to run compilation in parallel, so it'll need to run stuff in rayon anyway.
asyncify(move || {
let database_identity = replica_ctx.database_identity;
let database_identity = replica_ctx.database_identity;

let mcc = ModuleCreationContext {
replica_ctx,
scheduler,
program: &program,
energy_monitor,
};
let mcc = ModuleCreationContext {
replica_ctx,
scheduler,
program_hash: program.hash,
energy_monitor,
};

let start = Instant::now();
let module_host = match host_type {
HostType::Wasm => {
let (actor, init_inst) = runtimes.wasmtime.make_actor(mcc)?;
match host_type {
HostType::Wasm => {
asyncify(move || {
let start = Instant::now();
let module = runtimes.wasmtime.make_actor(mcc, &program.bytes, core)?;
trace!("wasmtime::make_actor blocked for {:?}", start.elapsed());
ModuleHost::new(actor, init_inst, unregister, executor, database_identity)
}
HostType::Js => {
let (actor, init_inst) = runtimes.v8.make_actor(mcc)?;
trace!("v8::make_actor blocked for {:?}", start.elapsed());
ModuleHost::new(actor, init_inst, unregister, executor, database_identity)
}
};
Ok((program, module_host))
})
.await
let module_host = ModuleHost::new(module, unregister, database_identity);
Ok((program, module_host))
})
.await
}
HostType::Js => {
let start = Instant::now();
let module = runtimes.v8.make_actor(mcc, &program.bytes, core).await?;
trace!("v8::make_actor blocked for {:?}", start.elapsed());
let module_host = ModuleHost::new(module, unregister, database_identity);
Ok((program, module_host))
}
}
}

async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result<Program> {
Expand Down Expand Up @@ -762,7 +763,7 @@ async fn launch_module(
energy_monitor: Arc<dyn EnergyMonitor>,
module_logs: Option<ModuleLogsDir>,
runtimes: Arc<HostRuntimes>,
executor: SingleCoreExecutor,
core: AllocatedJobCore,
bsatn_rlb_pool: BsatnRowListBuilderPool,
) -> anyhow::Result<(Program, LaunchedModule)> {
let db_identity = database.database_identity;
Expand All @@ -780,7 +781,7 @@ async fn launch_module(
program,
energy_monitor.clone(),
on_panic,
executor,
core,
)
.await?;

Expand Down Expand Up @@ -1018,7 +1019,7 @@ impl Host {
page_pool: PagePool,
database: Database,
program: Program,
executor: SingleCoreExecutor,
core: AllocatedJobCore,
bsatn_rlb_pool: BsatnRowListBuilderPool,
) -> anyhow::Result<Arc<ModuleInfo>> {
let (db, _connected_clients) = RelationalDB::open(
Expand All @@ -1042,7 +1043,7 @@ impl Host {
Arc::new(NullEnergyMonitor),
None,
runtimes.clone(),
executor,
core,
bsatn_rlb_pool,
)
.await?;
Expand Down Expand Up @@ -1076,7 +1077,7 @@ impl Host {
policy: MigrationPolicy,
energy_monitor: Arc<dyn EnergyMonitor>,
on_panic: impl Fn() + Send + Sync + 'static,
executor: SingleCoreExecutor,
core: AllocatedJobCore,
) -> anyhow::Result<UpdateDatabaseResult> {
let replica_ctx = &self.replica_ctx;
let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone());
Expand All @@ -1089,7 +1090,7 @@ impl Host {
program,
energy_monitor,
on_panic,
executor,
core,
)
.await?;

Expand Down Expand Up @@ -1253,7 +1254,7 @@ pub(crate) async fn extract_schema_with_pools(
initial_program: program.hash,
};

let core = SingleCoreExecutor::in_current_tokio_runtime();
let core = AllocatedJobCore::default();
let module_info =
Host::try_init_in_memory_to_check(runtimes, page_pool, database, program, core, bsatn_rlb_pool).await?;
// this should always succeed, but sometimes it doesn't
Expand Down
7 changes: 5 additions & 2 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::subscription::module_subscription_manager::{from_tx_offset, Transacti
use crate::util::prometheus_handle::IntGaugeExt;
use chrono::{DateTime, Utc};
use core::mem;
use futures::TryFutureExt;
use parking_lot::{Mutex, MutexGuard};
use smallvec::SmallVec;
use spacetimedb_client_api_messages::energy::EnergyQuanta;
Expand Down Expand Up @@ -865,7 +866,8 @@ impl InstanceEnv {
// TODO(perf): Stash a long-lived `Client` in the env somewhere, rather than building a new one for each call.
let execute_fut = reqwest::Client::new().execute(reqwest);

let response_fut = async {
// Run the future that does IO work on a tokio worker thread, where it's more efficent.
let response_fut = tokio::spawn(async {
// `reqwest::Error` may contain sensitive info, namely the full URL with query params.
// We'll strip those with `strip_query_params_from_eqwest_error`
// after `await`ing `response_fut` below.
Expand All @@ -880,7 +882,8 @@ impl InstanceEnv {
let body = http_body_util::BodyExt::collect(body).await?.to_bytes();

Ok((response, body))
};
})
.unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()));

let database_identity = *self.database_identity();

Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/host/module_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{
energy::EnergyMonitor,
host::{module_host::ModuleInfo, wasm_common::module_host_actor::DescribeError, Scheduler},
module_host_context::ModuleCreationContextLimited,
module_host_context::ModuleCreationContext,
replica_context::ReplicaContext,
};
use spacetimedb_lib::{Identity, RawModuleDef};
Expand All @@ -13,7 +13,7 @@ use std::sync::Arc;

/// Builds a [`ModuleCommon`] from a [`RawModuleDef`].
pub fn build_common_module_from_raw(
mcc: ModuleCreationContextLimited,
mcc: ModuleCreationContext,
raw_def: RawModuleDef,
) -> Result<ModuleCommon, ValidationErrors> {
// Perform a bunch of validation on the raw definition.
Expand Down
Loading
Loading