Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for specify input kafka offset for each partitions #1242

Merged
merged 6 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 73 additions & 10 deletions input_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package goreplay
import (
"encoding/json"
"log"
"strconv"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
Expand All @@ -12,14 +14,24 @@ import (
// KafkaInput is used for receiving Kafka messages and
// transforming them into HTTP payloads.
type KafkaInput struct {
config *InputKafkaConfig
consumers []sarama.PartitionConsumer
messages chan *sarama.ConsumerMessage
quit chan struct{}
config *InputKafkaConfig
consumers []sarama.PartitionConsumer
messages chan *sarama.ConsumerMessage
speedFactor float64
quit chan struct{}
kafkaTimer *kafkaTimer
}

func getOffsetOfPartitions(offsetCfg string) int64 {
offset, err := strconv.ParseInt(offsetCfg, 10, 64)
if err != nil || offset < -2 {
log.Fatalln("Failed to parse offset: "+offsetCfg, err)
}
return offset
}

// NewKafkaInput creates instance of kafka consumer client with TLS config
func NewKafkaInput(_ string, config *InputKafkaConfig, tlsConfig *KafkaTLSConfig) *KafkaInput {
func NewKafkaInput(offsetCfg string, config *InputKafkaConfig, tlsConfig *KafkaTLSConfig) *KafkaInput {
c := NewKafkaConfig(&config.SASLConfig, tlsConfig)

var con sarama.Consumer
Expand All @@ -41,14 +53,17 @@ func NewKafkaInput(_ string, config *InputKafkaConfig, tlsConfig *KafkaTLSConfig
}

i := &KafkaInput{
config: config,
consumers: make([]sarama.PartitionConsumer, len(partitions)),
messages: make(chan *sarama.ConsumerMessage, 256),
quit: make(chan struct{}),
config: config,
consumers: make([]sarama.PartitionConsumer, len(partitions)),
messages: make(chan *sarama.ConsumerMessage, 256),
speedFactor: 1,
quit: make(chan struct{}),
kafkaTimer: new(kafkaTimer),
}
i.config.Offset = offsetCfg

for index, partition := range partitions {
consumer, err := con.ConsumePartition(config.Topic, partition, sarama.OffsetNewest)
consumer, err := con.ConsumePartition(config.Topic, partition, getOffsetOfPartitions(offsetCfg))
if err != nil {
log.Fatalln("Failed to start Sarama(Kafka) partition consumer:", err)
}
Expand Down Expand Up @@ -86,12 +101,15 @@ func (i *KafkaInput) PluginRead() (*Message, error) {
case message = <-i.messages:
}

inputTs := ""

msg.Data = message.Value
if i.config.UseJSON {

var kafkaMessage KafkaMessage
json.Unmarshal(message.Value, &kafkaMessage)

inputTs = kafkaMessage.ReqTs
var err error
msg.Data, err = kafkaMessage.Dump()
if err != nil {
Expand All @@ -103,8 +121,11 @@ func (i *KafkaInput) PluginRead() (*Message, error) {
// does it have meta
if isOriginPayload(msg.Data) {
msg.Meta, msg.Data = payloadMetaWithBody(msg.Data)
inputTs = string(payloadMeta(msg.Meta)[2])
}

i.timeWait(inputTs)

return &msg, nil

}
Expand All @@ -118,3 +139,45 @@ func (i *KafkaInput) Close() error {
close(i.quit)
return nil
}

func (i *KafkaInput) timeWait(curInputTs string) {
if i.config.Offset == "-1" || curInputTs == "" {
return
}

// implement for Kafka input showdown or speedup emitting
timer := i.kafkaTimer
curTs := time.Now().UnixNano()

curInput, err := strconv.ParseInt(curInputTs, 10, 64)

if timer.latestInputTs == 0 || timer.latestOutputTs == 0 {
timer.latestInputTs = curInput
timer.latestOutputTs = curTs
return
}

if err != nil {
log.Fatalln("Fatal to parse timestamp err: ", err)
}

diffTs := curInput - timer.latestInputTs
pastTs := curTs - timer.latestOutputTs

diff := diffTs - pastTs
if i.speedFactor != 1 {
diff = int64(float64(diff) / i.speedFactor)
}

if diff > 0 {
time.Sleep(time.Duration(diff))
}

timer.latestInputTs = curInput
timer.latestOutputTs = curTs
}

type kafkaTimer struct {
latestInputTs int64
latestOutputTs int64
}
4 changes: 2 additions & 2 deletions input_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestInputKafkaRAW(t *testing.T) {
map[string][]int32{"test": {0}},
)

input := NewKafkaInput("", &InputKafkaConfig{
input := NewKafkaInput("-1", &InputKafkaConfig{
consumer: consumer,
Topic: "test",
UseJSON: false,
Expand All @@ -42,7 +42,7 @@ func TestInputKafkaJSON(t *testing.T) {
map[string][]int32{"test": {0}},
)

input := NewKafkaInput("", &InputKafkaConfig{
input := NewKafkaInput("-1", &InputKafkaConfig{
consumer: consumer,
Topic: "test",
UseJSON: true,
Expand Down
1 change: 1 addition & 0 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type InputKafkaConfig struct {
Host string `json:"input-kafka-host"`
Topic string `json:"input-kafka-topic"`
UseJSON bool `json:"input-kafka-json-format"`
Offset string `json:"input-kafka-offset"`
SASLConfig SASLKafkaConfig
}

Expand Down
39 changes: 33 additions & 6 deletions limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ func parseLimitOptions(options string) (limit int, isPercent bool) {
return
}

func newLimiterExceptions(l *Limiter) {

if !l.isPercent {
return
}
speedFactor := float64(l.limit) / float64(100)

// FileInput、KafkaInput have its own rate limiting. Unlike other inputs we not just dropping requests, we can slow down or speed up request emittion.
switch input := l.plugin.(type) {
case *FileInput:
input.speedFactor = speedFactor
case *KafkaInput:
input.speedFactor = speedFactor
}
}

// NewLimiter constructor for Limiter, accepts plugin and options
// `options` allow to sprcify relatve or absolute limiting
func NewLimiter(plugin interface{}, options string) PluginReadWriter {
Expand All @@ -39,17 +55,28 @@ func NewLimiter(plugin interface{}, options string) PluginReadWriter {
l.plugin = plugin
l.currentTime = time.Now().UnixNano()

// FileInput have its own rate limiting. Unlike other inputs we not just dropping requests, we can slow down or speed up request emittion.
if fi, ok := l.plugin.(*FileInput); ok && l.isPercent {
fi.speedFactor = float64(l.limit) / float64(100)
}
newLimiterExceptions(l)

return l
}

func (l *Limiter) isLimitedExceptions() bool {
if !l.isPercent {
return false
}
// Fileinput、Kafkainput have its own limiting algorithm
switch l.plugin.(type) {
case *FileInput:
return true
case *KafkaInput:
return true
default:
return false
}
}

func (l *Limiter) isLimited() bool {
// File input have its own limiting algorithm
if _, ok := l.plugin.(*FileInput); ok && l.isPercent {
if l.isLimitedExceptions() {
return false
}

Expand Down
2 changes: 1 addition & 1 deletion plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func NewPlugins() *InOutPlugins {
}

if Settings.InputKafkaConfig.Host != "" && Settings.InputKafkaConfig.Topic != "" {
plugins.registerPlugin(NewKafkaInput, "", &Settings.InputKafkaConfig, &Settings.KafkaTLSConfig)
plugins.registerPlugin(NewKafkaInput, Settings.InputKafkaConfig.Offset, &Settings.InputKafkaConfig, &Settings.KafkaTLSConfig)
}

return plugins
Expand Down
1 change: 1 addition & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func init() {
flag.StringVar(&Settings.InputKafkaConfig.SASLConfig.Mechanism, "input-kafka-mechanism", "", "mechanism\n\tgor --input-raw :8080 --output-kafka-mechanism 'SCRAM-SHA-512'")
flag.StringVar(&Settings.InputKafkaConfig.SASLConfig.Username, "input-kafka-username", "", "username\n\tgor --input-raw :8080 --output-kafka-username 'username'")
flag.StringVar(&Settings.InputKafkaConfig.SASLConfig.Password, "input-kafka-password", "", "password\n\tgor --input-raw :8080 --output-kafka-password 'password'")
flag.StringVar(&Settings.InputKafkaConfig.Offset, "input-kafka-offset", "-1", "Specify offset in Kafka partitions start to consume\n\t-1: Starts from newest, -2: Starts from oldest\nAnd supported for showdown or speedup for emitting!\n\tgor --input-kafka-offset \"-2|200%\"")

flag.StringVar(&Settings.KafkaTLSConfig.CACert, "kafka-tls-ca-cert", "", "CA certificate for Kafka TLS Config:\n\tgor --input-raw :3000 --output-kafka-host '192.168.0.1:9092' --output-kafka-topic 'topic' --kafka-tls-ca-cert cacert.cer.pem --kafka-tls-client-cert client.cer.pem --kafka-tls-client-key client.key.pem")
flag.StringVar(&Settings.KafkaTLSConfig.ClientCert, "kafka-tls-client-cert", "", "Client certificate for Kafka TLS Config (mandatory with to kafka-tls-ca-cert and kafka-tls-client-key)")
Expand Down
Loading