@@ -193,7 +193,7 @@ void TPartition::ProcessReserveRequests(const TActorContext& ctx) {
193
193
break ;
194
194
}
195
195
196
- if (WaitingForSubDomainQuota (ctx, currentSize)) {
196
+ if (WaitingForSubDomainQuota (currentSize)) {
197
197
PQ_LOG_D (" Reserve processing: SubDomainOutOfSpace. Partition: " << Partition);
198
198
break ;
199
199
}
@@ -486,7 +486,7 @@ void TPartition::OnHandleWriteResponse(const TActorContext& ctx)
486
486
487
487
void TPartition::Handle (TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx)
488
488
{
489
- PQ_LOG_T ( " TPartition::Handle TEvHandleWriteResponse." );
489
+ PQ_LOG_D ( " Received TPartition::Handle TEvHandleWriteResponse." );
490
490
OnHandleWriteResponse (ctx);
491
491
}
492
492
@@ -657,7 +657,7 @@ void TPartition::ChangeScaleStatusIfNeeded(NKikimrPQ::EScaleStatus scaleStatus)
657
657
}
658
658
659
659
void TPartition::HandleOnWrite (TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx) {
660
- PQ_LOG_T ( " TPartition::TEvWrite" );
660
+ PQ_LOG_D ( " Received TPartition::TEvWrite" );
661
661
662
662
if (!CanEnqueue ()) {
663
663
ReplyError (ctx, ev->Get ()->Cookie , InactivePartitionErrorCode,
@@ -771,7 +771,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& c
771
771
++*offset;
772
772
}
773
773
}
774
- if (WaitingForPreviousBlobQuota () || WaitingForSubDomainQuota (ctx )) {
774
+ if (WaitingForPreviousBlobQuota () || WaitingForSubDomainQuota ()) {
775
775
SetDeadlinesForWrites (ctx);
776
776
}
777
777
WriteInflightSize += size;
@@ -1021,19 +1021,20 @@ void TPartition::ExecRequest(TSplitMessageGroupMsg& msg, ProcessParameters& para
1021
1021
1022
1022
TPartition::EProcessResult TPartition::PreProcessRequest (TWriteMsg& p) {
1023
1023
if (!CanWrite ()) {
1024
- WriteInflightSize -= p.Msg .Data .size ();
1024
+ WriteInflightSize -= p.Msg .Data .size ();
1025
1025
ScheduleReplyError (p.Cookie , false , InactivePartitionErrorCode,
1026
1026
TStringBuilder () << " Write to inactive partition " << Partition.OriginalPartitionId );
1027
1027
return EProcessResult::ContinueDrop;
1028
1028
}
1029
1029
1030
1030
if (DiskIsFull) {
1031
- WriteInflightSize -= p.Msg .Data .size ();
1031
+ WriteInflightSize -= p.Msg .Data .size ();
1032
1032
ScheduleReplyError (p.Cookie , false ,
1033
1033
NPersQueue::NErrorCode::WRITE_ERROR_DISK_IS_FULL,
1034
1034
" Disk is full" );
1035
1035
return EProcessResult::ContinueDrop;
1036
1036
}
1037
+
1037
1038
if (TxAffectedSourcesIds.contains (p.Msg .SourceId )) {
1038
1039
return EProcessResult::Blocked;
1039
1040
}
@@ -1639,7 +1640,7 @@ bool TPartition::RequestBlobQuota()
1639
1640
1640
1641
void TPartition::HandlePendingRequests (const TActorContext& ctx)
1641
1642
{
1642
- if (WaitingForPreviousBlobQuota () || WaitingForSubDomainQuota (ctx ) || NeedDeletePartition) {
1643
+ if (WaitingForPreviousBlobQuota () || WaitingForSubDomainQuota () || NeedDeletePartition) {
1643
1644
return ;
1644
1645
}
1645
1646
if (RequestBlobQuota ()) {
@@ -1810,7 +1811,7 @@ bool TPartition::WaitingForPreviousBlobQuota() const {
1810
1811
return TopicQuotaRequestCookie != 0 ;
1811
1812
}
1812
1813
1813
- bool TPartition::WaitingForSubDomainQuota (const TActorContext& /* ctx */ , const ui64 withSize) const {
1814
+ bool TPartition::WaitingForSubDomainQuota (const ui64 withSize) const {
1814
1815
if (!SubDomainOutOfSpace || !AppData ()->FeatureFlags .GetEnableTopicDiskSubDomainQuota ()) {
1815
1816
return false ;
1816
1817
}
0 commit comments