Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,28 @@ path = "src/grpc/grpc-web/client.rs"
[[bin]]
name = "example-http-server"
path = "src/http/example-http-server.rs"

[[bin]]
name = "example-http-client"
path = "src/http/example-http-client.rs"

[[bin]]
name = "http-tls-server"
path = "src/http/http-tls-server.rs"
required-features = ["__tls"]

[[bin]]
name = "http-tls-client"
path = "src/http/http-tls-client.rs"
required-features = ["__tls"]

# shmipc
[[bin]]
name = "shmipc-thrift-server"
path = "src/thrift/shmipc/server.rs"
required-features = ["shmipc"]
[[bin]]
name = "shmipc-thrift-client"
path = "src/thrift/shmipc/client.rs"
required-features = ["shmipc"]

[dependencies]
anyhow.workspace = true
async-stream.workspace = true
Expand Down Expand Up @@ -166,6 +173,7 @@ volo-gen = { path = "./volo-gen" }

[features]
tls = ["rustls"]
shmipc = ["volo/shmipc", "volo-thrift/shmipc"]

__tls = []
rustls = ["__tls", "volo/rustls", "volo-grpc/rustls", "volo-http/rustls"]
Expand Down
56 changes: 56 additions & 0 deletions examples/src/thrift/shmipc/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use std::sync::LazyLock;

use volo_thrift::client::CallOpt;

static CLIENT: LazyLock<volo_gen::thrift_gen::hello::HelloServiceClient> = LazyLock::new(|| {
let uds_path = std::os::unix::net::SocketAddr::from_pathname("/tmp/hello_test.sock").unwrap();
volo_gen::thrift_gen::hello::HelloServiceClientBuilder::new("hello")
.address(volo::net::ShmipcAddr(uds_path))
.build()
});

#[volo::main]
async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::WARN)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

let config = volo::net::shmipc::config::Config {
share_memory_path_prefix: "/dev/shm/client.ipc.shm".to_string(),
mem_map_type: volo::net::shmipc::config::MemMapType::MemMapTypeMemFd,
..Default::default()
};
volo::net::shmipc::config::DEFAULT_SHMIPC_CONFIG.store(config.into());

let desc = volo_gen::thrift_gen::hello::HelloRequest::get_descriptor()
.unwrap()
.type_descriptor();
println!("{desc:?}");

loop {
let fm = pilota_thrift_fieldmask::FieldMaskBuilder::new(&desc, &["$.hello"])
.with_options(pilota_thrift_fieldmask::Options::new().with_black_list_mode(true))
.build()
.unwrap();
println!("{fm:?}");
let mut req = volo_gen::thrift_gen::hello::HelloRequest {
name: "volo".into(),
hello: Some("world".into()),
_field_mask: None,
};
req.set_field_mask(fm);

println!("req with field mask: {req:?}");
let resp = CLIENT
.clone()
.with_callopt(CallOpt::default())
.hello(req)
.await;
match resp {
Ok(info) => println!("{info:?}"),
Err(e) => eprintln!("{e:?}"),
}
// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
31 changes: 31 additions & 0 deletions examples/src/thrift/shmipc/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
pub struct S;

impl volo_gen::thrift_gen::hello::HelloService for S {
async fn hello(
&self,
req: volo_gen::thrift_gen::hello::HelloRequest,
) -> Result<volo_gen::thrift_gen::hello::HelloResponse, volo_thrift::ServerError> {
println!("req: {req:?}");
let resp = volo_gen::thrift_gen::hello::HelloResponse {
message: format!("Hello, {}!", req.name).into(),
_field_mask: None,
};
Ok(resp)
}
}

#[volo::main]
async fn main() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::WARN)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

let addr = std::os::unix::net::SocketAddr::from_pathname("/tmp/hello_test.sock").unwrap();
let addr = volo::net::Address::from(volo::net::ShmipcAddr(addr));

