Skip to content

Commit d2fc1f6

Browse files
committed
[Enhancement] Add a cleaner for BrpcStubCache to cleanup unused connections
Signed-off-by: duanyyyyyyy <yan.duan9759@gmail.com>
1 parent 3269ae6 commit d2fc1f6

File tree

2 files changed

+40
-55
lines changed

2 files changed

+40
-55
lines changed

be/src/util/brpc_stub_cache.cpp

Lines changed: 34 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ HttpBrpcStubCache* HttpBrpcStubCache::getInstance() {
134134

135135
HttpBrpcStubCache::HttpBrpcStubCache() {
136136
_stub_map.init(500);
137-
_task_map.init(500);
138137
_pipeline_timer = ExecEnv::GetInstance()->pipeline_timer();
139138
}
140139

@@ -143,8 +142,8 @@ HttpBrpcStubCache::~HttpBrpcStubCache() {
143142

144143
{
145144
std::lock_guard<SpinLock> l(_lock);
146-
for (auto& stub : _task_map) {
147-
task_to_cleanup.push_back(stub.second);
145+
for (auto& stub : _stub_map) {
146+
task_to_cleanup.push_back(stub.second.second);
148147
}
149148
}
150149

@@ -153,7 +152,6 @@ HttpBrpcStubCache::~HttpBrpcStubCache() {
153152
}
154153

155154
_stub_map.clear();
156-
_task_map.clear();
157155
}
158156

159157
StatusOr<std::shared_ptr<PInternalService_RecoverableStub>> HttpBrpcStubCache::get_http_stub(
@@ -176,40 +174,34 @@ StatusOr<std::shared_ptr<PInternalService_RecoverableStub>> HttpBrpcStubCache::g
176174
// get is exist
177175
std::lock_guard<SpinLock> l(_lock);
178176

179-
// schedule clean up task
180-
auto task = _task_map.seek(endpoint);
181-
if (task == nullptr) {
177+
auto stub_pair_ptr = _stub_map.seek(endpoint);
178+
if (stub_pair_ptr == nullptr) {
179+
// create
182180
auto new_task = std::make_shared<HttpEndpointCleanupTask>(this, endpoint);
183-
_task_map.insert(endpoint, new_task);
184-
task = _task_map.seek(endpoint);
181+
auto stub = std::make_shared<PInternalService_RecoverableStub>(endpoint, "http");
182+
if (!stub->reset_channel().ok()) {
183+
return Status::RuntimeError("init brpc http channel error on " + taddr.hostname + ":" +
184+
std::to_string(taddr.port));
185+
}
186+
_stub_map.insert(endpoint, std::make_pair(stub, new_task));
187+
stub_pair_ptr = _stub_map.seek(endpoint);
185188
}
186-
_pipeline_timer->unschedule((*task).get());
189+
190+
// schedule clean up task
191+
_pipeline_timer->unschedule((*stub_pair_ptr).second.get());
187192
timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s);
188-
auto status = _pipeline_timer->schedule((*task).get(), tm);
193+
auto status = _pipeline_timer->schedule((*stub_pair_ptr).second.get(), tm);
189194
if (!status.ok()) {
190195
LOG(WARNING) << "Failed to schedule http brpc cleanup task: " << endpoint;
191196
}
192-
193-
auto stub_ptr = _stub_map.seek(endpoint);
194-
if (stub_ptr != nullptr) {
195-
return *stub_ptr;
196-
}
197-
// create
198-
auto stub = std::make_shared<PInternalService_RecoverableStub>(endpoint, "http");
199-
if (!stub->reset_channel().ok()) {
200-
return Status::RuntimeError("init brpc http channel error on " + taddr.hostname + ":" +
201-
std::to_string(taddr.port));
202-
}
203-
_stub_map.insert(endpoint, stub);
204-
return stub;
197+
return (*stub_pair_ptr).first;
205198
}
206199

207200
void HttpBrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) {
208201
std::lock_guard<SpinLock> l(_lock);
209202

210203
LOG(INFO) << "cleanup http stubs from endpoint:" << endpoint;
211204
_stub_map.erase(endpoint);
212-
_task_map.erase(endpoint);
213205
}
214206

215207
LakeServiceBrpcStubCache* LakeServiceBrpcStubCache::getInstance() {
@@ -219,7 +211,6 @@ LakeServiceBrpcStubCache* LakeServiceBrpcStubCache::getInstance() {
219211

220212
LakeServiceBrpcStubCache::LakeServiceBrpcStubCache() {
221213
_stub_map.init(500);
222-
_task_map.init(500);
223214
_pipeline_timer = ExecEnv::GetInstance()->pipeline_timer();
224215
}
225216

@@ -228,8 +219,8 @@ LakeServiceBrpcStubCache::~LakeServiceBrpcStubCache() {
228219

229220
{
230221
std::lock_guard<SpinLock> l(_lock);
231-
for (auto& stub : _task_map) {
232-
task_to_cleanup.push_back(stub.second);
222+
for (auto& stub : _stub_map) {
223+
task_to_cleanup.push_back(stub.second.second);
233224
}
234225
}
235226

@@ -238,7 +229,6 @@ LakeServiceBrpcStubCache::~LakeServiceBrpcStubCache() {
238229
}
239230

240231
_stub_map.clear();
241-
_task_map.clear();
242232
}
243233

244234
DEFINE_FAIL_POINT(get_stub_return_nullptr);
@@ -258,41 +248,34 @@ StatusOr<std::shared_ptr<starrocks::LakeService_RecoverableStub>> LakeServiceBrp
258248
// get if exist
259249
std::lock_guard<SpinLock> l(_lock);
260250

261-
// schedule clean up task
262-
auto task = _task_map.seek(endpoint);
263-
if (task == nullptr) {
251+
auto stub_pair_ptr = _stub_map.seek(endpoint);
252+
FAIL_POINT_TRIGGER_EXECUTE(get_stub_return_nullptr, { stub_pair_ptr = nullptr; });
253+
if (stub_pair_ptr == nullptr) {
254+
// create
255+
auto stub = std::make_shared<starrocks::LakeService_RecoverableStub>(endpoint, "");
264256
auto new_task = std::make_shared<LakeEndpointCleanupTask>(this, endpoint);
265-
_task_map.insert(endpoint, new_task);
266-
task = _task_map.seek(endpoint);
257+
if (!stub->reset_channel().ok()) {
258+
return Status::RuntimeError("init brpc lake channel error on " + host + ":" + std::to_string(port));
259+
}
260+
_stub_map.insert(endpoint, std::make_pair(stub, new_task));
261+
stub_pair_ptr = _stub_map.seek(endpoint);
267262
}
268-
_pipeline_timer->unschedule((*task).get());
263+
264+
// schedule clean up task
265+
_pipeline_timer->unschedule((*stub_pair_ptr).second.get());
269266
timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s);
270-
auto status = _pipeline_timer->schedule((*task).get(), tm);
267+
auto status = _pipeline_timer->schedule((*stub_pair_ptr).second.get(), tm);
271268
if (!status.ok()) {
272269
LOG(WARNING) << "Failed to schedule lake brpc cleanup task: " << endpoint;
273270
}
274-
275-
auto stub_ptr = _stub_map.seek(endpoint);
276-
FAIL_POINT_TRIGGER_EXECUTE(get_stub_return_nullptr, { stub_ptr = nullptr; });
277-
if (stub_ptr != nullptr) {
278-
return *stub_ptr;
279-
}
280-
// create
281-
auto stub = std::make_shared<starrocks::LakeService_RecoverableStub>(endpoint, "");
282-
283-
if (!stub->reset_channel().ok()) {
284-
return Status::RuntimeError("init brpc http channel error on " + host + ":" + std::to_string(port));
285-
}
286-
_stub_map.insert(endpoint, stub);
287-
return stub;
271+
return (*stub_pair_ptr).first;
288272
}
289273

290274
void LakeServiceBrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) {
291275
std::lock_guard<SpinLock> l(_lock);
292276

293277
LOG(INFO) << "cleanup lake service stubs from endpoint:" << endpoint;
294278
_stub_map.erase(endpoint);
295-
_task_map.erase(endpoint);
296279
}
297280

298281
void EndpointCleanupTask::Run() {

be/src/util/brpc_stub_cache.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,9 @@ class HttpBrpcStubCache {
9494
~HttpBrpcStubCache();
9595

9696
SpinLock _lock;
97-
butil::FlatMap<butil::EndPoint, std::shared_ptr<PInternalService_RecoverableStub>> _stub_map;
98-
butil::FlatMap<butil::EndPoint, std::shared_ptr<HttpEndpointCleanupTask>> _task_map;
97+
butil::FlatMap<butil::EndPoint, std::pair<std::shared_ptr<PInternalService_RecoverableStub>,
98+
std::shared_ptr<HttpEndpointCleanupTask>>>
99+
_stub_map;
99100
pipeline::PipelineTimer* _pipeline_timer;
100101
};
101102

@@ -112,8 +113,9 @@ class LakeServiceBrpcStubCache {
112113
~LakeServiceBrpcStubCache();
113114

114115
SpinLock _lock;
115-
butil::FlatMap<butil::EndPoint, std::shared_ptr<LakeService_RecoverableStub>> _stub_map;
116-
butil::FlatMap<butil::EndPoint, std::shared_ptr<LakeEndpointCleanupTask>> _task_map;
116+
butil::FlatMap<butil::EndPoint,
117+
std::pair<std::shared_ptr<LakeService_RecoverableStub>, std::shared_ptr<LakeEndpointCleanupTask>>>
118+
_stub_map;
117119
pipeline::PipelineTimer* _pipeline_timer;
118120
};
119121

0 commit comments

Comments
 (0)