Skip to content

Commit 30c82c3

Browse files
committed
[Enhancement] Add a cleaner for BrpcStubCache to cleanup unused connections
Signed-off-by: duanyyyyyyy <yan.duan9759@gmail.com>
1 parent 251f476 commit 30c82c3

File tree

4 files changed

+171
-5
lines changed

4 files changed

+171
-5
lines changed

be/src/util/brpc_stub_cache.cpp

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::get_stub(const
6565
timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s);
6666
auto status = _pipeline_timer->schedule((*stub_pool)->_cleanup_task, tm);
6767
if (!status.ok()) {
68-
LOG(WARNING) << "Failed to schedule endpoint: " << endpoint;
68+
LOG(WARNING) << "Failed to schedule brpc cleanup task: " << endpoint;
6969
}
7070

7171
return (*stub_pool)->get_or_create(endpoint);
@@ -99,10 +99,7 @@ void BrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) {
9999
std::lock_guard<SpinLock> l(_lock);
100100

101101
LOG(INFO) << "cleanup stubs from endpoint:" << endpoint;
102-
auto pool = _stub_map.seek(endpoint);
103-
if (pool != nullptr) {
104-
_stub_map.erase(endpoint);
105-
}
102+
_stub_map.erase(endpoint);
106103
}
107104

108105
BrpcStubCache::StubPool::StubPool() : _idx(-1) {
@@ -137,6 +134,26 @@ HttpBrpcStubCache* HttpBrpcStubCache::getInstance() {
137134

138135
HttpBrpcStubCache::HttpBrpcStubCache() {
139136
_stub_map.init(500);
137+
_task_map.init(500);
138+
_pipeline_timer = ExecEnv::GetInstance()->pipeline_timer();
139+
}
140+
141+
HttpBrpcStubCache::~HttpBrpcStubCache() {
142+
std::vector<std::shared_ptr<HttpEndpointCleanupTask>> task_to_cleanup;
143+
144+
{
145+
std::lock_guard<SpinLock> l(_lock);
146+
for (auto& stub : _task_map) {
147+
task_to_cleanup.push_back(stub.second);
148+
}
149+
}
150+
151+
for (auto& task : task_to_cleanup) {
152+
task->unschedule(_pipeline_timer);
153+
}
154+
155+
_stub_map.clear();
156+
_task_map.clear();
140157
}
141158

142159
StatusOr<std::shared_ptr<PInternalService_RecoverableStub>> HttpBrpcStubCache::get_http_stub(
@@ -158,6 +175,21 @@ StatusOr<std::shared_ptr<PInternalService_RecoverableStub>> HttpBrpcStubCache::g
158175
}
159176
// get is exist
160177
std::lock_guard<SpinLock> l(_lock);
178+
179+
// schedule clean up task
180+
auto task = _task_map.seek(endpoint);
181+
if (task == nullptr) {
182+
auto new_task = std::make_shared<HttpEndpointCleanupTask>(this, endpoint);
183+
_task_map.insert(endpoint, new_task);
184+
task = _task_map.seek(endpoint);
185+
}
186+
_pipeline_timer->unschedule((*task).get());
187+
timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s);
188+
auto status = _pipeline_timer->schedule((*task).get(), tm);
189+
if (!status.ok()) {
190+
LOG(WARNING) << "Failed to schedule http brpc cleanup task: " << endpoint;
191+
}
192+
161193
auto stub_ptr = _stub_map.seek(endpoint);
162194
if (stub_ptr != nullptr) {
163195
return *stub_ptr;
@@ -172,13 +204,41 @@ StatusOr<std::shared_ptr<PInternalService_RecoverableStub>> HttpBrpcStubCache::g
172204
return stub;
173205
}
174206

207+
void HttpBrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) {
208+
std::lock_guard<SpinLock> l(_lock);
209+
210+
LOG(INFO) << "cleanup http stubs from endpoint:" << endpoint;
211+
_stub_map.erase(endpoint);
212+
_task_map.erase(endpoint);
213+
}
214+
175215
LakeServiceBrpcStubCache* LakeServiceBrpcStubCache::getInstance() {
176216
static LakeServiceBrpcStubCache cache;
177217
return &cache;
178218
}
179219