volo_gen::thrift_gen::hello::HelloServiceServer::new(S)
.run(addr)
.await
.unwrap();
}
4 changes: 2 additions & 2 deletions volo-grpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-grpc"
version = "0.12.1"
version = "0.12.2"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand All @@ -24,7 +24,7 @@ maintenance = { status = "actively-developed" }

[dependencies]
pilota.workspace = true
volo = { version = "0.12", path = "../volo" }
volo = { version = "0.12.2", path = "../volo" }
motore = { workspace = true, features = ["tower"] }
metainfo.workspace = true
async-broadcast.workspace = true
Expand Down
4 changes: 2 additions & 2 deletions volo-http/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-http"
version = "0.5.2"
version = "0.5.3"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand All @@ -19,7 +19,7 @@ keywords = ["async", "rpc", "http"]
maintenance = { status = "actively-developed" }

[dependencies]
volo = { version = "0.12", path = "../volo" }
volo = { version = "0.12.2", path = "../volo" }

ahash.workspace = true
bytes.workspace = true
Expand Down
6 changes: 4 additions & 2 deletions volo-thrift/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "volo-thrift"
version = "0.12.0"
version = "0.12.1"
edition.workspace = true
homepage.workspace = true
repository.workspace = true
Expand All @@ -19,7 +19,7 @@ keywords = ["async", "rpc", "thrift"]
maintenance = { status = "actively-developed" }

[dependencies]
volo = { version = "0.12", path = "../volo" }
volo = { version = "0.12.2", path = "../volo" }
pilota.workspace = true
motore.workspace = true
metainfo.workspace = true
Expand Down Expand Up @@ -58,3 +58,5 @@ multiplex = []
unsafe-codec = []
# This will use unwrap_unchecked instead of unwrap in some places.
unsafe_unchecked = ["volo/unsafe_unchecked"]

shmipc = ["volo/shmipc"]
26 changes: 18 additions & 8 deletions volo-thrift/src/codec/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use bytes::Bytes;
use linkedbytes::LinkedBytes;
use pilota::thrift::ThriftException;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, Interest};
use tracing::{trace, warn};
use volo::{net::ext::AsyncExt, util::buf_reader::BufReader};

use self::{framed::MakeFramedCodec, thrift::MakeThriftCodec, ttheader::MakeTTHeaderCodec};
Expand Down Expand Up @@ -131,9 +130,10 @@ impl<E: ZeroCopyEncoder, W: AsyncWrite + AsyncExt + Unpin + Send + Sync + 'stati

// first, we need to get the size of the message
let (real_size, malloc_size) = self.encoder.size(cx, &msg)?;
trace!(
tracing::trace!(
"[VOLO] codec encode message real size: {}, malloc size: {}",
real_size, malloc_size
real_size,
malloc_size
);
cx.stats_mut().set_write_size(real_size);

Expand Down Expand Up @@ -176,7 +176,7 @@ impl<E: ZeroCopyEncoder, W: AsyncWrite + AsyncExt + Unpin + Send + Sync + 'stati
malloc_size
);
e.append_msg(&msg);
warn!("[VOLO] thrift codec encode message error: {}", e);
tracing::warn!("[VOLO] thrift codec encode message error: {}", e);
Err(e)
}
}
Expand All @@ -191,11 +191,16 @@ impl<E: ZeroCopyEncoder, W: AsyncWrite + AsyncExt + Unpin + Send + Sync + 'stati
{
Ok(ready) => ready.is_read_closed() || ready.is_write_closed(),
Err(e) => {
warn!("[VOLO] thrift codec write half ready error: {}", e);
tracing::debug!("[VOLO] thrift codec write half ready error: {}", e);
true
}
}
}

#[cfg(feature = "shmipc")]
fn shmipc_helper(&self) -> volo::net::shmipc::ShmipcHelper {
self.writer.shmipc_helper()
}
}

