Skip to content

Commit 3b4e99e

Browse files
committed
[Enhancement] Add a cleaner for BrpcStubCache to cleanup unused connections
1 parent af49488 commit 3b4e99e

File tree

6 files changed

+158
-0
lines changed

6 files changed

+158
-0
lines changed

be/src/common/config.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ CONF_String(ssl_private_key_path, "");
5959
// These connections are created during the first few access and will be used thereafter
6060
CONF_Int32(brpc_max_connections_per_server, "1");
6161

62+
// BRPC stub cache cleanup configurations
63+
// Interval in seconds to run cleanup for expired BRPC stubs
64+
CONF_mInt32(brpc_stub_cleanup_interval_s, "600"); // 10 minutes
65+
6266
// Declare a selection strategy for those servers have many ips.
6367
// Note that there should at most one ip match this list.
6468
// this is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24

be/src/util/brpc_stub_cache.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "common/config.h"
1818
#include "gen_cpp/internal_service.pb.h"
1919
#include "gen_cpp/lake_service.pb.h"
20+
#include "util/misc.h"
2021
#include "util/failpoint/fail_point.h"
2122
#include "util/starrocks_metrics.h"
2223

@@ -28,6 +29,17 @@ BrpcStubCache::BrpcStubCache() {
2829
std::lock_guard<SpinLock> l(_lock);
2930
return _stub_map.size();
3031
});
32+
_cleanup_thread = std::thread([this] {
33+
#ifdef GOOGLE_PROFILER
34+
ProfilerRegisterThread();
35+
#endif
36+
while (!_stopped.load()) {
37+
LOG(INFO) << "Start to clean up expired brpc stubs";
38+
check_and_cleanup_unhealthy_stubs();
39+
nap_sleep(config::brpc_stub_cleanup_interval_s, [this] { return _stopped.load(); });
40+
}
41+
});
42+
Thread::set_thread_name(_cleanup_thread, "brpc_cleanup_thread");
3143
}
3244

3345
BrpcStubCache::~BrpcStubCache() {
@@ -71,6 +83,30 @@ std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::get_stub(const
7183
return get_stub(endpoint);
7284
}
7385

86+
void BrpcStubCache::check_and_cleanup_unhealthy_stubs() {
87+
std::vector<butil::EndPoint> snapshot;
88+
{
89+
std::lock_guard<SpinLock> l(_lock);
90+
91+
for (auto& entry : _stub_map) {
92+
snapshot.push_back(entry.first);
93+
}
94+
}
95+
96+
for (auto endpoint : snapshot) {
97+
auto stub_pool = get_stub_pool_readonly(endpoint);
98+
if (stub_pool && stub_pool->is_unhealthy()) {
99+
{
100+
std::lock_guard<SpinLock> l(_lock);
101+
102+
LOG(INFO) << "cleanup stubs from endpoint:" << endpoint;
103+
_stub_map.erase(endpoint);
104+
delete stub_pool;
105+
}
106+
}
107+
}
108+
}
109+
74110
BrpcStubCache::StubPool::StubPool() : _idx(-1) {
75111
_stubs.reserve(config::brpc_max_connections_per_server);
76112
}
@@ -91,6 +127,28 @@ std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::StubPool::get_o
91127
return _stubs[_idx];
92128
}
93129

130+
bool BrpcStubCache::StubPool::is_unhealthy() {
131+
if (_stubs.size() == 0) {
132+
// When the vector is empty do not need to clean
133+
return false;
134+
}
135+
136+
auto& first_stub = _stubs[0];
137+
if (!first_stub) {
138+
return true;
139+
}
140+
141+
auto status = first_stub->check_health();
142+
return !status.ok();
143+
}
144+
145+
BrpcStubCache::StubPool* BrpcStubCache::get_stub_pool_readonly(
146+
const butil::EndPoint& endpoint) {
147+
std::lock_guard<SpinLock> l(_lock);
148+
auto stub_ptr = _stub_map.seek(endpoint);
149+
return stub_ptr ? *stub_ptr : nullptr;
150+
}
151+
94152
HttpBrpcStubCache* HttpBrpcStubCache::getInstance() {
95153
static HttpBrpcStubCache cache;
96154
return &cache;

be/src/util/brpc_stub_cache.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include <memory>
3838
#include <mutex>
3939
#include <vector>
40+
#include <atomic>
4041

4142
#include "common/statusor.h"
4243
#include "gen_cpp/Types_types.h" // TNetworkAddress
@@ -45,6 +46,7 @@
4546
#include "util/lake_service_recoverable_stub.h"
4647
#include "util/network_util.h"
4748
#include "util/spinlock.h"
49+
#include "util/thread.h"
4850

4951
namespace starrocks {
5052

@@ -56,18 +58,26 @@ class BrpcStubCache {
5658
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const butil::EndPoint& endpoint);
5759
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const TNetworkAddress& taddr);
5860
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const std::string& host, int port);
61+
void check_and_cleanup_unhealthy_stubs();
5962

6063
private:
6164
struct StubPool {
6265
StubPool();
66+
~StubPool();
67+
6368
std::shared_ptr<PInternalService_RecoverableStub> get_or_create(const butil::EndPoint& endpoint);
69+
bool is_unhealthy();
6470

6571
std::vector<std::shared_ptr<PInternalService_RecoverableStub>> _stubs;
6672
int64_t _idx;
6773
};
6874

