From 2578b05daae6c56e38244b4658c25b3e0c0fc932 Mon Sep 17 00:00:00 2001 From: maowenjie <3252896864@qq.com> Date: Thu, 25 Dec 2025 22:02:22 +0800 Subject: [PATCH 1/6] bind_client_ip --- src/brpc/channel.cpp | 23 +++++++++++++++++++++- src/brpc/channel.h | 4 ++++ src/brpc/details/naming_service_thread.cpp | 3 ++- src/brpc/details/naming_service_thread.h | 1 + src/brpc/socket.cpp | 11 +++++++++-- src/brpc/socket.h | 1 + src/brpc/socket_map.cpp | 9 ++++++--- src/brpc/socket_map.h | 18 +++++++++++------ 8 files changed, 57 insertions(+), 13 deletions(-) diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index 0252e97d74..b57a6f95b0 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -77,6 +77,7 @@ ChannelSSLOptions* ChannelOptions::mutable_ssl_options() { static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) { if (opt.auth == NULL && !opt.has_ssl_options() && + opt.client_host.empty() && opt.connection_group.empty() && opt.hc_option.health_check_path.empty()) { // Returning zeroized result by default is more intuitive for users. @@ -94,6 +95,10 @@ static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) { buf.append("|conng="); buf.append(opt.connection_group); } + if (!opt.client_host.empty()) { + buf.append("|clih="); + buf.append(opt.client_host); + } if (opt.auth) { buf.append("|auth="); buf.append((char*)&opt.auth, sizeof(opt.auth)); @@ -362,6 +367,13 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port, LOG(ERROR) << "Invalid port=" << port; return -1; } + butil::EndPoint client_end_point; + if (!_options.client_host.empty() && + butil::str2ip(_options.client_host.c_str(), &client_end_point.ip) != 0 && + butil::hostname2ip(_options.client_host.c_str(), &client_end_point.ip) != 0) { + LOG(ERROR) << "Invalid client host=`" << _options.client_host << '\''; + return -1; + } _server_address = server_addr_and_port; const ChannelSignature sig = ComputeChannelSignature(_options); std::shared_ptr ssl_ctx; @@ -369,7 +381,8 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port, return -1; } if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig), - &_server_id, ssl_ctx, _options.use_rdma, _options.hc_option) != 0) { + &_server_id, ssl_ctx, _options.use_rdma, + _options.hc_option, client_end_point) != 0) { LOG(ERROR) << "Fail to insert into SocketMap"; return -1; } @@ -397,6 +410,13 @@ int Channel::Init(const char* ns_url, _options.mutable_ssl_options()->sni_name = _service_name; } } + butil::EndPoint client_end_point; + if (!_options.client_host.empty() && + butil::str2ip(_options.client_host.c_str(), &client_end_point.ip) != 0 && + butil::hostname2ip(_options.client_host.c_str(), &client_end_point.ip) != 0) { + LOG(ERROR) << "Invalid client host=`" << _options.client_host << '\''; + return -1; + } std::unique_ptr lb(new (std::nothrow) LoadBalancerWithNaming); if (NULL == lb) { @@ -409,6 +429,7 @@ int Channel::Init(const char* ns_url, ns_opt.use_rdma = _options.use_rdma; ns_opt.channel_signature = ComputeChannelSignature(_options); ns_opt.hc_option = _options.hc_option; + ns_opt.client_end_point = client_end_point; if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) { return -1; } diff --git a/src/brpc/channel.h b/src/brpc/channel.h index c970209b3a..e956c9f80d 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -148,6 +148,10 @@ struct ChannelOptions { // Its priority is higher than FLAGS_health_check_path and FLAGS_health_check_timeout_ms. // When it is not set, FLAGS_health_check_path and FLAGS_health_check_timeout_ms will take effect. HealthCheckOption hc_option; + + // IP address or host name of the client + // Default: "" + std::string client_host; private: // SSLOptions is large and not often used, allocate it on heap to // prevent ChannelOptions from being bloated in most cases. diff --git a/src/brpc/details/naming_service_thread.cpp b/src/brpc/details/naming_service_thread.cpp index 341ca35b09..127cd55678 100644 --- a/src/brpc/details/naming_service_thread.cpp +++ b/src/brpc/details/naming_service_thread.cpp @@ -126,7 +126,8 @@ void NamingServiceThread::Actions::ResetServers( // to pick those Sockets with the right settings during OnAddedServers const SocketMapKey key(_added[i], _owner->_options.channel_signature); CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, _owner->_options.ssl_ctx, - _owner->_options.use_rdma, _owner->_options.hc_option)); + _owner->_options.use_rdma, _owner->_options.hc_option, + _owner->_options.client_end_point)); _added_sockets.push_back(tagged_id); } diff --git a/src/brpc/details/naming_service_thread.h b/src/brpc/details/naming_service_thread.h index 1745e5f267..d8c319a277 100644 --- a/src/brpc/details/naming_service_thread.h +++ b/src/brpc/details/naming_service_thread.h @@ -53,6 +53,7 @@ struct GetNamingServiceThreadOptions { HealthCheckOption hc_option; ChannelSignature channel_signature; std::shared_ptr ssl_ctx; + butil::EndPoint client_end_point; }; // A dedicated thread to map a name to ServerIds diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 9490650b78..341a5e9c24 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -728,7 +728,7 @@ int Socket::OnCreated(const SocketOptions& options) { _keytable_pool = options.keytable_pool; _tos = 0; _remote_side = options.remote_side; - _local_side = butil::EndPoint(); + _local_side = options.local_side; _on_edge_triggered_events = options.on_edge_triggered_events; _user = options.user; _conn = options.conn; @@ -1283,8 +1283,10 @@ int Socket::Connect(const timespec* abstime, _ssl_state = SSL_OFF; } struct sockaddr_storage serv_addr; + struct sockaddr_storage cli_addr; socklen_t addr_size = 0; - if (butil::endpoint2sockaddr(remote_side(), &serv_addr, &addr_size) != 0) { + if (butil::endpoint2sockaddr(remote_side(), &serv_addr, &addr_size) != 0 || + butil::endpoint2sockaddr(local_side(), &cli_addr, &addr_size) != 0) { PLOG(ERROR) << "Fail to get sockaddr"; return -1; } @@ -1297,6 +1299,10 @@ int Socket::Connect(const timespec* abstime, // We need to do async connect (to manage the timeout by ourselves). CHECK_EQ(0, butil::make_non_blocking(sockfd)); + if (::bind(sockfd, (struct sockaddr*)& cli_addr, addr_size) != 0) { + LOG(FATAL) << "Fail to bind socket, errno=" << strerror(errno); + return -1; + } const int rc = ::connect( sockfd, (struct sockaddr*)&serv_addr, addr_size); if (rc != 0 && errno != EINPROGRESS) { @@ -2811,6 +2817,7 @@ int Socket::GetPooledSocket(SocketUniquePtr* pooled_socket) { if (socket_pool == NULL) { SocketOptions opt; opt.remote_side = remote_side(); + opt.local_side = local_side(); opt.user = user(); opt.on_edge_triggered_events = _on_edge_triggered_events; opt.initial_ssl_ctx = _ssl_ctx; diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 03ad43f867..9b120d695f 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -250,6 +250,7 @@ struct SocketOptions { // user->BeforeRecycle() before recycling. int fd{-1}; butil::EndPoint remote_side; + butil::EndPoint local_side; // If `connect_on_create' is true and `fd' is less than 0, // a client connection will be established to remote_side() // regarding deadline `connect_abstime' when Socket is being created. diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 14bea71db5..569b00cf2b 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -92,8 +92,9 @@ SocketMap* get_or_new_client_side_socket_map() { int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, - const HealthCheckOption& hc_option) { - return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option); + const HealthCheckOption& hc_option, + butil::EndPoint& client_end_point) { + return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option, client_end_point); } int SocketMapFind(const SocketMapKey& key, SocketId* id) { @@ -229,7 +230,8 @@ void SocketMap::ShowSocketMapInBvarIfNeed() { int SocketMap::Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, - const HealthCheckOption& hc_option) { + const HealthCheckOption& hc_option, + butil::EndPoint& client_end_point) { ShowSocketMapInBvarIfNeed(); std::unique_lock mu(_mutex); @@ -251,6 +253,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, SocketId tmp_id; SocketOptions opt; opt.remote_side = key.peer.addr; + opt.local_side = client_end_point; opt.initial_ssl_ctx = ssl_ctx; opt.use_rdma = use_rdma; opt.hc_option = hc_option; diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h index b0d542e78e..9e5560651e 100644 --- a/src/brpc/socket_map.h +++ b/src/brpc/socket_map.h @@ -82,18 +82,21 @@ struct SocketMapKeyHasher { int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, - const HealthCheckOption& hc_option); + const HealthCheckOption& hc_option, + butil::EndPoint& client_end_point); inline int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx) { HealthCheckOption hc_option; - return SocketMapInsert(key, id, ssl_ctx, false, hc_option); + butil::EndPoint endpoint; + return SocketMapInsert(key, id, ssl_ctx, false, hc_option, endpoint); } inline int SocketMapInsert(const SocketMapKey& key, SocketId* id) { std::shared_ptr empty_ptr; HealthCheckOption hc_option; - return SocketMapInsert(key, id, empty_ptr, false, hc_option); + butil::EndPoint endpoint; + return SocketMapInsert(key, id, empty_ptr, false, hc_option, endpoint); } // Find the SocketId associated with `key'. @@ -155,17 +158,20 @@ class SocketMap { int Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, - const HealthCheckOption& hc_option); + const HealthCheckOption& hc_option, + butil::EndPoint& client_end_point); int Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx) { HealthCheckOption hc_option; - return Insert(key, id, ssl_ctx, false, hc_option); + butil::EndPoint endpoint; + return Insert(key, id, ssl_ctx, false, hc_option, endpoint); } int Insert(const SocketMapKey& key, SocketId* id) { std::shared_ptr empty_ptr; HealthCheckOption hc_option; - return Insert(key, id, empty_ptr, false, hc_option); + butil::EndPoint endpoint; + return Insert(key, id, empty_ptr, false, hc_option, endpoint); } void Remove(const SocketMapKey& key, SocketId expected_id); From 4aac92dbb883531f78bf18ceac36becf28ab1086 Mon Sep 17 00:00:00 2001 From: maowenjie <3252896864@qq.com> Date: Fri, 26 Dec 2025 18:12:40 +0800 Subject: [PATCH 2/6] fix UT & review --- src/brpc/channel.cpp | 16 ++++++++-------- src/brpc/channel.h | 3 ++- src/brpc/details/naming_service_thread.cpp | 2 +- src/brpc/details/naming_service_thread.h | 2 +- src/brpc/socket.cpp | 21 +++++++++++++-------- src/brpc/socket_map.cpp | 8 ++++---- src/brpc/socket_map.h | 4 ++-- 7 files changed, 31 insertions(+), 25 deletions(-) diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index b57a6f95b0..287ac08e0c 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -367,10 +367,10 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port, LOG(ERROR) << "Invalid port=" << port; return -1; } - butil::EndPoint client_end_point; + butil::EndPoint client_endpoint; if (!_options.client_host.empty() && - butil::str2ip(_options.client_host.c_str(), &client_end_point.ip) != 0 && - butil::hostname2ip(_options.client_host.c_str(), &client_end_point.ip) != 0) { + butil::str2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0 && + butil::hostname2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0) { LOG(ERROR) << "Invalid client host=`" << _options.client_host << '\''; return -1; } @@ -382,7 +382,7 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port, } if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig), &_server_id, ssl_ctx, _options.use_rdma, - _options.hc_option, client_end_point) != 0) { + _options.hc_option, client_endpoint) != 0) { LOG(ERROR) << "Fail to insert into SocketMap"; return -1; } @@ -410,10 +410,10 @@ int Channel::Init(const char* ns_url, _options.mutable_ssl_options()->sni_name = _service_name; } } - butil::EndPoint client_end_point; + butil::EndPoint client_endpoint; if (!_options.client_host.empty() && - butil::str2ip(_options.client_host.c_str(), &client_end_point.ip) != 0 && - butil::hostname2ip(_options.client_host.c_str(), &client_end_point.ip) != 0) { + butil::str2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0 && + butil::hostname2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0) { LOG(ERROR) << "Invalid client host=`" << _options.client_host << '\''; return -1; } @@ -429,7 +429,7 @@ int Channel::Init(const char* ns_url, ns_opt.use_rdma = _options.use_rdma; ns_opt.channel_signature = ComputeChannelSignature(_options); ns_opt.hc_option = _options.hc_option; - ns_opt.client_end_point = client_end_point; + ns_opt.client_endpoint = client_endpoint; if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) { return -1; } diff --git a/src/brpc/channel.h b/src/brpc/channel.h index e956c9f80d..a3179dc051 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -149,7 +149,8 @@ struct ChannelOptions { // When it is not set, FLAGS_health_check_path and FLAGS_health_check_timeout_ms will take effect. HealthCheckOption hc_option; - // IP address or host name of the client + // IP address or host name of the client. + // if the client_host is "", the client IP address is determined by the OS. // Default: "" std::string client_host; private: diff --git a/src/brpc/details/naming_service_thread.cpp b/src/brpc/details/naming_service_thread.cpp index 127cd55678..6bc4e76bdf 100644 --- a/src/brpc/details/naming_service_thread.cpp +++ b/src/brpc/details/naming_service_thread.cpp @@ -127,7 +127,7 @@ void NamingServiceThread::Actions::ResetServers( const SocketMapKey key(_added[i], _owner->_options.channel_signature); CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, _owner->_options.ssl_ctx, _owner->_options.use_rdma, _owner->_options.hc_option, - _owner->_options.client_end_point)); + _owner->_options.client_endpoint)); _added_sockets.push_back(tagged_id); } diff --git a/src/brpc/details/naming_service_thread.h b/src/brpc/details/naming_service_thread.h index d8c319a277..52a4e9449d 100644 --- a/src/brpc/details/naming_service_thread.h +++ b/src/brpc/details/naming_service_thread.h @@ -53,7 +53,7 @@ struct GetNamingServiceThreadOptions { HealthCheckOption hc_option; ChannelSignature channel_signature; std::shared_ptr ssl_ctx; - butil::EndPoint client_end_point; + butil::EndPoint client_endpoint; }; // A dedicated thread to map a name to ServerIds diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 341a5e9c24..88a2878dca 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -1283,10 +1283,8 @@ int Socket::Connect(const timespec* abstime, _ssl_state = SSL_OFF; } struct sockaddr_storage serv_addr; - struct sockaddr_storage cli_addr; socklen_t addr_size = 0; - if (butil::endpoint2sockaddr(remote_side(), &serv_addr, &addr_size) != 0 || - butil::endpoint2sockaddr(local_side(), &cli_addr, &addr_size) != 0) { + if (butil::endpoint2sockaddr(remote_side(), &serv_addr, &addr_size) != 0) { PLOG(ERROR) << "Fail to get sockaddr"; return -1; } @@ -1298,10 +1296,16 @@ int Socket::Connect(const timespec* abstime, CHECK_EQ(0, butil::make_close_on_exec(sockfd)); // We need to do async connect (to manage the timeout by ourselves). CHECK_EQ(0, butil::make_non_blocking(sockfd)); - - if (::bind(sockfd, (struct sockaddr*)& cli_addr, addr_size) != 0) { - LOG(FATAL) << "Fail to bind socket, errno=" << strerror(errno); - return -1; + if (local_side().ip != butil::IP_ANY) { + struct sockaddr_storage cli_addr; + if (butil::endpoint2sockaddr(local_side(), &cli_addr, &addr_size) != 0) { + PLOG(ERROR) << "Fail to get client sockaddr"; + return -1; + } + if (::bind(sockfd, (struct sockaddr*)&cli_addr, addr_size) != 0) { + PLOG(ERROR) << "Fail to bind client socket, errno=" << strerror(errno); + return -1; + } } const int rc = ::connect( sockfd, (struct sockaddr*)&serv_addr, addr_size); @@ -2817,7 +2821,7 @@ int Socket::GetPooledSocket(SocketUniquePtr* pooled_socket) { if (socket_pool == NULL) { SocketOptions opt; opt.remote_side = remote_side(); - opt.local_side = local_side(); + opt.local_side = butil::EndPoint(local_side().ip, 0); opt.user = user(); opt.on_edge_triggered_events = _on_edge_triggered_events; opt.initial_ssl_ctx = _ssl_ctx; @@ -2919,6 +2923,7 @@ int Socket::GetShortSocket(SocketUniquePtr* short_socket) { SocketId id; SocketOptions opt; opt.remote_side = remote_side(); + opt.local_side = butil::EndPoint(local_side().ip, 0); opt.user = user(); opt.on_edge_triggered_events = _on_edge_triggered_events; opt.initial_ssl_ctx = _ssl_ctx; diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 569b00cf2b..99959d6246 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -93,8 +93,8 @@ int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, const HealthCheckOption& hc_option, - butil::EndPoint& client_end_point) { - return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option, client_end_point); + const butil::EndPoint& client_endpoint) { + return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option, client_endpoint); } int SocketMapFind(const SocketMapKey& key, SocketId* id) { @@ -231,7 +231,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, const HealthCheckOption& hc_option, - butil::EndPoint& client_end_point) { + const butil::EndPoint& client_endpoint) { ShowSocketMapInBvarIfNeed(); std::unique_lock mu(_mutex); @@ -253,7 +253,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, SocketId tmp_id; SocketOptions opt; opt.remote_side = key.peer.addr; - opt.local_side = client_end_point; + opt.local_side = client_endpoint; opt.initial_ssl_ctx = ssl_ctx; opt.use_rdma = use_rdma; opt.hc_option = hc_option; diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h index 9e5560651e..5f77fd9755 100644 --- a/src/brpc/socket_map.h +++ b/src/brpc/socket_map.h @@ -83,7 +83,7 @@ int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, const HealthCheckOption& hc_option, - butil::EndPoint& client_end_point); + const butil::EndPoint& client_endpoint); inline int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx) { @@ -159,7 +159,7 @@ class SocketMap { const std::shared_ptr& ssl_ctx, bool use_rdma, const HealthCheckOption& hc_option, - butil::EndPoint& client_end_point); + const butil::EndPoint& client_endpoint); int Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx) { From 1bff8cb62ef39778f8e12c261598765107a8863e Mon Sep 17 00:00:00 2001 From: wenjiecn <3252896864@qq.com> Date: Mon, 29 Dec 2025 20:48:13 +0800 Subject: [PATCH 3/6] add client_host UT --- test/brpc_server_unittest.cpp | 44 +++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp index 4a774fab2a..ede5760045 100644 --- a/test/brpc_server_unittest.cpp +++ b/test/brpc_server_unittest.cpp @@ -2070,4 +2070,48 @@ TEST_F(ServerTest, auth) { ASSERT_EQ(0, server.Join()); } +void TestClientHost(const butil::EndPoint& ep, + brpc::Controller& cntl, + int error_code, bool failed, + brpc::ChannelOptions& copt) { + brpc::Channel chan; + copt.max_retry = 0; + ASSERT_EQ(0, chan.Init(ep, &copt)); + + test::EchoRequest req; + test::EchoResponse res; + req.set_message(EXP_REQUEST); + test::EchoService_Stub stub(&chan); + stub.Echo(&cntl, &req, &res, NULL); + ASSERT_EQ(cntl.Failed(), failed) << cntl.ErrorText(); + ASSERT_EQ(cntl.ErrorCode(), error_code); +} + +TEST_F(ServerTest, client_host) { + butil::EndPoint ep; + ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); + brpc::Server server; + EchoServiceImpl service; + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + brpc::ServerOptions opt; + ASSERT_EQ(0, server.Start(ep, &opt)); + + brpc::Controller cntl; + brpc::ChannelOptions copt; + copt.client_host = "localhost"; + std::vector connection_types = { + brpc::CONNECTION_TYPE_SINGLE, + brpc::CONNECTION_TYPE_POOLED, + brpc::CONNECTION_TYPE_SHORT + }; + for (auto connect_type : connection_types) { + copt.connection_type = connect_type; + TestClientHost(ep, cntl, 0, false, copt); + cntl.Reset(); + } + + ASSERT_EQ(0, server.Stop(0)); + ASSERT_EQ(0, server.Join()); +} + } //namespace From 8650f00b5ae508229e90245f03b3a4d804520e64 Mon Sep 17 00:00:00 2001 From: wenjiecn <3252896864@qq.com> Date: Mon, 5 Jan 2026 12:01:16 +0800 Subject: [PATCH 4/6] updated to support SO_BINDTODEVICE. --- src/brpc/channel.cpp | 26 +++++----------------- src/brpc/channel.h | 6 ++--- src/brpc/details/naming_service_thread.cpp | 2 +- src/brpc/details/naming_service_thread.h | 2 +- src/brpc/socket.cpp | 18 ++++++--------- src/brpc/socket.h | 5 ++++- src/brpc/socket_map.cpp | 8 +++---- src/brpc/socket_map.h | 16 +++++-------- test/brpc_server_unittest.cpp | 4 ++-- 9 files changed, 34 insertions(+), 53 deletions(-) diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index 287ac08e0c..5ebed6b44f 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -77,7 +77,7 @@ ChannelSSLOptions* ChannelOptions::mutable_ssl_options() { static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) { if (opt.auth == NULL && !opt.has_ssl_options() && - opt.client_host.empty() && + opt.device_name.empty() && opt.connection_group.empty() && opt.hc_option.health_check_path.empty()) { // Returning zeroized result by default is more intuitive for users. @@ -95,9 +95,9 @@ static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) { buf.append("|conng="); buf.append(opt.connection_group); } - if (!opt.client_host.empty()) { - buf.append("|clih="); - buf.append(opt.client_host); + if (!opt.device_name.empty()) { + buf.append("|devn="); + buf.append(opt.device_name); } if (opt.auth) { buf.append("|auth="); @@ -367,13 +367,6 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port, LOG(ERROR) << "Invalid port=" << port; return -1; } - butil::EndPoint client_endpoint; - if (!_options.client_host.empty() && - butil::str2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0 && - butil::hostname2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0) { - LOG(ERROR) << "Invalid client host=`" << _options.client_host << '\''; - return -1; - } _server_address = server_addr_and_port; const ChannelSignature sig = ComputeChannelSignature(_options); std::shared_ptr ssl_ctx; @@ -382,7 +375,7 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port, } if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig), &_server_id, ssl_ctx, _options.use_rdma, - _options.hc_option, client_endpoint) != 0) { + _options.hc_option, _options.device_name) != 0) { LOG(ERROR) << "Fail to insert into SocketMap"; return -1; } @@ -410,13 +403,6 @@ int Channel::Init(const char* ns_url, _options.mutable_ssl_options()->sni_name = _service_name; } } - butil::EndPoint client_endpoint; - if (!_options.client_host.empty() && - butil::str2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0 && - butil::hostname2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0) { - LOG(ERROR) << "Invalid client host=`" << _options.client_host << '\''; - return -1; - } std::unique_ptr lb(new (std::nothrow) LoadBalancerWithNaming); if (NULL == lb) { @@ -429,7 +415,7 @@ int Channel::Init(const char* ns_url, ns_opt.use_rdma = _options.use_rdma; ns_opt.channel_signature = ComputeChannelSignature(_options); ns_opt.hc_option = _options.hc_option; - ns_opt.client_endpoint = client_endpoint; + ns_opt.device_name = _options.device_name; if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) { return -1; } diff --git a/src/brpc/channel.h b/src/brpc/channel.h index a3179dc051..23f5a74d48 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -149,10 +149,10 @@ struct ChannelOptions { // When it is not set, FLAGS_health_check_path and FLAGS_health_check_timeout_ms will take effect. HealthCheckOption hc_option; - // IP address or host name of the client. - // if the client_host is "", the client IP address is determined by the OS. + // The device name of the client's network adapter. + // if the device_name is "", the flow control is determined by the OS. // Default: "" - std::string client_host; + std::string device_name; private: // SSLOptions is large and not often used, allocate it on heap to // prevent ChannelOptions from being bloated in most cases. diff --git a/src/brpc/details/naming_service_thread.cpp b/src/brpc/details/naming_service_thread.cpp index 6bc4e76bdf..537736ee90 100644 --- a/src/brpc/details/naming_service_thread.cpp +++ b/src/brpc/details/naming_service_thread.cpp @@ -127,7 +127,7 @@ void NamingServiceThread::Actions::ResetServers( const SocketMapKey key(_added[i], _owner->_options.channel_signature); CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, _owner->_options.ssl_ctx, _owner->_options.use_rdma, _owner->_options.hc_option, - _owner->_options.client_endpoint)); + _owner->_options.device_name)); _added_sockets.push_back(tagged_id); } diff --git a/src/brpc/details/naming_service_thread.h b/src/brpc/details/naming_service_thread.h index 52a4e9449d..71ad0d4ad7 100644 --- a/src/brpc/details/naming_service_thread.h +++ b/src/brpc/details/naming_service_thread.h @@ -53,7 +53,7 @@ struct GetNamingServiceThreadOptions { HealthCheckOption hc_option; ChannelSignature channel_signature; std::shared_ptr ssl_ctx; - butil::EndPoint client_endpoint; + std::string device_name; }; // A dedicated thread to map a name to ServerIds diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 88a2878dca..ff23828bb3 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -728,7 +728,8 @@ int Socket::OnCreated(const SocketOptions& options) { _keytable_pool = options.keytable_pool; _tos = 0; _remote_side = options.remote_side; - _local_side = options.local_side; + _local_side = butil::EndPoint(); + _device_name = options.device_name; _on_edge_triggered_events = options.on_edge_triggered_events; _user = options.user; _conn = options.conn; @@ -1296,14 +1297,11 @@ int Socket::Connect(const timespec* abstime, CHECK_EQ(0, butil::make_close_on_exec(sockfd)); // We need to do async connect (to manage the timeout by ourselves). CHECK_EQ(0, butil::make_non_blocking(sockfd)); - if (local_side().ip != butil::IP_ANY) { - struct sockaddr_storage cli_addr; - if (butil::endpoint2sockaddr(local_side(), &cli_addr, &addr_size) != 0) { - PLOG(ERROR) << "Fail to get client sockaddr"; - return -1; - } - if (::bind(sockfd, (struct sockaddr*)&cli_addr, addr_size) != 0) { - PLOG(ERROR) << "Fail to bind client socket, errno=" << strerror(errno); + if (!_device_name.empty()) { + if (setsockopt(sockfd, SOL_SOCKET, SO_BINDTODEVICE, + _device_name.c_str(), _device_name.size()) < 0) { + PLOG(ERROR) << "Fail to set SO_BINDTODEVICE of fd=" << sockfd + << " to device_name=" << _device_name; return -1; } } @@ -2821,7 +2819,6 @@ int Socket::GetPooledSocket(SocketUniquePtr* pooled_socket) { if (socket_pool == NULL) { SocketOptions opt; opt.remote_side = remote_side(); - opt.local_side = butil::EndPoint(local_side().ip, 0); opt.user = user(); opt.on_edge_triggered_events = _on_edge_triggered_events; opt.initial_ssl_ctx = _ssl_ctx; @@ -2923,7 +2920,6 @@ int Socket::GetShortSocket(SocketUniquePtr* short_socket) { SocketId id; SocketOptions opt; opt.remote_side = remote_side(); - opt.local_side = butil::EndPoint(local_side().ip, 0); opt.user = user(); opt.on_edge_triggered_events = _on_edge_triggered_events; opt.initial_ssl_ctx = _ssl_ctx; diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 9b120d695f..2d675d3756 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -250,7 +250,7 @@ struct SocketOptions { // user->BeforeRecycle() before recycling. int fd{-1}; butil::EndPoint remote_side; - butil::EndPoint local_side; + std::string device_name; // If `connect_on_create' is true and `fd' is less than 0, // a client connection will be established to remote_side() // regarding deadline `connect_abstime' when Socket is being created. @@ -831,6 +831,9 @@ friend void DereferenceSocket(Socket*); // Address of self. Initialized in ResetFileDescriptor(). butil::EndPoint _local_side; + // The device name of the client's network adapter. + std::string _device_name; + // Called when edge-triggered events happened on `_fd'. Read comments // of EventDispatcher::AddConsumer (event_dispatcher.h) // carefully before implementing the callback. diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 99959d6246..c45dad6e89 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -93,8 +93,8 @@ int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, const HealthCheckOption& hc_option, - const butil::EndPoint& client_endpoint) { - return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option, client_endpoint); + const std::string& device_name) { + return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option, device_name); } int SocketMapFind(const SocketMapKey& key, SocketId* id) { @@ -231,7 +231,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, const HealthCheckOption& hc_option, - const butil::EndPoint& client_endpoint) { + const std::string& device_name) { ShowSocketMapInBvarIfNeed(); std::unique_lock mu(_mutex); @@ -253,7 +253,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, SocketId tmp_id; SocketOptions opt; opt.remote_side = key.peer.addr; - opt.local_side = client_endpoint; + opt.device_name = device_name; opt.initial_ssl_ctx = ssl_ctx; opt.use_rdma = use_rdma; opt.hc_option = hc_option; diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h index 5f77fd9755..5a71c90eee 100644 --- a/src/brpc/socket_map.h +++ b/src/brpc/socket_map.h @@ -83,20 +83,18 @@ int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, const HealthCheckOption& hc_option, - const butil::EndPoint& client_endpoint); + const std::string& device_name); inline int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx) { HealthCheckOption hc_option; - butil::EndPoint endpoint; - return SocketMapInsert(key, id, ssl_ctx, false, hc_option, endpoint); + return SocketMapInsert(key, id, ssl_ctx, false, hc_option, ""); } inline int SocketMapInsert(const SocketMapKey& key, SocketId* id) { std::shared_ptr empty_ptr; HealthCheckOption hc_option; - butil::EndPoint endpoint; - return SocketMapInsert(key, id, empty_ptr, false, hc_option, endpoint); + return SocketMapInsert(key, id, empty_ptr, false, hc_option, ""); } // Find the SocketId associated with `key'. @@ -159,19 +157,17 @@ class SocketMap { const std::shared_ptr& ssl_ctx, bool use_rdma, const HealthCheckOption& hc_option, - const butil::EndPoint& client_endpoint); + const std::string& device_name); int Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx) { HealthCheckOption hc_option; - butil::EndPoint endpoint; - return Insert(key, id, ssl_ctx, false, hc_option, endpoint); + return Insert(key, id, ssl_ctx, false, hc_option, ""); } int Insert(const SocketMapKey& key, SocketId* id) { std::shared_ptr empty_ptr; HealthCheckOption hc_option; - butil::EndPoint endpoint; - return Insert(key, id, empty_ptr, false, hc_option, endpoint); + return Insert(key, id, empty_ptr, false, hc_option, ""); } void Remove(const SocketMapKey& key, SocketId expected_id); diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp index ede5760045..5eb73c1325 100644 --- a/test/brpc_server_unittest.cpp +++ b/test/brpc_server_unittest.cpp @@ -2087,7 +2087,7 @@ void TestClientHost(const butil::EndPoint& ep, ASSERT_EQ(cntl.ErrorCode(), error_code); } -TEST_F(ServerTest, client_host) { +TEST_F(ServerTest, network_device_name) { butil::EndPoint ep; ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); brpc::Server server; @@ -2098,7 +2098,7 @@ TEST_F(ServerTest, client_host) { brpc::Controller cntl; brpc::ChannelOptions copt; - copt.client_host = "localhost"; + copt.device_name = "lo"; std::vector connection_types = { brpc::CONNECTION_TYPE_SINGLE, brpc::CONNECTION_TYPE_POOLED, From df180006c2241bdf3397d5aadcf20fac66584949 Mon Sep 17 00:00:00 2001 From: wenjiecn <3252896864@qq.com> Date: Tue, 13 Jan 2026 22:49:28 +0800 Subject: [PATCH 5/6] updated to support SO_BINDTODEVICE and bind client_host. --- src/brpc/channel.cpp | 23 +++++++++++++++++++++- src/brpc/channel.h | 5 +++++ src/brpc/details/naming_service_thread.cpp | 1 + src/brpc/details/naming_service_thread.h | 1 + src/brpc/socket.cpp | 15 +++++++++++++- src/brpc/socket.h | 1 + src/brpc/socket_map.cpp | 5 ++++- src/brpc/socket_map.h | 14 +++++++++---- test/brpc_server_unittest.cpp | 3 ++- 9 files changed, 60 insertions(+), 8 deletions(-) diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index 5ebed6b44f..7e6381c7d7 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -77,6 +77,7 @@ ChannelSSLOptions* ChannelOptions::mutable_ssl_options() { static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) { if (opt.auth == NULL && !opt.has_ssl_options() && + opt.client_host.empty() && opt.device_name.empty() && opt.connection_group.empty() && opt.hc_option.health_check_path.empty()) { @@ -95,6 +96,10 @@ static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) { buf.append("|conng="); buf.append(opt.connection_group); } + if (!opt.client_host.empty()) { + buf.append("|clih="); + buf.append(opt.client_host); + } if (!opt.device_name.empty()) { buf.append("|devn="); buf.append(opt.device_name); @@ -367,6 +372,13 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port, LOG(ERROR) << "Invalid port=" << port; return -1; } + butil::EndPoint client_endpoint; + if (!_options.client_host.empty() && + butil::str2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0 && + butil::hostname2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0) { + LOG(ERROR) << "Invalid client host=`" << _options.client_host << '\''; + return -1; + } _server_address = server_addr_and_port; const ChannelSignature sig = ComputeChannelSignature(_options); std::shared_ptr ssl_ctx; @@ -375,7 +387,8 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port, } if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig), &_server_id, ssl_ctx, _options.use_rdma, - _options.hc_option, _options.device_name) != 0) { + _options.hc_option, client_endpoint, + _options.device_name) != 0) { LOG(ERROR) << "Fail to insert into SocketMap"; return -1; } @@ -403,6 +416,13 @@ int Channel::Init(const char* ns_url, _options.mutable_ssl_options()->sni_name = _service_name; } } + butil::EndPoint client_endpoint; + if (!_options.client_host.empty() && + butil::str2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0 && + butil::hostname2ip(_options.client_host.c_str(), &client_endpoint.ip) != 0) { + LOG(ERROR) << "Invalid client host=`" << _options.client_host << '\''; + return -1; + } std::unique_ptr lb(new (std::nothrow) LoadBalancerWithNaming); if (NULL == lb) { @@ -415,6 +435,7 @@ int Channel::Init(const char* ns_url, ns_opt.use_rdma = _options.use_rdma; ns_opt.channel_signature = ComputeChannelSignature(_options); ns_opt.hc_option = _options.hc_option; + ns_opt.client_endpoint = client_endpoint; ns_opt.device_name = _options.device_name; if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) { return -1; diff --git a/src/brpc/channel.h b/src/brpc/channel.h index 23f5a74d48..0f349ac6fe 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -149,6 +149,11 @@ struct ChannelOptions { // When it is not set, FLAGS_health_check_path and FLAGS_health_check_timeout_ms will take effect. HealthCheckOption hc_option; + // IP address or host name of the client. + // if the client_host is "", the client IP address is determined by the OS. + // Default: "" + std::string client_host; + // The device name of the client's network adapter. // if the device_name is "", the flow control is determined by the OS. // Default: "" diff --git a/src/brpc/details/naming_service_thread.cpp b/src/brpc/details/naming_service_thread.cpp index 537736ee90..a75db265b0 100644 --- a/src/brpc/details/naming_service_thread.cpp +++ b/src/brpc/details/naming_service_thread.cpp @@ -127,6 +127,7 @@ void NamingServiceThread::Actions::ResetServers( const SocketMapKey key(_added[i], _owner->_options.channel_signature); CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, _owner->_options.ssl_ctx, _owner->_options.use_rdma, _owner->_options.hc_option, + _owner->_options.client_endpoint, _owner->_options.device_name)); _added_sockets.push_back(tagged_id); } diff --git a/src/brpc/details/naming_service_thread.h b/src/brpc/details/naming_service_thread.h index 71ad0d4ad7..34a29a8622 100644 --- a/src/brpc/details/naming_service_thread.h +++ b/src/brpc/details/naming_service_thread.h @@ -53,6 +53,7 @@ struct GetNamingServiceThreadOptions { HealthCheckOption hc_option; ChannelSignature channel_signature; std::shared_ptr ssl_ctx; + butil::EndPoint client_endpoint; std::string device_name; }; diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index ff23828bb3..e431aceff9 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -728,7 +728,7 @@ int Socket::OnCreated(const SocketOptions& options) { _keytable_pool = options.keytable_pool; _tos = 0; _remote_side = options.remote_side; - _local_side = butil::EndPoint(); + _local_side = options.local_side; _device_name = options.device_name; _on_edge_triggered_events = options.on_edge_triggered_events; _user = options.user; @@ -1305,6 +1305,17 @@ int Socket::Connect(const timespec* abstime, return -1; } } + if (local_side().ip != butil::IP_ANY) { + struct sockaddr_storage cli_addr; + if (butil::endpoint2sockaddr(local_side(), &cli_addr, &addr_size) != 0) { + PLOG(ERROR) << "Fail to get client sockaddr"; + return -1; + } + if (::bind(sockfd, (struct sockaddr*)&cli_addr, addr_size) != 0) { + PLOG(ERROR) << "Fail to bind client socket, errno=" << strerror(errno); + return -1; + } + } const int rc = ::connect( sockfd, (struct sockaddr*)&serv_addr, addr_size); if (rc != 0 && errno != EINPROGRESS) { @@ -2819,6 +2830,7 @@ int Socket::GetPooledSocket(SocketUniquePtr* pooled_socket) { if (socket_pool == NULL) { SocketOptions opt; opt.remote_side = remote_side(); + opt.local_side = butil::EndPoint(local_side().ip, 0); opt.user = user(); opt.on_edge_triggered_events = _on_edge_triggered_events; opt.initial_ssl_ctx = _ssl_ctx; @@ -2920,6 +2932,7 @@ int Socket::GetShortSocket(SocketUniquePtr* short_socket) { SocketId id; SocketOptions opt; opt.remote_side = remote_side(); + opt.local_side = butil::EndPoint(local_side().ip, 0); opt.user = user(); opt.on_edge_triggered_events = _on_edge_triggered_events; opt.initial_ssl_ctx = _ssl_ctx; diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 2d675d3756..a3e2323056 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -250,6 +250,7 @@ struct SocketOptions { // user->BeforeRecycle() before recycling. int fd{-1}; butil::EndPoint remote_side; + butil::EndPoint local_side; std::string device_name; // If `connect_on_create' is true and `fd' is less than 0, // a client connection will be established to remote_side() diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index c45dad6e89..16e6986394 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -93,8 +93,9 @@ int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, const HealthCheckOption& hc_option, + const butil::EndPoint& client_endpoint, const std::string& device_name) { - return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option, device_name); + return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option, client_endpoint, device_name); } int SocketMapFind(const SocketMapKey& key, SocketId* id) { @@ -231,6 +232,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, const HealthCheckOption& hc_option, + const butil::EndPoint& client_endpoint, const std::string& device_name) { ShowSocketMapInBvarIfNeed(); @@ -253,6 +255,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, SocketId tmp_id; SocketOptions opt; opt.remote_side = key.peer.addr; + opt.local_side = client_endpoint; opt.device_name = device_name; opt.initial_ssl_ctx = ssl_ctx; opt.use_rdma = use_rdma; diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h index 5a71c90eee..c939bf1bc6 100644 --- a/src/brpc/socket_map.h +++ b/src/brpc/socket_map.h @@ -83,18 +83,21 @@ int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, const HealthCheckOption& hc_option, + const butil::EndPoint& client_endpoint, const std::string& device_name); inline int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx) { HealthCheckOption hc_option; - return SocketMapInsert(key, id, ssl_ctx, false, hc_option, ""); + butil::EndPoint endpoint; + return SocketMapInsert(key, id, ssl_ctx, false, hc_option, endpoint, ""); } inline int SocketMapInsert(const SocketMapKey& key, SocketId* id) { std::shared_ptr empty_ptr; HealthCheckOption hc_option; - return SocketMapInsert(key, id, empty_ptr, false, hc_option, ""); + butil::EndPoint endpoint; + return SocketMapInsert(key, id, empty_ptr, false, hc_option, endpoint, ""); } // Find the SocketId associated with `key'. @@ -157,17 +160,20 @@ class SocketMap { const std::shared_ptr& ssl_ctx, bool use_rdma, const HealthCheckOption& hc_option, + const butil::EndPoint& client_endpoint, const std::string& device_name); int Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx) { HealthCheckOption hc_option; - return Insert(key, id, ssl_ctx, false, hc_option, ""); + butil::EndPoint endpoint; + return Insert(key, id, ssl_ctx, false, hc_option, endpoint, ""); } int Insert(const SocketMapKey& key, SocketId* id) { std::shared_ptr empty_ptr; HealthCheckOption hc_option; - return Insert(key, id, empty_ptr, false, hc_option, ""); + butil::EndPoint endpoint; + return Insert(key, id, empty_ptr, false, hc_option, endpoint, ""); } void Remove(const SocketMapKey& key, SocketId expected_id); diff --git a/test/brpc_server_unittest.cpp b/test/brpc_server_unittest.cpp index 5eb73c1325..8508a7986c 100644 --- a/test/brpc_server_unittest.cpp +++ b/test/brpc_server_unittest.cpp @@ -2087,7 +2087,7 @@ void TestClientHost(const butil::EndPoint& ep, ASSERT_EQ(cntl.ErrorCode(), error_code); } -TEST_F(ServerTest, network_device_name) { +TEST_F(ServerTest, bind_client_host_and_network_device) { butil::EndPoint ep; ASSERT_EQ(0, str2endpoint("127.0.0.1:8613", &ep)); brpc::Server server; @@ -2098,6 +2098,7 @@ TEST_F(ServerTest, network_device_name) { brpc::Controller cntl; brpc::ChannelOptions copt; + copt.client_host = "localhost"; copt.device_name = "lo"; std::vector connection_types = { brpc::CONNECTION_TYPE_SINGLE, From 46b8adf9d17e0d1ad545528b4520cb6a48e00789 Mon Sep 17 00:00:00 2001 From: wenjiecn <3252896864@qq.com> Date: Fri, 16 Jan 2026 20:51:51 +0800 Subject: [PATCH 6/6] review --- src/brpc/channel.cpp | 21 +++++++----- src/brpc/details/naming_service_thread.cpp | 6 ++-- src/brpc/details/naming_service_thread.h | 11 +++---- src/brpc/socket_map.cpp | 22 +++---------- src/brpc/socket_map.h | 38 ++++++++++++++-------- 5 files changed, 47 insertions(+), 51 deletions(-) diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index 7e6381c7d7..0bb29fce17 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -385,10 +385,14 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port, if (CreateSocketSSLContext(_options, &ssl_ctx) != 0) { return -1; } + SocketOptions opt; + opt.local_side = client_endpoint; + opt.initial_ssl_ctx = ssl_ctx; + opt.use_rdma = _options.use_rdma; + opt.hc_option = _options.hc_option; + opt.device_name = _options.device_name; if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig), - &_server_id, ssl_ctx, _options.use_rdma, - _options.hc_option, client_endpoint, - _options.device_name) != 0) { + &_server_id, opt) != 0) { LOG(ERROR) << "Fail to insert into SocketMap"; return -1; } @@ -432,12 +436,13 @@ int Channel::Init(const char* ns_url, GetNamingServiceThreadOptions ns_opt; ns_opt.succeed_without_server = _options.succeed_without_server; ns_opt.log_succeed_without_server = _options.log_succeed_without_server; - ns_opt.use_rdma = _options.use_rdma; + ns_opt.socket_option.use_rdma = _options.use_rdma; ns_opt.channel_signature = ComputeChannelSignature(_options); - ns_opt.hc_option = _options.hc_option; - ns_opt.client_endpoint = client_endpoint; - ns_opt.device_name = _options.device_name; - if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) { + ns_opt.socket_option.hc_option = _options.hc_option; + ns_opt.socket_option.local_side = client_endpoint; + ns_opt.socket_option.device_name = _options.device_name; + if (CreateSocketSSLContext(_options, + &ns_opt.socket_option.initial_ssl_ctx) != 0) { return -1; } if (lb->Init(ns_url, lb_name, _options.ns_filter, &ns_opt) != 0) { diff --git a/src/brpc/details/naming_service_thread.cpp b/src/brpc/details/naming_service_thread.cpp index a75db265b0..f882b2255d 100644 --- a/src/brpc/details/naming_service_thread.cpp +++ b/src/brpc/details/naming_service_thread.cpp @@ -125,10 +125,8 @@ void NamingServiceThread::Actions::ResetServers( // Socket. SocketMapKey may be passed through AddWatcher. Make sure // to pick those Sockets with the right settings during OnAddedServers const SocketMapKey key(_added[i], _owner->_options.channel_signature); - CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, _owner->_options.ssl_ctx, - _owner->_options.use_rdma, _owner->_options.hc_option, - _owner->_options.client_endpoint, - _owner->_options.device_name)); + CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, + _owner->_options.socket_option)); _added_sockets.push_back(tagged_id); } diff --git a/src/brpc/details/naming_service_thread.h b/src/brpc/details/naming_service_thread.h index 34a29a8622..9acb8f2931 100644 --- a/src/brpc/details/naming_service_thread.h +++ b/src/brpc/details/naming_service_thread.h @@ -44,17 +44,14 @@ class NamingServiceWatcher { struct GetNamingServiceThreadOptions { GetNamingServiceThreadOptions() : succeed_without_server(false) - , log_succeed_without_server(true) - , use_rdma(false) {} + , log_succeed_without_server(true) { + socket_option.use_rdma = false; +} bool succeed_without_server; bool log_succeed_without_server; - bool use_rdma; - HealthCheckOption hc_option; ChannelSignature channel_signature; - std::shared_ptr ssl_ctx; - butil::EndPoint client_endpoint; - std::string device_name; + SocketOptions socket_option; }; // A dedicated thread to map a name to ServerIds diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 16e6986394..3984f6b866 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -90,13 +90,9 @@ SocketMap* get_or_new_client_side_socket_map() { } int SocketMapInsert(const SocketMapKey& key, SocketId* id, - const std::shared_ptr& ssl_ctx, - bool use_rdma, - const HealthCheckOption& hc_option, - const butil::EndPoint& client_endpoint, - const std::string& device_name) { - return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx, use_rdma, hc_option, client_endpoint, device_name); -} + SocketOptions& opt) { + return get_or_new_client_side_socket_map()->Insert(key, id, opt); +} int SocketMapFind(const SocketMapKey& key, SocketId* id) { SocketMap* m = get_client_side_socket_map(); @@ -229,11 +225,7 @@ void SocketMap::ShowSocketMapInBvarIfNeed() { } int SocketMap::Insert(const SocketMapKey& key, SocketId* id, - const std::shared_ptr& ssl_ctx, - bool use_rdma, - const HealthCheckOption& hc_option, - const butil::EndPoint& client_endpoint, - const std::string& device_name) { + SocketOptions& opt) { ShowSocketMapInBvarIfNeed(); std::unique_lock mu(_mutex); @@ -253,13 +245,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, sc = NULL; } SocketId tmp_id; - SocketOptions opt; opt.remote_side = key.peer.addr; - opt.local_side = client_endpoint; - opt.device_name = device_name; - opt.initial_ssl_ctx = ssl_ctx; - opt.use_rdma = use_rdma; - opt.hc_option = hc_option; if (_options.socket_creator->CreateSocket(opt, &tmp_id) != 0) { PLOG(FATAL) << "Fail to create socket to " << key.peer; return -1; diff --git a/src/brpc/socket_map.h b/src/brpc/socket_map.h index c939bf1bc6..7cf0880498 100644 --- a/src/brpc/socket_map.h +++ b/src/brpc/socket_map.h @@ -80,24 +80,30 @@ struct SocketMapKeyHasher { // successfully, SocketMapRemove() MUST be called when the Socket is not needed. // Return 0 on success, -1 otherwise. int SocketMapInsert(const SocketMapKey& key, SocketId* id, + SocketOptions& opt); + +inline int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, - const HealthCheckOption& hc_option, - const butil::EndPoint& client_endpoint, - const std::string& device_name); + const HealthCheckOption& hc_option) { + SocketOptions opt; + opt.remote_side = key.peer.addr; + opt.initial_ssl_ctx = ssl_ctx; + opt.use_rdma = use_rdma; + opt.hc_option = hc_option; + return SocketMapInsert(key, id, opt); +} inline int SocketMapInsert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx) { HealthCheckOption hc_option; - butil::EndPoint endpoint; - return SocketMapInsert(key, id, ssl_ctx, false, hc_option, endpoint, ""); + return SocketMapInsert(key, id, ssl_ctx, false, hc_option); } inline int SocketMapInsert(const SocketMapKey& key, SocketId* id) { std::shared_ptr empty_ptr; HealthCheckOption hc_option; - butil::EndPoint endpoint; - return SocketMapInsert(key, id, empty_ptr, false, hc_option, endpoint, ""); + return SocketMapInsert(key, id, empty_ptr, false, hc_option); } // Find the SocketId associated with `key'. @@ -159,22 +165,26 @@ class SocketMap { int Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx, bool use_rdma, - const HealthCheckOption& hc_option, - const butil::EndPoint& client_endpoint, - const std::string& device_name); + const HealthCheckOption& hc_option) { + SocketOptions opt; + opt.remote_side = key.peer.addr; + opt.initial_ssl_ctx = ssl_ctx; + opt.use_rdma = use_rdma; + opt.hc_option = hc_option; + return Insert(key, id, opt); +} int Insert(const SocketMapKey& key, SocketId* id, const std::shared_ptr& ssl_ctx) { HealthCheckOption hc_option; - butil::EndPoint endpoint; - return Insert(key, id, ssl_ctx, false, hc_option, endpoint, ""); + return Insert(key, id, ssl_ctx, false, hc_option); } int Insert(const SocketMapKey& key, SocketId* id) { std::shared_ptr empty_ptr; HealthCheckOption hc_option; - butil::EndPoint endpoint; - return Insert(key, id, empty_ptr, false, hc_option, endpoint, ""); + return Insert(key, id, empty_ptr, false, hc_option); } + int Insert(const SocketMapKey& key, SocketId* id, SocketOptions& opt); void Remove(const SocketMapKey& key, SocketId expected_id); int Find(const SocketMapKey& key, SocketId* id);