@@ -12,8 +12,9 @@ namespace {
12
12
13
13
class TPropose : public TSubOperationState {
14
14
public:
15
- explicit TPropose (TOperationId id)
15
+ TPropose (TOperationId id, bool replacePath )
16
16
: OperationId(std::move(id))
17
+ , ReplacePath(replacePath)
17
18
{}
18
19
19
20
bool HandleReply (TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override {
@@ -33,7 +34,9 @@ class TPropose : public TSubOperationState {
33
34
path->StepCreated = step;
34
35
context.SS ->PersistCreateStep (db, pathId, step);
35
36
36
- context.SS ->TabletCounters ->Simple ()[COUNTER_STREAMING_QUERY_COUNT].Add (1 );
37
+ if (!ReplacePath) {
38
+ context.SS ->TabletCounters ->Simple ()[COUNTER_STREAMING_QUERY_COUNT].Add (1 );
39
+ }
37
40
38
41
IncParentDirAlterVersionWithRepublish (OperationId, path, context);
39
42
context.SS ->ClearDescribePathCaches (pathPtr);
@@ -61,6 +64,7 @@ class TPropose : public TSubOperationState {
61
64
62
65
private:
63
66
const TOperationId OperationId;
67
+ const bool ReplacePath = false ;
64
68
};
65
69
66
70
class TCreateStreamingQuery : public TSubOperation {
@@ -82,29 +86,53 @@ class TCreateStreamingQuery : public TSubOperation {
82
86
switch (state) {
83
87
case TTxState::Waiting:
84
88
case TTxState::Propose:
85
- return MakeHolder<TPropose>(OperationId);
89
+ return MakeHolder<TPropose>(OperationId, ReplacePath );
86
90
case TTxState::Done:
87
91
return MakeHolder<TDone>(OperationId);
88
92
default :
89
93
return nullptr ;
90
94
}
91
95
}
92
96
93
- static bool IsDestinationPathValid (const THolder<TProposeResponse>& result, const TOperationContext& context, const TPath& dstPath, const TString& acl, bool acceptExisted) {
97
+ ui64 GetAlterVersion (const TPath& dstPath, const TOperationContext& context) {
98
+ ui64 alterVersion = 1 ;
99
+
100
+ if (Transaction.GetReplaceIfExists ()) {
101
+ ReplacePath = static_cast <bool >(dstPath.Check ()
102
+ .IsResolved ()
103
+ .NotUnderDeleting ());
104
+
105
+ if (ReplacePath) {
106
+ const auto & oldStreamingQueryInfo = context.SS ->StreamingQueries .Value (dstPath->PathId , nullptr );
107
+ Y_ABORT_UNLESS (oldStreamingQueryInfo);
108
+ alterVersion = oldStreamingQueryInfo->AlterVersion + 1 ;
109
+ }
110
+ }
111
+
112
+ return alterVersion;
113
+ }
114
+
115
+ bool IsDestinationPathValid (const THolder<TProposeResponse>& result, const TOperationContext& context, const TPath& dstPath, const TString& acl, bool acceptExisted) const {
94
116
const auto checks = dstPath.Check ();
95
117
96
118
checks.IsAtLocalSchemeShard ();
97
119
98
120
if (dstPath.IsResolved ()) {
99
121
checks.IsResolved ()
100
- .NotUnderDeleting ()
101
- .FailOnExist (TPathElement::EPathType::EPathTypeStreamingQuery, acceptExisted);
122
+ .NotUnderDeleting ();
123
+
124
+ if (ReplacePath) {
125
+ checks.NotUnderOperation ()
126
+ .FailOnWrongType (TPathElement::EPathType::EPathTypeStreamingQuery);
127
+ } else {
128
+ checks.FailOnExist (TPathElement::EPathType::EPathTypeStreamingQuery, acceptExisted);
129
+ }
102
130
} else {
103
131
checks.NotEmpty ()
104
132
.NotResolved ();
105
133
}
106
134
107
- if (checks) {
135
+ if (!ReplacePath && checks) {
108
136
checks.IsValidLeafName (context.UserToken .Get ())
109
137
.DepthLimit ()
110
138
.PathsLimit ()
@@ -123,8 +151,11 @@ class TCreateStreamingQuery : public TSubOperation {
123
151
return static_cast <bool >(checks);
124
152
}
125
153
126
- static void AddPathInSchemeShard (const THolder<TProposeResponse>& result, TPath& dstPath, const TString& owner) {
127
- dstPath.MaterializeLeaf (owner);
154
+ void AddPathInSchemeShard (const THolder<TProposeResponse>& result, TPath& dstPath, const TString& owner) const {
155
+ if (!ReplacePath) {
156
+ dstPath.MaterializeLeaf (owner);
157
+ }
158
+
128
159
result->SetPathId (dstPath.Base ()->PathId .LocalPathId );
129
160
}
130
161
@@ -139,6 +170,16 @@ class TCreateStreamingQuery : public TSubOperation {
139
170
return streamingQuery;
140
171
}
141
172
173
+ void IncPathCounters (const TPath& parentPath, const TPath& dstPath, TOperationContext& context) const {
174
+ IncParentDirAlterVersionWithRepublishSafeWithUndo (OperationId, dstPath, context.SS , context.OnComplete );
175
+
176
+ if (!ReplacePath) {
177
+ dstPath.DomainInfo ()->IncPathsInside (context.SS );
178
+ }
179
+
180
+ IncAliveChildrenDirect (OperationId, parentPath, context);
181
+ }
182
+
142
183
public:
143
184
using TSubOperation::TSubOperation;
144
185
@@ -156,12 +197,14 @@ class TCreateStreamingQuery : public TSubOperation {
156
197
RETURN_RESULT_UNLESS (IsParentPathValid (result, parentPath, /* isCreate */ true ));
157
198
158
199
TPath dstPath = parentPath.Child (name);
200
+ const ui64 alterVersion = GetAlterVersion (dstPath, context);
201
+
159
202
const TString& acl = Transaction.GetModifyACL ().GetDiffACL ();
160
203
RETURN_RESULT_UNLESS (IsDestinationPathValid (result, context, dstPath, acl, !Transaction.GetFailOnExist ()));
161
204
RETURN_RESULT_UNLESS (IsApplyIfChecksPassed (result, Transaction, context));
162
205
RETURN_RESULT_UNLESS (IsDescriptionValid (result, streamingQueryDescription));
163
206
164
- const auto streamingQueryInfo = CreateNewStreamingQuery (streamingQueryDescription, 1 );
207
+ const auto streamingQueryInfo = CreateNewStreamingQuery (streamingQueryDescription, alterVersion );
165
208
Y_ABORT_UNLESS (streamingQueryInfo);
166
209
167
210
AddPathInSchemeShard (result, dstPath, owner);
@@ -172,11 +215,7 @@ class TCreateStreamingQuery : public TSubOperation {
172
215
NIceDb::TNiceDb db (context.GetDB ());
173
216
AdvanceTransactionStateToPropose (OperationId, context, db);
174
217
PersistStreamingQuery (OperationId, context, db, streamingQuery, streamingQueryInfo, acl);
175
-
176
- IncParentDirAlterVersionWithRepublishSafeWithUndo (OperationId, dstPath, context.SS , context.OnComplete );
177
-
178
- dstPath.DomainInfo ()->IncPathsInside (context.SS );
179
- IncAliveChildrenDirect (OperationId, parentPath, context); // for correct discard of ChildrenExist prop
218
+ IncPathCounters (parentPath, dstPath, context);
180
219
181
220
SetState (NextState ());
182
221
return result;
@@ -191,6 +230,9 @@ class TCreateStreamingQuery : public TSubOperation {
191
230
LOG_N (" TCreateStreamingQuery AbortUnsafe: opId# " << OperationId << " , txId# " << forceDropTxId);
192
231
context.OnComplete .DoneOperation (OperationId);
193
232
}
233
+
234
+ private:
235
+ bool ReplacePath = false ;
194
236
};
195
237
196
238
using TTag = TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateStreamingQuery>;
@@ -215,27 +257,7 @@ bool SetName<NStreamingQuery::TTag>(NStreamingQuery::TTag, TTxTransaction& tx, c
215
257
216
258
} // namespace NOperation
217
259
218
- ISubOperation::TPtr CreateNewStreamingQuery (TOperationId id, const TTxTransaction& tx, TOperationContext& context) {
219
- Y_ABORT_UNLESS (tx.GetOperationType () == NKikimrSchemeOp::ESchemeOpCreateStreamingQuery);
220
-
221
- LOG_I (" CreateNewStreamingQuery, opId# " << id << " , tx# " << tx.ShortDebugString ());
222
-
223
- const TPath parentPath = TPath::Resolve (tx.GetWorkingDir (), context.SS );
224
- if (const auto checks = NStreamingQuery::IsParentPathValid (parentPath, /* isCreate */ true ); !checks) {
225
- return CreateReject (id, checks.GetStatus (), TStringBuilder () << " Invalid CreateStreamingQuery request: " << checks.GetError ());
226
- }
227
-
228
- if (const auto & operation = tx.GetCreateStreamingQuery (); operation.GetReplaceIfExists ()) {
229
- const TPath dstPath = parentPath.Child (operation.GetName ());
230
- const auto isAlreadyExists = dstPath.Check ()
231
- .IsResolved ()
232
- .NotUnderDeleting ();
233
-
234
- if (isAlreadyExists) {
235
- return CreateAlterStreamingQuery (id, tx);
236
- }
237
- }
238
-
260
+ ISubOperation::TPtr CreateNewStreamingQuery (TOperationId id, const TTxTransaction& tx) {
239
261
return MakeSubOperation<NStreamingQuery::TCreateStreamingQuery>(id, tx);
240
262
}
241
263
0 commit comments