Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ydb/core/protos/counters_schemeshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ enum ESimpleCounters {
COUNTER_IN_FLIGHT_OPS_TxCreateSecret = 206 [(CounterOpts) = {Name: "InFlightOps/CreateSecret"}];
COUNTER_IN_FLIGHT_OPS_TxAlterSecret = 207 [(CounterOpts) = {Name: "InFlightOps/AlterSecret"}];
COUNTER_IN_FLIGHT_OPS_TxDropSecret = 208 [(CounterOpts) = {Name: "InFlightOps/DropSecret"}];

COUNTER_STREAMING_QUERY_COUNT = 209 [(CounterOpts) = {Name: "StreamingQueryCount"}];
COUNTER_IN_FLIGHT_OPS_TxCreateStreamingQuery = 210 [(CounterOpts) = {Name: "InFlightOps/CreateStreamingQuery"}];
COUNTER_IN_FLIGHT_OPS_TxDropStreamingQuery = 211 [(CounterOpts) = {Name: "InFlightOps/DropStreamingQuery"}];
COUNTER_IN_FLIGHT_OPS_TxAlterStreamingQuery = 212 [(CounterOpts) = {Name: "InFlightOps/AlterStreamingQuery"}];
}

enum ECumulativeCounters {
Expand Down Expand Up @@ -431,6 +436,10 @@ enum ECumulativeCounters {
COUNTER_FINISHED_OPS_TxCreateSecret = 127 [(CounterOpts) = {Name: "FinishedOps/CreateSecret"}];
COUNTER_FINISHED_OPS_TxAlterSecret = 128 [(CounterOpts) = {Name: "FinishedOps/AlterSecret"}];
COUNTER_FINISHED_OPS_TxDropSecret = 129 [(CounterOpts) = {Name: "FinishedOps/DropSecret"}];

COUNTER_FINISHED_OPS_TxCreateStreamingQuery = 130 [(CounterOpts) = {Name: "FinishedOps/CreateStreamingQuery"}];
COUNTER_FINISHED_OPS_TxDropStreamingQuery = 131 [(CounterOpts) = {Name: "FinishedOps/DropStreamingQuery"}];
COUNTER_FINISHED_OPS_TxAlterStreamingQuery = 132 [(CounterOpts) = {Name: "FinishedOps/AlterStreamingQuery"}];
}

enum EPercentileCounters {
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,7 @@ message TModifyScheme {
optional EOperationType OperationType = 2;
optional bool Internal = 36 [default = false]; // internal operations are not generated directly by the user
optional bool FailOnExist = 50 [default = false]; // as a replacement for TEvModifySchemeTransaction.FailOnExist
optional bool ReplaceIfExists = 91 [default = false];
optional bool AllowAccessToPrivatePaths = 53 [default = false];

optional TMkDir MkDir = 3;
Expand Down Expand Up @@ -1907,6 +1908,8 @@ message TModifyScheme {
optional TSecretSchemaOp CreateSecret = 88;
optional TSecretSchemaOp AlterSecret = 89;

optional TStreamingQueryDescription CreateStreamingQuery = 90;

// Some entries are grouped by semantics, so are out of order
}

Expand Down Expand Up @@ -1977,6 +1980,7 @@ enum EPathType {
EPathTypeTransfer = 23;
EPathTypeSysView = 24;
EPathTypeSecret = 25;
EPathTypeStreamingQuery = 26;
}

enum EPathSubType {
Expand Down Expand Up @@ -2038,6 +2042,7 @@ message TPathVersion {
optional uint64 BackupCollectionVersion = 31;
optional uint64 SysViewVersion = 32;
optional uint64 SecretVersion = 33;
optional uint64 StreamingQueryVersion = 34;
}

// Describes single path
Expand Down Expand Up @@ -2131,6 +2136,7 @@ message TPathDescription {
optional TBackupCollectionDescription BackupCollectionDescription = 31;
optional TSysViewDescription SysViewDescription = 32;
optional TSecretDescription SecretDescription = 33;
optional TStreamingQueryDescription StreamingQueryDescription = 34;
}

// For persisting AlterTable Tx description in Schemeshard internal DB
Expand Down Expand Up @@ -2376,3 +2382,12 @@ message TSecretSchemaOp {
optional string Value = 2; // original, unencrypted value
optional bool InheritPermissions = 3;
}

message TStreamingQueryProperties {
map<string, string> Properties = 1;
}

message TStreamingQueryDescription {
optional string Name = 1;
optional TStreamingQueryProperties Properties = 2;
}
5 changes: 5 additions & 0 deletions ydb/core/protos/schemeshard/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,10 @@ enum EOperationType {
ESchemeOpAlterSecret = 128;
ESchemeOpDropSecret = 129;

// Streaming query
ESchemeOpCreateStreamingQuery = 130;
ESchemeOpDropStreamingQuery = 131;
ESchemeOpAlterStreamingQuery = 132;

// Some entries are grouped by semantics, so are out of order
}
13 changes: 13 additions & 0 deletions ydb/core/tx/scheme_board/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
BackupCollectionInfo.Drop();
SysViewInfo.Drop();
SecretInfo.Drop();
StreamingQueryInfo.Drop();
}

void FillTableInfo(const NKikimrSchemeOp::TPathDescription& pathDesc) {
Expand Down Expand Up @@ -1302,6 +1303,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
DESCRIPTION_PART(BackupCollectionInfo);
DESCRIPTION_PART(SysViewInfo);
DESCRIPTION_PART(SecretInfo);
DESCRIPTION_PART(StreamingQueryInfo);

#undef DESCRIPTION_PART

Expand Down Expand Up @@ -1654,6 +1656,10 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
Kind = TNavigate::KindSecret;
FillInfo(Kind, SecretInfo, std::move(*pathDesc.MutableSecretDescription()));
break;
case NKikimrSchemeOp::EPathTypeStreamingQuery:
Kind = TNavigate::KindStreamingQuery;
FillInfo(Kind, StreamingQueryInfo, std::move(*pathDesc.MutableStreamingQueryDescription()));
break;
case NKikimrSchemeOp::EPathTypeInvalid:
Y_DEBUG_ABORT("Invalid path type");
break;
Expand Down Expand Up @@ -1739,6 +1745,9 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
case NKikimrSchemeOp::EPathTypeSecret:
ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindSecret);
break;
case NKikimrSchemeOp::EPathTypeStreamingQuery:
ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindStreamingQuery);
break;
case NKikimrSchemeOp::EPathTypeTableIndex:
case NKikimrSchemeOp::EPathTypeInvalid:
Y_DEBUG_ABORT("Invalid path type");
Expand Down Expand Up @@ -1972,6 +1981,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
entry.SysViewInfo = SysViewInfo;
entry.SecretInfo = SecretInfo;
entry.TableKind = TableKind;
entry.StreamingQueryInfo = StreamingQueryInfo;
}

bool CheckColumns(TResolveContext* context, TResolve::TEntry& entry,
Expand Down Expand Up @@ -2281,6 +2291,9 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
// Secret specific
TIntrusivePtr<TNavigate::TSecretInfo> SecretInfo;

// StreamingQuery specific
TIntrusivePtr<TNavigate::TStreamingQueryInfo> StreamingQueryInfo;

}; // TCacheItem

struct TMerger {
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/scheme_cache/scheme_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ struct TSchemeCacheNavigate {
KindTransfer = 24,
KindSysView = 25,
KindSecret = 26,
KindStreamingQuery = 27,
};

struct TListNodeEntry : public TAtomicRefCount<TListNodeEntry> {
Expand Down Expand Up @@ -335,6 +336,11 @@ struct TSchemeCacheNavigate {
NKikimrSchemeOp::TSecretDescription Description;
};

struct TStreamingQueryInfo : public TAtomicRefCount<TStreamingQueryInfo> {
EKind Kind = KindUnknown;
NKikimrSchemeOp::TStreamingQueryDescription Description;
};

struct TEntry {
enum class ERequestType : ui8 {
ByPath,
Expand Down Expand Up @@ -392,6 +398,7 @@ struct TSchemeCacheNavigate {
TIntrusiveConstPtr<TBackupCollectionInfo> BackupCollectionInfo;
TIntrusiveConstPtr<TSysViewInfo> SysViewInfo;
TIntrusiveConstPtr<TSecretInfo> SecretInfo;
TIntrusiveConstPtr<TStreamingQueryInfo> StreamingQueryInfo;

TString ToString() const;
TString ToString(const NScheme::TTypeRegistry& typeRegistry) const;
Expand Down
25 changes: 24 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2008,7 +2008,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
}

// Resorce Pool
// Resource Pool
{
auto rowset = db.Table<Schema::ResourcePool>().Range().Select();
if (!rowset.IsReady()) {
Expand Down Expand Up @@ -2054,6 +2054,29 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
}

// Read streaming queries
{
auto rowset = db.Table<Schema::StreamingQueryState>().Range().Select();
if (!rowset.IsReady()) {
return false;
}

while (!rowset.EndOfSet()) {
const TOwnerId ownerPathId = rowset.GetValue<Schema::StreamingQueryState::OwnerPathId>();
const TLocalPathId localPathId = rowset.GetValue<Schema::StreamingQueryState::LocalPathId>();
const TPathId pathId(ownerPathId, localPathId);

auto& streamingQuery = Self->StreamingQueries[pathId] = new TStreamingQueryInfo();
streamingQuery->AlterVersion = rowset.GetValue<Schema::StreamingQueryState::AlterVersion>();
Y_PROTOBUF_SUPPRESS_NODISCARD streamingQuery->Properties.ParseFromString(rowset.GetValue<Schema::StreamingQueryState::Properties>());
Self->IncrementPathDbRefCount(pathId);

if (!rowset.Next()) {
return false;
}
}
}

// Read table columns
{
TColumnRows columnRows;
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__op_traits.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ EOperationClass GetOperationClass(NKikimrSchemeOp::EOperationType op) {
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTransfer:
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSysView:
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSecret:
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateStreamingQuery:
return EOperationClass::Create;

// Simple operations that drop paths
Expand Down Expand Up @@ -81,6 +82,7 @@ EOperationClass GetOperationClass(NKikimrSchemeOp::EOperationType op) {
case NKikimrSchemeOp::EOperationType::ESchemeOpDropTransferCascade:
case NKikimrSchemeOp::EOperationType::ESchemeOpDropSysView:
case NKikimrSchemeOp::EOperationType::ESchemeOpDropSecret:
case NKikimrSchemeOp::EOperationType::ESchemeOpDropStreamingQuery:
return EOperationClass::Drop;

// Simple operations that alter paths
Expand Down Expand Up @@ -109,6 +111,7 @@ EOperationClass GetOperationClass(NKikimrSchemeOp::EOperationType op) {
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterBackupCollection:
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterTransfer:
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterSecret:
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterStreamingQuery:
return EOperationClass::Alter;

// Compound or special operations
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__op_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateSecret>
constexpr inline static bool CreateDirsFromName = true;
};

template <>
struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateStreamingQuery>
: public TSchemeTxTraitsFallback
{
constexpr inline static bool CreateDirsFromName = true;
};

namespace NOperation {

template <class TTraits>
Expand Down
16 changes: 16 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,14 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
case TTxState::ETxType::TxDropSecret:
return CreateDropSecret(NextPartId(), txState);

// StreamingQuery
case TTxState::ETxType::TxCreateStreamingQuery:
return CreateNewStreamingQuery(NextPartId(), txState);
case TTxState::ETxType::TxDropStreamingQuery:
return CreateDropStreamingQuery(NextPartId(), txState);
case TTxState::ETxType::TxAlterStreamingQuery:
return CreateAlterStreamingQuery(NextPartId(), txState);

case TTxState::ETxType::TxInvalid:
Y_UNREACHABLE();
}
Expand Down Expand Up @@ -1625,6 +1633,14 @@ TVector<ISubOperation::TPtr> TDefaultOperationFactory::MakeOperationParts(
return {CreateAlterSecret(op.NextPartId(), tx)};
case NKikimrSchemeOp::EOperationType::ESchemeOpDropSecret:
return {CreateDropSecret(op.NextPartId(), tx)};

// StreamingQuery
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateStreamingQuery:
return {CreateNewStreamingQuery(op.NextPartId(), tx, context)};
case NKikimrSchemeOp::EOperationType::ESchemeOpDropStreamingQuery:
return {CreateDropStreamingQuery(op.NextPartId(), tx)};
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterStreamingQuery:
return {CreateAlterStreamingQuery(op.NextPartId(), tx)};
}

Y_UNREACHABLE();
Expand Down
Loading
Loading