Skip to content

Commit 2513d9b

Browse files
authored
Support mixing tracking peers SM mode (#616)
* Support mixing tracking peers SM mode * Fix
1 parent db1bcf9 commit 2513d9b

File tree

3 files changed

+31
-8
lines changed

3 files changed

+31
-8
lines changed

src/handle_append_entries.cxx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,6 +1227,7 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
12271227
p->set_last_accepted_log_idx(new_matched_idx);
12281228
}
12291229

1230+
bool sm_committed_idx_updated = false;
12301231
if (resp.get_ctx() &&
12311232
ctx_->get_params()->track_peers_sm_commit_idx_) {
12321233
// If the response contains appendix, it should be
@@ -1239,6 +1240,7 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
12391240
p_tr("sm committed index of peer %d: %" PRIu64 " -> %" PRIu64,
12401241
p->get_id(), prev_sm_committed_idx, new_sm_committed_idx);
12411242
p->set_sm_committed_idx(new_sm_committed_idx);
1243+
sm_committed_idx_updated = true;
12421244
}
12431245
if (check_sm_commit_notify_ready(new_sm_committed_idx)) {
12441246
uint64_t target_idx =
@@ -1256,6 +1258,9 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
12561258
}
12571259
}
12581260
}
1261+
if (!sm_committed_idx_updated) {
1262+
p->set_sm_committed_idx(0);
1263+
}
12591264

12601265
cb_func::Param param(id_, leader_, p->get_id());
12611266
param.ctx = &new_matched_idx;

src/handle_commit.cxx

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,13 @@ namespace nuraft {
4141

4242
void raft_server::commit(ulong target_idx) {
4343
bool track_peers_sm_commit_idx = ctx_->get_params()->track_peers_sm_commit_idx_;
44+
bool same_target_idx = (target_idx == quick_commit_index_);
4445
if (target_idx > quick_commit_index_ ||
45-
(track_peers_sm_commit_idx &&
46-
target_idx >= quick_commit_index_)) {
46+
(track_peers_sm_commit_idx && same_target_idx)) {
47+
p_db( "trigger commit upto %" PRIu64 ", current quick commit index %" PRIu64,
48+
target_idx, quick_commit_index_.load() );
4749
quick_commit_index_ = target_idx;
4850
lagging_sm_target_index_ = target_idx;
49-
p_db( "trigger commit upto %" PRIu64 "", quick_commit_index_.load() );
5051

5152
// if this is a leader notify peers to commit as well
5253
// for peers that are free, send the request, otherwise,
@@ -60,6 +61,11 @@ void raft_server::commit(ulong target_idx) {
6061
// upto the target index. No need to send AE.
6162
continue;
6263
}
64+
if (track_peers_sm_commit_idx &&
65+
same_target_idx &&
66+
pp->get_sm_committed_idx() == 0) {
67+
continue;
68+
}
6369
if (!request_append_entries(pp)) {
6470
pp->set_pending_commit();
6571
}
@@ -316,7 +322,8 @@ bool raft_server::commit_in_bg_exec(size_t timeout_ms) {
316322
}
317323
}
318324

319-
if (check_sm_commit_notify_ready(index_to_commit)) {
325+
if (ctx_->get_params()->track_peers_sm_commit_idx_ &&
326+
check_sm_commit_notify_ready(index_to_commit)) {
320327
uint64_t target_idx = update_sm_commit_notifier_target_idx(index_to_commit);
321328
p_tr("sm commit notify ready: %" PRIu64 ", target: %" PRIu64,
322329
index_to_commit, target_idx);
@@ -510,9 +517,13 @@ void raft_server::commit_conf(ulong idx_to_commit,
510517
}
511518

512519
void raft_server::scan_sm_commit_and_notify(uint64_t idx_upto) {
520+
auto params = ctx_->get_params();
521+
if (params->track_peers_sm_commit_idx_ == false) {
522+
return;
523+
}
524+
513525
p_tr("sm commit notifier scan start, upto %" PRIu64, idx_upto);
514526

515-
auto params = ctx_->get_params();
516527
std::list< ptr<commit_ret_elem> > async_elems;
517528

518529
std::unique_lock<std::mutex> cre_lock(commit_ret_elems_lock_);
@@ -572,8 +583,12 @@ bool raft_server::check_sm_commit_notify_ready(uint64_t idx) {
572583
// that means it is excluded from the quorum.
573584
continue;
574585
}
575-
if (pp.second->get_sm_committed_idx() < idx) {
586+
if (pp.second->get_sm_committed_idx() &&
587+
pp.second->get_sm_committed_idx() < idx) {
576588
// At least one peer's lag is enough to return false.
589+
//
590+
// WARNING: We should exclude 0, in case that there are member
591+
// without tracking peer sm mode.
577592
return false;
578593
}
579594
}

src/raft_server.cxx

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,8 @@ void raft_server::apply_and_log_current_params() {
417417
"snapshot IO: %s, "
418418
"parallel log appending: %s, "
419419
"streaming mode max log gap %d, max bytes %" PRIu64 ", "
420-
"full consensus mode: %s",
420+
"full consensus mode: %s, "
421+
"tracking peer sm committed index: %s",
421422
params->election_timeout_lower_bound_,
422423
params->election_timeout_upper_bound_,
423424
params->heart_beat_interval_,
@@ -442,7 +443,9 @@ void raft_server::apply_and_log_current_params() {
442443
params->parallel_log_appending_ ? "ON" : "OFF",
443444
params->max_log_gap_in_stream_,
444445
params->max_bytes_in_flight_in_stream_,
445-
params->use_full_consensus_among_healthy_members_ ? "ON" : "OFF" );
446+
params->use_full_consensus_among_healthy_members_ ? "ON" : "OFF",
447+
params->track_peers_sm_commit_idx_ ? "ON" : "OFF"
448+
);
446449

447450
status_check_timer_.set_duration_ms(params->heart_beat_interval_);
448451
status_check_timer_.reset();

0 commit comments

Comments
 (0)