pub struct DefaultDecoder<D, R> {
Expand All @@ -213,7 +218,7 @@ impl<D: ZeroCopyDecoder, R: AsyncRead + AsyncExt + Unpin + Send + Sync + 'static
) -> Result<Option<ThriftMessage<Msg>>, ThriftException> {
// just to check if we have reached EOF
if self.reader.fill_buf().await?.is_empty() {
trace!(
tracing::trace!(
"[VOLO] thrift codec decode message EOF, rpcinfo: {:?}",
cx.rpc_info()
);
Expand All @@ -224,7 +229,7 @@ impl<D: ZeroCopyDecoder, R: AsyncRead + AsyncExt + Unpin + Send + Sync + 'static
cx.stats_mut().record_decode_start_at();
cx.stats_mut().record_read_start_at();

trace!(
tracing::trace!(
"[VOLO] codec decode message received: {:?}",
self.reader.buffer()
);
Expand All @@ -234,10 +239,15 @@ impl<D: ZeroCopyDecoder, R: AsyncRead + AsyncExt + Unpin + Send + Sync + 'static

let end = std::time::Instant::now();
cx.stats_mut().record_decode_end_at();
trace!("[VOLO] thrift codec decode message cost: {:?}", end - start);
tracing::trace!("[VOLO] thrift codec decode message cost: {:?}", end - start);

res
}

#[cfg(feature = "shmipc")]
fn shmipc_helper(&self) -> volo::net::shmipc::ShmipcHelper {
self.reader.shmipc_helper()
}
}

/// `MkZC` is a shorthand for [`MakeZeroCopyCodec`].
Expand Down
10 changes: 10 additions & 0 deletions volo-thrift/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ pub trait Decoder: Send + Sync + 'static {
fn is_closed(&self) -> impl Future<Output = bool> + Send {
async { false }
}

#[cfg(feature = "shmipc")]
fn shmipc_helper(&self) -> ::volo::net::shmipc::ShmipcHelper {
::volo::net::shmipc::ShmipcHelper::none()
}
}

/// [`Encoder`] writes a [`ThriftMessage`] to an [`AsyncWrite`] and flushes the data.
Expand All @@ -38,6 +43,11 @@ pub trait Encoder: Send + Sync + 'static {
fn is_closed(&self) -> impl Future<Output = bool> + Send {
async { false }
}

#[cfg(feature = "shmipc")]
fn shmipc_helper(&self) -> ::volo::net::shmipc::ShmipcHelper {
::volo::net::shmipc::ShmipcHelper::none()
}
}

/// [`MakeCodec`] receives an [`AsyncRead`] and an [`AsyncWrite`] and returns a
Expand Down
9 changes: 9 additions & 0 deletions volo-thrift/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ impl<S, L, Req, MkC, SP> Server<S, L, Req, MkC, SP> {

#[cfg(feature = "multiplex")]
if self.multiplex {
#[cfg(feature = "shmipc")]
if peer_addr.as_ref().is_some_and(Address::is_shmipc) {
tracing::error!("multiplex is not supported when using shmipc");
let _ = rh.shmipc_helper().close().await;
continue;
}
tokio::spawn(handle_conn_multiplex(
rh,
wh,
Expand Down Expand Up @@ -417,6 +423,9 @@ async fn handle_conn<R, W, Req, Svc, Resp, MkC, SP>(

let (encoder, decoder) = make_codec.make_codec(rh, wh);

#[cfg(feature = "shmipc")]
let _guard = crate::codec::Decoder::shmipc_helper(&decoder).close_guard();

tracing::trace!(
"[VOLO] handle conn by ping-pong, peer_addr: {:?}",
peer_addr
Expand Down
7 changes: 7 additions & 0 deletions volo-thrift/src/transport/multiplex/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ where
type Error = io::Error;

async fn call(&self, target: Address) -> Result<Self::Response, Self::Error> {
#[cfg(feature = "shmipc")]
if target.is_shmipc() {
return Err(io::Error::new(
io::ErrorKind::Unsupported,
"shmipc does not support multiplex",
));
}
let make_transport = self.make_transport.clone();
let (rh, wh) = make_transport.make_transport(target.clone()).await?;
Ok(ThriftTransport::new(
Expand Down
Loading
Loading