Skip to content

Commit c9782bc

Browse files
committed
[Enhancement] Add a cleaner for BrpcStubCache to cleanup unused connections
Signed-off-by: duanyyyyyyy <yan.duan9759@gmail.com>
1 parent b9dbcea commit c9782bc

File tree

7 files changed

+109
-14
lines changed

7 files changed

+109
-14
lines changed

be/src/common/config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ CONF_String(ssl_private_key_path, "");
5858
// The max number of single connections maintained by the brpc client and each server.
5959
// These connections are created during the first few access and will be used thereafter
6060
CONF_Int32(brpc_max_connections_per_server, "1");
61+
// BRPC stub cache expire configurations
62+
// The expire time of BRPC stub cache, default 60 minutes.
63+
CONF_mInt32(brpc_stub_expire_s, "3600"); // 60 minutes
6164

6265
// Declare a selection strategy for those servers have many ips.
6366
// Note that there should at most one ip match this list.

be/src/runtime/exec_env.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ Status ExecEnv::init(const std::vector<StorePath>& store_paths, bool as_cn) {
497497
#endif
498498
_load_channel_mgr = new LoadChannelMgr();
499499
_load_stream_mgr = new LoadStreamMgr();
500-
_brpc_stub_cache = new BrpcStubCache();
500+
_brpc_stub_cache = new BrpcStubCache(this);
501501
_stream_load_executor = new StreamLoadExecutor(this);
502502
_stream_context_mgr = new StreamContextMgr();
503503
_transaction_mgr = new TransactionMgr(this);

be/src/util/brpc_stub_cache.cpp

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
#include "util/brpc_stub_cache.h"
1616

17+
#include <runtime/exec_env.h>
18+
1719
#include "common/config.h"
1820
#include "gen_cpp/internal_service.pb.h"
1921
#include "gen_cpp/lake_service.pb.h"
@@ -22,28 +24,50 @@
2224

2325
namespace starrocks {
2426

25-
BrpcStubCache::BrpcStubCache() {
27+
BrpcStubCache::BrpcStubCache(ExecEnv* exec_env) {
2628
_stub_map.init(239);
29+
_pipeline_timer = exec_env->pipeline_timer();
2730
REGISTER_GAUGE_STARROCKS_METRIC(brpc_endpoint_stub_count, [this]() {
2831
std::lock_guard<SpinLock> l(_lock);
2932
return _stub_map.size();
3033
});
3134
}
3235

3336
BrpcStubCache::~BrpcStubCache() {
34-
for (auto& stub : _stub_map) {
35-
delete stub.second;
37+
std::vector<std::shared_ptr<StubPool>> pools_to_cleanup;
38+
{
39+
std::lock_guard<SpinLock> l(_lock);
40+
41+
for (auto& stub : _stub_map) {
42+
pools_to_cleanup.push_back(stub.second);
43+
}
44+
}
45+
46+
for (auto& pool : pools_to_cleanup) {
47+
pool->_cleanup_task->unschedule(_pipeline_timer);
3648
}
49+
50+
_stub_map.clear();
3751
}
3852

3953
std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::get_stub(const butil::EndPoint& endpoint) {
4054
std::lock_guard<SpinLock> l(_lock);
55+
4156
auto stub_pool = _stub_map.seek(endpoint);
4257
if (stub_pool == nullptr) {
43-
StubPool* pool = new StubPool();
44-
_stub_map.insert(endpoint, pool);
45-
return pool->get_or_create(endpoint);
58+
auto new_pool = std::make_shared<StubPool>();
59+
new_pool->_cleanup_task = new EndpointCleanupTask(this, endpoint);
60+
_stub_map.insert(endpoint, new_pool);
61+
stub_pool = _stub_map.seek(endpoint);
62+
}
63+
64+
_pipeline_timer->unschedule((*stub_pool)->_cleanup_task);
65+
timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s);
66+
auto status = _pipeline_timer->schedule((*stub_pool)->_cleanup_task, tm);
67+
if (!status.ok()) {
68+
LOG(WARNING) << "Failed to schedule endpoint: " << endpoint;
4669
}
70+
4771
return (*stub_pool)->get_or_create(endpoint);
4872
}
4973

@@ -71,10 +95,25 @@ std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::get_stub(const
7195
return get_stub(endpoint);
7296
}
7397

98+
void BrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) {
99+
std::lock_guard<SpinLock> l(_lock);
100+
101+
LOG(INFO) << "cleanup stubs from endpoint:" << endpoint;
102+
auto pool = _stub_map.seek(endpoint);
103+
if (pool != nullptr) {
104+
_stub_map.erase(endpoint);
105+
}
106+
}
107+
74108
BrpcStubCache::StubPool::StubPool() : _idx(-1) {
75109
_stubs.reserve(config::brpc_max_connections_per_server);
76110
}
77111

112+
BrpcStubCache::StubPool::~StubPool() {
113+
_stubs.clear();
114+
SAFE_DELETE(_cleanup_task);
115+
}
116+
78117
std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::StubPool::get_or_create(
79118
const butil::EndPoint& endpoint) {
80119
if (UNLIKELY(_stubs.size() < config::brpc_max_connections_per_server)) {
@@ -172,4 +211,8 @@ StatusOr<std::shared_ptr<starrocks::LakeService_RecoverableStub>> LakeServiceBrp
172211
return stub;
173212
}
174213

214+
void EndpointCleanupTask::Run() {
215+
_cache->cleanup_expired(_endpoint);
216+
}
217+
175218
} // namespace starrocks

be/src/util/brpc_stub_cache.h

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434

3535
#pragma once
3636

37+
#include <exec/pipeline/schedule/pipeline_timer.h>
38+
3739
#include <memory>
3840
#include <mutex>
3941
#include <vector>
@@ -48,26 +50,33 @@
4850

4951
namespace starrocks {
5052

53+
class ExecEnv;
54+
class EndpointCleanupTask;
55+
5156
class BrpcStubCache {
5257
public:
53-
BrpcStubCache();
58+
BrpcStubCache(ExecEnv* exec_env);
5459
~BrpcStubCache();
5560

5661
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const butil::EndPoint& endpoint);
5762
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const TNetworkAddress& taddr);
5863
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const std::string& host, int port);
64+
void cleanup_expired(const butil::EndPoint& endpoint);
5965

6066
private:
6167
struct StubPool {
6268
StubPool();
69+
~StubPool();
6370
std::shared_ptr<PInternalService_RecoverableStub> get_or_create(const butil::EndPoint& endpoint);
6471

6572
std::vector<std::shared_ptr<PInternalService_RecoverableStub>> _stubs;
6673
int64_t _idx;
74+
EndpointCleanupTask* _cleanup_task = nullptr;
6775
};
6876

6977
SpinLock _lock;
70-
butil::FlatMap<butil::EndPoint, StubPool*> _stub_map;
78+
butil::FlatMap<butil::EndPoint, std::shared_ptr<StubPool>> _stub_map;
79+
pipeline::PipelineTimer* _pipeline_timer;
7180
};
7281

