Skip to content

Commit db1bcf9

Browse files
authored
Make tracking peers' SM idx work (#614)
1 parent b88bbaa commit db1bcf9

File tree

5 files changed

+221
-47
lines changed

5 files changed

+221
-47
lines changed

include/libnuraft/raft_server.hxx

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,6 +1138,14 @@ protected:
11381138
bool need_to_handle_commit_elem);
11391139
void commit_conf(ulong idx_to_commit, ptr<log_entry>& le);
11401140

1141+
void scan_sm_commit_and_notify(uint64_t idx_upto);
1142+
1143+
bool check_sm_commit_notify_ready(uint64_t idx);
1144+
1145+
uint64_t update_sm_commit_notifier_target_idx(uint64_t to);
1146+
1147+
bool reset_sm_commit_notifier_target_idx(uint64_t expected);
1148+
11411149
ptr< cmd_result< ptr<buffer> > >
11421150
send_msg_to_leader(ptr<req_msg>& req,
11431151
const req_ext_params& ext_params = req_ext_params());
@@ -1725,6 +1733,12 @@ protected:
17251733
* a `AppendEntries` request including invalid one too.
17261734
*/
17271735
timer_helper last_rcvd_append_entries_req_;
1736+
1737+
/**
1738+
* Scan each peer's state machine commit and notify callback.
1739+
* Protected by `lock_`.
1740+
*/
1741+
std::atomic<uint64_t> sm_commit_notifier_target_idx_;
17281742
};
17291743

17301744
} // namespace nuraft;

