Skip to content

Commit 005afd1

Browse files
committed
don't flush all requests at end of PA
1 parent eae8430 commit 005afd1

12 files changed

+61
-26
lines changed

src/c++/perf_analyzer/client_backend/mock_client_backend.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,14 @@ class NaggyMockClientBackend : public ClientBackend {
489489
});
490490
}
491491

492+
~NaggyMockClientBackend()
493+
{
494+
// Make sure no requests carry over to the next test
495+
while (stats_->num_active_infer_calls) {
496+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
497+
}
498+
}
499+
492500
MOCK_METHOD(
493501
Error, ModelConfig,
494502
(rapidjson::Document*, const std::string&, const std::string&),

src/c++/perf_analyzer/concurrency_worker.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ ConcurrencyWorker::HandleExecuteOff()
111111
// Wait if no request should be sent and it is not exiting
112112
thread_config_->is_paused_ = true;
113113
std::unique_lock<std::mutex> lock(wake_mutex_);
114-
wake_signal_.wait(lock, [this]() { return early_exit || execute_; });
114+
wake_signal_.wait(lock, [this]() { return exiting_ || execute_; });
115115

116116
// TODO REFACTOR TMA-1043 - memory manager should be handling this instead
117117
// of here
@@ -131,10 +131,10 @@ ConcurrencyWorker::HandleNoConcurrency()
131131
// Wait if no request should be sent and it is not exiting
132132
std::unique_lock<std::mutex> lock(wake_mutex_);
133133
wake_signal_.wait(lock, [this]() {
134-
return early_exit || (thread_config_->concurrency_ > 0);
134+
return exiting_ || (thread_config_->concurrency_ > 0);
135135
});
136136
// Stop executing if concurrency is 0 and early exit is requested
137-
if (early_exit && thread_config_->concurrency_ == 0) {
137+
if (exiting_ && thread_config_->concurrency_ == 0) {
138138
return true;
139139
}
140140
}
@@ -181,7 +181,7 @@ ConcurrencyWorker::WaitForResponses()
181181
std::unique_lock<std::mutex> lk(cb_mtx_);
182182
thread_stat_->idle_timer.Start();
183183
cb_cv_.wait(lk, [this] {
184-
if (notified_) {
184+
if (notified_ || exiting_) {
185185
notified_ = false;
186186
return true;
187187
}

src/c++/perf_analyzer/infer_context.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ InferContext::SendSequenceInferRequest(uint32_t seq_stat_index, bool delayed)
6363
// This also helps in reporting the realistic latencies.
6464
std::lock_guard<std::mutex> guard(
6565
sequence_manager_->GetMutex(seq_stat_index));
66-
if (!early_exit && execute_) {
66+
if (!exiting_ && execute_) {
6767
sequence_manager_->SetInferSequenceOptions(
6868
seq_stat_index, infer_data_.options_);
6969

@@ -298,6 +298,10 @@ InferContext::AsyncCallbackFuncImpl(cb::InferResult* result)
298298
// Add the request record to thread request records vector with
299299
// proper locking
300300
std::lock_guard<std::mutex> lock(thread_stat_->mu_);
301+
if (exiting_) {
302+
return;
303+
}
304+
301305
thread_stat_->cb_status_ = result_ptr->RequestStatus();
302306
if (thread_stat_->cb_status_.IsOk()) {
303307
std::string request_id;

src/c++/perf_analyzer/infer_context.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ class InferContext {
104104
// Initialize the context. Must be done before any inferences are sent
105105
void Init();
106106

107+
// Signal to the context to stop working and exit
108+
void Exit() { exiting_ = true; }
109+
107110
// Send a single inference request to the server
108111
void SendInferRequest(bool delayed = false);
109112

@@ -192,6 +195,7 @@ class InferContext {
192195

193196
const uint32_t id_{0};
194197
const size_t thread_id_{0};
198+
bool exiting_{false};
195199

196200
size_t GetNumActiveThreads() { return num_active_threads_; }
197201

src/c++/perf_analyzer/iworker.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ namespace triton { namespace perfanalyzer {
3333
class IWorker {
3434
public:
3535
virtual void Infer() = 0;
36+
virtual void Exit() = 0;
3637
};
3738

3839
}} // namespace triton::perfanalyzer

src/c++/perf_analyzer/load_manager.cc

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,15 @@ LoadManager::InitManagerInputs(
248248
void
249249
LoadManager::StopWorkerThreads()
250250
{
251-
early_exit = true;
252-
// wake up all threads
253-
wake_signal_.notify_all();
251+
// FIXME do I need to acquire the lock first?
252+
for (auto& worker : workers_) {
253+
worker->Exit();
254+
}
255+
256+
{
257+
std::unique_lock<std::mutex> lock(wake_mutex_);
258+
wake_signal_.notify_all();
259+
}
254260

255261
size_t cnt = 0;
256262
for (auto& thread : threads_) {

src/c++/perf_analyzer/load_worker.cc

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,21 @@
3434

3535
namespace triton { namespace perfanalyzer {
3636

37+
void
38+
LoadWorker::Exit()
39+
{
40+
for (auto ctx : ctxs_) {
41+
ctx->Exit();
42+
}
43+
44+
exiting_ = true;
45+
46+
{
47+
std::lock_guard<std::mutex> lk(cb_mtx_);
48+
cb_cv_.notify_all();
49+
}
50+
}
51+
3752
bool
3853
LoadWorker::ShouldExit()
3954
{
@@ -44,16 +59,14 @@ LoadWorker::ShouldExit()
4459
thread_config_->num_requests_ != 0 &&
4560
thread_stat_->num_sent_requests_ >= thread_config_->num_requests_;
4661

47-
return early_exit || bad_status || done_with_request_count;
62+
return exiting_ || bad_status || done_with_request_count;
4863
}
4964

5065
bool
5166
LoadWorker::HandleExitConditions()
5267
{
5368
if (ShouldExit()) {
5469
CompleteOngoingSequences();
55-
thread_stat_->idle_timer.Start();
56-
WaitForOngoingRequests();
5770
return true;
5871
}
5972
return false;

src/c++/perf_analyzer/load_worker.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ class LoadWorker : public IWorker {
6969

7070
virtual ~LoadWorker() = default;
7171

72+
virtual void Exit() override;
73+
7274
protected:
7375
// Return the total number of async requests that have started and not
7476
// finished
@@ -117,6 +119,8 @@ class LoadWorker : public IWorker {
117119

118120
void AsyncCallbackFinalize(uint32_t ctx_id);
119121

122+
bool exiting_ = false;
123+
120124
uint32_t id_;
121125

122126
std::vector<std::shared_ptr<InferContext>> ctxs_;

src/c++/perf_analyzer/request_rate_worker.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ RequestRateWorker::Infer()
4646
HandleExecuteOff();
4747

4848
bool is_delayed = SleepIfNecessary();
49+
if (HandleExitConditions()) {
50+
return;
51+
}
4952
uint32_t ctx_id = GetCtxId();
5053
SendInferRequest(ctx_id, is_delayed);
5154
RestoreFreeCtxId(ctx_id);
@@ -119,7 +122,7 @@ RequestRateWorker::HandleExecuteOff()
119122
// Wait if no request should be sent and it is not exiting
120123
thread_config_->is_paused_ = true;
121124
std::unique_lock<std::mutex> lock(wake_mutex_);
122-
wake_signal_.wait(lock, [this]() { return early_exit || execute_; });
125+
wake_signal_.wait(lock, [this]() { return exiting_ || execute_; });
123126
}
124127

125128
thread_config_->is_paused_ = false;
@@ -155,7 +158,7 @@ RequestRateWorker::WaitForFreeCtx()
155158
std::unique_lock<std::mutex> lk(cb_mtx_);
156159
thread_stat_->idle_timer.Start();
157160
cb_cv_.wait(lk, [this] {
158-
if (notified_) {
161+
if (notified_ || exiting_) {
159162
notified_ = false;
160163
return true;
161164
}

src/c++/perf_analyzer/test_concurrency_manager.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ TEST_CASE("concurrency_free_ctx_ids")
474474

475475
std::this_thread::sleep_for(std::chrono::milliseconds(15));
476476

477-
early_exit = true;
477+
worker->Exit();
478478
infer_future.get();
479479

480480
// The first sequence should only be called two times, once at the very start,
@@ -590,7 +590,7 @@ TEST_CASE("Concurrency - shared memory infer input calls")
590590

591591
std::this_thread::sleep_for(std::chrono::milliseconds(18));
592592

593-
early_exit = true;
593+
worker->Exit();
594594
infer_future.get();
595595

596596
const auto& actual_append_raw_calls{tcm.stats_->num_append_raw_calls};

0 commit comments

Comments
 (0)