Skip to content

Commit ebf06bb

Browse files
[BugFix] Fix async delta writer crash due to null pointer (backport #62626) (#62651)
Signed-off-by: xiangguangyxg <xiangguangyxg@gmail.com> Co-authored-by: xiangguangyxg <110401425+xiangguangyxg@users.noreply.github.com>
1 parent 4a333ab commit ebf06bb

File tree

3 files changed

+59
-8
lines changed

3 files changed

+59
-8
lines changed

be/src/runtime/lake_tablets_channel.cpp

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -452,8 +452,8 @@ void LakeTabletsChannel::add_chunk(Chunk* chunk, const PTabletWriterAddChunkRequ
452452
}
453453
total_row_num += size;
454454
int64_t tablet_id = tablet_ids[row_indexes[from]];
455-
auto& dw = _delta_writers[tablet_id];
456-
if (dw == nullptr) {
455+
auto delta_writer_iter = _delta_writers.find(tablet_id);
456+
if (delta_writer_iter == _delta_writers.end()) {
457457
LOG(WARNING) << "LakeTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(request.id())
458458
<< " not found tablet_id: " << tablet_id;
459459
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
@@ -464,6 +464,7 @@ void LakeTabletsChannel::add_chunk(Chunk* chunk, const PTabletWriterAddChunkRequ
464464
}
465465

466466
// back pressure OlapTableSink since there are too many memtables need to flush
467+
auto& dw = delta_writer_iter->second;
467468
while (dw->queueing_memtable_num() >= config::max_queueing_memtable_per_tablet) {
468469
if (watch.elapsed_time() / 1000000 > request.timeout_ms()) {
469470
LOG(INFO) << "LakeTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(request.id())
@@ -549,7 +550,18 @@ void LakeTabletsChannel::add_chunk(Chunk* chunk, const PTabletWriterAddChunkRequ
549550

550551
std::set<long> immutable_tablet_ids;
551552
for (auto tablet_id : request.tablet_ids()) {
552-
auto& writer = _delta_writers[tablet_id];
553+
auto writer_iter = _delta_writers.find(tablet_id);
554+
if (writer_iter == _delta_writers.end()) {
555+
LOG(WARNING) << "LakeTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(request.id())
556+
<< " not found tablet_id: " << tablet_id;
557+
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
558+
response->mutable_status()->add_error_msgs(
559+
fmt::format("Failed to add_chunk since tablet_id {} not exists, txn_id: {}, load_id: {}", tablet_id,
560+
_txn_id, print_id(request.id())));
561+
return;
562+
}
563+
564+
auto& writer = writer_iter->second;
553565
if (writer->is_immutable() && immutable_tablet_ids.count(tablet_id) == 0) {
554566
response->add_immutable_tablet_ids(tablet_id);
555567
response->add_immutable_partition_ids(writer->partition_id());

be/test/exprs/agg/agg_compressed_key_test.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ TEST(AggCompressedKey, could_bound) {
4444
used_bytes.resize(1);
4545

4646
auto type1 = TypeDescriptor(TYPE_INT);
47-
groupby.emplace_back(type1, false);
47+
groupby.emplace_back(ColumnType{type1, false});
4848
std::vector<std::optional<std::pair<VectorizedLiteral*, VectorizedLiteral*>>> ranges;
4949
auto* min = pool.add(new VectorizedLiteral(ColumnHelper::create_const_column<TYPE_INT>(0, 1), type1));
5050
auto* max = pool.add(new VectorizedLiteral(ColumnHelper::create_const_column<TYPE_INT>(100, 1), type1));
@@ -70,8 +70,8 @@ TEST(AggCompressedKey, could_bound) {
7070
used_bytes.resize(2);
7171

7272
auto type1 = TypeDescriptor(TYPE_INT);
73-
groupby.emplace_back(type1, false);
74-
groupby.emplace_back(type1, true);
73+
groupby.emplace_back(ColumnType{type1, false});
74+
groupby.emplace_back(ColumnType{type1, true});
7575
std::vector<std::optional<std::pair<VectorizedLiteral*, VectorizedLiteral*>>> ranges;
7676
auto* min = pool.add(new VectorizedLiteral(ColumnHelper::create_const_column<TYPE_INT>(0, 1), type1));
7777
auto* max = pool.add(new VectorizedLiteral(ColumnHelper::create_const_column<TYPE_INT>(100, 1), type1));
@@ -98,8 +98,8 @@ TEST(AggCompressedKey, could_bound) {
9898
used_bytes.resize(2);
9999

100100
auto type1 = TypeDescriptor::create_decimalv3_type(TYPE_DECIMAL128, 8, 4);
101-
groupby.emplace_back(type1, false);
102-
groupby.emplace_back(type1, true);
101+
groupby.emplace_back(ColumnType{type1, false});
102+
groupby.emplace_back(ColumnType{type1, true});
103103
std::vector<std::optional<std::pair<VectorizedLiteral*, VectorizedLiteral*>>> ranges;
104104
auto* min = pool.add(
105105
new VectorizedLiteral(ColumnHelper::create_const_decimal_column<TYPE_DECIMAL128>(0, 8, 4, 1), type1));

be/test/runtime/lake_tablets_channel_test.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,45 @@ TEST_F(LakeTabletsChannelTest, test_write_failed) {
690690
ASSERT_FALSE(fs::path_exist(_tablet_manager->txn_log_location(10088, kTxnId)));
691691
}
692692

693+
TEST_F(LakeTabletsChannelTest, test_tablet_not_existed) {
694+
auto open_request = _open_request;
695+
open_request.set_num_senders(1);
696+
697+
ASSERT_OK(_tablets_channel->open(open_request, &_open_response, _schema_param, false));
698+
699+
constexpr int kChunkSize = 128;
700+
constexpr int kChunkSizePerTablet = kChunkSize / 4;
701+
auto chunk = generate_data(kChunkSize);
702+
PTabletWriterAddChunkRequest add_chunk_request;
703+
PTabletWriterAddBatchResult add_chunk_response;
704+
add_chunk_request.set_index_id(kIndexId);
705+
add_chunk_request.set_sender_id(0);
706+
add_chunk_request.set_eos(false);
707+
add_chunk_request.set_packet_seq(0);
708+
709+
for (int i = 0; i < kChunkSize; i++) {
710+
int64_t tablet_id = 10086 + (i / kChunkSizePerTablet);
711+
add_chunk_request.add_tablet_ids(tablet_id);
712+
add_chunk_request.add_partition_ids(tablet_id < 10088 ? 10 : 11);
713+
}
714+
715+
ASSIGN_OR_ABORT(auto chunk_pb, serde::ProtobufChunkSerde::serialize(chunk));
716+
add_chunk_request.mutable_chunk()->Swap(&chunk_pb);
717+
718+
add_chunk_request.add_tablet_ids(10000); // Not existed tablet id
719+
720+
bool close_channel;
721+
_tablets_channel->add_chunk(&chunk, add_chunk_request, &add_chunk_response, &close_channel);
722+
ASSERT_NE(TStatusCode::INTERNAL_ERROR, add_chunk_response.status().status_code());
723+
ASSERT_FALSE(close_channel);
724+
725+
_tablets_channel->abort();
726+
727+
ASSERT_FALSE(fs::path_exist(_tablet_manager->txn_log_location(10086, kTxnId)));
728+
ASSERT_FALSE(fs::path_exist(_tablet_manager->txn_log_location(10087, kTxnId)));
729+
ASSERT_FALSE(fs::path_exist(_tablet_manager->txn_log_location(10088, kTxnId)));
730+
}
731+
693732
TEST_F(LakeTabletsChannelTest, test_empty_tablet) {
694733
auto open_request = _open_request;
695734
open_request.set_num_senders(1);

0 commit comments

Comments
 (0)