src/global_mgr.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,10 @@ void nuraft_global_mgr::commit_worker_loop(ptr<worker_handle> handle) {
313313

314314
p_tr("execute commit for %p", target.get());
315315

316+
if (target->sm_commit_notifier_target_idx_ > 0) {
317+
target->scan_sm_commit_and_notify(target->sm_commit_notifier_target_idx_);
318+
}
319+
316320
if (target->sm_commit_paused_) {
317321
p_tr("commit of this server has been paused");
318322
// Since there can be other Raft server waiting for being served,

src/handle_append_entries.cxx

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
1818
limitations under the License.
1919
**************************************************************************/
2020

21+
#include "global_mgr.hxx"
2122
#include "pp_util.hxx"
2223
#include "raft_params.hxx"
2324
#include "raft_server.hxx"
@@ -1215,19 +1216,6 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
12151216
uint64_t prev_sm_committed_idx = p->get_sm_committed_idx();
12161217
uint64_t new_sm_committed_idx = 0;
12171218

1218-
if (resp.get_ctx() &&
1219-
ctx_->get_params()->track_peers_sm_commit_idx_) {
1220-
// If the response contains appendix, it should be
1221-
// `resp_appendix` type.
1222-
ptr<resp_appendix> appendix = resp_appendix::deserialize(*resp.get_ctx());
1223-
if (appendix->extra_order_ == resp_appendix::NOTIFYING_SM_COMMITTED_INDEX) {
1224-
new_sm_committed_idx = appendix->sm_committed_idx_;
1225-
p_tr("sm committed index of peer %d: %" PRIu64 " -> %" PRIu64,
1226-
p->get_id(), prev_sm_committed_idx, new_sm_committed_idx);
1227-
p->set_sm_committed_idx(new_sm_committed_idx);
1228-
}
1229-
}
1230-
12311219
{
12321220
std::lock_guard<std::mutex> l(p->get_lock());
12331221
p->set_next_log_idx(resp.get_next_idx());
@@ -1238,6 +1226,37 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
12381226
p->set_matched_idx(new_matched_idx);
12391227
p->set_last_accepted_log_idx(new_matched_idx);
12401228
}
1229+
1230+
if (resp.get_ctx() &&
1231+
ctx_->get_params()->track_peers_sm_commit_idx_) {
1232+
// If the response contains appendix, it should be
1233+
// `resp_appendix` type.
1234+
ptr<resp_appendix> appendix = resp_appendix::deserialize(*resp.get_ctx());
1235+
if (appendix->extra_order_ == resp_appendix::NOTIFYING_SM_COMMITTED_INDEX) {
1236+
{
1237+
std::lock_guard<std::mutex> l(p->get_lock());
1238+
new_sm_committed_idx = appendix->sm_committed_idx_;
1239+
p_tr("sm committed index of peer %d: %" PRIu64 " -> %" PRIu64,
1240+
p->get_id(), prev_sm_committed_idx, new_sm_committed_idx);
1241+
p->set_sm_committed_idx(new_sm_committed_idx);
1242+
}
1243+
if (check_sm_commit_notify_ready(new_sm_committed_idx)) {
1244+
uint64_t target_idx =
1245+
update_sm_commit_notifier_target_idx(new_sm_committed_idx);
1246+
p_tr("sm commit notify ready: %" PRIu64 ", target idx: %" PRIu64,
1247+
new_sm_committed_idx, target_idx);
1248+
global_mgr* mgr = get_global_mgr();
1249+
if (mgr) {
1250+
// Global thread pool exists, request it.
1251+
mgr->request_commit( this->shared_from_this() );
1252+
} else {
1253+
std::unique_lock<std::mutex> lock(commit_cv_lock_);
1254+
commit_cv_.notify_one();
1255+
}
1256+
}
1257+
}
1258+
}
1259+
12411260
cb_func::Param param(id_, leader_, p->get_id());
12421261
param.ctx = &new_matched_idx;
12431262
CbReturnCode rc = ctx_->cb_func_.call
@@ -1282,6 +1301,7 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
12821301
: resp.get_next_idx();
12831302
need_to_catchup = p->clear_pending_commit() ||
12841303
next_idx_to_send < log_store_->next_slot();
1304+
p_tr("need to catchup peer %d: %d", p->get_id(), need_to_catchup);
12851305

12861306
} else {
12871307
std::lock_guard<std::mutex> guard(p->get_lock());

src/handle_commit.cxx

Lines changed: 168 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ limitations under the License.
4040
namespace nuraft {
4141

4242
void raft_server::commit(ulong target_idx) {
43-
if (target_idx > quick_commit_index_) {
43+
bool track_peers_sm_commit_idx = ctx_->get_params()->track_peers_sm_commit_idx_;
44+
if (target_idx > quick_commit_index_ ||
45+
(track_peers_sm_commit_idx &&
46+
target_idx >= quick_commit_index_)) {
4447
quick_commit_index_ = target_idx;
4548
lagging_sm_target_index_ = target_idx;
4649
p_db( "trigger commit upto %" PRIu64 "", quick_commit_index_.load() );
@@ -51,6 +54,12 @@ void raft_server::commit(ulong target_idx) {
5154
if (role_ == srv_role::leader) {
5255
for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) {
5356
ptr<peer> pp = it->second;
57+
if (track_peers_sm_commit_idx &&
58+
pp->get_sm_committed_idx() >= target_idx) {
59+
// This peer's state machine is already committed
60+
// upto the target index. No need to send AE.
61+
continue;
62+
}
5463
if (!request_append_entries(pp)) {
5564
pp->set_pending_commit();
5665
}
@@ -109,16 +118,20 @@ void raft_server::commit_in_bg() {
109118
// WARNING:
110119
// If `sm_commit_paused_` is set, we shouldn't enter
111120
// `commit_in_bg_exec()`, as it will cause an infinite loop.
112-
while ( quick_commit_index_ <= sm_commit_index_ ||
113-
sm_commit_index_ >= log_store_->next_slot() - 1 ||
114-
sm_commit_paused_ ) {
121+
while ( ( quick_commit_index_ <= sm_commit_index_ ||
122+
sm_commit_index_ >= log_store_->next_slot() - 1 ||
123+
sm_commit_paused_ ) &&
124+
sm_commit_notifier_target_idx_ == 0 ) {
115125
std::unique_lock<std::mutex> lock(commit_cv_lock_);
116126

117127
auto wait_check = [this]() {
118128
if (stopping_) {
119129
// WARNING: `stopping_` flag should have the highest priority.
120130
return true;
121131
}
132+
if (sm_commit_notifier_target_idx_ > 0) {
133+
return true;
134+
}
122135
if (sm_commit_paused_) {
123136
return false;
124137
}
@@ -148,6 +161,12 @@ void raft_server::commit_in_bg() {
148161

149162
commit_in_bg_exec();
150163

164+
if (sm_commit_notifier_target_idx_ > 0) {
165+
uint64_t target_idx = sm_commit_notifier_target_idx_;
166+
scan_sm_commit_and_notify(target_idx);
167+
reset_sm_commit_notifier_target_idx(target_idx);
168+
}
169+
151170
} catch (std::exception& err) {
152171
// LCOV_EXCL_START
153172
commit_bg_stopped_ = true;
@@ -296,6 +315,13 @@ bool raft_server::commit_in_bg_exec(size_t timeout_ms) {
296315
watcher->set_result(ret_bool, exp);
297316
}
298317
}
318+
319+
if (check_sm_commit_notify_ready(index_to_commit)) {
320+
uint64_t target_idx = update_sm_commit_notifier_target_idx(index_to_commit);
321+
p_tr("sm commit notify ready: %" PRIu64 ", target: %" PRIu64,
322+
index_to_commit, target_idx);
323+
}
324+
299325
}
300326

301327
p_db( "DONE: commit upto %" PRIu64 ", current idx %" PRIu64,
@@ -341,6 +367,8 @@ void raft_server::commit_app_log(ulong idx_to_commit,
341367
( state_machine::ext_op_params( sm_idx, buf ) );
342368
if (ret_value) ret_value->pos(0);
343369

370+
auto params = ctx_->get_params();
371+
344372
std::list< ptr<commit_ret_elem> > async_elems;
345373
if (need_to_handle_commit_elem) {
346374
std::unique_lock<std::mutex> cre_lock(commit_ret_elems_lock_);
@@ -365,26 +393,33 @@ void raft_server::commit_app_log(ulong idx_to_commit,
365393
elem->result_code_ = cmd_result_code::OK;
366394
elem->ret_value_ = ret_value;
367395
need_to_check_commit_ret = false;
368-
p_dv("notify cb %" PRIu64 " %p", sm_idx, &elem->awaiter_);
369396

370-
switch (ctx_->get_params()->return_method_) {
371-
case raft_params::blocking:
372-
default:
373-
// Blocking mode:
374-
if (elem->callback_invoked_) {
375-
// If elem callback invoked, remove it
397+
if (params->track_peers_sm_commit_idx_) {
398+
// Respond upon all peers' SM commit.
399+
// It will be handled later in `commit_in_bg_exec`.
400+
401+
} else {
402+
// Respond upon the leader's SM commit.
403+
p_dv("notify cb %" PRIu64 " %p", sm_idx, &elem->awaiter_);
404+
switch (params->return_method_) {
405+
case raft_params::blocking:
406+
default:
407+
// Blocking mode:
408+
if (elem->callback_invoked_) {
409+
// If elem callback invoked (== TIMEOUT), remove it
410+
commit_ret_elems_.erase(entry);
411+
} else {
412+
// or notify client that request done
413+
elem->awaiter_.invoke();
414+
}
415+
break;
416+
417+
case raft_params::async_handler:
418+
// Async handler: put into list.
419+
async_elems.push_back(elem);
376420
commit_ret_elems_.erase(entry);
377-
} else {
378-
// or notify client that request done
379-
elem->awaiter_.invoke();
421+
break;
380422
}
381-
break;
382-
383-
case raft_params::async_handler:
384-
// Async handler: put into list.
385-
async_elems.push_back(elem);
386-
commit_ret_elems_.erase(entry);
387-
break;
388423
}
389424
}
390425
}
@@ -399,19 +434,23 @@ void raft_server::commit_app_log(ulong idx_to_commit,
399434
p_tr("commit thread is invoked earlier than user thread, "
400435
"log %" PRIu64 ", elem %p", sm_idx, elem.get());
401436

402-
switch (ctx_->get_params()->return_method_) {
403-
case raft_params::blocking:
404-
default:
405-
elem->awaiter_.invoke(); // Callback will not sleep.
406-
break;
407-
case raft_params::async_handler:
408-
// Async handler:
409-
// Set the result, but should not put it into the
410-
// `async_elems` list, as the user thread (supposed to be
411-
// executed right after this) will invoke the callback immediately.
412-
elem->async_result_ =
413-
cs_new< cmd_result< ptr<buffer> > >( elem->ret_value_ );
414-
break;
437+
if (params->track_peers_sm_commit_idx_) {
438+
// Ditto.
439+
} else {
440+
switch (params->return_method_) {
441+
case raft_params::blocking:
442+
default:
443+
elem->awaiter_.invoke(); // Callback will not sleep.
444+
break;
445+
case raft_params::async_handler:
446+
// Async handler:
447+
// Set the result, but should not put it into the
448+
// `async_elems` list, as the user thread (supposed to be
449+
// executed right after this) will invoke the callback immediately.
450+
elem->async_result_ =
451+
cs_new< cmd_result< ptr<buffer> > >( elem->ret_value_ );
452+
break;
453+
}
415454
}
416455
commit_ret_elems_.insert( std::make_pair(sm_idx, elem) );
417456
}
@@ -470,6 +509,101 @@ void raft_server::commit_conf(ulong idx_to_commit,
470509
// }
471510
}
472511

512+
void raft_server::scan_sm_commit_and_notify(uint64_t idx_upto) {
513+
p_tr("sm commit notifier scan start, upto %" PRIu64, idx_upto);
514+
515+
auto params = ctx_->get_params();
516+
std::list< ptr<commit_ret_elem> > async_elems;
517+
518+
std::unique_lock<std::mutex> cre_lock(commit_ret_elems_lock_);
519+
520+
// NOTE: If we reach here, we assume the leader already finished its
521+
// commit (i.e., `commit_app_log`), hence the corresponding
522+
// `commit_ret_elem` must exist.
523+
auto entry = commit_ret_elems_.begin();
524+
while (entry != commit_ret_elems_.end()) {
525+
if (entry->first > idx_upto) {
526+
break;
527+
}
528+
ptr<commit_ret_elem> elem = entry->second;
529+
530+
p_tr("notify cb %" PRIu64 " %p", entry->first, &elem->awaiter_);
531+
switch (params->return_method_) {
532+
case raft_params::blocking:
533+
default:
534+
// Blocking mode:
535+
if (elem->callback_invoked_) {
536+
// If elem callback invoked (== TIMEOUT), remove it
537+
entry = commit_ret_elems_.erase(entry);
538+
} else {
539+
// or notify client that request done
540+
elem->awaiter_.invoke();
541+
entry++;
542+
}
543+
break;
544+
545+
case raft_params::async_handler:
546+
// Async handler: put into list.
547+
async_elems.push_back(elem);
548+
entry = commit_ret_elems_.erase(entry);
549+
break;
550+
}
551+
}
552+
553+
for (auto& entry: async_elems) {
554+
ptr<commit_ret_elem>& elem = entry;
555+
if (elem->async_result_) {
556+
ptr<std::exception> err = nullptr;
557+
elem->async_result_->set_result( elem->ret_value_, err, cmd_result_code::OK );
558+
elem->ret_value_.reset();
559+
elem->async_result_.reset();
560+
}
561+
}
562+
563+
p_tr("sm commit notifier scan done");
564+
}
565+
566+
bool raft_server::check_sm_commit_notify_ready(uint64_t idx) {
567+
recur_lock(lock_);
568+
for (auto& pp: peers_) {
569+
if (pp.second->get_matched_idx() < idx) {
570+
// At the moment this function is invoked,
571+
// if there is a peer whose last index is smaller than `idx`,
572+
// that means it is excluded from the quorum.
573+
continue;
574+
}
575+
if (pp.second->get_sm_committed_idx() < idx) {
576+
// At least one peer's lag is enough to return false.
577+
return false;
578+
}
579+
}
580+
// Finally, the leader itself state machine should meet the condition too.
581+
if (sm_commit_index_ < idx) {
582+
return false;
583+
}
584+
return true;
585+
}
586+
587+
uint64_t raft_server::update_sm_commit_notifier_target_idx(uint64_t to) {
588+
recur_lock(lock_);
589+
if (sm_commit_notifier_target_idx_ >= to) {
590+
// Already set to a larger value.
591+
return sm_commit_notifier_target_idx_;
592+
}
593+
sm_commit_notifier_target_idx_ = to;
594+
return sm_commit_notifier_target_idx_;
595+
}
596+
597+
bool raft_server::reset_sm_commit_notifier_target_idx(uint64_t expected) {
598+
recur_lock(lock_);
599+
if (sm_commit_notifier_target_idx_ == expected) {
600+
sm_commit_notifier_target_idx_ = 0;
601+
p_tr("reset sm commit notifier target idx from %" PRIu64, expected);
602+
return true;
603+
}
604+
return false;
605+
}
606+
473607
bool raft_server::apply_config_log_entry(ptr<log_entry>& le,
474608
ptr<state_mgr>& s_mgr,
475609
std::string& err_msg)

0 commit comments

Comments
 (0)