Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.2.x] tx/group compaction fixes #24689

Merged
merged 20 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a98a5d3
tx/group: track begin offset of transactions
bharathv Dec 20, 2024
e71e1f7
tx/group: support describe_producers for group
bharathv Dec 24, 2024
5dab07b
tx/tests/dt: test for describe producers
bharathv Dec 20, 2024
266e86e
tx/groups: escape hatch for unsafe aborting of group transactions
bharathv Dec 20, 2024
e0bf89f
group_tx_tracker/stm: plumb feature_table into the stm
bharathv Dec 24, 2024
6692d1b
k/group_data_parser: reduce ignored batch logging to debug
bharathv Jan 2, 2025
5f37e76
tx/group/group_tx_tracker: tighten up computation of collectible_offsset
bharathv Dec 24, 2024
3e4a421
group_tx_tracker/stm: track additional information about fence batches
bharathv Jan 2, 2025
d40ce01
group_tx_tracker/stm: heuristic to ignore stale tx_fence batches
bharathv Jan 2, 2025
0f7e395
group_tracker/stm: add a periodic GC loop to expire stale tx_fence txes
bharathv Jan 3, 2025
aa2333b
k/group: disallow group deletion while transactions in progress
bharathv Dec 23, 2024
690d61f
group_recovery_consumer/logging: tidy up logging
bharathv Jan 2, 2025
0292bab
group_recovery/tx: fix group recovery for non existent groups
bharathv Jan 2, 2025
fae25ef
group/tx: add a ducktape test for compactibility of consumer_offsets
bharathv Dec 20, 2024
5dd5339
tx/producer_state: add getters for internal state
bharathv Dec 22, 2024
b9ea8dd
tx/observability: add types and plumbing needed to get producer states
bharathv Dec 22, 2024
b45c73d
tx/admin: types for exposing producer info in REST api
bharathv Dec 22, 2024
0d3d482
tx/observability: REST endpoint to fetch all producers from a partition
bharathv Dec 22, 2024
bfa3603
raft/persisted_stm: add ability for stms to reject local snapshot
bharathv Dec 23, 2024
8a681c1
tx/group_tx_stm: invalidate existing snapshots for the stm
bharathv Dec 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/v/archival/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,8 @@ ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) {
get_last_offset());
}

ss::future<> archival_metadata_stm::apply_local_snapshot(
ss::future<raft::local_snapshot_applied>
archival_metadata_stm::apply_local_snapshot(
raft::stm_snapshot_header header, iobuf&& data) {
auto snap = serde::from_iobuf<snapshot>(std::move(data));

Expand Down Expand Up @@ -1287,7 +1288,7 @@ ss::future<> archival_metadata_stm::apply_local_snapshot(
} else {
_last_clean_at = header.offset;
}
co_return;
co_return raft::local_snapshot_applied::yes;
}

ss::future<raft::stm_snapshot>
Expand Down
2 changes: 1 addition & 1 deletion src/v/archival/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class archival_metadata_stm final : public raft::persisted_stm<> {
ss::future<> do_apply(const model::record_batch& batch) override;
ss::future<> apply_raft_snapshot(const iobuf&) override;

ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override;
ss::future<raft::stm_snapshot>
take_local_snapshot(ssx::semaphore_units apply_units) override;
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/distributed_kv_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class distributed_kv_stm final : public raft::persisted_stm<> {
});
}

ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&& bytes) override {
auto holder = _gate.hold();
iobuf_parser parser(std::move(bytes));
Expand All @@ -144,6 +144,7 @@ class distributed_kv_stm final : public raft::persisted_stm<> {
}
}
_kvs = std::move(snap.kv_data);
co_return raft::local_snapshot_applied::yes;
}

ss::future<raft::stm_snapshot>
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ ss::future<> id_allocator_stm::write_snapshot() {
.finally([this] { _is_writing_snapshot = false; });
}

