Skip to content

Commit c9472ea

Browse files
committed
Used Mem changes and Db changes + removed micro framework
1 parent be3a726 commit c9472ea

12 files changed

+262
-256
lines changed

ydb/core/tx/schemeshard/schemeshard__operation_alter_streaming_query.cpp

Lines changed: 98 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
#include "schemeshard__operation_common.h"
2-
#include "schemeshard__operation_common_streaming_query.h"
32
#include "schemeshard_impl.h"
43

4+
#define LOG_I(stream) LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
5+
#define LOG_N(stream) LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "[" << context.SS->TabletID() << "] " << stream)
6+
#define RETURN_RESULT_UNLESS(x) if (!(x)) return result;
7+
58
namespace NKikimr::NSchemeShard {
69

710
namespace NStreamingQuery {
@@ -24,13 +27,9 @@ class TPropose : public TSubOperationState {
2427

2528
const TPathId& pathId = txState->TargetPathId;
2629
const TPath& path = TPath::Init(pathId, context.SS);
27-
const TPathElement::TPtr pathPtr = context.SS->PathsById.at(pathId);
28-
2930
NIceDb::TNiceDb db(context.GetDB());
3031

3132
IncParentDirAlterVersionWithRepublish(OperationId, path, context);
32-
context.SS->ClearDescribePathCaches(pathPtr);
33-
context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
3433

3534
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
3635
return true;
@@ -57,6 +56,8 @@ class TPropose : public TSubOperationState {
5756
};
5857

5958
class TAlterStreamingQuery : public TSubOperation {
59+
static constexpr ui64 MAX_PROTOBUF_SIZE = 2_MB;
60+
6061
static TTxState::ETxState NextState() {
6162
return TTxState::Propose;
6263
}
@@ -83,9 +84,25 @@ class TAlterStreamingQuery : public TSubOperation {
8384
}
8485
}
8586

87+
static bool IsParentPathValid(const THolder<TProposeResponse>& result, const TPath& parentPath) {
88+
const auto checks = parentPath.Check();
89+
checks.NotUnderDomainUpgrade()
90+
.IsAtLocalSchemeShard()
91+
.IsResolved()
92+
.NotDeleted()
93+
.NotUnderDeleting()
94+
.IsCommonSensePath()
95+
.IsLikeDirectory();
96+
97+
if (!checks) {
98+
result->SetError(checks.GetStatus(), checks.GetError());
99+
}
100+
101+
return static_cast<bool>(checks);
102+
}
103+
86104
static bool IsDestinationPathValid(const THolder<TProposeResponse>& result, const TPath& dstPath) {
87105
const auto checks = dstPath.Check();
88-
89106
checks.IsAtLocalSchemeShard()
90107
.IsResolved()
91108
.NotUnderDeleting()
@@ -103,13 +120,78 @@ class TAlterStreamingQuery : public TSubOperation {
103120
return static_cast<bool>(checks);
104121
}
105122

106-
TPathElement::TPtr ReplaceStreamingQueryPathElement(const TPath& dstPath) const {
123+
bool IsApplyIfChecksPassed(const THolder<TProposeResponse>& result, const TOperationContext& context) const {
124+
if (TString errorStr; !context.SS->CheckApplyIf(Transaction, errorStr)) {
125+
result->SetError(NKikimrScheme::StatusPreconditionFailed, errorStr);
126+
return false;
127+
}
128+
129+
return true;
130+
}
131+
132+
TStreamingQueryInfo::TPtr GetAlteredQueryInfo(const TPath& dstPath, const TOperationContext& context) const {
133+
const auto& oldStreamingQueryInfo = context.SS->StreamingQueries.Value(dstPath->PathId, nullptr);
134+
Y_ABORT_UNLESS(oldStreamingQueryInfo);
135+
auto streamingQueryInfo = MakeIntrusive<TStreamingQueryInfo>(TStreamingQueryInfo{
136+
.AlterVersion = oldStreamingQueryInfo->AlterVersion + 1,
137+
.Properties = Transaction.GetCreateStreamingQuery().GetProperties(),
138+
});
139+
140+
auto& properties = *streamingQueryInfo->Properties.MutableProperties();
141+
for (const auto& [property, value] : oldStreamingQueryInfo->Properties.GetProperties()) {
142+
properties.emplace(property, value);
143+
}
144+
145+
return streamingQueryInfo;
146+
}
147+
148+
bool IsDescriptionValid(const THolder<TProposeResponse>& result, TStreamingQueryInfo::TPtr queryInfo) const {
149+
if (const ui64 propertiesSize = queryInfo->Properties.ByteSizeLong(); propertiesSize > MAX_PROTOBUF_SIZE) {
150+
result->SetError(NKikimrScheme::StatusSchemeError, TStringBuilder() << "Maximum size of properties must be less or equal equal to " << MAX_PROTOBUF_SIZE << " but got " << propertiesSize << " after alter");
151+
return false;
152+
}
153+
154+
return true;
155+
}
156+
157+
void PersistAlterStreamingQuery(const TPath& dstPath, const TOperationContext& context) const {
158+
const TPathId& pathId = dstPath.Base()->PathId;
159+
160+
context.MemChanges.GrabPath(context.SS, dstPath->ParentPathId);
161+
context.MemChanges.GrabStreamingQuery(context.SS, pathId);
162+
context.MemChanges.GrabNewTxState(context.SS, OperationId);
163+
164+
context.DbChanges.PersistPath(pathId);
165+
context.DbChanges.PersistStreamingQuery(pathId);
166+
context.DbChanges.PersistTxState(OperationId);
167+
}
168+
169+
void CreateTransaction(const TPath& dstPath, const TOperationContext& context) const {
170+
Y_ABORT_UNLESS(!context.SS->FindTx(OperationId));
171+
172+
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxAlterStreamingQuery, dstPath.Base()->PathId);
173+
txState.Shards.clear();
174+
txState.State = TTxState::Propose;
175+
txState.MinStep = TStepId(1);
176+
context.OnComplete.ActivateTx(OperationId);
177+
178+
if (const auto parent = dstPath.Parent().Base(); parent->HasActiveChanges()) {
179+
const TTxId parentTxId = parent->PlannedToCreate() ? parent->CreateTxId : parent->LastTxId;
180+
context.OnComplete.Dependence(parentTxId, OperationId.GetTxId());
181+
}
182+
}
183+
184+
void AlterStreamingQueryPathElement(const TPath& dstPath, TStreamingQueryInfo::TPtr queryInfo, const TOperationContext& context) const {
107185
TPathElement::TPtr streamingQuery = dstPath.Base();
108186

109187
streamingQuery->PathState = TPathElement::EPathState::EPathStateAlter;
110188
streamingQuery->LastTxId = OperationId.GetTxId();
111189

112-
return streamingQuery;
190+
if (const auto& acl = Transaction.GetModifyACL().GetDiffACL()) {
191+
streamingQuery->ApplyACL(acl);
192+
}
193+
194+
context.SS->StreamingQueries[dstPath.Base()->PathId] = queryInfo;
113195
}
114196

115197
public:
@@ -128,36 +210,27 @@ class TAlterStreamingQuery : public TSubOperation {
128210
static_cast<ui64>(context.SS->SelfTabletId()));
129211

130212
const TPath& parentPath = TPath::Resolve(parentPathStr, context.SS);
131-
RETURN_RESULT_UNLESS(IsParentPathValid(result, parentPath, /* isCreate */ false));
213+
RETURN_RESULT_UNLESS(IsParentPathValid(result, parentPath));
132214

133215
const TPath& dstPath = parentPath.Child(name);
134216
RETURN_RESULT_UNLESS(IsDestinationPathValid(result, dstPath));
135-
RETURN_RESULT_UNLESS(IsApplyIfChecksPassed(result, Transaction, context));
136-
RETURN_RESULT_UNLESS(IsDescriptionValid(result, streamingQueryDescription));
137-
138-
const auto& oldStreamingQueryInfo = context.SS->StreamingQueries.Value(dstPath->PathId, nullptr);
139-
Y_ABORT_UNLESS(oldStreamingQueryInfo);
140-
const auto streamingQueryInfo = CreateModifyStreamingQuery(streamingQueryDescription, oldStreamingQueryInfo);
141-
Y_ABORT_UNLESS(streamingQueryInfo);
217+
RETURN_RESULT_UNLESS(IsApplyIfChecksPassed(result, context));
218+
const auto queryInfo = GetAlteredQueryInfo(dstPath, context);
219+
RETURN_RESULT_UNLESS(IsDescriptionValid(result, queryInfo));
142220

143221
result->SetPathId(dstPath.Base()->PathId.LocalPathId);
144-
const TPathElement::TPtr streamingQuery = ReplaceStreamingQueryPathElement(dstPath);
145-
CreateTransaction(OperationId, context, streamingQuery->PathId, TTxState::TxAlterStreamingQuery);
146-
RegisterParentPathDependencies(OperationId, context, parentPath);
147-
148-
NIceDb::TNiceDb db(context.GetDB());
149-
AdvanceTransactionStateToPropose(OperationId, context, db);
150-
PersistStreamingQuery(OperationId, context, db, streamingQuery, streamingQueryInfo, /* acl */ TString());
151222

152-
IncParentDirAlterVersionWithRepublishSafeWithUndo(OperationId, dstPath, context.SS, context.OnComplete);
223+
const auto guard = context.DbGuard();
224+
PersistAlterStreamingQuery(dstPath, context);
225+
CreateTransaction(dstPath, context);
226+
AlterStreamingQueryPathElement(dstPath, queryInfo, context);
153227

154228
SetState(NextState());
155229
return result;
156230
}
157231

158232
void AbortPropose(TOperationContext& context) override {
159233
LOG_N("TAlterStreamingQuery AbortPropose: opId# " << OperationId);
160-
Y_ABORT("no AbortPropose for TAlterStreamingQuery");
161234
}
162235

163236
void AbortUnsafe(TTxId forceDropTxId, TOperationContext& context) override {

ydb/core/tx/schemeshard/schemeshard__operation_common_streaming_query.cpp

Lines changed: 0 additions & 131 deletions
This file was deleted.

ydb/core/tx/schemeshard/schemeshard__operation_common_streaming_query.h

Lines changed: 0 additions & 32 deletions
This file was deleted.

0 commit comments

Comments
 (0)