diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index bc94370aa25a..edd758215388 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -270,9 +270,12 @@ void TKqpScanComputeActor::DoBootstrap() { auto taskRunner = MakeDqTaskRunner(GetAllocatorPtr(), execCtx, settings, logger); TBase::SetTaskRunner(taskRunner); - auto wakeup = [this] { ContinueExecute(); }; + auto selfId = this->SelfId(); + auto wakeupCallback = [actorSystem, selfId]() { + actorSystem->Send(selfId, new TEvDqCompute::TEvResumeExecution{EResumeSource::CAWakeupCallback}); + }; auto errorCallback = [this](const TString& error){ SendError(error); }; - TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get(TxId), RuntimeSettings.UseSpilling, MemoryLimits.ArrayBufferMinFillPercentage, std::move(wakeup), std::move(errorCallback))); + TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get(TxId), RuntimeSettings.UseSpilling, MemoryLimits.ArrayBufferMinFillPercentage, std::move(wakeupCallback), std::move(errorCallback))); ComputeCtx.AddTableScan(0, Meta, GetStatsMode()); ScanData = &ComputeCtx.GetTableScan(0); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp index 2f6335e11d78..f6dc873319f0 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp @@ -58,7 +58,10 @@ class TDqComputeActor : public TDqSyncComputeActorBase { auto taskRunner = TaskRunnerFactory(GetAllocatorPtr(), Task, RuntimeSettings.StatsMode, logger); SetTaskRunner(taskRunner); - auto wakeupCallback = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); }; + auto selfId = this->SelfId(); + auto wakeupCallback = [actorSystem, selfId]() { + actorSystem->Send(selfId, new TEvDqCompute::TEvResumeExecution{EResumeSource::CAWakeupCallback}); + }; auto errorCallback = [this](const TString& error){ SendError(error); }; TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeupCallback), std::move(errorCallback)); PrepareTaskRunner(execCtx); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 079e990758a7..ce6e0ca59d51 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -55,6 +55,7 @@ enum class EResumeSource : ui32 { CADataSent, CAPendingOutput, CATaskRunnerCreated, + CAWakeupCallback, Last, };