ss::future<>
ss::future<raft::local_snapshot_applied>
id_allocator_stm::apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) {
return ss::make_exception_future<>(
return ss::make_exception_future<raft::local_snapshot_applied>(
std::logic_error("id_allocator_stm doesn't support snapshots"));
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/id_allocator_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class id_allocator_stm final : public raft::persisted_stm<> {
advance_state(int64_t, model::timeout_clock::duration);

ss::future<> write_snapshot();
ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override;
ss::future<raft::stm_snapshot>
take_local_snapshot(ssx::semaphore_units apply_units) override;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,14 @@ ss::future<> log_eviction_stm::apply_raft_snapshot(const iobuf&) {
co_return;
}

ss::future<> log_eviction_stm::apply_local_snapshot(
ss::future<raft::local_snapshot_applied> log_eviction_stm::apply_local_snapshot(
raft::stm_snapshot_header header, iobuf&& data) {
auto snapshot = serde::from_iobuf<snapshot_data>(std::move(data));
vlog(
_log.info, "Applying snapshot {} at offset: {}", snapshot, header.offset);

_delete_records_eviction_offset = snapshot.effective_start_offset;
return ss::now();
co_return raft::local_snapshot_applied::yes;
}

ss::future<raft::stm_snapshot>
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/log_eviction_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class log_eviction_stm
ss::future<iobuf> take_snapshot(model::offset) final { co_return iobuf{}; }

protected:
ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override;

ss::future<raft::stm_snapshot>
Expand Down
30 changes: 24 additions & 6 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class request {
}
}

seq_t first_sequence() const { return _first_sequence; }
seq_t last_sequence() const { return _last_sequence; }
model::term_id term() const { return _term; }

void set_value(request_result_t::value_type);
void set_error(request_result_t::error_type);
void mark_request_in_progress() { _state = request_state::in_progress; }
Expand Down Expand Up @@ -106,6 +110,14 @@ class request {
// Kafka clients only issue requests in batches of 5, the queue is fairly small
// at all times.
class requests {
private:
static constexpr int32_t requests_cached_max = 5;
// chunk size of the request containers to avoid wastage.
static constexpr size_t chunk_size = std::bit_ceil(
static_cast<unsigned long>(requests_cached_max));

using request_queue = ss::chunked_fifo<request_ptr, chunk_size>;

public:
result<request_ptr> try_emplace(
seq_t first, seq_t last, model::term_id current, bool reset_sequences);
Expand All @@ -118,17 +130,21 @@ class requests {
bool operator==(const requests&) const;
friend std::ostream& operator<<(std::ostream&, const requests&);

const request_queue& inflight_requests() const {
return _inflight_requests;
}

const request_queue& finished_requests() const {
return _finished_requests;
}

private:
static constexpr int32_t requests_cached_max = 5;
// chunk size of the request containers to avoid wastage.
static constexpr size_t chunk_size = std::bit_ceil(
static_cast<unsigned long>(requests_cached_max));
bool is_valid_sequence(seq_t incoming) const;
std::optional<request_ptr> last_request() const;
void gc_requests_from_older_terms(model::term_id current);
void reset(request_result_t::error_type);
ss::chunked_fifo<request_ptr, chunk_size> _inflight_requests;
ss::chunked_fifo<request_ptr, chunk_size> _finished_requests;
request_queue _inflight_requests;
request_queue _finished_requests;
friend producer_state;
};

Expand Down Expand Up @@ -271,6 +287,8 @@ class producer_state {
// progress transactions with older epoch.
void reset_with_new_epoch(model::producer_epoch new_epoch);

const requests& idempotent_request_state() const { return _requests; }

private:
prefix_logger& _logger;

Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/rm_group_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,8 @@ class rm_group_proxy {

virtual ss::future<abort_group_tx_reply>
abort_group_tx_locally(abort_group_tx_request) = 0;

virtual ss::future<get_producers_reply>
get_group_producers_locally(get_producers_request) = 0;
};
} // namespace cluster
56 changes: 56 additions & 0 deletions src/v/cluster/rm_partition_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -559,4 +559,60 @@ ss::future<abort_tx_reply> rm_partition_frontend::do_abort_tx(
});
}

ss::future<get_producers_reply>
rm_partition_frontend::get_producers_locally(get_producers_request request) {
get_producers_reply reply;
auto partition = _partition_manager.local().get(request.ntp);
if (!partition || !partition->is_leader()) {
reply.error_code = tx::errc::not_coordinator;
co_return reply;
}
reply.error_code = tx::errc::none;
auto stm = partition->raft()->stm_manager()->get<rm_stm>();
if (!stm) {
// maybe an internal (non data) partition
co_return reply;
}
const auto& producers = stm->get_producers();
reply.producer_count = producers.size();
for (const auto& [pid, state] : producers) {
producer_state_info producer_info;
producer_info.pid = state->id();
// fill in the idempotent producer state.
const auto& requests = state->idempotent_request_state();
for (const auto& request : requests.inflight_requests()) {
idempotent_request_info request_info;
request_info.first_sequence = request->first_sequence();
request_info.last_sequence = request->last_sequence();
request_info.term = request->term();
producer_info.inflight_requests.push_back(std::move(request_info));
}

for (const auto& request : requests.finished_requests()) {
idempotent_request_info request_info;
request_info.first_sequence = request->first_sequence();
request_info.last_sequence = request->last_sequence();
request_info.term = request->term();
producer_info.finished_requests.push_back(std::move(request_info));
}
producer_info.last_update = state->last_update_timestamp();

// Fill in transactional producer state, if any.
const auto& tx_state = state->transaction_state();
if (state->has_transaction_in_progress() && tx_state) {
producer_info.tx_begin_offset = tx_state->first;
producer_info.tx_end_offset = tx_state->last;
producer_info.tx_seq = tx_state->sequence;
producer_info.tx_timeout = tx_state->timeout;
producer_info.coordinator_partition
= tx_state->coordinator_partition;
}
reply.producers.push_back(std::move(producer_info));
if (reply.producers.size() > request.max_producers_to_include) {
break;
}
}
co_return reply;
}

} // namespace cluster
2 changes: 2 additions & 0 deletions src/v/cluster/rm_partition_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class rm_partition_frontend {
model::producer_identity,
model::tx_seq,
model::timeout_clock::duration);
ss::future<get_producers_reply>
get_producers_locally(get_producers_request);
ss::future<> stop() {
_as.request_abort();
return ss::make_ready_future<>();
Expand Down
5 changes: 3 additions & 2 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1666,7 +1666,7 @@ model::offset rm_stm::to_log_offset(kafka::offset k_offset) const {
return model::offset(k_offset);
}

ss::future<>
ss::future<raft::local_snapshot_applied>
rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
auto units = co_await _state_lock.hold_write_lock();

Expand All @@ -1689,7 +1689,7 @@ rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
data = co_await serde::read_async<tx_snapshot_v6>(data_parser);
} else {
vlog(_ctx_log.error, "Ignored snapshot version {}", hdr.version);
co_return;
co_return raft::local_snapshot_applied::no;
}

_highest_producer_id = std::max(
Expand Down Expand Up @@ -1774,6 +1774,7 @@ rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
snapshot_opt.value());
}
}
co_return raft::local_snapshot_applied::yes;
}