75+
StubPool* get_stub_pool_readonly(const butil::EndPoint& endpoint);
76+
6977
SpinLock _lock;
7078
butil::FlatMap<butil::EndPoint, StubPool*> _stub_map;
79+
std::atomic<bool> _stopped{false};
80+
std::thread _cleanup_thread;
7181
};
7282

7383
class HttpBrpcStubCache {

be/src/util/internal_service_recoverable_stub.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,70 @@ Status PInternalService_RecoverableStub::reset_channel(int64_t next_connection_g
6060
return Status::OK();
6161
}
6262

63+
Status PInternalService_RecoverableStub::check_health() {
64+
65+
#ifdef BE_TEST
66+
if (_endpoint.port == 125) {
67+
LOG(INFO) << "BE_TEST mode: Port 125 simulated as unhealthy for " << _endpoint;
68+
return Status::InternalError("Simulated failure for port 111");
69+
}
70+
LOG(INFO) << "BE_TEST mode: Endpoint " << _endpoint << " simulated as healthy";
71+
return Status::OK();
72+
#endif
73+
74+
// extract host
75+
std::string host = butil::endpoint2str(_endpoint).c_str();
76+
size_t colon_pos = host.find(':');
77+
if (colon_pos != std::string::npos) {
78+
host = host.substr(0, colon_pos);
79+
}
80+
81+
// generate endpoint
82+
butil::EndPoint http_endpoint;
83+
if (butil::str2endpoint((host + ":" + std::to_string(config::be_http_port)).c_str(), &http_endpoint) != 0) {
84+
LOG(WARNING) << "Invalid HTTP endpoint: " << host << ":" << config::be_http_port;
85+
return Status::InternalError("Invalid HTTP endpoint");
86+
}
87+
88+
// generate channel options
89+
brpc::ChannelOptions options;
90+
options.connect_timeout_ms = config::rpc_connect_timeout_ms;
91+
options.timeout_ms = config::rpc_connect_timeout_ms;
92+
options.max_retry = 1;
93+
options.protocol = "http";
94+
95+
// init channel
96+
brpc::Channel channel;
97+
if (channel.Init(http_endpoint, &options) != 0) {
98+
LOG(WARNING) << "Failed to init HTTP channel for " << http_endpoint;
99+
return Status::InternalError("Failed to init HTTP channel");
100+
}
101+
102+
// generate controller
103+
brpc::Controller cntl;
104+
cntl.http_request().set_method(brpc::HTTP_METHOD_GET);
105+
cntl.http_request().uri() = "/api/health";
106+
107+
// trigger health API
108+
channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
109+
110+
// get and check response
111+
if (cntl.Failed()) {
112+
LOG(WARNING) << "Health check failed for endpoint " << _endpoint
113+
<< ", error: " << cntl.ErrorText();
114+
return Status::InternalError("Health check request failed");
115+
}
116+
int status_code = cntl.http_response().status_code();
117+
if (status_code == 200) {
118+
LOG(INFO) << "Health check passed for endpoint " << _endpoint << " (HTTP " << status_code << ")";
119+
return Status::OK();
120+
} else {
121+
LOG(WARNING) << "Health check failed for endpoint " << _endpoint
122+
<< ", HTTP status: " << status_code;
123+
return Status::InternalError("Health check failed");
124+
}
125+
}
126+
63127
void PInternalService_RecoverableStub::tablet_writer_open(::google::protobuf::RpcController* controller,
64128
const ::starrocks::PTabletWriterOpenRequest* request,
65129
::starrocks::PTabletWriterOpenResult* response,

be/src/util/internal_service_recoverable_stub.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class PInternalService_RecoverableStub : public PInternalService,
2828
~PInternalService_RecoverableStub();
2929

3030
Status reset_channel(int64_t next_connection_group = 0);
31+
Status check_health();
3132

3233
std::shared_ptr<starrocks::PInternalService_Stub> stub() const {
3334
std::shared_lock l(_mutex);

be/test/util/brpc_stub_cache_test.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,25 @@ TEST_F(BrpcStubCacheTest, test_http_stub) {
108108
ASSERT_EQ(nullptr, *stub4);
109109
}
110110

111+
TEST_F(BrpcStubCacheTest, cleanup) {
112+
BrpcStubCache cache;
113+
TNetworkAddress address;
114+
address.hostname = "127.0.0.1";
115+
// healthy port
116+
address.port = 123;
117+
auto stub1 = cache.get_stub(address);
118+
ASSERT_NE(nullptr, stub1);
119+
cache.check_and_cleanup_unhealthy_stubs();
120+
auto stub2 = cache.get_stub(address);
121+
ASSERT_EQ(stub2, stub1);
122+
123+
// unhealthy port
124+
address.port = 125;
125+
auto stub3 = cache.get_stub(address);
126+
ASSERT_NE(nullptr, stub1);
127+
cache.check_and_cleanup_unhealthy_stubs();
128+
auto stub4 = cache.get_stub(address);
129+
ASSERT_NE(stub4, stub3);
130+
}
131+
111132
} // namespace starrocks

0 commit comments

Comments
 (0)