From a98a5d3fb32d1022c5e40bb6971a3c2031222838 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 19 Dec 2024 22:57:45 -0800 Subject: [PATCH 01/20] tx/group: track begin offset of transactions (cherry picked from commit 89478480b40f34d4c694f673b4ab68df29a4d89e) --- src/v/kafka/server/group.cc | 9 ++++++--- src/v/kafka/server/group.h | 6 +++++- src/v/kafka/server/group_manager.cc | 2 +- src/v/kafka/server/group_recovery_consumer.cc | 6 ++++-- src/v/kafka/server/group_stm.cc | 4 +++- src/v/kafka/server/group_stm.h | 4 +++- 6 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index 9060897a5e367..a4c56810fbcf5 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -175,11 +175,13 @@ bool group::valid_previous_state(group_state s) const { group::ongoing_transaction::ongoing_transaction( model::tx_seq tx_seq, model::partition_id coordinator_partition, - model::timeout_clock::duration tx_timeout) + model::timeout_clock::duration tx_timeout, + model::offset begin_offset) : tx_seq(tx_seq) , coordinator_partition(coordinator_partition) , timeout(tx_timeout) - , last_update(model::timeout_clock::now()) {} + , last_update(model::timeout_clock::now()) + , begin_offset(begin_offset) {} group::tx_producer::tx_producer(model::producer_epoch epoch) : epoch(epoch) {} @@ -1878,7 +1880,8 @@ group::begin_tx(cluster::begin_group_tx_request r) { r.pid.get_id(), r.pid.get_epoch()); producer_it->second.epoch = r.pid.get_epoch(); producer_it->second.transaction = std::make_unique( - ongoing_transaction(r.tx_seq, r.tm_partition, r.timeout)); + ongoing_transaction( + r.tx_seq, r.tm_partition, r.timeout, result.value().last_offset)); try_arm(producer_it->second.transaction->deadline()); diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 432f85bcef0da..2da96be790bc4 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -166,7 +166,10 @@ class group final : public ss::enable_lw_shared_from_this { */ struct ongoing_transaction { ongoing_transaction( - model::tx_seq, model::partition_id, model::timeout_clock::duration); + model::tx_seq, + model::partition_id, + model::timeout_clock::duration, + model::offset); model::tx_seq tx_seq; model::partition_id coordinator_partition; @@ -175,6 +178,7 @@ class group final : public ss::enable_lw_shared_from_this { model::timeout_clock::time_point last_update; bool is_expiration_requested{false}; + model::offset begin_offset{-1}; model::timeout_clock::time_point deadline() const { return last_update + timeout; diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 8c3469dac357b..63ed6c9ecf15f 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -978,7 +978,7 @@ ss::future<> group_manager::do_recover_group( if (session.tx) { auto& tx = *session.tx; group::ongoing_transaction group_tx( - tx.tx_seq, tx.tm_partition, tx.timeout); + tx.tx_seq, tx.tm_partition, tx.timeout, tx.begin_offset); for (auto& [tp, o_md] : tx.offsets) { group_tx.offsets[tp] = group::pending_tx_offset{ .offset_metadata = group_tx::partition_offset{ diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index 8f1473748f1a3..b40bb1d4ffcb6 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -71,7 +71,8 @@ ss::future<> group_recovery_consumer::handle_fence_v1( pid.get_epoch(), data.tx_seq, data.transaction_timeout_ms, - model::partition_id(0)); + model::partition_id(0), + header.base_offset); co_return; } @@ -92,7 +93,8 @@ ss::future<> group_recovery_consumer::handle_fence( pid.get_epoch(), data.tx_seq, data.transaction_timeout_ms, - data.tm_partition); + data.tm_partition, + header.base_offset); co_return; } diff --git a/src/v/kafka/server/group_stm.cc b/src/v/kafka/server/group_stm.cc index 983974a56bf71..75ad1fce7a93c 100644 --- a/src/v/kafka/server/group_stm.cc +++ b/src/v/kafka/server/group_stm.cc @@ -104,7 +104,8 @@ void group_stm::try_set_fence( model::producer_epoch epoch, model::tx_seq txseq, model::timeout_clock::duration transaction_timeout_ms, - model::partition_id tm_partition) { + model::partition_id tm_partition, + model::offset fence_offset) { auto [it, _] = _producers.try_emplace(id, epoch); if (it->second.epoch <= epoch) { it->second.epoch = epoch; @@ -112,6 +113,7 @@ void group_stm::try_set_fence( .tx_seq = txseq, .tm_partition = tm_partition, .timeout = transaction_timeout_ms, + .begin_offset = fence_offset, .offsets = {}, }); } diff --git a/src/v/kafka/server/group_stm.h b/src/v/kafka/server/group_stm.h index d9a99c136c952..b53bc0bbe8cb9 100644 --- a/src/v/kafka/server/group_stm.h +++ b/src/v/kafka/server/group_stm.h @@ -35,6 +35,7 @@ class group_stm { model::tx_seq tx_seq; model::partition_id tm_partition; model::timeout_clock::duration timeout; + model::offset begin_offset{-1}; chunked_hash_map offsets; }; @@ -57,7 +58,8 @@ class group_stm { model::producer_epoch epoch, model::tx_seq txseq, model::timeout_clock::duration transaction_timeout_ms, - model::partition_id tm_partition); + model::partition_id tm_partition, + model::offset fence_offset); bool has_data() const { return !_is_removed && (_is_loaded || _offsets.size() > 0); From e71e1f749c3a3bdacf753ed6b64296fb83473ffd Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 23 Dec 2024 16:01:10 -0800 Subject: [PATCH 02/20] tx/group: support describe_producers for group (cherry picked from commit cdd274a2a53100d64b16d57f7d713b003127fbf6) --- src/v/kafka/server/group.h | 5 +- src/v/kafka/server/group_manager.cc | 59 +++++++++++++++++++ src/v/kafka/server/group_manager.h | 4 ++ .../server/handlers/describe_producers.cc | 15 ++++- 4 files changed, 79 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 2da96be790bc4..7837f6b629a12 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -202,6 +202,8 @@ class group final : public ss::enable_lw_shared_from_this { std::unique_ptr transaction; }; + using producers_map = chunked_hash_map; + struct offset_metadata { model::offset log_offset; model::offset offset; @@ -659,6 +661,8 @@ class group final : public ss::enable_lw_shared_from_this { } } + const producers_map& producers() const { return _producers; } + // helper for the kafka api: describe groups described_group describe() const; @@ -715,7 +719,6 @@ class group final : public ss::enable_lw_shared_from_this { private: using member_map = absl::node_hash_map; using protocol_support = absl::node_hash_map; - using producers_map = chunked_hash_map; friend std::ostream& operator<<(std::ostream&, const group&); diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 63ed6c9ecf15f..72848c754b6ba 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -29,6 +29,7 @@ #include "kafka/server/group.h" #include "kafka/server/group_metadata.h" #include "kafka/server/group_recovery_consumer.h" +#include "kafka/server/group_tx_tracker_stm.h" #include "kafka/server/logger.h" #include "model/fundamental.h" #include "model/namespace.h" @@ -1620,6 +1621,64 @@ group_manager::describe_group(const model::ntp& ntp, const kafka::group_id& g) { return group->describe(); } +group_manager::partition_producers +group_manager::describe_partition_producers(const model::ntp& ntp) { + vlog(klog.debug, "describe producers: {}", ntp); + partition_producers response; + response.partition_index = ntp.tp.partition; + auto it = _partitions.find(ntp); + if (it == _partitions.end() || !it->second->partition->is_leader()) { + response.error_code = error_code::not_leader_for_partition; + return response; + } + response.error_code = kafka::error_code::none; + // snapshot the list of groups attached to this partition + chunked_vector> groups; + std::copy_if( + _groups.begin(), + _groups.end(), + std::back_inserter(groups), + [&ntp](const auto& g_pair) { + const auto& [group_id, group] = g_pair; + return group->partition()->ntp() == ntp; + }); + for (auto& [gid, group] : groups) { + if (group->in_state(group_state::dead)) { + continue; + } + auto partition = group->partition(); + if (!partition) { + // unlikely, conservative check + continue; + } + for (const auto& [id, state] : group->producers()) { + auto& tx = state.transaction; + int64_t start_offset = -1; + if (tx && tx->begin_offset >= model::offset{0}) { + start_offset = partition->get_offset_translator_state() + ->from_log_offset(tx->begin_offset); + } + int64_t last_timetamp = -1; + if (tx) { + auto time_since_last_update = model::timeout_clock::now() + - tx->last_update; + auto last_update_ts + = (model::timestamp_clock::now() - time_since_last_update); + last_timetamp = last_update_ts.time_since_epoch() / 1ms; + } + response.active_producers.push_back({ + .producer_id = id, + .producer_epoch = state.epoch, + .last_sequence = tx ? tx->tx_seq : -1, + .last_timestamp = last_timetamp, + .coordinator_epoch = -1, + .current_txn_start_offset = start_offset, + }); + } + } + return response; +} + ss::future> group_manager::delete_groups( std::vector> groups) { std::vector results; diff --git a/src/v/kafka/server/group_manager.h b/src/v/kafka/server/group_manager.h index ebfdb3be1d503..64b45779af3ad 100644 --- a/src/v/kafka/server/group_manager.h +++ b/src/v/kafka/server/group_manager.h @@ -26,6 +26,7 @@ #include "kafka/protocol/offset_delete.h" #include "kafka/protocol/offset_fetch.h" #include "kafka/protocol/schemata/delete_groups_response.h" +#include "kafka/protocol/schemata/describe_producers_response.h" #include "kafka/protocol/schemata/list_groups_response.h" #include "kafka/protocol/sync_group.h" #include "kafka/protocol/txn_offset_commit.h" @@ -184,6 +185,9 @@ class group_manager { described_group describe_group(const model::ntp&, const kafka::group_id&); + using partition_producers = partition_response; + partition_response describe_partition_producers(const model::ntp&); + ss::future> delete_groups(std::vector>); diff --git a/src/v/kafka/server/handlers/describe_producers.cc b/src/v/kafka/server/handlers/describe_producers.cc index c03f64bfd0f78..9a087551bdae6 100644 --- a/src/v/kafka/server/handlers/describe_producers.cc +++ b/src/v/kafka/server/handlers/describe_producers.cc @@ -17,6 +17,8 @@ #include "kafka/protocol/errors.h" #include "kafka/protocol/produce.h" #include "kafka/protocol/schemata/describe_producers_response.h" +#include "kafka/server/group_manager.h" +#include "kafka/server/group_router.h" #include "kafka/server/handlers/details/security.h" #include "kafka/server/request_context.h" #include "kafka/server/response.h" @@ -41,8 +43,8 @@ make_error_response(model::partition_id id, kafka::error_code ec) { return partition_response{.partition_index = id, .error_code = ec}; } -partition_response -do_get_producers_for_partition(cluster::partition_manager& pm, model::ktp ntp) { +partition_response do_get_producers_for_data_partition( + cluster::partition_manager& pm, model::ktp ntp) { auto partition = pm.get(ntp); if (!partition || !partition->is_leader()) { return make_error_response( @@ -97,9 +99,16 @@ get_producers_for_partition(request_context& ctx, model::ktp ntp) { ntp.get_partition(), kafka::error_code::not_leader_for_partition); } + if (ntp.get_topic() == model::kafka_consumer_offsets_topic) { + co_return co_await ctx.groups().get_group_manager().invoke_on( + *shard, [ktp = std::move(ntp)](kafka::group_manager& gm) mutable { + return gm.describe_partition_producers(ktp.to_ntp()); + }); + } + co_return co_await ctx.partition_manager().invoke_on( *shard, [ntp = std::move(ntp)](cluster::partition_manager& pm) mutable { - return do_get_producers_for_partition(pm, std::move(ntp)); + return do_get_producers_for_data_partition(pm, std::move(ntp)); }); } From 5dab07b2cf0dd8044892610cbf4a08453a176f41 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Fri, 20 Dec 2024 01:15:49 -0800 Subject: [PATCH 03/20] tx/tests/dt: test for describe producers (cherry picked from commit 24c8e89f4f8e857840a326c7ba7a0e46679977bb) --- tests/rptest/clients/kafka_cli_tools.py | 2 +- tests/rptest/tests/describe_producers_test.py | 63 +++++++++++++++---- 2 files changed, 53 insertions(+), 12 deletions(-) diff --git a/tests/rptest/clients/kafka_cli_tools.py b/tests/rptest/clients/kafka_cli_tools.py index e76fa02f95803..e4810c52e19c7 100644 --- a/tests/rptest/clients/kafka_cli_tools.py +++ b/tests/rptest/clients/kafka_cli_tools.py @@ -478,7 +478,7 @@ def describe_producers(self, topic: str, partition: int): split_str = res.split("\n") info_str = split_str[0] info_key = info_str.strip().split("\t") - assert info_key == expected_columns, f"{info_key}" + assert info_key == expected_columns, f"{info_key} vs {expected_columns}" assert split_str[-1] == "" diff --git a/tests/rptest/tests/describe_producers_test.py b/tests/rptest/tests/describe_producers_test.py index ff5113c451860..9c75005ddfad1 100644 --- a/tests/rptest/tests/describe_producers_test.py +++ b/tests/rptest/tests/describe_producers_test.py @@ -8,6 +8,7 @@ # by the Apache License, Version 2.0 import random +import string import time from rptest.services.cluster import cluster from rptest.clients.types import TopicSpec @@ -18,6 +19,8 @@ from rptest.clients.rpk import RpkTool from rptest.clients.rpk import parse_rpk_table from rptest.util import wait_until_result +from ducktape.mark import matrix +from confluent_kafka import TopicPartition class DescribeProducersTest(RedpandaTest): @@ -31,23 +34,49 @@ def __init__(self, test_context): self.kafka_cli = KafkaCliTools(self.redpanda, "3.0.0") - def _describe_all_producers(self): + def _describe_all_producers(self, include_group_partitions: bool): all_producers = [] for topic in self.topics: for partition in range(topic.partition_count): producers = self.kafka_cli.describe_producers( topic.name, partition) all_producers += producers + if include_group_partitions: + for partition in range(0, 16): + all_producers += self.kafka_cli.describe_producers( + "__consumer_offsets", partition) return all_producers def _check_timestamp(self, producer_desc, range_start, range_end): + ts = int(producer_desc['LastTimestamp']) # convert to python representation of epoch - ts = int(producer_desc['LastTimestamp']) / 1000.0 - assert ts >= range_start and ts <= range_end, \ + ts_epoch = ts / 1000.0 + assert ts == -1 or (ts_epoch >= range_start and ts_epoch <= range_end), \ f"Producer timestamp must correspond to system clock. Returned timestamp: {ts}, range: [{range_start}, {range_end}]" + def _random_group_name(self): + return ''.join( + random.choice(string.ascii_uppercase) for _ in range(16)) + @cluster(num_nodes=3) - def test_describe_producer_with_tx(self): + @matrix(include_group_tx=[True, False]) + def test_describe_producer_with_tx(self, include_group_tx): + + consumers = [] + if include_group_tx: + for _ in range(5): + c = ck.Consumer({ + 'bootstrap.servers': self.redpanda.brokers(), + 'group.id': self._random_group_name(), + 'auto.offset.reset': 'earliest', + 'enable.auto.commit': False, + 'max.poll.interval.ms': 10000, + 'session.timeout.ms': 8000 + }) + c.subscribe([topic.name for topic in self.topics]) + c.consume(1, 1) + consumers.append(c) + before = time.time() producer_count = 20 producers = [] @@ -59,7 +88,7 @@ def test_describe_producer_with_tx(self): producer.init_transactions() producers.append(producer) - all_producers_desc = self._describe_all_producers() + all_producers_desc = self._describe_all_producers(include_group_tx) assert len(all_producers_desc ) == 0, "Before producing data producers should be empty" @@ -69,11 +98,23 @@ def test_describe_producer_with_tx(self): producer.produce(self.topics[idx % len(self.topics)].name, f'key-{idx}', f'value-{idx}', idx % self.partition_count) + + if include_group_tx: + assert consumers + c = random.choice(consumers) + dummy = [ + TopicPartition(topic.name, 0, 100) for topic in self.topics + ] + producer.send_offsets_to_transaction( + dummy, c.consumer_group_metadata()) + producer.flush() + expected_producer_count = 2 * producer_count if include_group_tx else producer_count + def _all_producers(): - all = self._describe_all_producers() - if len(all) == producer_count: + all = self._describe_all_producers(include_group_tx) + if len(all) == expected_producer_count: return (True, all) return (False, None) @@ -82,7 +123,7 @@ def _all_producers(): after = time.time() assert len( all_producers_desc - ) == producer_count, f"Unexpected size of producers list, expected: {producer_count}, current: {len(all_producers_desc)}" + ) == expected_producer_count, f"Unexpected size of producers list, expected: {expected_producer_count}, current: {len(all_producers_desc)}" for p in all_producers_desc: self.logger.info(f"producer state with transaction ongoing: {p}") self._check_timestamp(p, before, after) @@ -95,11 +136,11 @@ def _all_producers(): producer.commit_transaction() producer.flush() - all_producers_desc = self._describe_all_producers() + all_producers_desc = self._describe_all_producers(include_group_tx) after = time.time() assert len( all_producers_desc - ) == producer_count, f"Unexpected size of producers list, expected: {producer_count}, current: {len(all_producers_desc)}" + ) == expected_producer_count, f"Unexpected size of producers list, expected: {expected_producer_count}, current: {len(all_producers_desc)}" for p in all_producers_desc: self.logger.info(f"producer state with transaction committed: {p}") self._check_timestamp(p, before, after) @@ -118,7 +159,7 @@ def test_describe_idempotent_producers(self): "test-msg", partition=i % self.partition_count) - all_producers = self._describe_all_producers() + all_producers = self._describe_all_producers(False) after = time.time() assert len( From 266e86e596763b6971239b457b8157339316ebcb Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Fri, 20 Dec 2024 03:40:46 -0800 Subject: [PATCH 04/20] tx/groups: escape hatch for unsafe aborting of group transactions This is unsafe because it does not do any required checks to see if a particular transaction is in progress and is a candidate for abort. For example if a transaction is committed by the coordinator and pending commit on the group, using this escape hatch to abort the transaction can cause correctness issues. To be used with caution as an escape hatch for aborting transactions that the group has lost track of are ok to be aborted. This situation usually is indicative of a bug in the transaction implementation. (cherry picked from commit 8c5ecca7b592d4ddc9b1f2c61a8ebc89d153604f) --- src/v/cluster/tx_gateway_frontend.cc | 19 ++++ src/v/cluster/tx_gateway_frontend.h | 14 +++ src/v/redpanda/admin/api-doc/transaction.json | 40 +++++++++ src/v/redpanda/admin/server.h | 2 + src/v/redpanda/admin/transaction.cc | 82 ++++++++++++++++++ tests/rptest/services/admin.py | 12 +++ .../rptest/transactions/transactions_test.py | 86 +++++++++++++++++++ 7 files changed, 255 insertions(+) diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index 7771413e86f04..f56bf38a8749f 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -2961,4 +2961,23 @@ ss::future tx_gateway_frontend::do_delete_partition_from_tx( co_return tx::errc::none; } +ss::future tx_gateway_frontend::unsafe_abort_group_transaction( + kafka::group_id group, + model::producer_identity pid, + model::tx_seq tx_seq, + model::timeout_clock::duration timeout) { + auto holder = _gate.hold(); + vlog( + txlog.warn, + "Issuing an unsafe abort of group transaction, group: {}, pid: {}, seq: " + "{}, timeout: {}", + group, + pid, + tx_seq, + timeout); + auto result = co_await _rm_group_proxy->abort_group_tx( + std::move(group), pid, tx_seq, timeout); + co_return result.ec; +} + } // namespace cluster diff --git a/src/v/cluster/tx_gateway_frontend.h b/src/v/cluster/tx_gateway_frontend.h index ff1ee0931cbf3..0962bd1fd11ca 100644 --- a/src/v/cluster/tx_gateway_frontend.h +++ b/src/v/cluster/tx_gateway_frontend.h @@ -72,6 +72,20 @@ class tx_gateway_frontend final ss::future find_coordinator(kafka::transactional_id); + // This is unsafe because it does not do any required checks to see + // if a particular transaction is in progress and is a candidate for abort. + // For example if a transaction is committed by the coordinator and pending + // commit on the group, using this escape hatch to abort the transaction + // can cause correctness issues. To be used with caution as an escape hatch + // for aborting transactions that the group has lost track of are ok to + // be aborted. This situation usually is indicative of a bug in the + // implementation. + ss::future unsafe_abort_group_transaction( + kafka::group_id, + model::producer_identity, + model::tx_seq, + model::timeout_clock::duration); + ss::future<> stop(); private: diff --git a/src/v/redpanda/admin/api-doc/transaction.json b/src/v/redpanda/admin/api-doc/transaction.json index b1ae401f3a8ee..10850e31fb24d 100644 --- a/src/v/redpanda/admin/api-doc/transaction.json +++ b/src/v/redpanda/admin/api-doc/transaction.json @@ -92,6 +92,46 @@ ] } ] + }, + { + "path": "/v1/transaction/unsafe_abort_group_transaction/{group_id}", + "operations": [ + { + "method": "POST", + "summary": "Unsafely abort a transaction from a group, only for debugging", + "type": "void", + "nickname": "unsafe_abort_group_transaction", + "produces": [ + "application/json" + ], + "parameters": [ + { + "name": "group_id", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "producer_id", + "in": "query", + "required": true, + "type": "integer" + }, + { + "name": "producer_epoch", + "in": "query", + "required": true, + "type": "integer" + }, + { + "name": "sequence", + "in": "query", + "required": true, + "type": "integer" + } + ] + } + ] } ], "models": { diff --git a/src/v/redpanda/admin/server.h b/src/v/redpanda/admin/server.h index 1be803acb242d..24ed095c8b90f 100644 --- a/src/v/redpanda/admin/server.h +++ b/src/v/redpanda/admin/server.h @@ -550,6 +550,8 @@ class admin_server { delete_partition_handler(std::unique_ptr); ss::future find_tx_coordinator_handler(std::unique_ptr); + ss::future + unsafe_abort_group_transaction(std::unique_ptr); /// Cluster routes ss::future diff --git a/src/v/redpanda/admin/transaction.cc b/src/v/redpanda/admin/transaction.cc index fd0079fa3e9c3..af282290729ef 100644 --- a/src/v/redpanda/admin/transaction.cc +++ b/src/v/redpanda/admin/transaction.cc @@ -11,6 +11,8 @@ #include "cluster/partition_manager.h" #include "cluster/tx_gateway_frontend.h" #include "container/lw_shared_container.h" +#include "kafka/server/coordinator_ntp_mapper.h" +#include "kafka/server/server.h" #include "redpanda/admin/api-doc/transaction.json.hh" #include "redpanda/admin/server.h" #include "redpanda/admin/util.h" @@ -36,6 +38,12 @@ void admin_server::register_transaction_routes() { [this](std::unique_ptr req) { return find_tx_coordinator_handler(std::move(req)); }); + + register_route( + ss::httpd::transaction_json::unsafe_abort_group_transaction, + [this](std::unique_ptr req) { + return unsafe_abort_group_transaction(std::move(req)); + }); } ss::future @@ -213,3 +221,77 @@ admin_server::delete_partition_handler(std::unique_ptr req) { co_await throw_on_error(*req, res, ntp); co_return ss::json::json_return_type(ss::json::json_void()); } + +ss::future +admin_server::unsafe_abort_group_transaction( + std::unique_ptr request) { + if (!_tx_gateway_frontend.local_is_initialized()) { + throw ss::httpd::bad_request_exception("Transaction are disabled"); + } + + auto group_id = request->get_path_param("group_id"); + auto pid_str = request->get_query_param("producer_id"); + auto epoch_str = request->get_query_param("producer_epoch"); + auto sequence_str = request->get_query_param("sequence"); + + if (group_id.empty()) { + throw ss::httpd::not_found_exception("group_id cannot be empty"); + } + + if (pid_str.empty() || epoch_str.empty() || sequence_str.empty()) { + throw ss::httpd::bad_param_exception(fmt::format( + "invalid producer_id({})/epoch({})/sequence({}), should be integers " + ">= 0", + pid_str, + epoch_str, + sequence_str)); + } + + std::optional pid; + try { + auto parsed_pid = boost::lexical_cast( + pid_str); + pid = model::producer_id{parsed_pid}; + } catch (const boost::bad_lexical_cast& e) { + throw ss::httpd::bad_param_exception( + fmt::format("invalid producer_id: {}, should be >= 0", pid_str)); + } + + std::optional epoch; + try { + auto parsed_epoch = boost::lexical_cast( + epoch_str); + epoch = model::producer_epoch{parsed_epoch}; + } catch (const boost::bad_lexical_cast& e) { + throw ss::httpd::bad_param_exception( + fmt::format("invalid producer_epoch {}, should be >= 0", epoch_str)); + } + + std::optional seq; + try { + auto parsed_seq = boost::lexical_cast( + sequence_str); + seq = model::tx_seq{parsed_seq}; + } catch (const boost::bad_lexical_cast& e) { + throw ss::httpd::bad_param_exception(fmt::format( + "invalid transaction sequence {}, should be >= 0", sequence_str)); + } + + auto& mapper = _kafka_server.local().coordinator_mapper(); + auto kafka_gid = kafka::group_id{group_id}; + auto group_ntp = mapper.ntp_for(kafka::group_id{group_id}); + if (!group_ntp) { + throw ss::httpd::server_error_exception( + "consumer_offsets topic not found"); + } + + auto result + = co_await _tx_gateway_frontend.local().unsafe_abort_group_transaction( + std::move(kafka_gid), + model::producer_identity{pid.value(), epoch.value()}, + seq.value(), + 5s); + + co_await throw_on_error(*request, result, group_ntp.value()); + co_return ss::json::json_return_type(ss::json::json_void()); +} diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index d41fa1f510efb..f42b21dd5b23d 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -1709,3 +1709,15 @@ def delete_data_migration(self, path = f"migrations/{migration_id}" return self._request("DELETE", path, node=node) + + def unsafe_abort_group_transaction(self, group_id: str, *, pid: int, + epoch: int, sequence: int): + params = { + "producer_id": pid, + "producer_epoch": epoch, + "sequence": sequence, + } + params = "&".join([f"{k}={v}" for k, v in params.items()]) + return self._request( + 'POST', + f"transaction/unsafe_abort_group_transaction/{group_id}?{params}") diff --git a/tests/rptest/transactions/transactions_test.py b/tests/rptest/transactions/transactions_test.py index da8bb7b383480..65445db014694 100644 --- a/tests/rptest/transactions/transactions_test.py +++ b/tests/rptest/transactions/transactions_test.py @@ -918,6 +918,92 @@ def _produce_one(producer, idx): assert num_consumed == expected_records + @cluster(num_nodes=3) + def unsafe_abort_group_transaction_test(self): + def random_group_name(): + return ''.join( + random.choice(string.ascii_uppercase) for _ in range(16)) + + def wait_for_active_producers(count: int): + def describe_active_producers(): + active_producers = [] + for partition in range(0, 16): + desc = self.kafka_cli.describe_producers( + "__consumer_offsets", partition) + for producer in desc: + tx_start_offset = producer[ + 'CurrentTransactionStartOffset'] + if 'None' in tx_start_offset: + continue + if int(tx_start_offset) >= 0: + active_producers.append(producer) + return active_producers + + wait_until( + lambda: len(describe_active_producers()) == count, + timeout_sec=30, + backoff_sec=1, + err_msg=f"Timed out waiting for producer count to reach {count}" + ) + + group_name = random_group_name() + input_records = 10 + self.generate_data(self.input_t, input_records) + + # setup consumer offsets + rpk = RpkTool(self.redpanda) + rpk.consume(topic=self.input_t.name, n=1, group="test-group") + + wait_for_active_producers(0) + + # Setup a consumer to consume from ^^ topic and + # produce to a target topic. + producer_conf = { + 'bootstrap.servers': self.redpanda.brokers(), + 'transactional.id': 'test-repro', + # Large-ish timeout + 'transaction.timeout.ms': 300000, + } + producer = ck.Producer(producer_conf) + consumer_conf = { + 'bootstrap.servers': self.redpanda.brokers(), + 'group.id': group_name, + 'auto.offset.reset': 'earliest', + 'enable.auto.commit': False, + } + consumer = ck.Consumer(consumer_conf) + consumer.subscribe([self.input_t]) + + # Consume - identity transform - produce + producer.init_transactions() + _ = self.consume(consumer) + # Start a transaction and flush some offsets + producer.begin_transaction() + producer.send_offsets_to_transaction( + consumer.position(consumer.assignment()), + consumer.consumer_group_metadata()) + producer.flush() + + wait_until(lambda: len(self.admin.get_all_transactions()) == 1, + timeout_sec=30, + backoff_sec=1, + err_msg="Timed out waiting for transaction to appear") + + wait_for_active_producers(1) + + self.admin.unsafe_abort_group_transaction(group_id=group_name, + pid=1, + epoch=0, + sequence=0) + wait_for_active_producers(0) + producer.commit_transaction() + + wait_until( + lambda: self.no_running_transactions(), + timeout_sec=30, + backoff_sec=1, + err_msg="Timed out waiting for running transactions to wind down.") + class TransactionsStreamsTest(RedpandaTest, TransactionsMixin): topics = (TopicSpec(partition_count=1, replication_factor=3), From e0bf89fad5c4546c4c241cfbb4233b4b6a20465c Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 23 Dec 2024 16:36:40 -0800 Subject: [PATCH 05/20] group_tx_tracker/stm: plumb feature_table into the stm (cherry picked from commit 3736f00b6a4879166b49ccfba64509ab5571989c) --- src/v/kafka/server/group_tx_tracker_stm.cc | 15 ++++++++++++--- src/v/kafka/server/group_tx_tracker_stm.h | 11 +++++++++-- .../server/tests/consumer_group_recovery_test.cc | 2 +- src/v/redpanda/application.cc | 3 ++- 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/v/kafka/server/group_tx_tracker_stm.cc b/src/v/kafka/server/group_tx_tracker_stm.cc index d8954fdaf19e6..cba381b9db6de 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.cc +++ b/src/v/kafka/server/group_tx_tracker_stm.cc @@ -14,9 +14,12 @@ namespace kafka { group_tx_tracker_stm::group_tx_tracker_stm( - ss::logger& logger, raft::consensus* raft) + ss::logger& logger, + raft::consensus* raft, + ss::sharded& feature_table) : raft::persisted_stm<>("group_tx_tracker_stm.snapshot", logger, raft) - , group_data_parser() {} + , group_data_parser() + , _feature_table(feature_table) {} void group_tx_tracker_stm::maybe_add_tx_begin_offset( kafka::group_id group, model::producer_identity pid, model::offset offset) { @@ -166,9 +169,15 @@ bool group_tx_tracker_stm_factory::is_applicable_for( return ntp.ns == model::kafka_consumer_offsets_nt.ns && ntp.tp.topic == model::kafka_consumer_offsets_nt.tp; } + +group_tx_tracker_stm_factory::group_tx_tracker_stm_factory( + ss::sharded& feature_table) + : _feature_table(feature_table) {} + void group_tx_tracker_stm_factory::create( raft::state_machine_manager_builder& builder, raft::consensus* raft) { - auto stm = builder.create_stm(klog, raft); + auto stm = builder.create_stm( + klog, raft, _feature_table); raft->log()->stm_manager()->add_stm(stm); } diff --git a/src/v/kafka/server/group_tx_tracker_stm.h b/src/v/kafka/server/group_tx_tracker_stm.h index efb54034cfe0e..130bfc0044077 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.h +++ b/src/v/kafka/server/group_tx_tracker_stm.h @@ -24,7 +24,8 @@ class group_tx_tracker_stm final public: static constexpr std::string_view name = "group_tx_tracker_stm"; - group_tx_tracker_stm(ss::logger&, raft::consensus*); + group_tx_tracker_stm( + ss::logger&, raft::consensus*, ss::sharded&); storage::stm_type type() override { return storage::stm_type::consumer_offsets_transactional; @@ -107,13 +108,19 @@ class group_tx_tracker_stm final void maybe_end_tx(kafka::group_id, model::producer_identity); all_txs_t _all_txs; + + ss::sharded& _feature_table; }; class group_tx_tracker_stm_factory : public cluster::state_machine_factory { public: - group_tx_tracker_stm_factory() = default; + explicit group_tx_tracker_stm_factory( + ss::sharded&); bool is_applicable_for(const storage::ntp_config&) const final; void create(raft::state_machine_manager_builder&, raft::consensus*) final; + +private: + ss::sharded& _feature_table; }; } // namespace kafka diff --git a/src/v/kafka/server/tests/consumer_group_recovery_test.cc b/src/v/kafka/server/tests/consumer_group_recovery_test.cc index a934381ec884e..29fc4e102a69e 100644 --- a/src/v/kafka/server/tests/consumer_group_recovery_test.cc +++ b/src/v/kafka/server/tests/consumer_group_recovery_test.cc @@ -209,7 +209,7 @@ struct cg_recovery_test_fixture : seastar_test { model::record_batch make_tx_fence_batch( const model::producer_identity& pid, group_tx::fence_metadata cmd) { return make_tx_batch( - model::record_batch_type::tx_fence, + model::record_batch_type::group_fence_tx, group::fence_control_record_version, pid, std::move(cmd)); diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 10e9464440758..ea4b2b81bf310 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -2748,7 +2748,8 @@ void application::start_runtime_services( cloud_storage_api, feature_table, controller->get_topics_state()); - pm.register_factory(); + pm.register_factory( + feature_table); }) .get(); partition_manager.invoke_on_all(&cluster::partition_manager::start).get(); From 6692d1b5491893597e556e41d7ebea6cd83c3142 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 2 Jan 2025 09:40:05 -0800 Subject: [PATCH 06/20] k/group_data_parser: reduce ignored batch logging to debug (cherry picked from commit 835f3fce5dbb78e91ef081c8ac2a470fc1312f3f) --- src/v/kafka/server/group_data_parser.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/kafka/server/group_data_parser.h b/src/v/kafka/server/group_data_parser.h index 753cf42cca657..cf596a3bbf304 100644 --- a/src/v/kafka/server/group_data_parser.h +++ b/src/v/kafka/server/group_data_parser.h @@ -111,7 +111,7 @@ class group_data_parser { std::move(b)); return handle_version_fence(fence); } - vlog(klog.warn, "ignoring batch with type: {}", b.header().type); + vlog(klog.debug, "ignoring batch with type: {}", b.header().type); return ss::make_ready_future<>(); } From 5f37e76808d14f38688f219c5c33c73d9a15eabe Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 23 Dec 2024 16:37:02 -0800 Subject: [PATCH 07/20] tx/group/group_tx_tracker: tighten up computation of collectible_offsset Consider group_metadata to determine if a group transaction should be considered open. Eg: if a group if tombstoned, any transaction corresponding to the group is ignored. This invariant is also held in the actual group stm to ensure groups are not tombstoned before any pending transactions are cleaned up (cherry picked from commit 9eee6326c5324009713bf582de4a1e04fe4e1a0d) --- src/v/kafka/server/group_stm.cc | 4 +- src/v/kafka/server/group_tx_tracker_stm.cc | 59 +++++++++++------ src/v/kafka/server/group_tx_tracker_stm.h | 3 + .../server/tests/group_tx_compaction_test.cc | 65 +++++++++++++++++-- 4 files changed, 103 insertions(+), 28 deletions(-) diff --git a/src/v/kafka/server/group_stm.cc b/src/v/kafka/server/group_stm.cc index 75ad1fce7a93c..257c9ec1ed556 100644 --- a/src/v/kafka/server/group_stm.cc +++ b/src/v/kafka/server/group_stm.cc @@ -31,7 +31,7 @@ void group_stm::update_tx_offset( it == _producers.end() || it->second.tx == nullptr || offset_md.pid.epoch != it->second.epoch) { vlog( - cluster::txlog.warn, + cluster::txlog.debug, "producer {} not found, skipping offsets update", offset_md.pid); return; @@ -58,7 +58,7 @@ void group_stm::commit(model::producer_identity pid) { || pid.epoch != it->second.epoch) { // missing prepare may happen when the consumer log gets truncated vlog( - cluster::txlog.warn, + cluster::txlog.debug, "unable to find ongoing transaction for producer: {}, skipping " "commit", pid); diff --git a/src/v/kafka/server/group_tx_tracker_stm.cc b/src/v/kafka/server/group_tx_tracker_stm.cc index cba381b9db6de..7825c928b79de 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.cc +++ b/src/v/kafka/server/group_tx_tracker_stm.cc @@ -19,15 +19,17 @@ group_tx_tracker_stm::group_tx_tracker_stm( ss::sharded& feature_table) : raft::persisted_stm<>("group_tx_tracker_stm.snapshot", logger, raft) , group_data_parser() - , _feature_table(feature_table) {} + , _feature_table(feature_table) + , _serializer(make_consumer_offsets_serializer()) {} void group_tx_tracker_stm::maybe_add_tx_begin_offset( kafka::group_id group, model::producer_identity pid, model::offset offset) { - auto [it, inserted] = _all_txs.try_emplace(group, pid, offset); - if (!inserted) { - // group already exists - it->second.maybe_add_tx_begin(pid, offset); + auto it = _all_txs.find(group); + if (it == _all_txs.end()) { + vlog(klog.debug, "[{}] group not found, ignoring fence", group); + return; } + it->second.maybe_add_tx_begin(pid, offset); } void group_tx_tracker_stm::maybe_end_tx( @@ -43,9 +45,6 @@ void group_tx_tracker_stm::maybe_end_tx( } group_data.begin_offsets.erase(p_it->second); group_data.producer_to_begin.erase(p_it); - if (group_data.producer_to_begin.empty()) { - _all_txs.erase(group); - } } ss::future<> group_tx_tracker_stm::do_apply(const model::record_batch& b) { @@ -97,20 +96,42 @@ ss::future group_tx_tracker_stm::take_snapshot(model::offset) { return ss::make_ready_future(iobuf()); } -ss::future<> group_tx_tracker_stm::handle_raft_data(model::record_batch) { - // No transactional data, ignore - return ss::now(); +ss::future<> group_tx_tracker_stm::handle_raft_data(model::record_batch batch) { + co_await model::for_each_record(batch, [this](model::record& r) { + auto record_type = _serializer.get_metadata_type(r.key().copy()); + switch (record_type) { + case offset_commit: + case noop: + return; + case group_metadata: + handle_group_metadata( + _serializer.decode_group_metadata(std::move(r))); + return; + } + __builtin_unreachable(); + }); +} + +void group_tx_tracker_stm::handle_group_metadata(group_metadata_kv md) { + if (md.value) { + vlog(klog.trace, "[group: {}] update", md.key.group_id); + // A group may checkpoint periodically as the member's state changes, + // here we retain the group state if the group already exists. + _all_txs.try_emplace(md.key.group_id, per_group_state{}); + } else { + vlog(klog.trace, "[group: {}] tombstone", md.key.group_id); + // A tombstone indicates all the group state can be purged and + // any transactions can be ignored. Although care must be taken + // to ensure there are no open transactions before tombstoning + // a group in the main state machine. + _all_txs.erase(md.key.group_id); + } } ss::future<> group_tx_tracker_stm::handle_tx_offsets( - model::record_batch_header header, kafka::group_tx::offsets_metadata data) { - // in case the fence got truncated, try to start the transaction from - // this point on. This is not possible today but may help if delete - // retention is implemented for consumer topics. - maybe_add_tx_begin_offset( - std::move(data.group_id), - model::producer_identity{header.producer_id, header.producer_epoch}, - header.base_offset); + model::record_batch_header, kafka::group_tx::offsets_metadata) { + // Transaction boundaries are determined by fence/commit or abort + // batches return ss::now(); } diff --git a/src/v/kafka/server/group_tx_tracker_stm.h b/src/v/kafka/server/group_tx_tracker_stm.h index 130bfc0044077..1d237acfae0a3 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.h +++ b/src/v/kafka/server/group_tx_tracker_stm.h @@ -102,6 +102,8 @@ class group_tx_tracker_stm final auto serde_fields() { return std::tie(transactions); } }; + void handle_group_metadata(group_metadata_kv); + void maybe_add_tx_begin_offset( kafka::group_id, model::producer_identity, model::offset); @@ -110,6 +112,7 @@ class group_tx_tracker_stm final all_txs_t _all_txs; ss::sharded& _feature_table; + group_metadata_serializer _serializer; }; class group_tx_tracker_stm_factory : public cluster::state_machine_factory { diff --git a/src/v/kafka/server/tests/group_tx_compaction_test.cc b/src/v/kafka/server/tests/group_tx_compaction_test.cc index 571102f3d5aed..703231243c992 100644 --- a/src/v/kafka/server/tests/group_tx_compaction_test.cc +++ b/src/v/kafka/server/tests/group_tx_compaction_test.cc @@ -22,16 +22,17 @@ static model::ntp offsets_ntp{ model::kafka_consumer_offsets_nt.tp, model::partition_id{0}}; -struct disabled_compaction_fixture { - disabled_compaction_fixture() { +struct custom_configs_fixture { + custom_configs_fixture() { // compaction is run manually in this test. cfg.get("log_disable_housekeeping_for_tests").set_value(true); + cfg.get("group_initial_rebalance_delay").set_value(0ms); } scoped_config cfg; }; struct group_manager_fixture - : public disabled_compaction_fixture + : public custom_configs_fixture , public redpanda_thread_fixture , public seastar_test { ss::future<> SetUpAsync() override { @@ -51,6 +52,18 @@ struct group_manager_fixture co_await wait_for_version_fence(); } + auto join_group(kafka::join_group_request request) { + return app._group_manager.local().join_group(std::move(request)).result; + } + + auto sync_group(kafka::sync_group_request request) { + return app._group_manager.local().sync_group(std::move(request)).result; + } + + auto describe_group(kafka::group_id& group) { + return app._group_manager.local().describe_group(offsets_ntp, group); + } + auto begin_tx(cluster::begin_group_tx_request request) { return app._group_manager.local().begin_tx(std::move(request)); } @@ -124,6 +137,40 @@ struct executable_op { virtual ss::future<> execute(group_manager_fixture*) = 0; }; +struct join_group_op : public executable_op { + explicit join_group_op(kafka::group_id group) + : group(std::move(group)) {} + + ss::future<> execute(group_manager_fixture* fixture) override { + vlog(logger.trace, "Executing join_group_op: {}", group); + + kafka::join_group_request request; + request.data = kafka::join_group_request_data{ + .group_id = group, + .session_timeout_ms = 300s, + .member_id = kafka::unknown_member_id, + .protocol_type = kafka::protocol_type{"test"}, + .protocols = chunked_vector{ + {kafka::protocol_name("test"), bytes()}}}; + request.ntp = offsets_ntp; + auto result = co_await fixture->join_group(std::move(request)); + ASSERT_EQ_CORO(result.data.error_code, kafka::error_code::none); + + kafka::sync_group_request sync_request; + sync_request.ntp = offsets_ntp; + sync_request.data = kafka::sync_group_request_data{ + .group_id = group, + .generation_id = result.data.generation_id, + .member_id = result.data.member_id, + }; + + auto sync_result = co_await fixture->sync_group( + std::move(sync_request)); + ASSERT_EQ_CORO(sync_result.data.error_code, kafka::error_code::none); + } + kafka::group_id group; +}; + struct begin_tx_op : public executable_op { explicit begin_tx_op(cluster::begin_group_tx_request begin_req) : req(std::move(begin_req)) {} @@ -220,6 +267,7 @@ random_ops generate_workload(workload_parameters params) { for (int i = 0; i < params.num_groups; i++) { std::queue group_ops; kafka::group_id id = next_group_id(); + group_ops.emplace(ss::make_shared(id)); auto pid = model::random_producer_identity(); for (int j = 0; j < params.num_tx_per_group; j++) { auto seq = model::tx_seq{j}; @@ -367,7 +415,7 @@ TEST_P_CORO(group_basic_workload_fixture, test_group_tx_stm_tracking) { auto log = consumer_offsets_log(); // Generate a commit transaction. auto ops = generate_workload(GetParam()); - ASSERT_EQ_CORO(ops.size(), 3); + ASSERT_EQ_CORO(ops.size(), 4); auto wait_until_stm_apply = [&] { return tests::cooperative_spin_wait_with_timeout(5s, [log, stm] { @@ -380,19 +428,22 @@ TEST_P_CORO(group_basic_workload_fixture, test_group_tx_stm_tracking) { [&]() { return wait_until_stm_apply(); }); }; + // prep the group + co_await execute_op(0); + co_await wait_until_stm_apply(); // no transactions in flight ASSERT_EQ_CORO( stm->max_collectible_offset(), log->offsets().committed_offset); auto before = stm->max_collectible_offset(); // begin transaction. - co_await execute_op(0); + co_await execute_op(1); ASSERT_EQ_CORO(stm->max_collectible_offset(), before); // tx offset commit - co_await execute_op(1); + co_await execute_op(2); ASSERT_EQ_CORO(stm->max_collectible_offset(), before); // end transaction - co_await execute_op(2); + co_await execute_op(3); // ensure max collectible offset moved. ASSERT_GT_CORO(stm->max_collectible_offset(), before); ASSERT_EQ_CORO( From 3e4a4219cc7344941e5b37ff84ed79422cf5b50e Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 2 Jan 2025 11:16:36 -0800 Subject: [PATCH 08/20] group_tx_tracker/stm: track additional information about fence batches (cherry picked from commit f7191ad7084c68f495e36fd5131953c41d3de6f9) --- src/v/kafka/server/group_tx_tracker_stm.cc | 83 +++++++++++++++++----- src/v/kafka/server/group_tx_tracker_stm.h | 48 +++++++++---- 2 files changed, 102 insertions(+), 29 deletions(-) diff --git a/src/v/kafka/server/group_tx_tracker_stm.cc b/src/v/kafka/server/group_tx_tracker_stm.cc index 7825c928b79de..ad69cbca9aabc 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.cc +++ b/src/v/kafka/server/group_tx_tracker_stm.cc @@ -23,28 +23,53 @@ group_tx_tracker_stm::group_tx_tracker_stm( , _serializer(make_consumer_offsets_serializer()) {} void group_tx_tracker_stm::maybe_add_tx_begin_offset( - kafka::group_id group, model::producer_identity pid, model::offset offset) { + model::record_batch_type fence_type, + kafka::group_id group, + model::producer_identity pid, + model::offset offset, + model::timestamp ts, + model::timeout_clock::duration tx_timeout) { auto it = _all_txs.find(group); if (it == _all_txs.end()) { - vlog(klog.debug, "[{}] group not found, ignoring fence", group); + vlog( + klog.debug, + "[{}] group not found, ignoring fence of type: {} at offset: {}", + group, + fence_type, + offset); return; } - it->second.maybe_add_tx_begin(pid, offset); + it->second.maybe_add_tx_begin(fence_type, pid, offset, ts, tx_timeout); } void group_tx_tracker_stm::maybe_end_tx( - kafka::group_id group, model::producer_identity pid) { + kafka::group_id group, model::producer_identity pid, model::offset offset) { auto it = _all_txs.find(group); if (it == _all_txs.end()) { + vlog( + klog.debug, + "[{}] group not found, ignoring end transaction for pid: {} at " + "offset: {}", + group, + pid, + offset); return; } auto& group_data = it->second; - auto p_it = group_data.producer_to_begin.find(pid); - if (p_it == group_data.producer_to_begin.end()) { + auto p_it = group_data.producer_states.find(pid); + if (p_it == group_data.producer_states.end()) { + vlog( + klog.debug, + "[{}] ignoring end transaction, no in progress transaction for pid: " + "{} at offset: {}", + group, + pid, + offset); return; } - group_data.begin_offsets.erase(p_it->second); - group_data.producer_to_begin.erase(p_it); + group_data.begin_offsets.erase(p_it->second.begin_offset); + group_data.producer_states.erase(p_it); + group_data.producer_to_begin_deprecated.erase(pid); } ss::future<> group_tx_tracker_stm::do_apply(const model::record_batch& b) { @@ -137,28 +162,41 @@ ss::future<> group_tx_tracker_stm::handle_tx_offsets( ss::future<> group_tx_tracker_stm::handle_fence_v0( model::record_batch_header header, kafka::group_tx::fence_metadata_v0 fence) { + // fence_v0 has no timeout, use a max permissible timeout. + // fence_v0 has been deprecated for long, this is not a problem in practice + auto timeout = std::chrono::duration_cast( + config::shard_local_cfg().transaction_max_timeout_ms()); maybe_add_tx_begin_offset( + header.type, std::move(fence.group_id), model::producer_identity{header.producer_id, header.producer_epoch}, - header.base_offset); + header.base_offset, + header.max_timestamp, + timeout); return ss::now(); } ss::future<> group_tx_tracker_stm::handle_fence_v1( model::record_batch_header header, kafka::group_tx::fence_metadata_v1 fence) { maybe_add_tx_begin_offset( + header.type, std::move(fence.group_id), model::producer_identity{header.producer_id, header.producer_epoch}, - header.base_offset); + header.base_offset, + header.max_timestamp, + fence.transaction_timeout_ms); return ss::now(); } ss::future<> group_tx_tracker_stm::handle_fence( model::record_batch_header header, kafka::group_tx::fence_metadata fence) { maybe_add_tx_begin_offset( + header.type, std::move(fence.group_id), model::producer_identity{header.producer_id, header.producer_epoch}, - header.base_offset); + header.base_offset, + header.max_timestamp, + fence.transaction_timeout_ms); return ss::now(); } @@ -166,7 +204,8 @@ ss::future<> group_tx_tracker_stm::handle_abort( model::record_batch_header header, kafka::group_tx::abort_metadata data) { maybe_end_tx( std::move(data.group_id), - model::producer_identity{header.producer_id, header.producer_epoch}); + model::producer_identity{header.producer_id, header.producer_epoch}, + header.base_offset); return ss::now(); } @@ -174,7 +213,7 @@ ss::future<> group_tx_tracker_stm::handle_commit( model::record_batch_header header, kafka::group_tx::commit_metadata data) { auto pid = model::producer_identity{ header.producer_id, header.producer_epoch}; - maybe_end_tx(std::move(data.group_id), pid); + maybe_end_tx(std::move(data.group_id), pid, header.base_offset); return ss::now(); } @@ -203,11 +242,21 @@ void group_tx_tracker_stm_factory::create( } void group_tx_tracker_stm::per_group_state::maybe_add_tx_begin( - model::producer_identity pid, model::offset offset) { - auto it = producer_to_begin.find(pid); - if (it == producer_to_begin.end()) { + model::record_batch_type fence_type, + model::producer_identity pid, + model::offset offset, + model::timestamp begin_ts, + model::timeout_clock::duration tx_timeout) { + auto it = producer_states.find(pid); + if (it == producer_states.end()) { begin_offsets.emplace(offset); - producer_to_begin[pid] = offset; + producer_states[pid] = { + .fence_type = fence_type, + .begin_offset = offset, + .batch_ts = begin_ts, + .timeout = tx_timeout}; + producer_to_begin_deprecated[pid] = offset; } } + } // namespace kafka diff --git a/src/v/kafka/server/group_tx_tracker_stm.h b/src/v/kafka/server/group_tx_tracker_stm.h index 1d237acfae0a3..8c0feaf5566b2 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.h +++ b/src/v/kafka/server/group_tx_tracker_stm.h @@ -71,27 +71,46 @@ class group_tx_tracker_stm final ss::future<> handle_version_fence(features::feature_table::version_fence); private: - struct per_group_state + struct producer_tx_state : serde::envelope< - per_group_state, + producer_tx_state, serde::version<0>, serde::compat_version<0>> { - per_group_state() = default; + model::record_batch_type fence_type; + model::offset begin_offset; + model::timestamp batch_ts; + model::timeout_clock::duration timeout; - per_group_state(model::producer_identity pid, model::offset offset) { - maybe_add_tx_begin(pid, offset); + auto serde_fields() { + return std::tie(fence_type, begin_offset, batch_ts, timeout); } + }; + struct per_group_state + : serde::envelope< + per_group_state, + serde::version<1>, + serde::compat_version<0>> { + per_group_state() = default; - void - maybe_add_tx_begin(model::producer_identity pid, model::offset offset); + void maybe_add_tx_begin( + model::record_batch_type fence_type, + model::producer_identity pid, + model::offset offset, + model::timestamp begin_ts, + model::timeout_clock::duration tx_timeout); absl::btree_set begin_offsets; + // deprecated absl::btree_map - producer_to_begin; + producer_to_begin_deprecated; + + absl::btree_map + producer_states; auto serde_fields() { - return std::tie(begin_offsets, producer_to_begin); + return std::tie( + begin_offsets, producer_to_begin_deprecated, producer_states); } }; using all_txs_t = absl::btree_map; @@ -105,9 +124,14 @@ class group_tx_tracker_stm final void handle_group_metadata(group_metadata_kv); void maybe_add_tx_begin_offset( - kafka::group_id, model::producer_identity, model::offset); - - void maybe_end_tx(kafka::group_id, model::producer_identity); + model::record_batch_type fence_type, + kafka::group_id, + model::producer_identity, + model::offset, + model::timestamp begin_ts, + model::timeout_clock::duration tx_timeout); + + void maybe_end_tx(kafka::group_id, model::producer_identity, model::offset); all_txs_t _all_txs; From d40ce01fe36cd90fa1420cdac8508b1d070b386f Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 2 Jan 2025 12:55:14 -0800 Subject: [PATCH 09/20] group_tx_tracker/stm: heuristic to ignore stale tx_fence batches (cherry picked from commit 6ca81b456f8aba1b97a39b58bcc3a47fa0252613) --- src/v/kafka/server/group_tx_tracker_stm.cc | 57 ++++++++++++++++++++-- src/v/kafka/server/group_tx_tracker_stm.h | 3 ++ src/v/model/timestamp.h | 4 ++ 3 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/group_tx_tracker_stm.cc b/src/v/kafka/server/group_tx_tracker_stm.cc index ad69cbca9aabc..708481567f477 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.cc +++ b/src/v/kafka/server/group_tx_tracker_stm.cc @@ -13,6 +13,8 @@ namespace kafka { +static constexpr std::chrono::milliseconds max_permissible_tx_timeout{15min}; + group_tx_tracker_stm::group_tx_tracker_stm( ss::logger& logger, raft::consensus* raft, @@ -39,7 +41,8 @@ void group_tx_tracker_stm::maybe_add_tx_begin_offset( offset); return; } - it->second.maybe_add_tx_begin(fence_type, pid, offset, ts, tx_timeout); + it->second.maybe_add_tx_begin( + group, fence_type, pid, offset, ts, tx_timeout); } void group_tx_tracker_stm::maybe_end_tx( @@ -165,7 +168,7 @@ ss::future<> group_tx_tracker_stm::handle_fence_v0( // fence_v0 has no timeout, use a max permissible timeout. // fence_v0 has been deprecated for long, this is not a problem in practice auto timeout = std::chrono::duration_cast( - config::shard_local_cfg().transaction_max_timeout_ms()); + max_permissible_tx_timeout); maybe_add_tx_begin_offset( header.type, std::move(fence.group_id), @@ -242,6 +245,7 @@ void group_tx_tracker_stm_factory::create( } void group_tx_tracker_stm::per_group_state::maybe_add_tx_begin( + const kafka::group_id& group, model::record_batch_type fence_type, model::producer_identity pid, model::offset offset, @@ -249,14 +253,59 @@ void group_tx_tracker_stm::per_group_state::maybe_add_tx_begin( model::timeout_clock::duration tx_timeout) { auto it = producer_states.find(pid); if (it == producer_states.end()) { - begin_offsets.emplace(offset); - producer_states[pid] = { + auto p_state = producer_tx_state{ .fence_type = fence_type, .begin_offset = offset, .batch_ts = begin_ts, .timeout = tx_timeout}; + if (p_state.expired_deprecated_fence_tx()) { + vlog( + klog.debug, + "[{}] Ignoring stale tx_fence batch at offset: {}, considering " + "it expired", + group, + offset); + return; + } + vlog( + klog.debug, + "[{}] Adding begin tx : {}, pid: {} at offset: {}, ts: {} with " + "timeout: {}", + group, + fence_type, + pid, + offset, + begin_ts, + tx_timeout); + begin_offsets.emplace(offset); + producer_states[pid] = p_state; producer_to_begin_deprecated[pid] = offset; } } +bool group_tx_tracker_stm::producer_tx_state::expired_deprecated_fence_tx() + const { + // A bug in 24.2.0 resulted in a situation where tx_fence + // batches were retained _after_ compaction while their corresponding + // data/commit/abort batches were compacted away. This applied to + // only group transactions that used tx_fence to begin the + // transaction. + // After this buggy compaction, these uncleaned tx_fence batches are + // accounted as open transactions when computing + // max_collectible_offset thus blocking further compaction after + // upgrade to 24.2.x. + if (fence_type != model::record_batch_type::tx_fence) { + return false; + } + // note: this is a heuristic to ignore any transactions that have long been + // expired and we do not want them to block max collectible offset. + // clamp the timeout, incase timeout is unset + auto max_timeout + = std::chrono::duration_cast( + 2 * max_permissible_tx_timeout); + auto clamped_timeout = std::min(max_timeout, 2 * timeout); + return model::timestamp_clock::now() + > model::to_time_point(batch_ts) + clamped_timeout; +} + } // namespace kafka diff --git a/src/v/kafka/server/group_tx_tracker_stm.h b/src/v/kafka/server/group_tx_tracker_stm.h index 8c0feaf5566b2..c3dfa45aa5702 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.h +++ b/src/v/kafka/server/group_tx_tracker_stm.h @@ -81,6 +81,8 @@ class group_tx_tracker_stm final model::timestamp batch_ts; model::timeout_clock::duration timeout; + bool expired_deprecated_fence_tx() const; + auto serde_fields() { return std::tie(fence_type, begin_offset, batch_ts, timeout); } @@ -93,6 +95,7 @@ class group_tx_tracker_stm final per_group_state() = default; void maybe_add_tx_begin( + const kafka::group_id&, model::record_batch_type fence_type, model::producer_identity pid, model::offset offset, diff --git a/src/v/model/timestamp.h b/src/v/model/timestamp.h index ec96dbc00e8a3..7504514a4a2bc 100644 --- a/src/v/model/timestamp.h +++ b/src/v/model/timestamp.h @@ -96,6 +96,10 @@ inline timestamp_clock::duration duration_since_epoch(timestamp ts) { std::chrono::milliseconds{ts.value()}); } +inline timestamp_clock::time_point to_time_point(timestamp ts) { + return timestamp_clock::time_point(std::chrono::milliseconds(ts.value())); +} + inline timestamp to_timestamp(timestamp_clock::time_point ts) { return timestamp(std::chrono::duration_cast( ts.time_since_epoch()) From 0f7e395655ab129c8baba88ad976dc14a7f13303 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 2 Jan 2025 17:33:42 -0800 Subject: [PATCH 10/20] group_tracker/stm: add a periodic GC loop to expire stale tx_fence txes (cherry picked from commit 6fc62bb05bde85a49eefcac48607582df27d98d1) --- src/v/kafka/server/group_tx_tracker_stm.cc | 46 +++++++++++++++++++++- src/v/kafka/server/group_tx_tracker_stm.h | 8 ++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/src/v/kafka/server/group_tx_tracker_stm.cc b/src/v/kafka/server/group_tx_tracker_stm.cc index 708481567f477..31147f1e07dcf 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.cc +++ b/src/v/kafka/server/group_tx_tracker_stm.cc @@ -22,7 +22,33 @@ group_tx_tracker_stm::group_tx_tracker_stm( : raft::persisted_stm<>("group_tx_tracker_stm.snapshot", logger, raft) , group_data_parser() , _feature_table(feature_table) - , _serializer(make_consumer_offsets_serializer()) {} + , _serializer(make_consumer_offsets_serializer()) { + _stale_tx_fence_gc_timer.set_callback([this] { + ssx::spawn_with_gate( + _gate, [this] { return gc_expired_tx_fence_transactions(); }); + }); + _stale_tx_fence_gc_timer.arm_periodic(tx_fence_gc_frequency); +} + +ss::future<> group_tx_tracker_stm::gc_expired_tx_fence_transactions() { + auto holder = _gate.hold(); + auto it = _all_txs.begin(); + while (it != _all_txs.end()) { + it->second.gc_expired_tx_fence_transactions(); + ++it; + if (ss::need_preempt() && it != _all_txs.end()) { + auto key_checkpoint = it->first; + co_await ss::yield(); + it = _all_txs.lower_bound(key_checkpoint); + } + } +} + +ss::future<> group_tx_tracker_stm::stop() { + vlog(klog.trace, "stopping..."); + _stale_tx_fence_gc_timer.cancel(); + return raft::persisted_stm<>::stop(); +} void group_tx_tracker_stm::maybe_add_tx_begin_offset( model::record_batch_type fence_type, @@ -283,6 +309,24 @@ void group_tx_tracker_stm::per_group_state::maybe_add_tx_begin( } } +void group_tx_tracker_stm::per_group_state::gc_expired_tx_fence_transactions() { + auto it = producer_states.begin(); + while (it != producer_states.end()) { + if (it->second.expired_deprecated_fence_tx()) { + vlog( + klog.warn, + "Expiring stale tx_fence based begin tx at offset: {} for " + "producer: {}", + it->second.begin_offset, + it->first); + begin_offsets.erase(it->second.begin_offset); + it = producer_states.erase(it); + continue; + } + ++it; + } +} + bool group_tx_tracker_stm::producer_tx_state::expired_deprecated_fence_tx() const { // A bug in 24.2.0 resulted in a situation where tx_fence diff --git a/src/v/kafka/server/group_tx_tracker_stm.h b/src/v/kafka/server/group_tx_tracker_stm.h index c3dfa45aa5702..893d160bfba66 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.h +++ b/src/v/kafka/server/group_tx_tracker_stm.h @@ -44,6 +44,8 @@ class group_tx_tracker_stm final ss::future<> do_apply(const model::record_batch&) override; + ss::future<> stop() final; + model::offset max_collectible_offset() override; ss::future<> @@ -111,6 +113,8 @@ class group_tx_tracker_stm final absl::btree_map producer_states; + void gc_expired_tx_fence_transactions(); + auto serde_fields() { return std::tie( begin_offsets, producer_to_begin_deprecated, producer_states); @@ -126,6 +130,8 @@ class group_tx_tracker_stm final void handle_group_metadata(group_metadata_kv); + ss::future<> gc_expired_tx_fence_transactions(); + void maybe_add_tx_begin_offset( model::record_batch_type fence_type, kafka::group_id, @@ -140,6 +146,8 @@ class group_tx_tracker_stm final ss::sharded& _feature_table; group_metadata_serializer _serializer; + static constexpr ss::lowres_clock::duration tx_fence_gc_frequency{1h}; + ss::timer _stale_tx_fence_gc_timer; }; class group_tx_tracker_stm_factory : public cluster::state_machine_factory { From aa2333b7f73b66de684b17c1563ae7a26a35b4a8 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Sun, 22 Dec 2024 21:23:16 -0800 Subject: [PATCH 11/20] k/group: disallow group deletion while transactions in progress This will result in hanging transactions and subsequent blocking of compaction. (cherry picked from commit 2b79687b9a9d54dcb06500c94b3a2625470c4c88) --- src/v/kafka/server/group.cc | 26 +++++++++---- src/v/kafka/server/group.h | 2 + tests/rptest/clients/rpk.py | 2 +- .../rptest/transactions/transactions_test.py | 39 +++++++++++++++++++ 4 files changed, 61 insertions(+), 8 deletions(-) diff --git a/src/v/kafka/server/group.cc b/src/v/kafka/server/group.cc index a4c56810fbcf5..0cf91e1c2cd1c 100644 --- a/src/v/kafka/server/group.cc +++ b/src/v/kafka/server/group.cc @@ -2530,13 +2530,21 @@ ss::future group::remove() { co_return error_code::group_id_not_found; case group_state::empty: - set_state(group_state::dead); break; default: co_return error_code::non_empty_group; } + // check if there are any transactions in progress + // tombstoning a group with open transactions will result + // in hanging transactions in the log. + if (has_transactions_in_progress()) { + co_return error_code::non_empty_group; + } + + set_state(group_state::dead); + // build offset tombstones storage::record_batch_builder builder( model::record_batch_type::raft_data, model::offset(0)); @@ -3504,14 +3512,18 @@ group::get_expired_offsets(std::chrono::seconds retention_period) { } } +bool group::has_transactions_in_progress() const { + return std::any_of( + _producers.begin(), + _producers.end(), + [](const producers_map::value_type& p) { + return p.second.transaction != nullptr; + }); +} + bool group::has_offsets() const { return !_offsets.empty() || !_pending_offset_commits.empty() - || std::any_of( - _producers.begin(), - _producers.end(), - [](const producers_map::value_type& p) { - return p.second.transaction != nullptr; - }); + || has_transactions_in_progress(); } std::vector diff --git a/src/v/kafka/server/group.h b/src/v/kafka/server/group.h index 7837f6b629a12..efcad6c6df674 100644 --- a/src/v/kafka/server/group.h +++ b/src/v/kafka/server/group.h @@ -843,6 +843,8 @@ class group final : public ss::enable_lw_shared_from_this { bool has_offsets() const; + bool has_transactions_in_progress() const; + bool has_pending_transaction(const model::topic_partition& tp) { if (std::any_of( _pending_offset_commits.begin(), diff --git a/tests/rptest/clients/rpk.py b/tests/rptest/clients/rpk.py index 0ac7f9c9e8b0e..3af079dce443b 100644 --- a/tests/rptest/clients/rpk.py +++ b/tests/rptest/clients/rpk.py @@ -945,7 +945,7 @@ def group_seek_to_file(self, group, file): def group_delete(self, group): cmd = ["delete", group] - self._run_group(cmd) + return self._run_group(cmd) def group_list(self, states: list[str] = []) -> list[RpkListGroup]: cmd = ['list'] diff --git a/tests/rptest/transactions/transactions_test.py b/tests/rptest/transactions/transactions_test.py index 65445db014694..5187d14d48a51 100644 --- a/tests/rptest/transactions/transactions_test.py +++ b/tests/rptest/transactions/transactions_test.py @@ -267,6 +267,45 @@ def simple_test(self): topic=self.input_t.name) self.logger.info(f"Read {len(records)} from node {node.name}") + @cluster(num_nodes=3) + def group_deletion_with_ongoing_transaction_test(self): + self.redpanda.set_cluster_config( + {"group_new_member_join_timeout": 5000}) + self.generate_data(self.input_t, self.max_records) + + group_name = "test_group" + + producer = ck.Producer({ + 'bootstrap.servers': self.redpanda.brokers(), + 'transactional.id': 'group_deletion_test_id', + }) + + group_name = "test" + consumer = ck.Consumer({ + 'bootstrap.servers': self.redpanda.brokers(), + 'group.id': group_name, + 'auto.offset.reset': 'earliest', + 'enable.auto.commit': False, + }) + + consumer.subscribe([self.input_t]) + _ = self.consume(consumer) + producer.init_transactions() + producer.begin_transaction() + producer.send_offsets_to_transaction( + consumer.position(consumer.assignment()), + consumer.consumer_group_metadata()) + producer.flush() + # leave the consumer group + consumer.close() + # Attempt to delete the group, should fail + rpk = RpkTool(self.redpanda) + out = rpk.group_delete(group=group_name) + assert "NON_EMPTY_GROUP" in out, f"Group deletion should fail with inprogress transaction: {out}" + producer.commit_transaction() + out = rpk.group_delete(group=group_name) + assert "OK" in out, f"Group deletion expected to succeed after committing transaction: {out}" + @cluster(num_nodes=3) def rejoin_member_test(self): self.redpanda.set_cluster_config( From 690d61fdc7bd615c44430eb2638293207b771ee8 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 2 Jan 2025 14:48:11 -0800 Subject: [PATCH 12/20] group_recovery_consumer/logging: tidy up logging (cherry picked from commit ac2204129ad07cebf6310cd6a378541e663e8728) --- src/v/kafka/server/group_recovery_consumer.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index b40bb1d4ffcb6..b0b2a1ded5d79 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -167,17 +167,18 @@ void group_recovery_consumer::handle_record(model::record r) { } void group_recovery_consumer::handle_group_metadata(group_metadata_kv md) { - vlog(klog.trace, "[group: {}] recovered group metadata", md.key.group_id); - if (md.value) { // until we switch over to a compacted topic or use raft snapshots, // always take the latest entry in the log. + vlog( + klog.trace, "[group: {}] recovered group metadata", md.key.group_id); auto [group_it, _] = _state.groups.try_emplace( md.key.group_id, group_stm()); group_it->second.overwrite_metadata(std::move(*md.value)); } else { // tombstone + vlog(klog.trace, "[group: {}] recovered tombstone", md.key.group_id); _state.groups.erase(md.key.group_id); } } From 0292bab391cfbb7b8c612defa748071c8e22495e Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 2 Jan 2025 15:01:38 -0800 Subject: [PATCH 13/20] group_recovery/tx: fix group recovery for non existent groups If a group got tombstoned all the producers to that group should be ignored. The current logic is incorrectly recovering producers and loading them up to expire later. (cherry picked from commit 7c8d6335a38f47acef8cb944f5a7b69b8c7cfc96) --- src/v/kafka/server/group_recovery_consumer.cc | 42 ++++++++++++++++--- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/src/v/kafka/server/group_recovery_consumer.cc b/src/v/kafka/server/group_recovery_consumer.cc index b0b2a1ded5d79..a2079f3f6b16f 100644 --- a/src/v/kafka/server/group_recovery_consumer.cc +++ b/src/v/kafka/server/group_recovery_consumer.cc @@ -80,6 +80,18 @@ ss::future<> group_recovery_consumer::handle_fence( model::record_batch_header header, kafka::group_tx::fence_metadata data) { auto pid = model::producer_identity{ header.producer_id, header.producer_epoch}; + auto group_it = _state.groups.find(data.group_id); + if (group_it == _state.groups.end()) { + vlog( + klog.trace, + "[group: {}] group does not exist, ignoring tx fence version: {} for " + "producer: {} - {}", + data.group_id, + group::fence_control_record_version, + pid, + data); + co_return; + } vlog( klog.trace, "[group: {}] recovered tx fence version: {} for producer: {} - {}", @@ -87,7 +99,6 @@ ss::future<> group_recovery_consumer::handle_fence( group::fence_control_record_version, pid, data); - auto [group_it, _] = _state.groups.try_emplace(data.group_id); group_it->second.try_set_fence( pid.get_id(), pid.get_epoch(), @@ -100,24 +111,43 @@ ss::future<> group_recovery_consumer::handle_fence( ss::future<> group_recovery_consumer::handle_abort( model::record_batch_header header, kafka::group_tx::abort_metadata data) { + auto pid = model::producer_identity{ + header.producer_id, header.producer_epoch}; + auto group_it = _state.groups.find(data.group_id); + if (group_it == _state.groups.end()) { + vlog( + klog.trace, + "[group: {}] group does not exist, ignoring abort for " + "producer: {} - sequence {}", + data.group_id, + pid, + data.tx_seq); + co_return; + } vlog( klog.trace, "[group: {}] recovered abort tx_seq: {}", data.group_id, data.tx_seq); - auto pid = model::producer_identity{ - header.producer_id, header.producer_epoch}; - auto [group_it, _] = _state.groups.try_emplace(data.group_id); group_it->second.abort(pid, data.tx_seq); co_return; } ss::future<> group_recovery_consumer::handle_commit( model::record_batch_header header, kafka::group_tx::commit_metadata data) { - vlog(klog.trace, "[group: {}] recovered commit tx", data.group_id); auto pid = model::producer_identity{ header.producer_id, header.producer_epoch}; - auto [group_it, _] = _state.groups.try_emplace(data.group_id); + auto group_it = _state.groups.find(data.group_id); + if (group_it == _state.groups.end()) { + vlog( + klog.trace, + "[group: {}] group does not exist, ignoring commit for " + "producer: {}", + data.group_id, + pid); + co_return; + } + vlog(klog.trace, "[group: {}] recovered commit tx", data.group_id); group_it->second.commit(pid); co_return; } From fae25ef4f4d6f03996f6de92e437214cd177f46a Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Fri, 20 Dec 2024 14:12:51 -0800 Subject: [PATCH 14/20] group/tx: add a ducktape test for compactibility of consumer_offsets (cherry picked from commit c7f953efd9f44d631ffaaccd67540660671362ab) --- .../transactions/consumer_offsets_test.py | 117 ++++++++++ .../verifiers/consumer_offsets_verifier.py | 214 ++++++++++++++++++ 2 files changed, 331 insertions(+) create mode 100644 tests/rptest/transactions/consumer_offsets_test.py create mode 100644 tests/rptest/transactions/verifiers/consumer_offsets_verifier.py diff --git a/tests/rptest/transactions/consumer_offsets_test.py b/tests/rptest/transactions/consumer_offsets_test.py new file mode 100644 index 0000000000000..c1ce82b584d43 --- /dev/null +++ b/tests/rptest/transactions/consumer_offsets_test.py @@ -0,0 +1,117 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +from rptest.transactions.verifiers.consumer_offsets_verifier import ConsumerOffsetsVerifier +from rptest.services.cluster import cluster +from rptest.services.redpanda_installer import RedpandaInstaller +from rptest.tests.redpanda_test import RedpandaTest +from ducktape.utils.util import wait_until +from ducktape.mark import matrix + + +class VerifyConsumerOffsets(RedpandaTest): + def __init__(self, test_context): + super(VerifyConsumerOffsets, + self).__init__(test_context=test_context, + num_brokers=3, + extra_rp_conf={ + "group_topic_partitions": 1, + "log_segment_size": 1024 * 1024, + "log_segment_ms": 60000, + "log_compaction_interval_ms": 10, + "group_new_member_join_timeout": 3000, + "group_initial_rebalance_delay": 0 + }) + + @cluster(num_nodes=3) + def test_consumer_group_offsets(self): + verifier = ConsumerOffsetsVerifier(self.redpanda, self._client) + verifier.verify() + + +class VerifyConsumerOffsetsThruUpgrades(RedpandaTest): + def __init__(self, test_context): + super(VerifyConsumerOffsetsThruUpgrades, + self).__init__(test_context=test_context, + num_brokers=3, + extra_rp_conf={ + "group_topic_partitions": 1, + "log_segment_size": 1024 * 1024, + "log_segment_ms": 60000, + "log_compaction_interval_ms": 10, + "group_new_member_join_timeout": 3000, + "group_initial_rebalance_delay": 0 + }) + + def rp_install_version(self, + num_previous: int, + version=RedpandaInstaller.HEAD): + if num_previous == 0: + return version + previous = self.redpanda._installer.highest_from_prior_feature_version( + version) + return self.rp_install_version(num_previous=num_previous - 1, + version=previous) + + def setUp(self): + pass + + def ensure_compactible(self): + def consumer_offsets_is_compactible(): + try: + state = self.redpanda._admin.get_partition_state( + namespace="kafka", topic="__consumer_offsets", partition=0) + collectible = [] + for replica in state["replicas"]: + for stm in replica["raft_state"]["stms"]: + if stm["name"] == "group_tx_tracker_stm.snapshot": + collectible.append(stm["last_applied_offset"] == + stm["max_collectible_offset"]) + return len(collectible) == 3 and all(collectible) + except Exception as e: + self.redpanda.logger.debug( + f"failed to get parition state: {e}") + + wait_until( + consumer_offsets_is_compactible, + timeout_sec=30, + backoff_sec=1, + err_msg= + f"Timed out waiting for consumer offsets partition to be compactible" + ) + + @cluster(num_nodes=3) + @matrix(versions_to_upgrade=[1, 2, 3]) + def test_consumer_group_offsets(self, versions_to_upgrade): + """This test ensures consumer offset state remains correct during the following upgrade cycles""" + + # At the time of writing, this test checks the following version upgrades + # Each upgrade also rolls logs ensures it is compacted + # 24.3.x (initial_version) -> 25.1.x + # 24.2.x (initial version) -> 24.3.x -> 24.4.x + # 24.1.x (initial version) -> 24.2.x -> 24.3.x -> 24.4.x + # + # After the upgrade + compaction the following invariants are checked + # - The state of group offets is correct as snapshotted prior to all upgrades + # - The log is fully compactible. + initial_version = self.rp_install_version( + num_previous=versions_to_upgrade) + self.redpanda._installer.install(self.redpanda.nodes, initial_version) + super(VerifyConsumerOffsetsThruUpgrades, self).setUp() + + verifier = ConsumerOffsetsVerifier(self.redpanda, self._client) + verifier.verify() + + versions = self.load_version_range(initial_version) + for v in self.upgrade_through_versions(versions_in=versions, + already_running=True): + self.logger.info(f"Updated to {v}") + verifier.verify() + + self.ensure_compactible() diff --git a/tests/rptest/transactions/verifiers/consumer_offsets_verifier.py b/tests/rptest/transactions/verifiers/consumer_offsets_verifier.py new file mode 100644 index 0000000000000..99ca06d21fbdf --- /dev/null +++ b/tests/rptest/transactions/verifiers/consumer_offsets_verifier.py @@ -0,0 +1,214 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +import threading +import confluent_kafka as ck +from confluent_kafka import TopicPartition +from rptest.clients.types import TopicSpec +from rptest.services.cluster import cluster +from concurrent import futures +from kafka import KafkaAdminClient +import random +from time import sleep +from concurrent.futures import ThreadPoolExecutor +from ducktape.utils.util import wait_until +from rptest.clients.rpk import RpkTool + + +class ConsumerOffsetsVerifier(): + """ + Populates consumer offsets topic with various transactional offset commits + over multiple groups and verifies the final offset positions. + Assumes that there is only one consumer offset partition for simplicity since + the intention of this test is correctness. + + The verifier does not use a real topic and consumer to generate offset commits, + instead dummy offsets are generated randomly to mimic consumption. A real consumer + only adds noise to the test and is not needed to validate correctness here. + """ + def __init__(self, + redpanda, + client, + produce_topic: str = "topic_produce", + source_topic: str = "topic_consume", + num_producers: int = 10, + num_src_partitions: int = 5, + max_commits: int = 5000): + + self._redpanda = redpanda + self._topic = produce_topic + self._source_topic = source_topic + self._logger = self._redpanda.logger + self._lock = threading.Lock() + self._tasks = [] + self._num_producers = num_producers + self._num_src_partitions = num_src_partitions + + produce_topic_spec = TopicSpec(name=produce_topic, + replication_factor=3, + partition_count=1) + + consume_topic_spec = TopicSpec(name=source_topic, + replication_factor=3, + partition_count=5) + client.create_topic(produce_topic_spec) + client.create_topic(consume_topic_spec) + + self.rpk = RpkTool(self._redpanda) + + # Each producers uses a group and each group has offset positions + # for every source partition + self._committed_offsets: dict[str, list[TopicPartition]] = dict() + self._stop_ev = threading.Event() + for producer in range(num_producers): + self._committed_offsets[f"group-{producer}"] = [ + TopicPartition(self._source_topic, p, -1) + for p in range(num_src_partitions) + ] + + self._total_commits_so_far = 0 + self._max_commits = max_commits + self._commits_done = threading.Event() + threading.Thread(target=self._start_producers, daemon=True).start() + + def _start_producers(self): + with ThreadPoolExecutor(max_workers=self._num_producers) as executor: + for producer in range(self._num_producers): + self._tasks.append( + executor.submit(lambda: self._start_one_producer( + group_id=f"group-{producer}", tx_id=f"txid-{producer}") + )) + + def _stop_all(self, timeout_sec: int = 30): + if self._stop_ev.isSet(): + return + self._stop_ev.set() + futures.wait(self._tasks, + timeout=timeout_sec, + return_when=futures.ALL_COMPLETED) + + def _current_committed_offsets(self, group_id: str, partitions: list[int]): + with self._lock: + return [ + tp for tp in self._committed_offsets[group_id] + if tp.partition in partitions + ] + + def _update_committed_offsets(self, group_id: str, + positions: list[TopicPartition]): + with self._lock: + for position in positions: + self._committed_offsets[group_id][ + position.partition] = position + self._total_commits_so_far += 1 + if self._total_commits_so_far >= self._max_commits: + self._commits_done.set() + + def _group_is_ready(self, group: str): + gr = self.rpk.group_describe(group=group, summary=True) + return gr.members == 1 and gr.state == "Stable" + + def _start_one_producer(self, group_id: str, tx_id: str): + + consumer = ck.Consumer({ + 'bootstrap.servers': self._redpanda.brokers(), + 'group.id': group_id, + 'auto.offset.reset': 'earliest', + 'enable.auto.commit': False, + }) + + consumer.subscribe([self._source_topic]) + + wait_until( + lambda: self._group_is_ready(group=group_id), + timeout_sec=30, + backoff_sec=1, + err_msg=f"Timed out waiting for group {group_id} to be stable") + + producer = ck.Producer({ + 'bootstrap.servers': self._redpanda.brokers(), + 'transactional.id': tx_id, + 'transaction.timeout.ms': 10000 + }) + producer.init_transactions() + + def generate_dummy_positions(): + # pick a random list of partitions to update + partitions = random.sample(range(0, self._num_src_partitions), 3) + current_offsets = self._current_committed_offsets( + group_id=group_id, partitions=partitions) + # update positions + for tp in current_offsets: + tp.offset = tp.offset + random.randint(1, 5) + return current_offsets + + i = 0 + while not self._stop_ev.isSet(): + new_positions = generate_dummy_positions() + self._logger.debug( + f"[{tx_id}] attempting to update positions to {new_positions}") + producer.begin_transaction() + producer.produce(self._topic, f"{tx_id}_{i}", f"{tx_id}_id") + producer.send_offsets_to_transaction( + new_positions, consumer.consumer_group_metadata()) + producer.flush() + + commit = random.choice([True, False]) + if commit: + producer.commit_transaction() + self._update_committed_offsets(group_id, new_positions) + self._logger.debug( + f"[{tx_id}] attempting to update positions to {new_positions} succeeded total commits: {self._total_commits_so_far}" + ) + else: + producer.abort_transaction() + self._logger.debug( + f"[{tx_id}] attempting to update positions to {new_positions} aborted" + ) + sleep(0.05) + + def verify(self, timeout_sec: int = 90): + self._logger.debug("waiting for commits done") + self._commits_done.wait(timeout_sec) + self._stop_all(timeout_sec) + + self._logger.debug("Verifying offsets for all groups") + + admin = KafkaAdminClient( + **{'bootstrap_servers': self._redpanda.brokers()}) + + def list_offsets(group_id: str): + offsets = admin.list_consumer_group_offsets(group_id) + result = [] + for tp, md in offsets.items(): + result.append(TopicPartition(tp.topic, tp.partition, + md.offset)) + return sorted(result, key=lambda tp: tp.partition) + + def offsets_are_consistent(): + try: + group_results = [] + for group in [ + f"group-{p}" for p in range(self._num_producers) + ]: + offsets = list_offsets(group) + expected = self._committed_offsets[group] + self._logger.debug( + f"group: {group}, offsets: {offsets}, expected: {expected}" + ) + group_results.append(offsets == expected) + return all(group_results) + except Exception as e: + self._logger.debug(f"exception listing offsets: {e}") + return False + + wait_until( + offsets_are_consistent, + timeout_sec=30, + backoff_sec=1, + err_msg=f"Timed out waiting group offsets to be consistent.") From 5dd5339d741d98f1cd6e236572c787f58739ab77 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Sun, 22 Dec 2024 10:41:33 -0800 Subject: [PATCH 15/20] tx/producer_state: add getters for internal state (cherry picked from commit 9958ca60a0c1bc3a304a3a106c5839ffa7ce4c61) --- src/v/cluster/producer_state.h | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/v/cluster/producer_state.h b/src/v/cluster/producer_state.h index 14938b02eaa16..dbbda649722c7 100644 --- a/src/v/cluster/producer_state.h +++ b/src/v/cluster/producer_state.h @@ -70,6 +70,10 @@ class request { } } + seq_t first_sequence() const { return _first_sequence; } + seq_t last_sequence() const { return _last_sequence; } + model::term_id term() const { return _term; } + void set_value(request_result_t::value_type); void set_error(request_result_t::error_type); void mark_request_in_progress() { _state = request_state::in_progress; } @@ -106,6 +110,14 @@ class request { // Kafka clients only issue requests in batches of 5, the queue is fairly small // at all times. class requests { +private: + static constexpr int32_t requests_cached_max = 5; + // chunk size of the request containers to avoid wastage. + static constexpr size_t chunk_size = std::bit_ceil( + static_cast(requests_cached_max)); + + using request_queue = ss::chunked_fifo; + public: result try_emplace( seq_t first, seq_t last, model::term_id current, bool reset_sequences); @@ -118,17 +130,21 @@ class requests { bool operator==(const requests&) const; friend std::ostream& operator<<(std::ostream&, const requests&); + const request_queue& inflight_requests() const { + return _inflight_requests; + } + + const request_queue& finished_requests() const { + return _finished_requests; + } + private: - static constexpr int32_t requests_cached_max = 5; - // chunk size of the request containers to avoid wastage. - static constexpr size_t chunk_size = std::bit_ceil( - static_cast(requests_cached_max)); bool is_valid_sequence(seq_t incoming) const; std::optional last_request() const; void gc_requests_from_older_terms(model::term_id current); void reset(request_result_t::error_type); - ss::chunked_fifo _inflight_requests; - ss::chunked_fifo _finished_requests; + request_queue _inflight_requests; + request_queue _finished_requests; friend producer_state; }; @@ -271,6 +287,8 @@ class producer_state { // progress transactions with older epoch. void reset_with_new_epoch(model::producer_epoch new_epoch); + const requests& idempotent_request_state() const { return _requests; } + private: prefix_logger& _logger; From b9ea8ddd07cebd96702530b085a64ee6475e6b72 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Sat, 21 Dec 2024 22:32:12 -0800 Subject: [PATCH 16/20] tx/observability: add types and plumbing needed to get producer states .. for a given partition, to be hooked up with REST API in the next commit. (cherry picked from commit 6efd325f853ca7388e0041139fa465a9e44dc605) --- src/v/cluster/rm_group_proxy.h | 3 + src/v/cluster/rm_partition_frontend.cc | 56 +++++++++ src/v/cluster/rm_partition_frontend.h | 2 + src/v/cluster/tx_gateway.cc | 6 + src/v/cluster/tx_gateway.h | 3 + src/v/cluster/tx_gateway.json | 5 + src/v/cluster/tx_gateway_frontend.cc | 63 +++++++++++ src/v/cluster/tx_gateway_frontend.h | 5 + src/v/cluster/tx_protocol_types.cc | 50 +++++++++ src/v/cluster/tx_protocol_types.h | 131 ++++++++++++++++++++++ src/v/kafka/server/group_manager.cc | 92 +++++++++++++++ src/v/kafka/server/group_manager.h | 3 + src/v/kafka/server/group_tx_tracker_stm.h | 98 ++++++++-------- src/v/kafka/server/rm_group_frontend.cc | 9 ++ src/v/kafka/server/rm_group_frontend.h | 7 ++ 15 files changed, 485 insertions(+), 48 deletions(-) diff --git a/src/v/cluster/rm_group_proxy.h b/src/v/cluster/rm_group_proxy.h index 87e2374b7d80e..18966060f4e9f 100644 --- a/src/v/cluster/rm_group_proxy.h +++ b/src/v/cluster/rm_group_proxy.h @@ -55,5 +55,8 @@ class rm_group_proxy { virtual ss::future abort_group_tx_locally(abort_group_tx_request) = 0; + + virtual ss::future + get_group_producers_locally(get_producers_request) = 0; }; } // namespace cluster diff --git a/src/v/cluster/rm_partition_frontend.cc b/src/v/cluster/rm_partition_frontend.cc index 2767bd094d7f6..8f624f4e95aeb 100644 --- a/src/v/cluster/rm_partition_frontend.cc +++ b/src/v/cluster/rm_partition_frontend.cc @@ -559,4 +559,60 @@ ss::future rm_partition_frontend::do_abort_tx( }); } +ss::future +rm_partition_frontend::get_producers_locally(get_producers_request request) { + get_producers_reply reply; + auto partition = _partition_manager.local().get(request.ntp); + if (!partition || !partition->is_leader()) { + reply.error_code = tx::errc::not_coordinator; + co_return reply; + } + reply.error_code = tx::errc::none; + auto stm = partition->raft()->stm_manager()->get(); + if (!stm) { + // maybe an internal (non data) partition + co_return reply; + } + const auto& producers = stm->get_producers(); + reply.producer_count = producers.size(); + for (const auto& [pid, state] : producers) { + producer_state_info producer_info; + producer_info.pid = state->id(); + // fill in the idempotent producer state. + const auto& requests = state->idempotent_request_state(); + for (const auto& request : requests.inflight_requests()) { + idempotent_request_info request_info; + request_info.first_sequence = request->first_sequence(); + request_info.last_sequence = request->last_sequence(); + request_info.term = request->term(); + producer_info.inflight_requests.push_back(std::move(request_info)); + } + + for (const auto& request : requests.finished_requests()) { + idempotent_request_info request_info; + request_info.first_sequence = request->first_sequence(); + request_info.last_sequence = request->last_sequence(); + request_info.term = request->term(); + producer_info.finished_requests.push_back(std::move(request_info)); + } + producer_info.last_update = state->last_update_timestamp(); + + // Fill in transactional producer state, if any. + const auto& tx_state = state->transaction_state(); + if (state->has_transaction_in_progress() && tx_state) { + producer_info.tx_begin_offset = tx_state->first; + producer_info.tx_end_offset = tx_state->last; + producer_info.tx_seq = tx_state->sequence; + producer_info.tx_timeout = tx_state->timeout; + producer_info.coordinator_partition + = tx_state->coordinator_partition; + } + reply.producers.push_back(std::move(producer_info)); + if (reply.producers.size() > request.max_producers_to_include) { + break; + } + } + co_return reply; +} + } // namespace cluster diff --git a/src/v/cluster/rm_partition_frontend.h b/src/v/cluster/rm_partition_frontend.h index 428a456ac71c3..0d2ae0898e8bc 100644 --- a/src/v/cluster/rm_partition_frontend.h +++ b/src/v/cluster/rm_partition_frontend.h @@ -49,6 +49,8 @@ class rm_partition_frontend { model::producer_identity, model::tx_seq, model::timeout_clock::duration); + ss::future + get_producers_locally(get_producers_request); ss::future<> stop() { _as.request_abort(); return ss::make_ready_future<>(); diff --git a/src/v/cluster/tx_gateway.cc b/src/v/cluster/tx_gateway.cc index da7adc13febf4..01b8a3b1e2c23 100644 --- a/src/v/cluster/tx_gateway.cc +++ b/src/v/cluster/tx_gateway.cc @@ -119,4 +119,10 @@ ss::future tx_gateway::find_coordinator( co_return co_await _tx_gateway_frontend.local().find_coordinator(r.tid); } +ss::future tx_gateway::get_producers( + get_producers_request request, rpc::streaming_context&) { + co_return co_await _tx_gateway_frontend.local().get_producers( + std::move(request)); +} + } // namespace cluster diff --git a/src/v/cluster/tx_gateway.h b/src/v/cluster/tx_gateway.h index 517fbe53cadc6..c8b65d38ffdad 100644 --- a/src/v/cluster/tx_gateway.h +++ b/src/v/cluster/tx_gateway.h @@ -71,6 +71,9 @@ class tx_gateway final : public tx_gateway_service { ss::future find_coordinator( find_coordinator_request, rpc::streaming_context&) override; + ss::future + get_producers(get_producers_request, rpc::streaming_context&) override; + private: ss::sharded& _tx_gateway_frontend; rm_group_proxy* _rm_group_proxy; diff --git a/src/v/cluster/tx_gateway.json b/src/v/cluster/tx_gateway.json index 0f6c8e5ef49c5..7cfbceb2f515d 100644 --- a/src/v/cluster/tx_gateway.json +++ b/src/v/cluster/tx_gateway.json @@ -64,6 +64,11 @@ "name": "find_coordinator", "input_type": "find_coordinator_request", "output_type": "find_coordinator_reply" + }, + { + "name": "get_producers", + "input_type": "get_producers_request", + "output_type": "get_producers_reply" } ] } \ No newline at end of file diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index f56bf38a8749f..446ed536efb3a 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -2980,4 +2980,67 @@ ss::future tx_gateway_frontend::unsafe_abort_group_transaction( co_return result.ec; } +ss::future +tx_gateway_frontend::get_producers(get_producers_request request) { + auto holder = _gate.hold(); + const auto& ntp = request.ntp; + if (!_metadata_cache.local().contains(ntp)) { + co_return get_producers_reply{ + .error_code = tx::errc::partition_not_exists}; + } + + auto leader_opt = _leaders.local().get_leader(ntp); + if (!leader_opt) { + co_return get_producers_reply{.error_code = tx::errc::leader_not_found}; + } + auto leader = leader_opt.value(); + if (leader == _self) { + auto shard = _shard_table.local().shard_for(ntp); + if (!shard.has_value()) { + co_return get_producers_reply{ + .error_code = tx::errc::shard_not_found}; + } + co_return co_await container().invoke_on( + shard.value(), + _ssg, + [request = std::move(request)](tx_gateway_frontend& local) mutable { + return local.get_producers_locally(std::move(request)); + }); + } + auto timeout = request.timeout; + auto result = co_await _connection_cache.local() + .with_node_client( + _self, + ss::this_shard_id(), + leader, + model::timeout_clock::now() + timeout, + [request = std::move(request), + timeout](tx_gateway_client_protocol cp) mutable { + return cp.get_producers( + std::move(request), + rpc::client_opts( + model::timeout_clock::now() + timeout)); + }); + if (result.has_error()) { + co_return get_producers_reply{.error_code = tx::errc::not_coordinator}; + } + co_return std::move(result.value().data); +} + +ss::future +tx_gateway_frontend::get_producers_locally(get_producers_request request) { + auto& ntp = request.ntp; + bool is_consumer_offsets_ntp = ntp.ns() + == model::kafka_consumer_offsets_nt.ns() + && ntp.tp.topic + == model::kafka_consumer_offsets_nt.tp; + + if (is_consumer_offsets_ntp) { + co_return co_await _rm_group_proxy->get_group_producers_locally( + std::move(request)); + } + co_return co_await _rm_partition_frontend.local().get_producers_locally( + std::move(request)); +} + } // namespace cluster diff --git a/src/v/cluster/tx_gateway_frontend.h b/src/v/cluster/tx_gateway_frontend.h index 0962bd1fd11ca..2ee779b3abbb8 100644 --- a/src/v/cluster/tx_gateway_frontend.h +++ b/src/v/cluster/tx_gateway_frontend.h @@ -86,6 +86,8 @@ class tx_gateway_frontend final model::tx_seq, model::timeout_clock::duration); + ss::future get_producers(get_producers_request); + ss::future<> stop(); private: @@ -293,6 +295,9 @@ class tx_gateway_frontend final model::timeout_clock::duration, bool ignore_update_ts); + ss::future + get_producers_locally(get_producers_request); + friend tx_gateway; }; } // namespace cluster diff --git a/src/v/cluster/tx_protocol_types.cc b/src/v/cluster/tx_protocol_types.cc index d6558453823fb..7821ad92721c3 100644 --- a/src/v/cluster/tx_protocol_types.cc +++ b/src/v/cluster/tx_protocol_types.cc @@ -236,4 +236,54 @@ std::ostream& operator<<(std::ostream& o, const try_abort_reply& r) { r.ec); return o; } + +std::ostream& operator<<(std::ostream& o, const idempotent_request_info& info) { + fmt::print( + o, + "{{ first: {}, last: {}, term: {} }}", + info.first_sequence, + info.last_sequence, + info.term); + return o; +} + +std::ostream& operator<<(std::ostream& o, const producer_state_info& info) { + fmt::print( + o, + "{{ pid: {}, inflight_requests: {}, finished: {}, begin offset: {}, end " + "offset: {}, sequence: {}, timeout: " + "{}, coordinator: {}, last_update: {}, group: {} }}", + info.pid, + info.inflight_requests, + info.finished_requests, + info.tx_begin_offset, + info.tx_end_offset, + info.tx_seq, + info.tx_timeout, + info.coordinator_partition, + info.last_update, + info.group_id); + return o; +} + +std::ostream& operator<<(std::ostream& o, const get_producers_reply& r) { + fmt::print( + o, + "{{ ec: {}, producers: {} , count: {} }}", + r.error_code, + r.producers, + r.producer_count); + return o; +} + +std::ostream& operator<<(std::ostream& o, const get_producers_request& r) { + fmt::print( + o, + "{{ ntp: {} timeout: {}, max_producers_to_include: {} }}", + r.ntp, + r.timeout, + r.max_producers_to_include); + return o; +} + } // namespace cluster diff --git a/src/v/cluster/tx_protocol_types.h b/src/v/cluster/tx_protocol_types.h index 11a8d06d3510e..1b662de2e872d 100644 --- a/src/v/cluster/tx_protocol_types.h +++ b/src/v/cluster/tx_protocol_types.h @@ -12,6 +12,7 @@ #include "cluster/errc.h" #include "cluster/tx_errc.h" +#include "container/fragmented_vector.h" #include "kafka/protocol/types.h" #include "model/fundamental.h" #include "model/record.h" @@ -938,4 +939,134 @@ struct find_coordinator_request auto serde_fields() { return std::tie(tid); } }; + +struct idempotent_request_info + : serde::envelope< + idempotent_request_info, + serde::version<0>, + serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + + int32_t first_sequence; + int32_t last_sequence; + model::term_id term; + + friend bool + operator==(const idempotent_request_info&, const idempotent_request_info&) + = default; + + friend std::ostream& + operator<<(std::ostream& o, const idempotent_request_info&); + + auto serde_fields() { + return std::tie(first_sequence, last_sequence, term); + } +}; + +struct producer_state_info + : serde::envelope< + producer_state_info, + serde::version<0>, + serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + + model::producer_identity pid; + // idempotent request information only filled for data partitions + // that support idempotency. + chunked_vector inflight_requests; + chunked_vector finished_requests; + std::optional last_update; + // following fields only set for transactional producers + // if there is a transaction in progress. + std::optional tx_begin_offset; + std::optional tx_end_offset; + std::optional tx_seq; + std::optional tx_timeout; + std::optional coordinator_partition; + // only set for group transactions + std::optional group_id; + + friend bool + operator==(const producer_state_info&, const producer_state_info&) + = default; + + friend std::ostream& + operator<<(std::ostream& o, const producer_state_info& r); + + auto serde_fields() { + return std::tie( + pid, + inflight_requests, + finished_requests, + last_update, + tx_begin_offset, + tx_end_offset, + tx_seq, + tx_timeout, + coordinator_partition, + group_id); + } +}; + +struct get_producers_reply + : serde::envelope< + get_producers_reply, + serde::version<0>, + serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + + tx::errc error_code{}; + chunked_vector producers; + // Not all producers are sent as a guard rail in extreme cases + // with too many producers, a separate count is sent that + // denotes the actual number while the vector is a subset. + size_t producer_count{0}; + + friend bool + operator==(const get_producers_reply&, const get_producers_reply&) + = default; + + friend std::ostream& + operator<<(std::ostream& o, const get_producers_reply&); + + auto serde_fields() { + return std::tie(error_code, producers, producer_count); + } +}; + +struct get_producers_request + : serde::envelope< + get_producers_request, + serde::version<0>, + serde::compat_version<0>> { + using rpc_adl_exempt = std::true_type; + + get_producers_request() noexcept = default; + + explicit get_producers_request( + model::ntp ntp, + model::timeout_clock::duration timeout, + size_t max_producers_to_include) + : ntp(std::move(ntp)) + , timeout{timeout} + , max_producers_to_include(max_producers_to_include) {} + + model::ntp ntp; + model::timeout_clock::duration timeout; + // Enforce a limit on max producers to respond to guard against + // edge cases. + size_t max_producers_to_include{0}; + + friend bool + operator==(const get_producers_request&, const get_producers_request&) + = default; + + friend std::ostream& + operator<<(std::ostream& o, const get_producers_request&); + + auto serde_fields() { + return std::tie(ntp, timeout, max_producers_to_include); + } +}; + } // namespace cluster diff --git a/src/v/kafka/server/group_manager.cc b/src/v/kafka/server/group_manager.cc index 72848c754b6ba..04b1055a324a1 100644 --- a/src/v/kafka/server/group_manager.cc +++ b/src/v/kafka/server/group_manager.cc @@ -1798,4 +1798,96 @@ error_code group_manager::validate_group_status( return error_code::not_coordinator; } +ss::future +group_manager::get_group_producers_locally( + cluster::get_producers_request request) { + const auto& ntp = request.ntp; + cluster::get_producers_reply reply; + auto it = _partitions.find(ntp); + if (it == _partitions.end() || !it->second->partition->is_leader()) { + reply.error_code = cluster::tx::errc::not_coordinator; + co_return reply; + } + auto attached_partition = *it; + reply.error_code = cluster::tx::errc::none; + // snapshot the list of groups attached to this partition + chunked_hash_map groups; + std::copy_if( + _groups.begin(), + _groups.end(), + std::inserter(groups, groups.end()), + [&ntp](auto g_pair) { + const auto& [group_id, group] = g_pair; + return group->partition()->ntp() == ntp; + }); + reply.producer_count = std::accumulate( + groups.begin(), + groups.end(), + size_t(0), + [](size_t acc, const auto& entry) { + return acc + entry.second->producers().size(); + }); + for (auto& [gid, group] : groups) { + if (reply.producers.size() >= request.max_producers_to_include) { + break; + } + if (group->in_state(group_state::dead)) { + continue; + } + auto partition = group->partition(); + if (!partition) { + // unlikely, conservative check + continue; + } + for (const auto& [id, state] : group->producers()) { + if (reply.producers.size() >= request.max_producers_to_include) { + break; + } + cluster::producer_state_info producer_info; + producer_info.pid = {id, state.epoch}; + producer_info.group_id = group->id()(); + auto& tx = state.transaction; + if (tx) { + producer_info.tx_begin_offset = tx->begin_offset; + producer_info.tx_seq = tx->tx_seq; + producer_info.tx_timeout = tx->timeout; + auto time_since_last_update = model::timeout_clock::now() + - tx->last_update; + auto last_update_ts = model::timestamp_clock::now() + - time_since_last_update; + producer_info.last_update = model::timestamp{ + last_update_ts.time_since_epoch() / 1ms}; + producer_info.coordinator_partition = tx->coordinator_partition; + } + reply.producers.push_back(std::move(producer_info)); + } + } + + // check if there any any additional (stale) groups being tracked by + // the stm, the list should be empty in most cases unless there is + // a divergence in state. + auto partition = attached_partition.second->partition; + auto stm + = partition->raft()->stm_manager()->get(); + if (!stm) { + co_return reply; + } + const auto& stm_txes = stm->inflight_transactions(); + for (const auto& [gid, state] : stm_txes) { + if (groups.contains(gid)) { + continue; + } + // we don't enforce size limits here because this list is expected to be + // small. stale group found, report it to the dbug output. + for (const auto& [pid, state] : state.producer_states) { + reply.producers.push_back({ + .pid = pid, + .tx_begin_offset = state.begin_offset, + .group_id = gid() + "-stale", + }); + } + } + co_return reply; +} + } // namespace kafka diff --git a/src/v/kafka/server/group_manager.h b/src/v/kafka/server/group_manager.h index 64b45779af3ad..6392af9831e23 100644 --- a/src/v/kafka/server/group_manager.h +++ b/src/v/kafka/server/group_manager.h @@ -204,6 +204,9 @@ class group_manager { size_t attached_partitions_count() const { return _partitions.size(); } + ss::future + get_group_producers_locally(cluster::get_producers_request); + public: error_code validate_group_status( const model::ntp& ntp, const group_id& group, api_key api); diff --git a/src/v/kafka/server/group_tx_tracker_stm.h b/src/v/kafka/server/group_tx_tracker_stm.h index 893d160bfba66..a454c7ef13d9f 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.h +++ b/src/v/kafka/server/group_tx_tracker_stm.h @@ -23,6 +23,54 @@ class group_tx_tracker_stm final , public group_data_parser { public: static constexpr std::string_view name = "group_tx_tracker_stm"; + struct producer_tx_state + : serde::envelope< + producer_tx_state, + serde::version<0>, + serde::compat_version<0>> { + model::record_batch_type fence_type; + model::offset begin_offset; + model::timestamp batch_ts; + model::timeout_clock::duration timeout; + + bool expired_deprecated_fence_tx() const; + + auto serde_fields() { + return std::tie(fence_type, begin_offset, batch_ts, timeout); + } + }; + struct per_group_state + : serde::envelope< + per_group_state, + serde::version<1>, + serde::compat_version<0>> { + per_group_state() = default; + + void maybe_add_tx_begin( + const kafka::group_id&, + model::record_batch_type fence_type, + model::producer_identity pid, + model::offset offset, + model::timestamp begin_ts, + model::timeout_clock::duration tx_timeout); + + absl::btree_set begin_offsets; + + // deprecated + absl::btree_map + producer_to_begin_deprecated; + + absl::btree_map + producer_states; + + void gc_expired_tx_fence_transactions(); + + auto serde_fields() { + return std::tie( + begin_offsets, producer_to_begin_deprecated, producer_states); + } + }; + using all_txs_t = absl::btree_map; group_tx_tracker_stm( ss::logger&, raft::consensus*, ss::sharded&); @@ -72,55 +120,9 @@ class group_tx_tracker_stm final model::record_batch_header, kafka::group_tx::commit_metadata); ss::future<> handle_version_fence(features::feature_table::version_fence); -private: - struct producer_tx_state - : serde::envelope< - producer_tx_state, - serde::version<0>, - serde::compat_version<0>> { - model::record_batch_type fence_type; - model::offset begin_offset; - model::timestamp batch_ts; - model::timeout_clock::duration timeout; - - bool expired_deprecated_fence_tx() const; - - auto serde_fields() { - return std::tie(fence_type, begin_offset, batch_ts, timeout); - } - }; - struct per_group_state - : serde::envelope< - per_group_state, - serde::version<1>, - serde::compat_version<0>> { - per_group_state() = default; - - void maybe_add_tx_begin( - const kafka::group_id&, - model::record_batch_type fence_type, - model::producer_identity pid, - model::offset offset, - model::timestamp begin_ts, - model::timeout_clock::duration tx_timeout); - - absl::btree_set begin_offsets; - - // deprecated - absl::btree_map - producer_to_begin_deprecated; + const all_txs_t& inflight_transactions() const { return _all_txs; } - absl::btree_map - producer_states; - - void gc_expired_tx_fence_transactions(); - - auto serde_fields() { - return std::tie( - begin_offsets, producer_to_begin_deprecated, producer_states); - } - }; - using all_txs_t = absl::btree_map; +private: struct snapshot : serde::envelope, serde::compat_version<0>> { all_txs_t transactions; diff --git a/src/v/kafka/server/rm_group_frontend.cc b/src/v/kafka/server/rm_group_frontend.cc index 5668628406c64..34d8134428cfc 100644 --- a/src/v/kafka/server/rm_group_frontend.cc +++ b/src/v/kafka/server/rm_group_frontend.cc @@ -497,4 +497,13 @@ rm_group_frontend::abort_group_tx_locally(cluster::abort_group_tx_request req) { co_return reply; } +ss::future +rm_group_frontend::get_group_producers_locally( + cluster::get_producers_request request) { + return _group_router.local() + .get_group_manager() + .local() + .get_group_producers_locally(std::move(request)); +} + } // namespace kafka diff --git a/src/v/kafka/server/rm_group_frontend.h b/src/v/kafka/server/rm_group_frontend.h index ffa0656ff0cd3..891c4a70558d5 100644 --- a/src/v/kafka/server/rm_group_frontend.h +++ b/src/v/kafka/server/rm_group_frontend.h @@ -59,6 +59,8 @@ class rm_group_frontend { model::timeout_clock::duration); ss::future abort_group_tx_locally(cluster::abort_group_tx_request); + ss::future + get_group_producers_locally(cluster::get_producers_request); private: ss::sharded& _metadata_cache; @@ -138,6 +140,11 @@ class rm_group_proxy_impl final : public cluster::rm_group_proxy { return _target.local().abort_group_tx_locally(std::move(req)); } + ss::future get_group_producers_locally( + cluster::get_producers_request request) override { + return _target.local().get_group_producers_locally(std::move(request)); + } + private: ss::sharded& _target; }; From b45c73d682dd2e2dc484b9a97d932e8dd3d03d23 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Sun, 22 Dec 2024 12:11:55 -0800 Subject: [PATCH 17/20] tx/admin: types for exposing producer info in REST api (cherry picked from commit 23c8e295a76880b333ed8e63a3dc75076480720d) --- src/v/redpanda/admin/api-doc/debug.json | 135 ++++++++++++++++++++++++ 1 file changed, 135 insertions(+) diff --git a/src/v/redpanda/admin/api-doc/debug.json b/src/v/redpanda/admin/api-doc/debug.json index d548f16647148..4421437848c6f 100644 --- a/src/v/redpanda/admin/api-doc/debug.json +++ b/src/v/redpanda/admin/api-doc/debug.json @@ -380,6 +380,46 @@ } ] }, + { + "path": "/v1/debug/producers/{namespace}/{topic}/{partition}", + "operations": [ + { + "method": "GET", + "summary": "Get low level debug information about producers producing to the input partition, queried from the leader of the partition.", + "type": "partition_producers", + "nickname": "get_partition_producers", + "produces": [ + "application/json" + ], + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "topic", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "partition", + "in": "path", + "required": true, + "type": "integer" + }, + { + "name": "limit", + "in": "query", + "required": false, + "type": "long" + } + ] + } + ] + }, { "path": "/v1/debug/set_storage_failure_injection_enabled", "operations": [ @@ -1171,6 +1211,101 @@ } } }, + "idempotent_producer_request_state": { + "id": "idempotent_producer_request_state", + "description": "Debug state of an idempotent producer request to a partition.", + "properties": { + "first_sequence": { + "type": "int", + "description": "Kafka first sequence of the request" + }, + "last_sequence": { + "type": "int", + "description": "Kafka last sequence of the request" + }, + "term": { + "type": "long", + "description": "Raft term of the idempotent request" + } + } + }, + "partition_producer_state": { + "id": "partition_producer_state", + "description": "Debug state of a single producer producing to a partition ", + "properties": { + "id" : { + "type": "long", + "description": "ID of the producer, assigned by Redpanda." + }, + "epoch": { + "type": "int", + "description": "Epoch of the producer, assigned by Redpanda." + }, + "inflight_idempotent_requests": { + "type": "array", + "items": { + "type": "idempotent_producer_request_state" + }, + "description": "List of requests currently in flight." + }, + "finished_idempotent_requests": { + "type": "array", + "items": { + "type": "idempotent_producer_request_state" + }, + "description": "List of finished requests." + }, + "last_update_timestamp": { + "type": "long", + "description": "Last update timestamp for this producer." + }, + "transaction_begin_offset": { + "type": "long", + "description": "First offset of the transaction." + }, + "transaction_last_offset": { + "type": "long", + "description": "Last offset of the transaction." + }, + "transaction_sequence": { + "type": "int", + "description": "Sequence number of the transaction." + }, + "transaction_timeout_ms": { + "type": "long", + "description": "Transaction timeout in ms." + }, + "transaction_coordinator_partition": { + "type": "int", + "description": "Transaction coordinator partition coordinating the transaction." + }, + "transaction_group_id": { + "type": "string", + "description": "Group id of transaction, only relevant for group partitions." + } + } + }, + "partition_producers": { + "id": "partition_producers", + "description": "Debug information about producers producing to a partition.", + "properties": { + "ntp": { + "type": "string", + "description": "Partition that is queried" + }, + "total_producer_count": { + "type": "long", + "description": "Total number of producers as tracked by the partition." + }, + "producers": { + "type": "array", + "items": { + "type": "partition_producer_state" + }, + "description": "Producers to this partition, a limit is enforced to avoid fetching too many producers. total_producer_count denotes actual number of producers tracked by the partition." + } + } + }, "local_storage_usage": { "id": "local_storage_usage", "description": "Local storage disk usage", From 0d3d482834794194caa7787908d22b69dab6facf Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Sun, 22 Dec 2024 12:50:41 -0800 Subject: [PATCH 18/20] tx/observability: REST endpoint to fetch all producers from a partition /v1/debug/producers/{namespace}/{topic}/{partition} .. includes low level debug information about producers for idempotency/transactional state. (cherry picked from commit 70e36ebe577e8bd84aa23869ecbf55c49858ed14) --- src/v/redpanda/admin/debug.cc | 93 +++++++++++++++++++ src/v/redpanda/admin/server.h | 2 + tests/rptest/services/admin.py | 4 + .../rptest/transactions/producers_api_test.py | 70 ++++++++++++++ 4 files changed, 169 insertions(+) create mode 100644 tests/rptest/transactions/producers_api_test.py diff --git a/src/v/redpanda/admin/debug.cc b/src/v/redpanda/admin/debug.cc index 5a33c9d87acea..53ddf2c895f08 100644 --- a/src/v/redpanda/admin/debug.cc +++ b/src/v/redpanda/admin/debug.cc @@ -417,6 +417,13 @@ void admin_server::register_debug_routes() { return get_partition_state_handler(std::move(req)); }); + register_route( + seastar::httpd::debug_json::get_partition_producers, + [this](std::unique_ptr req) + -> ss::future { + return get_producers_state_handler(std::move(req)); + }); + register_route( ss::httpd::debug_json::cpu_profile, [this](std::unique_ptr req) @@ -809,6 +816,92 @@ admin_server::get_partition_state_handler( co_return ss::json::json_return_type(std::move(response)); } +ss::future +admin_server::get_producers_state_handler( + std::unique_ptr req) { + if (!_tx_gateway_frontend.local_is_initialized()) { + throw ss::httpd::server_error_exception( + "Server is not yet ready to process requests, retry later."); + } + const model::ntp ntp = parse_ntp_from_request(req->param); + auto timeout = std::chrono::duration_cast( + 10s); + size_t limit = 1000; + if (auto e = req->get_query_param("limit"); !e.empty()) { + try { + limit = boost::lexical_cast(e); + } catch (const boost::bad_lexical_cast&) { + // ignore + } + } + auto result = co_await _tx_gateway_frontend.local().get_producers( + cluster::get_producers_request{ntp, timeout, limit}); + if (result.error_code != cluster::tx::errc::none) { + throw ss::httpd::server_error_exception(fmt::format( + "Error {} getting producers for ntp: {}", result.error_code, ntp)); + } + vlog( + adminlog.debug, + "producers for {}, size: {}", + ntp, + result.producers.size()); + ss::httpd::debug_json::partition_producers producers; + producers.ntp = fmt::format("{}", ntp); + producers.total_producer_count = result.producer_count; + for (auto& producer : result.producers) { + ss::httpd::debug_json::partition_producer_state producer_state; + producer_state.id = producer.pid.id(); + producer_state.epoch = producer.pid.epoch(); + for (const auto& req : producer.inflight_requests) { + ss::httpd::debug_json::idempotent_producer_request_state inflight; + inflight.first_sequence = req.first_sequence; + inflight.last_sequence = req.last_sequence; + inflight.term = req.term(); + producer_state.inflight_idempotent_requests.push( + std::move(inflight)); + } + for (const auto& req : producer.finished_requests) { + ss::httpd::debug_json::idempotent_producer_request_state finished; + finished.first_sequence = req.first_sequence; + finished.last_sequence = req.last_sequence; + finished.term = req.term(); + producer_state.finished_idempotent_requests.push( + std::move(finished)); + } + if (producer.last_update) { + producer_state.last_update_timestamp + = producer.last_update.value()(); + } + if (producer.tx_begin_offset) { + producer_state.transaction_begin_offset + = producer.tx_begin_offset.value(); + } + if (producer.tx_end_offset) { + producer_state.transaction_last_offset + = producer.tx_end_offset.value(); + } + if (producer.tx_seq) { + producer_state.transaction_sequence = producer.tx_seq.value(); + } + if (producer.tx_timeout) { + producer_state.transaction_timeout_ms + = std::chrono::duration_cast( + producer.tx_timeout.value()) + .count(); + } + if (producer.coordinator_partition) { + producer_state.transaction_coordinator_partition + = producer.coordinator_partition.value()(); + } + if (producer.group_id) { + producer_state.transaction_group_id = producer.group_id.value(); + } + producers.producers.push(std::move(producer_state)); + co_await ss::coroutine::maybe_yield(); + } + co_return ss::json::json_return_type(std::move(producers)); +} + ss::future admin_server::get_node_uuid_handler() { ss::httpd::debug_json::broker_uuid uuid; uuid.node_uuid = ssx::sformat( diff --git a/src/v/redpanda/admin/server.h b/src/v/redpanda/admin/server.h index 24ed095c8b90f..0cede052840d9 100644 --- a/src/v/redpanda/admin/server.h +++ b/src/v/redpanda/admin/server.h @@ -614,6 +614,8 @@ class admin_server { ss::future get_partition_state_handler(std::unique_ptr); + ss::future + get_producers_state_handler(std::unique_ptr); ss::future get_local_storage_usage_handler(std::unique_ptr); ss::future diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index f42b21dd5b23d..1f15b48682c8b 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -1478,6 +1478,10 @@ def get_partition_state(self, namespace, topic, partition, node=None): path = f"debug/partition/{namespace}/{topic}/{partition}" return self._request("GET", path, node=node).json() + def get_producers_state(self, namespace, topic, partition, node=None): + path = f"debug/producers/{namespace}/{topic}/{partition}" + return self._request("GET", path, node=node).json() + def get_local_storage_usage(self, node=None): """ Get the local storage usage report. diff --git a/tests/rptest/transactions/producers_api_test.py b/tests/rptest/transactions/producers_api_test.py new file mode 100644 index 0000000000000..240741993a0e8 --- /dev/null +++ b/tests/rptest/transactions/producers_api_test.py @@ -0,0 +1,70 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +import threading +from time import sleep +from rptest.transactions.verifiers.consumer_offsets_verifier import ConsumerOffsetsVerifier +from rptest.services.admin import Admin +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.cluster import cluster + + +class ProducersAdminAPITest(RedpandaTest): + def __init__(self, test_context): + super(ProducersAdminAPITest, + self).__init__(test_context=test_context, + num_brokers=3, + extra_rp_conf={ + "group_topic_partitions": 1, + "group_new_member_join_timeout": 3000, + "enable_leader_balancer": False + }) + self._stop_scraping = threading.Event() + + def get_producer_state(self, topic: str): + admin = self.redpanda._admin + return admin.get_producers_state(namespace="kafka", + topic=topic, + partition=0) + + @cluster(num_nodes=3) + def test_producers_state_api_during_load(self): + verifier = ConsumerOffsetsVerifier(self.redpanda, self._client) + self.redpanda._admin.await_stable_leader(topic="__consumer_offsets") + self.redpanda._admin.await_stable_leader(topic=verifier._topic) + + # Run a scraper in background as the verifier is running to ensure it doesn't + # interact with the workloads incorrectly + def scraper(): + while not self._stop_scraping.isSet(): + self.get_producer_state(verifier._topic) + self.get_producer_state("__consumer_offsets") + sleep(1) + + bg_scraper = threading.Thread(target=scraper, daemon=True) + bg_scraper.start() + verifier.verify() + self._stop_scraping.set() + bg_scraper.join() + + # Basic sanity checks + co_producers = self.get_producer_state("__consumer_offsets") + assert len( + co_producers["producers"] + ) == 10, "Not all producers states found in consumer_offsets partition" + expected_groups = set([f"group-{i}" for i in range(10)]) + state_groups = set([ + producer["transaction_group_id"] + for producer in co_producers["producers"] + ]) + assert expected_groups == state_groups, f"Not all groups reported. expected: {expected_groups}, repoted: {state_groups}" + + topic_producers = self.get_producer_state(verifier._topic) + assert len( + topic_producers["producers"] + ) == 10, "Not all producers states found in data topic partition" From bfa360356e4ad9d82e5b573765954f385dc67482 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 23 Dec 2024 09:32:13 -0800 Subject: [PATCH 19/20] raft/persisted_stm: add ability for stms to reject local snapshot .. in this case the state machine proceeds on to applying from the log. (cherry picked from commit c833f5028652c4cb3144c73dd00cf38b7a77be36) --- src/v/archival/archival_metadata_stm.cc | 5 +- src/v/archival/archival_metadata_stm.h | 2 +- src/v/cluster/distributed_kv_stm.h | 3 +- src/v/cluster/id_allocator_stm.cc | 4 +- src/v/cluster/id_allocator_stm.h | 2 +- src/v/cluster/log_eviction_stm.cc | 4 +- src/v/cluster/log_eviction_stm.h | 2 +- src/v/cluster/rm_stm.cc | 5 +- src/v/cluster/rm_stm.h | 2 +- src/v/cluster/tm_stm.cc | 4 +- src/v/cluster/tm_stm.h | 2 +- src/v/kafka/server/group_tx_tracker_stm.cc | 4 +- src/v/kafka/server/group_tx_tracker_stm.h | 2 +- src/v/raft/persisted_stm.cc | 23 +++++---- src/v/raft/persisted_stm.h | 10 +++- src/v/raft/tests/persisted_stm_test.cc | 54 ++++++++++++++++++---- 16 files changed, 90 insertions(+), 38 deletions(-) diff --git a/src/v/archival/archival_metadata_stm.cc b/src/v/archival/archival_metadata_stm.cc index 3abc1b9dff645..ca7ffda5fdd8b 100644 --- a/src/v/archival/archival_metadata_stm.cc +++ b/src/v/archival/archival_metadata_stm.cc @@ -1219,7 +1219,8 @@ ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) { get_last_offset()); } -ss::future<> archival_metadata_stm::apply_local_snapshot( +ss::future +archival_metadata_stm::apply_local_snapshot( raft::stm_snapshot_header header, iobuf&& data) { auto snap = serde::from_iobuf(std::move(data)); @@ -1287,7 +1288,7 @@ ss::future<> archival_metadata_stm::apply_local_snapshot( } else { _last_clean_at = header.offset; } - co_return; + co_return raft::local_snapshot_applied::yes; } ss::future diff --git a/src/v/archival/archival_metadata_stm.h b/src/v/archival/archival_metadata_stm.h index eec969a4b3232..e2e2e76f49bf9 100644 --- a/src/v/archival/archival_metadata_stm.h +++ b/src/v/archival/archival_metadata_stm.h @@ -305,7 +305,7 @@ class archival_metadata_stm final : public raft::persisted_stm<> { ss::future<> do_apply(const model::record_batch& batch) override; ss::future<> apply_raft_snapshot(const iobuf&) override; - ss::future<> + ss::future apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override; ss::future take_local_snapshot(ssx::semaphore_units apply_units) override; diff --git a/src/v/cluster/distributed_kv_stm.h b/src/v/cluster/distributed_kv_stm.h index 8a0febc041ef1..2099f2a21e4be 100644 --- a/src/v/cluster/distributed_kv_stm.h +++ b/src/v/cluster/distributed_kv_stm.h @@ -130,7 +130,7 @@ class distributed_kv_stm final : public raft::persisted_stm<> { }); } - ss::future<> + ss::future apply_local_snapshot(raft::stm_snapshot_header, iobuf&& bytes) override { auto holder = _gate.hold(); iobuf_parser parser(std::move(bytes)); @@ -144,6 +144,7 @@ class distributed_kv_stm final : public raft::persisted_stm<> { } } _kvs = std::move(snap.kv_data); + co_return raft::local_snapshot_applied::yes; } ss::future diff --git a/src/v/cluster/id_allocator_stm.cc b/src/v/cluster/id_allocator_stm.cc index d57f8e85c3185..47bd9e6f4cb46 100644 --- a/src/v/cluster/id_allocator_stm.cc +++ b/src/v/cluster/id_allocator_stm.cc @@ -223,9 +223,9 @@ ss::future<> id_allocator_stm::write_snapshot() { .finally([this] { _is_writing_snapshot = false; }); } -ss::future<> +ss::future id_allocator_stm::apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) { - return ss::make_exception_future<>( + return ss::make_exception_future( std::logic_error("id_allocator_stm doesn't support snapshots")); } diff --git a/src/v/cluster/id_allocator_stm.h b/src/v/cluster/id_allocator_stm.h index 61059e48902ad..e7463ec6aba90 100644 --- a/src/v/cluster/id_allocator_stm.h +++ b/src/v/cluster/id_allocator_stm.h @@ -106,7 +106,7 @@ class id_allocator_stm final : public raft::persisted_stm<> { advance_state(int64_t, model::timeout_clock::duration); ss::future<> write_snapshot(); - ss::future<> + ss::future apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override; ss::future take_local_snapshot(ssx::semaphore_units apply_units) override; diff --git a/src/v/cluster/log_eviction_stm.cc b/src/v/cluster/log_eviction_stm.cc index fe979a6747a5b..b6f9d79d00584 100644 --- a/src/v/cluster/log_eviction_stm.cc +++ b/src/v/cluster/log_eviction_stm.cc @@ -440,14 +440,14 @@ ss::future<> log_eviction_stm::apply_raft_snapshot(const iobuf&) { co_return; } -ss::future<> log_eviction_stm::apply_local_snapshot( +ss::future log_eviction_stm::apply_local_snapshot( raft::stm_snapshot_header header, iobuf&& data) { auto snapshot = serde::from_iobuf(std::move(data)); vlog( _log.info, "Applying snapshot {} at offset: {}", snapshot, header.offset); _delete_records_eviction_offset = snapshot.effective_start_offset; - return ss::now(); + co_return raft::local_snapshot_applied::yes; } ss::future diff --git a/src/v/cluster/log_eviction_stm.h b/src/v/cluster/log_eviction_stm.h index ae4515fb7e805..ca694b0748bc2 100644 --- a/src/v/cluster/log_eviction_stm.h +++ b/src/v/cluster/log_eviction_stm.h @@ -109,7 +109,7 @@ class log_eviction_stm ss::future take_snapshot(model::offset) final { co_return iobuf{}; } protected: - ss::future<> + ss::future apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override; ss::future diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 8b4069fc5f411..0d9d8f42344db 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1666,7 +1666,7 @@ model::offset rm_stm::to_log_offset(kafka::offset k_offset) const { return model::offset(k_offset); } -ss::future<> +ss::future rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) { auto units = co_await _state_lock.hold_write_lock(); @@ -1689,7 +1689,7 @@ rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) { data = co_await serde::read_async(data_parser); } else { vlog(_ctx_log.error, "Ignored snapshot version {}", hdr.version); - co_return; + co_return raft::local_snapshot_applied::no; } _highest_producer_id = std::max( @@ -1774,6 +1774,7 @@ rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) { snapshot_opt.value()); } } + co_return raft::local_snapshot_applied::yes; } uint8_t rm_stm::active_snapshot_version() { diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 055cbbba40763..3417caa796d65 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -260,7 +260,7 @@ class rm_stm final : public raft::persisted_stm<> { tx::producer_ptr, std::optional, model::timeout_clock::duration); - ss::future<> + ss::future apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override; ss::future take_local_snapshot(ssx::semaphore_units apply_units) override; diff --git a/src/v/cluster/tm_stm.cc b/src/v/cluster/tm_stm.cc index cad14a68e2fdc..284943c0315d4 100644 --- a/src/v/cluster/tm_stm.cc +++ b/src/v/cluster/tm_stm.cc @@ -599,7 +599,7 @@ fragmented_vector tm_stm::get_transactions_list() const { return ret; } -ss::future<> +ss::future tm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tm_ss_buf) { vassert( hdr.version >= tm_snapshot_v0::version @@ -624,7 +624,7 @@ tm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tm_ss_buf) { vlog(_ctx_log.trace, "Applied snapshot at offset: {}", hdr.offset); } - return ss::now(); + co_return raft::local_snapshot_applied::yes; } ss::future diff --git a/src/v/cluster/tm_stm.h b/src/v/cluster/tm_stm.h index c9812a768a08f..fc956ee39577a 100644 --- a/src/v/cluster/tm_stm.h +++ b/src/v/cluster/tm_stm.h @@ -359,7 +359,7 @@ class tm_stm final : public raft::persisted_stm<> { private: std::optional find_tx(const kafka::transactional_id&); - ss::future<> + ss::future apply_local_snapshot(raft::stm_snapshot_header, iobuf&&) override; ss::future take_local_snapshot(ssx::semaphore_units apply_units) override; diff --git a/src/v/kafka/server/group_tx_tracker_stm.cc b/src/v/kafka/server/group_tx_tracker_stm.cc index 31147f1e07dcf..715f6ce17f5f9 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.cc +++ b/src/v/kafka/server/group_tx_tracker_stm.cc @@ -117,12 +117,14 @@ model::offset group_tx_tracker_stm::max_collectible_offset() { return result; } -ss::future<> group_tx_tracker_stm::apply_local_snapshot( +ss::future +group_tx_tracker_stm::apply_local_snapshot( raft::stm_snapshot_header, iobuf&& snap_buf) { auto holder = _gate.hold(); iobuf_parser parser(std::move(snap_buf)); auto snap = co_await serde::read_async(parser); _all_txs = std::move(snap.transactions); + co_return raft::local_snapshot_applied::yes; } ss::future diff --git a/src/v/kafka/server/group_tx_tracker_stm.h b/src/v/kafka/server/group_tx_tracker_stm.h index a454c7ef13d9f..5243ec36bb515 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.h +++ b/src/v/kafka/server/group_tx_tracker_stm.h @@ -96,7 +96,7 @@ class group_tx_tracker_stm final model::offset max_collectible_offset() override; - ss::future<> + ss::future apply_local_snapshot(raft::stm_snapshot_header, iobuf&& bytes) override; ss::future diff --git a/src/v/raft/persisted_stm.cc b/src/v/raft/persisted_stm.cc index 2e6d599029206..f71a2ad3f77fd 100644 --- a/src/v/raft/persisted_stm.cc +++ b/src/v/raft/persisted_stm.cc @@ -611,17 +611,24 @@ ss::future<> persisted_stm::start() { if (maybe_snapshot) { stm_snapshot& snapshot = *maybe_snapshot; - auto next_offset = model::next_offset(snapshot.header.offset); if (next_offset >= _raft->start_offset()) { - vlog( - _log.debug, - "start with applied snapshot, set_next {}", - next_offset); - co_await apply_local_snapshot( + auto snapshot_applied = co_await apply_local_snapshot( snapshot.header, std::move(snapshot.data)); - set_next(next_offset); - _last_snapshot_offset = snapshot.header.offset; + if (snapshot_applied == local_snapshot_applied::yes) { + vlog( + _log.debug, + "start with applied snapshot, set_next {}", + next_offset); + set_next(next_offset); + _last_snapshot_offset = snapshot.header.offset; + } else { + vlog( + _log.warn, + "local snapshot rejected by {}, will be recovered from the " + "log", + name()); + } } else { // This can happen on an out-of-date replica that re-joins the group // after other replicas have already evicted logs to some offset diff --git a/src/v/raft/persisted_stm.h b/src/v/raft/persisted_stm.h index 241982e9a9eca..87ab1fc431d84 100644 --- a/src/v/raft/persisted_stm.h +++ b/src/v/raft/persisted_stm.h @@ -30,6 +30,9 @@ namespace raft { static constexpr const int8_t stm_snapshot_version_v0 = 0; static constexpr const int8_t stm_snapshot_version = 1; +using local_snapshot_applied + = ss::bool_class; + struct stm_snapshot_header { int8_t version{0}; int32_t snapshot_size{0}; @@ -228,9 +231,12 @@ class persisted_stm virtual ss::future<> do_apply(const model::record_batch& b) = 0; /** - * Called when local snapshot is applied to the state machine + * Called when local snapshot is applied to the state machine. The state + * machine may choose to reject the snapshot if it wishes to in which case + * it will be hydrated from the log. */ - virtual ss::future<> apply_local_snapshot(stm_snapshot_header, iobuf&&) = 0; + virtual ss::future + apply_local_snapshot(stm_snapshot_header, iobuf&&) = 0; /** * Called when a local snapshot is taken. Apply fiber is stalled until diff --git a/src/v/raft/tests/persisted_stm_test.cc b/src/v/raft/tests/persisted_stm_test.cc index a2249ebc6291e..f921763da83f8 100644 --- a/src/v/raft/tests/persisted_stm_test.cc +++ b/src/v/raft/tests/persisted_stm_test.cc @@ -124,9 +124,11 @@ struct kv_state class persisted_kv : public persisted_stm<> { public: static constexpr std::string_view name = "persited_kv_stm"; - explicit persisted_kv(raft_node_instance& rn) + explicit persisted_kv( + raft_node_instance& rn, bool reject_local_snapshots = false) : persisted_stm<>("simple-kv", logger, rn.raft().get()) - , raft_node(rn) {} + , raft_node(rn) + , reject_local_snapshots(reject_local_snapshots) {} ss::future<> start() override { return persisted_stm<>::start(); } ss::future<> stop() override { return persisted_stm<>::stop(); } @@ -157,10 +159,13 @@ class persisted_kv : public persisted_stm<> { /** * Called when local snapshot is applied to the state machine */ - ss::future<> - apply_local_snapshot(stm_snapshot_header header, iobuf&& buffer) final { + ss::future + apply_local_snapshot(stm_snapshot_header, iobuf&& buffer) final { + if (reject_local_snapshots) { + co_return raft::local_snapshot_applied::no; + } state = serde::from_iobuf(std::move(buffer)); - co_return; + co_return raft::local_snapshot_applied::yes; }; /** @@ -274,6 +279,7 @@ class persisted_kv : public persisted_stm<> { kv_state state; kv_operation last_operation; raft_node_instance& raft_node; + bool reject_local_snapshots = false; }; class other_persisted_kv : public persisted_kv { @@ -392,7 +398,8 @@ struct persisted_stm_test_fixture : state_machine_fixture { return ops; } - ss::future<> restart_cluster() { + ss::future<> + restart_cluster(bool reject_local_snapshots_after_restart = false) { absl::flat_hash_map data_directories; for (auto& [id, node] : nodes()) { data_directories[id] @@ -407,7 +414,8 @@ struct persisted_stm_test_fixture : state_machine_fixture { for (auto& [_, node] : nodes()) { co_await node->initialise(all_vnodes()); raft::state_machine_manager_builder builder; - auto stm = builder.create_stm(*node); + auto stm = builder.create_stm( + *node, reject_local_snapshots_after_restart); co_await node->start(std::move(builder)); node_stms.emplace(node->get_vnode(), std::move(stm)); } @@ -475,10 +483,10 @@ class slow_persisted_stm : public persisted_stm<> { co_return iobuf{}; } - ss::future<> - apply_local_snapshot(stm_snapshot_header header, iobuf&& buffer) override { + ss::future + apply_local_snapshot(stm_snapshot_header, iobuf&& buffer) override { _last_stm_applied = serde::from_iobuf(std::move(buffer)); - co_return; + co_return raft::local_snapshot_applied::yes; } ss::future<> validate_applied_offsets(model::offset before) { @@ -598,6 +606,32 @@ TEST_F_CORO(persisted_stm_test_fixture, test_local_snapshot) { } } +TEST_F_CORO(persisted_stm_test_fixture, test_skipping_local_snapshot_on_start) { + co_await initialize_state_machines(); + kv_state expected; + auto ops = random_operations(2000); + for (auto batch : ops) { + co_await apply_operations(expected, std::move(batch)); + } + co_await wait_for_apply(); + for (const auto& [_, stm] : node_stms) { + ASSERT_EQ_CORO(stm->state, expected); + } + // take local snapshot on every node + co_await take_local_snapshot_on_every_node(); + + auto committed = node(model::node_id(0)).raft()->committed_offset(); + // Reject snapshot loading + co_await restart_cluster(true); + co_await wait_for_committed_offset(committed, 30s); + co_await wait_for_apply(); + + // Ensure everything loaded from the log tallies + for (const auto& [_, stm] : node_stms) { + ASSERT_EQ_CORO(stm->state, expected); + } +} + TEST_F_CORO(persisted_stm_test_fixture, test_raft_and_local_snapshot) { co_await initialize_state_machines(); kv_state expected; From 8a681c1f7d54d6bc7e7f628cb23d6acc9c387754 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 23 Dec 2024 10:34:04 -0800 Subject: [PATCH 20/20] tx/group_tx_stm: invalidate existing snapshots for the stm Bumps the supported snapshot version so the existing snapshots are invalidated as they may contain stale max_collectible_offset. This forces the stm to reconstruct the state form the log and recompute correct max_collectible_offset. (cherry picked from commit 0051463f23316562812a88e77efb1598bfe87463) --- src/v/kafka/server/group_tx_tracker_stm.cc | 10 +++++++--- src/v/kafka/server/group_tx_tracker_stm.h | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/v/kafka/server/group_tx_tracker_stm.cc b/src/v/kafka/server/group_tx_tracker_stm.cc index 715f6ce17f5f9..0eaff6008e68b 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.cc +++ b/src/v/kafka/server/group_tx_tracker_stm.cc @@ -119,8 +119,12 @@ model::offset group_tx_tracker_stm::max_collectible_offset() { ss::future group_tx_tracker_stm::apply_local_snapshot( - raft::stm_snapshot_header, iobuf&& snap_buf) { + raft::stm_snapshot_header header, iobuf&& snap_buf) { auto holder = _gate.hold(); + if (header.version != supported_local_snapshot_version) { + // fall back to applying from the log + co_return raft::local_snapshot_applied::no; + } iobuf_parser parser(std::move(snap_buf)); auto snap = co_await serde::read_async(parser); _all_txs = std::move(snap.transactions); @@ -137,8 +141,8 @@ group_tx_tracker_stm::take_local_snapshot(ssx::semaphore_units apply_units) { iobuf snap_buf; apply_units.return_all(); co_await serde::write_async(snap_buf, snap); - // snapshot versioning handled via serde. - co_return raft::stm_snapshot::create(0, offset, std::move(snap_buf)); + co_return raft::stm_snapshot::create( + supported_local_snapshot_version, offset, std::move(snap_buf)); } ss::future<> group_tx_tracker_stm::apply_raft_snapshot(const iobuf&) { diff --git a/src/v/kafka/server/group_tx_tracker_stm.h b/src/v/kafka/server/group_tx_tracker_stm.h index 5243ec36bb515..3fd596f2d3f19 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.h +++ b/src/v/kafka/server/group_tx_tracker_stm.h @@ -123,6 +123,7 @@ class group_tx_tracker_stm final const all_txs_t& inflight_transactions() const { return _all_txs; } private: + static constexpr int8_t supported_local_snapshot_version = 1; struct snapshot : serde::envelope, serde::compat_version<0>> { all_txs_t transactions;