Skip to content

Commit 34ff5ff

Browse files
authored
YQ-4482 added streaming queries into scheme shard (#22603)
1 parent bb0a602 commit 34ff5ff

File tree

54 files changed

+2156
-4
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2156
-4
lines changed

ydb/core/protos/counters_schemeshard.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,11 @@ enum ESimpleCounters {
265265
COUNTER_IN_FLIGHT_OPS_TxCreateSecret = 206 [(CounterOpts) = {Name: "InFlightOps/CreateSecret"}];
266266
COUNTER_IN_FLIGHT_OPS_TxAlterSecret = 207 [(CounterOpts) = {Name: "InFlightOps/AlterSecret"}];
267267
COUNTER_IN_FLIGHT_OPS_TxDropSecret = 208 [(CounterOpts) = {Name: "InFlightOps/DropSecret"}];
268+
269+
COUNTER_STREAMING_QUERY_COUNT = 209 [(CounterOpts) = {Name: "StreamingQueryCount"}];
270+
COUNTER_IN_FLIGHT_OPS_TxCreateStreamingQuery = 210 [(CounterOpts) = {Name: "InFlightOps/CreateStreamingQuery"}];
271+
COUNTER_IN_FLIGHT_OPS_TxDropStreamingQuery = 211 [(CounterOpts) = {Name: "InFlightOps/DropStreamingQuery"}];
272+
COUNTER_IN_FLIGHT_OPS_TxAlterStreamingQuery = 212 [(CounterOpts) = {Name: "InFlightOps/AlterStreamingQuery"}];
268273
}
269274

270275
enum ECumulativeCounters {
@@ -431,6 +436,10 @@ enum ECumulativeCounters {
431436
COUNTER_FINISHED_OPS_TxCreateSecret = 127 [(CounterOpts) = {Name: "FinishedOps/CreateSecret"}];
432437
COUNTER_FINISHED_OPS_TxAlterSecret = 128 [(CounterOpts) = {Name: "FinishedOps/AlterSecret"}];
433438
COUNTER_FINISHED_OPS_TxDropSecret = 129 [(CounterOpts) = {Name: "FinishedOps/DropSecret"}];
439+
440+
COUNTER_FINISHED_OPS_TxCreateStreamingQuery = 130 [(CounterOpts) = {Name: "FinishedOps/CreateStreamingQuery"}];
441+
COUNTER_FINISHED_OPS_TxDropStreamingQuery = 131 [(CounterOpts) = {Name: "FinishedOps/DropStreamingQuery"}];
442+
COUNTER_FINISHED_OPS_TxAlterStreamingQuery = 132 [(CounterOpts) = {Name: "FinishedOps/AlterStreamingQuery"}];
434443
}
435444

436445
enum EPercentileCounters {

ydb/core/protos/flat_scheme_op.proto

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1793,6 +1793,7 @@ message TModifyScheme {
17931793
optional EOperationType OperationType = 2;
17941794
optional bool Internal = 36 [default = false]; // internal operations are not generated directly by the user
17951795
optional bool FailOnExist = 50 [default = false]; // as a replacement for TEvModifySchemeTransaction.FailOnExist
1796+
optional bool ReplaceIfExists = 91 [default = false];
17961797
optional bool AllowAccessToPrivatePaths = 53 [default = false];
17971798

17981799
optional TMkDir MkDir = 3;
@@ -1907,6 +1908,8 @@ message TModifyScheme {
19071908
optional TSecretSchemaOp CreateSecret = 88;
19081909
optional TSecretSchemaOp AlterSecret = 89;
19091910

1911+
optional TStreamingQueryDescription CreateStreamingQuery = 90;
1912+
19101913
// Some entries are grouped by semantics, so are out of order
19111914
}
19121915

@@ -1977,6 +1980,7 @@ enum EPathType {
19771980
EPathTypeTransfer = 23;
19781981
EPathTypeSysView = 24;
19791982
EPathTypeSecret = 25;
1983+
EPathTypeStreamingQuery = 26;
19801984
}
19811985

19821986
enum EPathSubType {
@@ -2038,6 +2042,7 @@ message TPathVersion {
20382042
optional uint64 BackupCollectionVersion = 31;
20392043
optional uint64 SysViewVersion = 32;
20402044
optional uint64 SecretVersion = 33;
2045+
optional uint64 StreamingQueryVersion = 34;
20412046
}
20422047

20432048
// Describes single path
@@ -2131,6 +2136,7 @@ message TPathDescription {
21312136
optional TBackupCollectionDescription BackupCollectionDescription = 31;
21322137
optional TSysViewDescription SysViewDescription = 32;
21332138
optional TSecretDescription SecretDescription = 33;
2139+
optional TStreamingQueryDescription StreamingQueryDescription = 34;
21342140
}
21352141

21362142
// For persisting AlterTable Tx description in Schemeshard internal DB
@@ -2376,3 +2382,12 @@ message TSecretSchemaOp {
23762382
optional string Value = 2; // original, unencrypted value
23772383
optional bool InheritPermissions = 3;
23782384
}
2385+
2386+
message TStreamingQueryProperties {
2387+
map<string, string> Properties = 1;
2388+
}
2389+
2390+
message TStreamingQueryDescription {
2391+
optional string Name = 1;
2392+
optional TStreamingQueryProperties Properties = 2;
2393+
}

ydb/core/protos/schemeshard/operations.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,5 +203,10 @@ enum EOperationType {
203203
ESchemeOpAlterSecret = 128;
204204
ESchemeOpDropSecret = 129;
205205

206+
// Streaming query
207+
ESchemeOpCreateStreamingQuery = 130;
208+
ESchemeOpDropStreamingQuery = 131;
209+
ESchemeOpAlterStreamingQuery = 132;
210+
206211
// Some entries are grouped by semantics, so are out of order
207212
}

ydb/core/tx/scheme_board/cache.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
764764
BackupCollectionInfo.Drop();
765765
SysViewInfo.Drop();
766766
SecretInfo.Drop();
767+
StreamingQueryInfo.Drop();
767768
}
768769

769770
void FillTableInfo(const NKikimrSchemeOp::TPathDescription& pathDesc) {
@@ -1302,6 +1303,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
13021303
DESCRIPTION_PART(BackupCollectionInfo);
13031304
DESCRIPTION_PART(SysViewInfo);
13041305
DESCRIPTION_PART(SecretInfo);
1306+
DESCRIPTION_PART(StreamingQueryInfo);
13051307

13061308
#undef DESCRIPTION_PART
13071309

@@ -1654,6 +1656,10 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
16541656
Kind = TNavigate::KindSecret;
16551657
FillInfo(Kind, SecretInfo, std::move(*pathDesc.MutableSecretDescription()));
16561658
break;
1659+
case NKikimrSchemeOp::EPathTypeStreamingQuery:
1660+
Kind = TNavigate::KindStreamingQuery;
1661+
FillInfo(Kind, StreamingQueryInfo, std::move(*pathDesc.MutableStreamingQueryDescription()));
1662+
break;
16571663
case NKikimrSchemeOp::EPathTypeInvalid:
16581664
Y_DEBUG_ABORT("Invalid path type");
16591665
break;
@@ -1739,6 +1745,9 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
17391745
case NKikimrSchemeOp::EPathTypeSecret:
17401746
ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindSecret);
17411747
break;
1748+
case NKikimrSchemeOp::EPathTypeStreamingQuery:
1749+
ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindStreamingQuery);
1750+
break;
17421751
case NKikimrSchemeOp::EPathTypeTableIndex:
17431752
case NKikimrSchemeOp::EPathTypeInvalid:
17441753
Y_DEBUG_ABORT("Invalid path type");
@@ -1972,6 +1981,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
19721981
entry.SysViewInfo = SysViewInfo;
19731982
entry.SecretInfo = SecretInfo;
19741983
entry.TableKind = TableKind;
1984+
entry.StreamingQueryInfo = StreamingQueryInfo;
19751985
}
19761986

19771987
bool CheckColumns(TResolveContext* context, TResolve::TEntry& entry,
@@ -2281,6 +2291,9 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
22812291
// Secret specific
22822292
TIntrusivePtr<TNavigate::TSecretInfo> SecretInfo;
22832293

2294+
// StreamingQuery specific
2295+
TIntrusivePtr<TNavigate::TStreamingQueryInfo> StreamingQueryInfo;
2296+
22842297
}; // TCacheItem
22852298

22862299
struct TMerger {

ydb/core/tx/scheme_cache/scheme_cache.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ struct TSchemeCacheNavigate {
204204
KindTransfer = 24,
205205
KindSysView = 25,
206206
KindSecret = 26,
207+
KindStreamingQuery = 27,
207208
};
208209

209210
struct TListNodeEntry : public TAtomicRefCount<TListNodeEntry> {
@@ -335,6 +336,11 @@ struct TSchemeCacheNavigate {
335336
NKikimrSchemeOp::TSecretDescription Description;
336337
};
337338

339+
struct TStreamingQueryInfo : public TAtomicRefCount<TStreamingQueryInfo> {
340+
EKind Kind = KindUnknown;
341+
NKikimrSchemeOp::TStreamingQueryDescription Description;
342+
};
343+
338344
struct TEntry {
339345
enum class ERequestType : ui8 {
340346
ByPath,
@@ -392,6 +398,7 @@ struct TSchemeCacheNavigate {
392398
TIntrusiveConstPtr<TBackupCollectionInfo> BackupCollectionInfo;
393399
TIntrusiveConstPtr<TSysViewInfo> SysViewInfo;
394400
TIntrusiveConstPtr<TSecretInfo> SecretInfo;
401+
TIntrusiveConstPtr<TStreamingQueryInfo> StreamingQueryInfo;
395402

396403
TString ToString() const;
397404
TString ToString(const NScheme::TTypeRegistry& typeRegistry) const;

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2008,7 +2008,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
20082008
}
20092009
}
20102010

2011-
// Resorce Pool
2011+
// Resource Pool
20122012
{
20132013
auto rowset = db.Table<Schema::ResourcePool>().Range().Select();
20142014
if (!rowset.IsReady()) {
@@ -2054,6 +2054,29 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
20542054
}
20552055
}
20562056

2057+
// Read streaming queries
2058+
{
2059+
auto rowset = db.Table<Schema::StreamingQueryState>().Range().Select();
2060+
if (!rowset.IsReady()) {
2061+
return false;
2062+
}
2063+
2064+
while (!rowset.EndOfSet()) {
2065+
const TOwnerId ownerPathId = rowset.GetValue<Schema::StreamingQueryState::OwnerPathId>();
2066+
const TLocalPathId localPathId = rowset.GetValue<Schema::StreamingQueryState::LocalPathId>();
2067+
const TPathId pathId(ownerPathId, localPathId);
2068+
2069+
auto& streamingQuery = Self->StreamingQueries[pathId] = new TStreamingQueryInfo();
2070+
streamingQuery->AlterVersion = rowset.GetValue<Schema::StreamingQueryState::AlterVersion>();
2071+
Y_PROTOBUF_SUPPRESS_NODISCARD streamingQuery->Properties.ParseFromString(rowset.GetValue<Schema::StreamingQueryState::Properties>());
2072+
Self->IncrementPathDbRefCount(pathId);
2073+
2074+
if (!rowset.Next()) {
2075+
return false;
2076+
}
2077+
}
2078+
}
2079+
20572080
// Read table columns
20582081
{
20592082
TColumnRows columnRows;

ydb/core/tx/schemeshard/schemeshard__op_traits.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ EOperationClass GetOperationClass(NKikimrSchemeOp::EOperationType op) {
4747
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTransfer:
4848
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSysView:
4949
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSecret:
50+
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateStreamingQuery:
5051
return EOperationClass::Create;
5152

5253
// Simple operations that drop paths
@@ -81,6 +82,7 @@ EOperationClass GetOperationClass(NKikimrSchemeOp::EOperationType op) {
8182
case NKikimrSchemeOp::EOperationType::ESchemeOpDropTransferCascade:
8283
case NKikimrSchemeOp::EOperationType::ESchemeOpDropSysView:
8384
case NKikimrSchemeOp::EOperationType::ESchemeOpDropSecret:
85+
case NKikimrSchemeOp::EOperationType::ESchemeOpDropStreamingQuery:
8486
return EOperationClass::Drop;
8587

8688
// Simple operations that alter paths
@@ -109,6 +111,7 @@ EOperationClass GetOperationClass(NKikimrSchemeOp::EOperationType op) {
109111
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterBackupCollection:
110112
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterTransfer:
111113
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterSecret:
114+
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterStreamingQuery:
112115
return EOperationClass::Alter;
113116

114117
// Compound or special operations

ydb/core/tx/schemeshard/schemeshard__op_traits.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,13 @@ struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateSecret>
194194
constexpr inline static bool CreateDirsFromName = true;
195195
};
196196

197+
template <>
198+
struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateStreamingQuery>
199+
: public TSchemeTxTraitsFallback
200+
{
201+
constexpr inline static bool CreateDirsFromName = true;
202+
};
203+
197204
namespace NOperation {
198205

199206
template <class TTraits>

ydb/core/tx/schemeshard/schemeshard__operation.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,14 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
12961296
case TTxState::ETxType::TxDropSecret:
12971297
return CreateDropSecret(NextPartId(), txState);
12981298

1299+
// StreamingQuery
1300+
case TTxState::ETxType::TxCreateStreamingQuery:
1301+
return CreateNewStreamingQuery(NextPartId(), txState);
1302+
case TTxState::ETxType::TxDropStreamingQuery:
1303+
return CreateDropStreamingQuery(NextPartId(), txState);
1304+
case TTxState::ETxType::TxAlterStreamingQuery:
1305+
return CreateAlterStreamingQuery(NextPartId(), txState);
1306+
12991307
case TTxState::ETxType::TxInvalid:
13001308
Y_UNREACHABLE();
13011309
}
@@ -1629,6 +1637,14 @@ TVector<ISubOperation::TPtr> TDefaultOperationFactory::MakeOperationParts(
16291637
return {CreateAlterSecret(op.NextPartId(), tx)};
16301638
case NKikimrSchemeOp::EOperationType::ESchemeOpDropSecret:
16311639
return {CreateDropSecret(op.NextPartId(), tx)};
1640+
1641+
// StreamingQuery
1642+
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateStreamingQuery:
1643+
return {CreateNewStreamingQuery(op.NextPartId(), tx, context)};
1644+
case NKikimrSchemeOp::EOperationType::ESchemeOpDropStreamingQuery:
1645+
return {CreateDropStreamingQuery(op.NextPartId(), tx)};
1646+
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterStreamingQuery:
1647+
return {CreateAlterStreamingQuery(op.NextPartId(), tx)};
16321648
}
16331649

16341650
Y_UNREACHABLE();

0 commit comments

Comments
 (0)