180220
LakeServiceBrpcStubCache::LakeServiceBrpcStubCache() {
181221
_stub_map.init(500);
222+
_task_map.init(500);
223+
_pipeline_timer = ExecEnv::GetInstance()->pipeline_timer();
224+
}
225+
226+
LakeServiceBrpcStubCache::~LakeServiceBrpcStubCache() {
227+
std::vector<std::shared_ptr<LakeEndpointCleanupTask>> task_to_cleanup;
228+
229+
{
230+
std::lock_guard<SpinLock> l(_lock);
231+
for (auto& stub : _task_map) {
232+
task_to_cleanup.push_back(stub.second);
233+
}
234+
}
235+
236+
for (auto& task : task_to_cleanup) {
237+
task->unschedule(_pipeline_timer);
238+
}
239+
240+
_stub_map.clear();
241+
_task_map.clear();
182242
}
183243

184244
DEFINE_FAIL_POINT(get_stub_return_nullptr);
@@ -197,22 +257,54 @@ StatusOr<std::shared_ptr<starrocks::LakeService_RecoverableStub>> LakeServiceBrp
197257
}
198258
// get if exist
199259
std::lock_guard<SpinLock> l(_lock);
260+
261+
// schedule clean up task
262+
auto task = _task_map.seek(endpoint);
263+
if (task == nullptr) {
264+
auto new_task = std::make_shared<LakeEndpointCleanupTask>(this, endpoint);
265+
_task_map.insert(endpoint, new_task);
266+
task = _task_map.seek(endpoint);
267+
}
268+
_pipeline_timer->unschedule((*task).get());
269+
timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s);
270+
auto status = _pipeline_timer->schedule((*task).get(), tm);
271+
if (!status.ok()) {
272+
LOG(WARNING) << "Failed to schedule lake brpc cleanup task: " << endpoint;
273+
}
274+
200275
auto stub_ptr = _stub_map.seek(endpoint);
201276
FAIL_POINT_TRIGGER_EXECUTE(get_stub_return_nullptr, { stub_ptr = nullptr; });
202277
if (stub_ptr != nullptr) {
203278
return *stub_ptr;
204279
}
205280
// create
206281
auto stub = std::make_shared<starrocks::LakeService_RecoverableStub>(endpoint, "");
282+
207283
if (!stub->reset_channel().ok()) {
208284
return Status::RuntimeError("init brpc http channel error on " + host + ":" + std::to_string(port));
209285
}
210286
_stub_map.insert(endpoint, stub);
211287
return stub;
212288
}
213289

290+
void LakeServiceBrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) {
291+
std::lock_guard<SpinLock> l(_lock);
292+
293+
LOG(INFO) << "cleanup lake service stubs from endpoint:" << endpoint;
294+
_stub_map.erase(endpoint);
295+
_task_map.erase(endpoint);
296+
}
297+
214298
void EndpointCleanupTask::Run() {
215299
_cache->cleanup_expired(_endpoint);
216300
}
217301

302+
void HttpEndpointCleanupTask::Run() {
303+
_cache->cleanup_expired(_endpoint);
304+
}
305+
306+
void LakeEndpointCleanupTask::Run() {
307+
_cache->cleanup_expired(_endpoint);
308+
}
309+
218310
} // namespace starrocks

