Skip to content

Commit 35038ec

Browse files
committed
[Enhancement] Add a cleaner for BrpcStubCache to cleanup unused connections
Signed-off-by: ‘duanyyyyyyy’ <yan.duan9759@gmail.com>
1 parent af49488 commit 35038ec

File tree

6 files changed

+163
-0
lines changed

6 files changed

+163
-0
lines changed

be/src/common/config.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ CONF_String(ssl_private_key_path, "");
5959
// These connections are created during the first few access and will be used thereafter
6060
CONF_Int32(brpc_max_connections_per_server, "1");
6161

62+
// BRPC stub cache cleanup configurations
63+
// Interval in seconds to run cleanup for expired BRPC stubs
64+
CONF_mInt32(brpc_stub_cleanup_interval_s, "600"); // 10 minutes
65+
6266
// Declare a selection strategy for those servers have many ips.
6367
// Note that there should at most one ip match this list.
6468
// this is a list in semicolon-delimited format, in CIDR notation, e.g. 10.10.10.0/24

be/src/util/brpc_stub_cache.cpp

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "gen_cpp/internal_service.pb.h"
1919
#include "gen_cpp/lake_service.pb.h"
2020
#include "util/failpoint/fail_point.h"
21+
#include "util/misc.h"
2122
#include "util/starrocks_metrics.h"
2223

2324
namespace starrocks {
@@ -28,12 +29,28 @@ BrpcStubCache::BrpcStubCache() {
2829
std::lock_guard<SpinLock> l(_lock);
2930
return _stub_map.size();
3031
});
32+
_cleanup_thread = std::thread([this] {
33+
#ifdef GOOGLE_PROFILER
34+
ProfilerRegisterThread();
35+
#endif
36+
while (!_stopped.load()) {
37+
LOG(INFO) << "Start to clean up expired brpc stubs";
38+
check_and_cleanup_unhealthy_stubs();
39+
nap_sleep(config::brpc_stub_cleanup_interval_s, [this] { return _stopped.load(); });
40+
}
41+
});
42+
Thread::set_thread_name(_cleanup_thread, "brpc_cleanup_thread");
3143
}
3244

3345
BrpcStubCache::~BrpcStubCache() {
46+
_stopped.store(true);
47+
_cleanup_thread.join();
48+
3449
for (auto& stub : _stub_map) {
3550
delete stub.second;
3651
}
52+
53+
_stub_map.clear();
3754
}
3855