uint8_t rm_stm::active_snapshot_version() {
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class rm_stm final : public raft::persisted_stm<> {
tx::producer_ptr,
std::optional<model::tx_seq>,
model::timeout_clock::duration);
ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override;
ss::future<raft::stm_snapshot>
take_local_snapshot(ssx::semaphore_units apply_units) override;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ fragmented_vector<tx_metadata> tm_stm::get_transactions_list() const {
return ret;
}

ss::future<>
ss::future<raft::local_snapshot_applied>
tm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tm_ss_buf) {
vassert(
hdr.version >= tm_snapshot_v0::version
Expand All @@ -624,7 +624,7 @@ tm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tm_ss_buf) {
vlog(_ctx_log.trace, "Applied snapshot at offset: {}", hdr.offset);
}

return ss::now();
co_return raft::local_snapshot_applied::yes;
}

ss::future<raft::stm_snapshot>
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/tm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ class tm_stm final : public raft::persisted_stm<> {

private:
std::optional<tx_metadata> find_tx(const kafka::transactional_id&);
ss::future<>
ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override;
ss::future<raft::stm_snapshot>
take_local_snapshot(ssx::semaphore_units apply_units) override;
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/tx_gateway.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,10 @@ ss::future<find_coordinator_reply> tx_gateway::find_coordinator(
co_return co_await _tx_gateway_frontend.local().find_coordinator(r.tid);
}

ss::future<get_producers_reply> tx_gateway::get_producers(
get_producers_request request, rpc::streaming_context&) {
co_return co_await _tx_gateway_frontend.local().get_producers(
std::move(request));
}

} // namespace cluster
3 changes: 3 additions & 0 deletions src/v/cluster/tx_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class tx_gateway final : public tx_gateway_service {
ss::future<find_coordinator_reply> find_coordinator(
find_coordinator_request, rpc::streaming_context&) override;

ss::future<get_producers_reply>
get_producers(get_producers_request, rpc::streaming_context&) override;

private:
ss::sharded<cluster::tx_gateway_frontend>& _tx_gateway_frontend;
rm_group_proxy* _rm_group_proxy;
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/tx_gateway.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
"name": "find_coordinator",
"input_type": "find_coordinator_request",
"output_type": "find_coordinator_reply"
},
{
"name": "get_producers",
"input_type": "get_producers_request",
"output_type": "get_producers_reply"
}
]
}
Loading
Loading