Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ CONF_String(ssl_private_key_path, "");
// The max number of single connections maintained by the brpc client and each server.
// These connections are created during the first few access and will be used thereafter
CONF_Int32(brpc_max_connections_per_server, "1");
// BRPC stub cache expire configurations
// The expire time of BRPC stub cache, default 60 minutes.
CONF_mInt32(brpc_stub_expire_s, "3600"); // 60 minutes

// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ Status ExecEnv::init(const std::vector<StorePath>& store_paths, bool as_cn) {
#endif
_load_channel_mgr = new LoadChannelMgr();
_load_stream_mgr = new LoadStreamMgr();
_brpc_stub_cache = new BrpcStubCache();
_brpc_stub_cache = new BrpcStubCache(this);
_stream_load_executor = new StreamLoadExecutor(this);
_stream_context_mgr = new StreamContextMgr();
_transaction_mgr = new TransactionMgr(this);
Expand Down
178 changes: 150 additions & 28 deletions be/src/util/brpc_stub_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,57 @@
#include "common/config.h"
#include "gen_cpp/internal_service.pb.h"
#include "gen_cpp/lake_service.pb.h"
#include "runtime/exec_env.h"
#include "util/failpoint/fail_point.h"
#include "util/starrocks_metrics.h"

namespace starrocks {

BrpcStubCache::BrpcStubCache() {
BrpcStubCache::BrpcStubCache(ExecEnv* exec_env) : _pipeline_timer(exec_env->pipeline_timer()) {
_stub_map.init(239);
_pipeline_timer = exec_env->pipeline_timer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_pipeline_timer is initialized in the init list, this line is redundant.

REGISTER_GAUGE_STARROCKS_METRIC(brpc_endpoint_stub_count, [this]() {
std::lock_guard<SpinLock> l(_lock);
return _stub_map.size();
});
}

BrpcStubCache::~BrpcStubCache() {
for (auto& stub : _stub_map) {
delete stub.second;
std::vector<std::shared_ptr<StubPool>> pools_to_cleanup;
{
std::lock_guard<SpinLock> l(_lock);

for (auto& stub : _stub_map) {
pools_to_cleanup.push_back(stub.second);
}
}

for (auto& pool : pools_to_cleanup) {
pool->_cleanup_task->unschedule(_pipeline_timer);
}

_stub_map.clear();
}

std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::get_stub(const butil::EndPoint& endpoint) {
std::lock_guard<SpinLock> l(_lock);

auto stub_pool = _stub_map.seek(endpoint);
if (stub_pool == nullptr) {
StubPool* pool = new StubPool();
_stub_map.insert(endpoint, pool);
return pool->get_or_create(endpoint);
auto new_pool = std::make_shared<StubPool>();
new_pool->_cleanup_task = new EndpointCleanupTask(this, endpoint);
_stub_map.insert(endpoint, new_pool);
stub_pool = _stub_map.seek(endpoint);
}

if (_pipeline_timer->unschedule((*stub_pool)->_cleanup_task) != 1) {
timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s);
auto status = _pipeline_timer->schedule((*stub_pool)->_cleanup_task, tm);
if (!status.ok()) {
LOG(WARNING) << "Failed to schedule brpc cleanup task: " << endpoint;
}
}

return (*stub_pool)->get_or_create(endpoint);
}

Expand Down Expand Up @@ -71,10 +95,22 @@ std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::get_stub(const
return get_stub(endpoint);
}

void BrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) {
std::lock_guard<SpinLock> l(_lock);

LOG(INFO) << "cleanup stubs from endpoint:" << endpoint;
_stub_map.erase(endpoint);
}

BrpcStubCache::StubPool::StubPool() : _idx(-1) {
_stubs.reserve(config::brpc_max_connections_per_server);
}

BrpcStubCache::StubPool::~StubPool() {
_stubs.clear();
SAFE_DELETE(_cleanup_task);
}

std::shared_ptr<PInternalService_RecoverableStub> BrpcStubCache::StubPool::get_or_create(
const butil::EndPoint& endpoint) {
if (UNLIKELY(_stubs.size() < config::brpc_max_connections_per_server)) {
Expand All @@ -98,6 +134,24 @@ HttpBrpcStubCache* HttpBrpcStubCache::getInstance() {

HttpBrpcStubCache::HttpBrpcStubCache() {
_stub_map.init(500);
_pipeline_timer = ExecEnv::GetInstance()->pipeline_timer();
}

HttpBrpcStubCache::~HttpBrpcStubCache() {
std::vector<std::shared_ptr<HttpEndpointCleanupTask>> task_to_cleanup;

{
std::lock_guard<SpinLock> l(_lock);
for (auto& stub : _stub_map) {
task_to_cleanup.push_back(stub.second.second);
}
}

for (auto& task : task_to_cleanup) {
task->unschedule(_pipeline_timer);
}

_stub_map.clear();
}

StatusOr<std::shared_ptr<PInternalService_RecoverableStub>> HttpBrpcStubCache::get_http_stub(
Expand All @@ -119,18 +173,37 @@ StatusOr<std::shared_ptr<PInternalService_RecoverableStub>> HttpBrpcStubCache::g
}
// get is exist
std::lock_guard<SpinLock> l(_lock);
auto stub_ptr = _stub_map.seek(endpoint);
if (stub_ptr != nullptr) {
return *stub_ptr;

auto stub_pair_ptr = _stub_map.seek(endpoint);
if (stub_pair_ptr == nullptr) {
// create
auto new_task = std::make_shared<HttpEndpointCleanupTask>(this, endpoint);
auto stub = std::make_shared<PInternalService_RecoverableStub>(endpoint, "http");
if (!stub->reset_channel().ok()) {
return Status::RuntimeError("init brpc http channel error on " + taddr.hostname + ":" +
std::to_string(taddr.port));
}
_stub_map.insert(endpoint, std::make_pair(stub, new_task));
stub_pair_ptr = _stub_map.seek(endpoint);
}
// create
auto stub = std::make_shared<PInternalService_RecoverableStub>(endpoint, "http");
if (!stub->reset_channel().ok()) {
return Status::RuntimeError("init brpc http channel error on " + taddr.hostname + ":" +
std::to_string(taddr.port));

// schedule clean up task
if (_pipeline_timer->unschedule((*stub_pair_ptr).second.get()) != 1) {
timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s);
auto status = _pipeline_timer->schedule((*stub_pair_ptr).second.get(), tm);
if (!status.ok()) {
LOG(WARNING) << "Failed to schedule http brpc cleanup task: " << endpoint;
}
}
_stub_map.insert(endpoint, stub);
return stub;

return (*stub_pair_ptr).first;
}

void HttpBrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) {
std::lock_guard<SpinLock> l(_lock);

LOG(INFO) << "cleanup http stubs from endpoint:" << endpoint;
_stub_map.erase(endpoint);
}

LakeServiceBrpcStubCache* LakeServiceBrpcStubCache::getInstance() {
Expand All @@ -140,6 +213,24 @@ LakeServiceBrpcStubCache* LakeServiceBrpcStubCache::getInstance() {

LakeServiceBrpcStubCache::LakeServiceBrpcStubCache() {
_stub_map.init(500);
_pipeline_timer = ExecEnv::GetInstance()->pipeline_timer();
}

LakeServiceBrpcStubCache::~LakeServiceBrpcStubCache() {
std::vector<std::shared_ptr<LakeEndpointCleanupTask>> task_to_cleanup;

{
std::lock_guard<SpinLock> l(_lock);
for (auto& stub : _stub_map) {
task_to_cleanup.push_back(stub.second.second);
}
}

for (auto& task : task_to_cleanup) {
task->unschedule(_pipeline_timer);
}

_stub_map.clear();
}

DEFINE_FAIL_POINT(get_stub_return_nullptr);
Expand All @@ -158,18 +249,49 @@ StatusOr<std::shared_ptr<starrocks::LakeService_RecoverableStub>> LakeServiceBrp
}
// get if exist
std::lock_guard<SpinLock> l(_lock);
auto stub_ptr = _stub_map.seek(endpoint);
FAIL_POINT_TRIGGER_EXECUTE(get_stub_return_nullptr, { stub_ptr = nullptr; });
if (stub_ptr != nullptr) {
return *stub_ptr;
}
// create
auto stub = std::make_shared<starrocks::LakeService_RecoverableStub>(endpoint, "");
if (!stub->reset_channel().ok()) {
return Status::RuntimeError("init brpc http channel error on " + host + ":" + std::to_string(port));
}
_stub_map.insert(endpoint, stub);
return stub;

auto stub_pair_ptr = _stub_map.seek(endpoint);
FAIL_POINT_TRIGGER_EXECUTE(get_stub_return_nullptr, { stub_pair_ptr = nullptr; });
if (stub_pair_ptr == nullptr) {
// create
auto stub = std::make_shared<starrocks::LakeService_RecoverableStub>(endpoint, "");
auto new_task = std::make_shared<LakeEndpointCleanupTask>(this, endpoint);
if (!stub->reset_channel().ok()) {
return Status::RuntimeError("init brpc lake channel error on " + host + ":" + std::to_string(port));
}
_stub_map.insert(endpoint, std::make_pair(stub, new_task));
stub_pair_ptr = _stub_map.seek(endpoint);
}

// schedule clean up task
if (_pipeline_timer->unschedule((*stub_pair_ptr).second.get()) != 1) {
timespec tm = butil::seconds_from_now(config::brpc_stub_expire_s);
auto status = _pipeline_timer->schedule((*stub_pair_ptr).second.get(), tm);
if (!status.ok()) {
LOG(WARNING) << "Failed to schedule lake brpc cleanup task: " << endpoint;
}
}

return (*stub_pair_ptr).first;
}

void LakeServiceBrpcStubCache::cleanup_expired(const butil::EndPoint& endpoint) {
std::lock_guard<SpinLock> l(_lock);

LOG(INFO) << "cleanup lake service stubs from endpoint:" << endpoint;
_stub_map.erase(endpoint);
}

void EndpointCleanupTask::Run() {
_cache->cleanup_expired(_endpoint);
}

void HttpEndpointCleanupTask::Run() {
_cache->cleanup_expired(_endpoint);
}

void LakeEndpointCleanupTask::Run() {
_cache->cleanup_expired(_endpoint);
}

} // namespace starrocks
60 changes: 56 additions & 4 deletions be/src/util/brpc_stub_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <vector>

#include "common/statusor.h"
#include "exec/pipeline/schedule/pipeline_timer.h"
#include "gen_cpp/Types_types.h" // TNetworkAddress
#include "service/brpc.h"
#include "util/internal_service_recoverable_stub.h"
Expand All @@ -48,54 +49,105 @@

namespace starrocks {

class ExecEnv;
class EndpointCleanupTask;
class HttpEndpointCleanupTask;
class LakeEndpointCleanupTask;

class BrpcStubCache {
public:
BrpcStubCache();
BrpcStubCache(ExecEnv* exec_env);
~BrpcStubCache();

std::shared_ptr<PInternalService_RecoverableStub> get_stub(const butil::EndPoint& endpoint);
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const TNetworkAddress& taddr);
std::shared_ptr<PInternalService_RecoverableStub> get_stub(const std::string& host, int port);
void cleanup_expired(const butil::EndPoint& endpoint);

private:
struct StubPool {
StubPool();
~StubPool();
std::shared_ptr<PInternalService_RecoverableStub> get_or_create(const butil::EndPoint& endpoint);

std::vector<std::shared_ptr<PInternalService_RecoverableStub>> _stubs;
int64_t _idx;
EndpointCleanupTask* _cleanup_task = nullptr;
};

SpinLock _lock;
butil::FlatMap<butil::EndPoint, StubPool*> _stub_map;
butil::FlatMap<butil::EndPoint, std::shared_ptr<StubPool>> _stub_map;
pipeline::PipelineTimer* _pipeline_timer;
};

class HttpBrpcStubCache {
public:
static HttpBrpcStubCache* getInstance();
StatusOr<std::shared_ptr<PInternalService_RecoverableStub>> get_http_stub(const TNetworkAddress& taddr);
void cleanup_expired(const butil::EndPoint& endpoint);

private:
HttpBrpcStubCache();
HttpBrpcStubCache(const HttpBrpcStubCache&) = delete;
HttpBrpcStubCache& operator=(const HttpBrpcStubCache&) = delete;
~HttpBrpcStubCache();

SpinLock _lock;
butil::FlatMap<butil::EndPoint, std::shared_ptr<PInternalService_RecoverableStub>> _stub_map;
butil::FlatMap<butil::EndPoint, std::pair<std::shared_ptr<PInternalService_RecoverableStub>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every endpoint is registered with a TimerTask, not sure if this is feasible for a cluster with large scale of nodes, e.g. hundreds of backends/compute nodes, will this be carefully evaluated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think can.
From the documentation we can see that hundreds of task can run without waiting for lock.
https://github.com/apache/brpc/blob/master/docs/cn/timer_keeping.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stdpain Any suggestions master?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use it with confidence. They will be stored in the heap after being bucketed by the timer. So there's no need to worry about memory or other overhead.

std::shared_ptr<HttpEndpointCleanupTask>>>
_stub_map;
pipeline::PipelineTimer* _pipeline_timer;
};

class LakeServiceBrpcStubCache {
public:
static LakeServiceBrpcStubCache* getInstance();
StatusOr<std::shared_ptr<starrocks::LakeService_RecoverableStub>> get_stub(const std::string& host, int port);
void cleanup_expired(const butil::EndPoint& endpoint);

private:
LakeServiceBrpcStubCache();
LakeServiceBrpcStubCache(const LakeServiceBrpcStubCache&) = delete;
LakeServiceBrpcStubCache& operator=(const LakeServiceBrpcStubCache&) = delete;
~LakeServiceBrpcStubCache();

SpinLock _lock;
butil::FlatMap<butil::EndPoint, std::shared_ptr<LakeService_RecoverableStub>> _stub_map;
butil::FlatMap<butil::EndPoint,
std::pair<std::shared_ptr<LakeService_RecoverableStub>, std::shared_ptr<LakeEndpointCleanupTask>>>
_stub_map;
pipeline::PipelineTimer* _pipeline_timer;
};

class EndpointCleanupTask : public starrocks::pipeline::PipelineTimerTask {
public:
EndpointCleanupTask(BrpcStubCache* cache, const butil::EndPoint& endpoint) : _cache(cache), _endpoint(endpoint){};
void Run() override;

private:
BrpcStubCache* _cache;
butil::EndPoint _endpoint;
};

class HttpEndpointCleanupTask : public starrocks::pipeline::PipelineTimerTask {
public:
HttpEndpointCleanupTask(HttpBrpcStubCache* cache, const butil::EndPoint& endpoint)
: _cache(cache), _endpoint(endpoint){};
void Run() override;

private:
HttpBrpcStubCache* _cache;
butil::EndPoint _endpoint;
};

class LakeEndpointCleanupTask : public starrocks::pipeline::PipelineTimerTask {
public:
LakeEndpointCleanupTask(LakeServiceBrpcStubCache* cache, const butil::EndPoint& endpoint)
: _cache(cache), _endpoint(endpoint){};
void Run() override;

private:
LakeServiceBrpcStubCache* _cache;
butil::EndPoint _endpoint;
};

} // namespace starrocks
2 changes: 1 addition & 1 deletion be/test/http/stream_load_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class StreamLoadActionTest : public testing::Test {
config::streaming_load_max_mb = 1;

_env._load_stream_mgr = new LoadStreamMgr();
_env._brpc_stub_cache = new BrpcStubCache();
_env._brpc_stub_cache = new BrpcStubCache(&_env);
_env._stream_load_executor = new StreamLoadExecutor(&_env);

_evhttp_req = evhttp_request_new(nullptr, nullptr);
Expand Down
Loading
Loading