3956
std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::get_stub(const butil::EndPoint& endpoint) {
@@ -71,10 +88,38 @@ std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::get_stub(const
7188
return get_stub(endpoint);
7289
}
7390

91+
void BrpcStubCache::check_and_cleanup_unhealthy_stubs() {
92+
std::vector<butil::EndPoint> snapshot;
93+
{
94+
std::lock_guard<SpinLock> l(_lock);
95+
96+
for (auto& entry : _stub_map) {
97+
snapshot.push_back(entry.first);
98+
}
99+
}
100+
101+
for (const auto& endpoint : snapshot) {
102+
auto stub_pool = get_stub_pool_readonly(endpoint);
103+
if (stub_pool && stub_pool->is_unhealthy()) {
104+
{
105+
std::lock_guard<SpinLock> l(_lock);
106+
107+
LOG(INFO) << "cleanup stubs from endpoint:" << endpoint;
108+
_stub_map.erase(endpoint);
109+
delete stub_pool;
110+
}
111+
}
112+
}
113+
}
114+
74115
BrpcStubCache::StubPool::StubPool() : _idx(-1) {
75116
_stubs.reserve(config::brpc_max_connections_per_server);
76117
}
77118

119+
BrpcStubCache::StubPool::~StubPool() {
120+
_stubs.clear();
121+
}
122+
78123
std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::StubPool::get_or_create(
79124
const butil::EndPoint& endpoint) {
80125
if (UNLIKELY(_stubs.size() < config::brpc_max_connections_per_server)) {
@@ -91,6 +136,27 @@ std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::StubPool::get_o
91136
return _stubs[_idx];
92137
}
93138

139+
bool BrpcStubCache::StubPool::is_unhealthy() {
140+
if (_stubs.size() == 0) {
141+
// When the vector is empty do not need to clean
142+
return false;
143+
}
144+
145+
auto& first_stub = _stubs[0];
146+
if (!first_stub) {
147+
return true;
148+
}
149+
150+
auto status = first_stub->check_health();
151+
return !status.ok();
152+
}
153+
154+
BrpcStubCache::StubPool* BrpcStubCache::get_stub_pool_readonly(const butil::EndPoint& endpoint) {
155+
std::lock_guard<SpinLock> l(_lock);
156+
auto stub_ptr = _stub_map.seek(endpoint);
157+
return stub_ptr ? *stub_ptr : nullptr;
158+
}
159+
94160
HttpBrpcStubCache* HttpBrpcStubCache::getInstance() {
95161
static HttpBrpcStubCache cache;
96162
return &cache;

be/src/util/brpc_stub_cache.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
#pragma once
3636

37+
#include <atomic>
3738
#include <memory>
3839
#include <mutex>
3940
#include <vector>
@@ -45,6 +46,7 @@
4546
#include "util/lake_service_recoverable_stub.h"
4647
#include "util/network_util.h"
4748
#include "util/spinlock.h"
49+
#include "util/thread.h"
4850

4951
namespace starrocks {
5052

@@ -56,18 +58,26 @@ class BrpcStubCache {
5658
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const butil::EndPoint& endpoint);
5759
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const TNetworkAddress& taddr);
5860
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const std::string& host, int port);
61+
void check_and_cleanup_unhealthy_stubs();
5962

6063
private:
6164
struct StubPool {
6265
StubPool();
66+
~StubPool();
67+
6368
std::shared_ptr<PInternalService_RecoverableStub> get_or_create(const butil::EndPoint& endpoint);
69+
bool is_unhealthy();
6470

6571
std::vector<std::shared_ptr<PInternalService_RecoverableStub>> _stubs;
6672
int64_t _idx;
6773
};
6874

75+
StubPool* get_stub_pool_readonly(const butil::EndPoint& endpoint);
76+
6977
SpinLock _lock;
7078
butil::FlatMap<butil::EndPoint, StubPool*> _stub_map;
79+
std::atomic<bool> _stopped{false};
80+
std::thread _cleanup_thread;
7181
};
7282

7383
class HttpBrpcStubCache {

be/src/util/internal_service_recoverable_stub.cpp

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,67 @@ Status PInternalService_RecoverableStub::reset_channel(int64_t next_connection_g
6060
return Status::OK();
6161
}
6262

63+
Status PInternalService_RecoverableStub::check_health() {
64+
#ifdef BE_TEST
65+
if (_endpoint.port == 125) {
66+
LOG(INFO) << "BE_TEST mode: Port 125 simulated as unhealthy for " << _endpoint;
67+
return Status::InternalError("Simulated failure for port 111");
68+
}
69+
LOG(INFO) << "BE_TEST mode: Endpoint " << _endpoint << " simulated as healthy";
70+
return Status::OK();
71+
#endif
72+
73+
// extract host
74+
std::string host = butil::endpoint2str(_endpoint).c_str();
75+
size_t colon_pos = host.find(':');
76+
if (colon_pos != std::string::npos) {
77+
host = host.substr(0, colon_pos);
78+
}
79+
80+
// generate endpoint
81+
butil::EndPoint http_endpoint;
82+
if (butil::str2endpoint((host + ":" + std::to_string(config::be_http_port)).c_str(), &http_endpoint) != 0) {
83+
LOG(WARNING) << "Invalid HTTP endpoint: " << host << ":" << config::be_http_port;
84+
return Status::InternalError("Invalid HTTP endpoint");
85+
}
86+
87+
// generate channel options
88+
brpc::ChannelOptions options;
89+
options.connect_timeout_ms = config::rpc_connect_timeout_ms;
90+
options.timeout_ms = config::rpc_connect_timeout_ms;
91+
options.max_retry = 3;
92+
options.protocol = "http";
93+
94+
// init channel
95+
brpc::Channel channel;
96+
if (channel.Init(http_endpoint, &options) != 0) {
97+
LOG(WARNING) << "Failed to init HTTP channel for " << http_endpoint;
98+
return Status::InternalError("Failed to init HTTP channel");
99+
}
100+
101+
// generate controller
102+
brpc::Controller cntl;
103+
cntl.http_request().set_method(brpc::HTTP_METHOD_GET);
104+
cntl.http_request().uri() = "/api/health";
105+
106+
// trigger health API
107+
channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
108+
109+
// get and check response
110+
if (cntl.Failed()) {
111+
LOG(WARNING) << "Health check failed for endpoint " << _endpoint << ", error: " << cntl.ErrorText();
112+
return Status::InternalError("Health check request failed");
113+
}
114+
int status_code = cntl.http_response().status_code();
115+
if (status_code == 200) {
116+
LOG(INFO) << "Health check passed for endpoint " << _endpoint << " (HTTP " << status_code << ")";
117+
return Status::OK();
118+
} else {
119+
LOG(WARNING) << "Health check failed for endpoint " << _endpoint << ", HTTP status: " << status_code;
120+
return Status::InternalError("Health check failed");
121+
}
122+
}
123+
63124
void PInternalService_RecoverableStub::tablet_writer_open(::google::protobuf::RpcController* controller,
64125
const ::starrocks::PTabletWriterOpenRequest* request,
65126
::starrocks::PTabletWriterOpenResult* response,

be/src/util/internal_service_recoverable_stub.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class PInternalService_RecoverableStub : public PInternalService,
2828
~PInternalService_RecoverableStub();
2929

3030
Status reset_channel(int64_t next_connection_group = 0);
31+
Status check_health();
3132

3233
std::shared_ptr<starrocks::PInternalService_Stub> stub() const {
3334
std::shared_lock l(_mutex);

be/test/util/brpc_stub_cache_test.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,25 @@ TEST_F(BrpcStubCacheTest, test_http_stub) {
108108
ASSERT_EQ(nullptr, *stub4);
109109
}
110110

111+
TEST_F(BrpcStubCacheTest, cleanup) {
112+
BrpcStubCache cache;
113+
TNetworkAddress address;
114+
address.hostname = "127.0.0.1";
115+
// healthy port
116+
address.port = 123;
117+
auto stub1 = cache.get_stub(address);
118+
ASSERT_NE(nullptr, stub1);
119+
cache.check_and_cleanup_unhealthy_stubs();
120+
auto stub2 = cache.get_stub(address);
121+
ASSERT_EQ(stub2, stub1);
122+
123+
// unhealthy port
124+
address.port = 125;
125+
auto stub3 = cache.get_stub(address);
126+
ASSERT_NE(nullptr, stub1);
127+
cache.check_and_cleanup_unhealthy_stubs();
128+
auto stub4 = cache.get_stub(address);
129+
ASSERT_NE(stub4, stub3);
130+
}
131+
111132
} // namespace starrocks

0 commit comments

Comments
 (0)