be/src/util/brpc_stub_cache.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ namespace starrocks {
5252

5353
class ExecEnv;
5454
class EndpointCleanupTask;
55+
class HttpEndpointCleanupTask;
56+
class LakeEndpointCleanupTask;
5557

5658
class BrpcStubCache {
5759
public:
@@ -83,28 +85,36 @@ class HttpBrpcStubCache {
8385
public:
8486
static HttpBrpcStubCache* getInstance();
8587
StatusOr<std::shared_ptr<PInternalService_RecoverableStub>> get_http_stub(const TNetworkAddress& taddr);
88+
void cleanup_expired(const butil::EndPoint& endpoint);
8689

8790
private:
8891
HttpBrpcStubCache();
8992
HttpBrpcStubCache(const HttpBrpcStubCache&) = delete;
9093
HttpBrpcStubCache& operator=(const HttpBrpcStubCache&) = delete;
94+
~HttpBrpcStubCache();
9195

9296
SpinLock _lock;
9397
butil::FlatMap<butil::EndPoint, std::shared_ptr<PInternalService_RecoverableStub>> _stub_map;
98+
butil::FlatMap<butil::EndPoint, std::shared_ptr<HttpEndpointCleanupTask>> _task_map;
99+
pipeline::PipelineTimer* _pipeline_timer;
94100
};
95101

96102
class LakeServiceBrpcStubCache {
97103
public:
98104
static LakeServiceBrpcStubCache* getInstance();
99105
StatusOr<std::shared_ptr<starrocks::LakeService_RecoverableStub>> get_stub(const std::string& host, int port);
106+
void cleanup_expired(const butil::EndPoint& endpoint);
100107

101108
private:
102109
LakeServiceBrpcStubCache();
103110
LakeServiceBrpcStubCache(const LakeServiceBrpcStubCache&) = delete;
104111
LakeServiceBrpcStubCache& operator=(const LakeServiceBrpcStubCache&) = delete;
112+
~LakeServiceBrpcStubCache();
105113

106114
SpinLock _lock;
107115
butil::FlatMap<butil::EndPoint, std::shared_ptr<LakeService_RecoverableStub>> _stub_map;
116+
butil::FlatMap<butil::EndPoint, std::shared_ptr<LakeEndpointCleanupTask>> _task_map;
117+
pipeline::PipelineTimer* _pipeline_timer;
108118
};
109119

110120
class EndpointCleanupTask : public starrocks::pipeline::PipelineTimerTask {
@@ -117,4 +127,26 @@ class EndpointCleanupTask : public starrocks::pipeline::PipelineTimerTask {
117127
butil::EndPoint _endpoint;
118128
};
119129

130+
class HttpEndpointCleanupTask : public starrocks::pipeline::PipelineTimerTask {
131+
public:
132+
HttpEndpointCleanupTask(HttpBrpcStubCache* cache, const butil::EndPoint& endpoint)
133+
: _cache(cache), _endpoint(endpoint){};
134+
void Run() override;
135+
136+
private:
137+
HttpBrpcStubCache* _cache;
138+
butil::EndPoint _endpoint;
139+
};
140+
141+
class LakeEndpointCleanupTask : public starrocks::pipeline::PipelineTimerTask {
142+
public:
143+
LakeEndpointCleanupTask(LakeServiceBrpcStubCache* cache, const butil::EndPoint& endpoint)
144+
: _cache(cache), _endpoint(endpoint){};
145+
void Run() override;
146+
147+
private:
148+
LakeServiceBrpcStubCache* _cache;
149+
butil::EndPoint _endpoint;
150+
};
151+
120152
} // namespace starrocks

be/test/util/brpc_stub_cache_test.cpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,37 @@ TEST_F(BrpcStubCacheTest, test_cleanup) {
138138
ASSERT_NE(stub3, stub1);
139139
}
140140

141+
TEST_F(BrpcStubCacheTest, test_lake_cleanup) {
142+
config::brpc_stub_expire_s = 1;
143+
LakeServiceBrpcStubCache cache;
144+
std::string hostname = "127.0.0.1";
145+
int32_t port = 123;
146+
auto stub1 = cache.get_stub(hostname, port);
147+
ASSERT_TRUE(stub1.ok());
148+
ASSERT_NE(nullptr, *stub1);
149+
auto stub2 = cache.get_stub(hostname, port);
150+
ASSERT_TRUE(stub1.ok());
151+
ASSERT_EQ(*stub2, *stub1);
152+
153+
sleep(2);
154+
auto stub3 = cache.get_stub(hostname, port);
155+
ASSERT_NE(*stub3, *stub1);
156+
}
157+
158+
TEST_F(BrpcStubCacheTest, test_http_cleanup) {
159+
config::brpc_stub_expire_s = 1;
160+
HttpBrpcStubCache cache;
161+
TNetworkAddress address;
162+
address.hostname = "127.0.0.1";
163+
address.port = 123;
164+
auto stub1 = cache.get_http_stub(address);
165+
ASSERT_NE(nullptr, *stub1);
166+
auto stub2 = cache.get_http_stub(address);
167+
ASSERT_EQ(*stub2, *stub1);
168+
169+
sleep(2);
170+
auto stub3 = cache.get_http_stub(address);
171+
ASSERT_NE(*stub3, *stub1);
172+
}
173+
141174
} // namespace starrocks

docs/en/administration/management/BE_configuration.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,15 @@ curl http://<BE_IP>:<BE_HTTP_PORT>/varz
256256
- Description: The maximum body size of a bRPC.
257257
- Introduced in: -
258258

259+
##### brpc_stub_expire_s
260+
261+
- Default: 3600
262+
- Type: Int
263+
- Unit: Seconds
264+
- Is mutable: Yes
265+
- Description: The expire time of BRPC stub cache, default 60 minutes.
266+
- Introduced in: -
267+
259268
<!--
260269
##### brpc_socket_max_unwritten_bytes
261270

0 commit comments

Comments
 (0)