7382
class HttpBrpcStubCache {
@@ -98,4 +107,14 @@ class LakeServiceBrpcStubCache {
98107
butil::FlatMap<butil::EndPoint, std::shared_ptr<LakeService_RecoverableStub>> _stub_map;
99108
};
100109

110+
class EndpointCleanupTask : public starrocks::pipeline::PipelineTimerTask {
111+
public:
112+
EndpointCleanupTask(BrpcStubCache* cache, const butil::EndPoint& endpoint) : _cache(cache), _endpoint(endpoint){};
113+
void Run() override;
114+
115+
private:
116+
BrpcStubCache* _cache;
117+
butil::EndPoint _endpoint;
118+
};
119+
101120
} // namespace starrocks

be/test/http/stream_load_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class StreamLoadActionTest : public testing::Test {
9191
config::streaming_load_max_mb = 1;
9292

9393
_env._load_stream_mgr = new LoadStreamMgr();
94-
_env._brpc_stub_cache = new BrpcStubCache();
94+
_env._brpc_stub_cache = new BrpcStubCache(&_env);
9595
_env._stream_load_executor = new StreamLoadExecutor(&_env);
9696

9797
_evhttp_req = evhttp_request_new(nullptr, nullptr);

be/test/http/transaction_stream_load_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class TransactionStreamLoadActionTest : public testing::Test {
6868
config::streaming_load_max_mb = 1;
6969

7070
_env._load_stream_mgr = new LoadStreamMgr();
71-
_env._brpc_stub_cache = new BrpcStubCache();
71+
_env._brpc_stub_cache = new BrpcStubCache(&_env);
7272
_env._stream_load_executor = new StreamLoadExecutor(&_env);
7373
_env._stream_context_mgr = new StreamContextMgr();
7474
_env._transaction_mgr = new TransactionMgr(&_env);

be/test/util/brpc_stub_cache_test.cpp

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
#include "util/brpc_stub_cache.h"
1919

2020
#include <gtest/gtest.h>
21+
#include <testutil/assert.h>
2122

23+
#include "runtime/exec_env.h"
2224
#include "util/failpoint/fail_point.h"
2325

2426
namespace starrocks {
@@ -27,10 +29,22 @@ class BrpcStubCacheTest : public testing::Test {
2729
public:
2830
BrpcStubCacheTest() = default;
2931
~BrpcStubCacheTest() override = default;
32+
void SetUp() override {
33+
_env._pipeline_timer = new pipeline::PipelineTimer();
34+
ASSERT_OK(_env._pipeline_timer->start());
35+
}
36+
void TearDown() override {
37+
delete _env._pipeline_timer;
38+
_env._pipeline_timer = nullptr;
39+
config::brpc_stub_expire_s = 3600;
40+
}
41+
42+
private:
43+
ExecEnv _env;
3044
};
3145

3246
TEST_F(BrpcStubCacheTest, normal) {
33-
BrpcStubCache cache;
47+
BrpcStubCache cache(&_env);
3448
TNetworkAddress address;
3549
address.hostname = "127.0.0.1";
3650
address.port = 123;
@@ -46,7 +60,7 @@ TEST_F(BrpcStubCacheTest, normal) {
4660
}
4761

4862
TEST_F(BrpcStubCacheTest, invalid) {
49-
BrpcStubCache cache;
63+
BrpcStubCache cache(&_env);
5064
TNetworkAddress address;
5165
address.hostname = "invalid.cm.invalid";
5266
address.port = 123;
@@ -55,7 +69,7 @@ TEST_F(BrpcStubCacheTest, invalid) {
5569
}
5670

5771
TEST_F(BrpcStubCacheTest, reset) {
58-
BrpcStubCache cache;
72+
BrpcStubCache cache(&_env);
5973
TNetworkAddress address;
6074
address.hostname = "127.0.0.1";
6175
address.port = 123;
@@ -108,4 +122,20 @@ TEST_F(BrpcStubCacheTest, test_http_stub) {
108122
ASSERT_EQ(nullptr, *stub4);
109123
}
110124

125+
TEST_F(BrpcStubCacheTest, test_cleanup) {
126+
config::brpc_stub_expire_s = 1;
127+
BrpcStubCache cache(&_env);
128+
TNetworkAddress address;
129+
address.hostname = "127.0.0.1";
130+
address.port = 123;
131+
auto stub1 = cache.get_stub(address);
132+
ASSERT_NE(nullptr, stub1);
133+
auto stub2 = cache.get_stub(address);
134+
ASSERT_EQ(stub2, stub1);
135+
136+
sleep(2);
137+
auto stub3 = cache.get_stub(address);
138+
ASSERT_NE(stub3, stub1);
139+
}
140+
111141
} // namespace starrocks

0 commit comments

Comments
 (0)