Skip to content

Commit 0ddbdae

Browse files
committed
test
1 parent 7847bc4 commit 0ddbdae

File tree

1 file changed

+108
-0
lines changed

1 file changed

+108
-0
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
2+
#include <ydb/core/tx/data_events/events.h>
3+
#include <ydb/core/tx/datashard/datashard.h>
4+
#include <ydb/core/base/tablet_pipecache.h>
5+
6+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
7+
8+
#include <util/generic/scope.h>
9+
10+
11+
namespace NKikimr {
12+
namespace NKqp {
13+
14+
using namespace NYdb;
15+
using namespace NYdb::NQuery;
16+
17+
Y_UNIT_TEST_SUITE(KqpReattach) {
18+
19+
Y_UNIT_TEST(ReattachDeliveryProblem) {
20+
TKikimrSettings settings;
21+
settings.SetUseRealThreads(false).SetEnableDataShardVolatileTransactions(false);
22+
settings.AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
23+
24+
TKikimrRunner kikimr(settings);
25+
auto db = kikimr.GetQueryClient();
26+
auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); });
27+
28+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
29+
Y_UNUSED(runtime);
30+
31+
{
32+
const TString query(Q1_(R"(
33+
UPSERT INTO `/Root/TwoShard` (Key, Value1) VALUES (1u, 'value'), (4000000001u, 'value');
34+
)"));
35+
36+
std::vector<std::unique_ptr<IEventHandle>> requests;
37+
38+
TActorId writeActor;
39+
ui64 tablet = 0;
40+
ui64 txId = 0;
41+
42+
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
43+
if (tablet == 0 && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWriteResult::EventType) {
44+
auto* msg = ev->Get<NEvents::TDataEvents::TEvWriteResult>();
45+
46+
writeActor = ev->Recipient;
47+
tablet = msg->Record.GetOrigin();
48+
txId = msg->Record.GetTxId();
49+
50+
return TTestActorRuntime::EEventAction::PROCESS;
51+
} else if (ev->GetTypeRewrite() == TEvTxProxy::TEvProposeTransaction::EventType) {
52+
auto restartTx = std::make_unique<TEvDataShard::TEvProposeTransactionRestart>(
53+
tablet,
54+
txId);
55+
56+
runtime.Send(writeActor, ev->Sender, restartTx.release());
57+
58+
auto deliveryProblem = std::make_unique<TEvPipeCache::TEvDeliveryProblem>(
59+
tablet,
60+
false);
61+
62+
runtime.Send(writeActor, ev->Sender, deliveryProblem.release());
63+
64+
requests.emplace_back(ev.Release());
65+
66+
return TTestActorRuntime::EEventAction::DROP;
67+
} else if (ev->GetTypeRewrite() == TEvDataShard::TEvProposeTransactionAttach::EventType) {
68+
UNIT_ASSERT(ev->Cookie == 1);
69+
70+
auto deliveryProblem = std::make_unique<TEvPipeCache::TEvDeliveryProblem>(
71+
tablet,
72+
true);
73+
74+
runtime.Send(writeActor, ev->Sender, deliveryProblem.release());
75+
76+
requests.emplace_back(ev.Release());
77+
78+
return TTestActorRuntime::EEventAction::DROP;
79+
}
80+
81+
return TTestActorRuntime::EEventAction::PROCESS;
82+
};
83+
84+
85+
runtime.SetObserverFunc(grab);
86+
87+
auto future = kikimr.RunInThreadPool([&]{
88+
return session.ExecuteQuery(
89+
query,
90+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
91+
});
92+
93+
const size_t requestsExpected = 2;
94+
95+
TDispatchOptions opts;
96+
opts.FinalEvents.emplace_back([&](IEventHandle&) {
97+
return requests.size() >= requestsExpected;
98+
});
99+
runtime.DispatchEvents(opts);
100+
UNIT_ASSERT(requests.size() == requestsExpected);
101+
102+
auto result = runtime.WaitFuture(future);
103+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::UNDETERMINED, result.GetIssues().ToString());
104+
}
105+
}
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)