Skip to content

Commit

Permalink
log consumer opens, closes, and exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
yaauie committed Jan 6, 2018
1 parent a54c2f4 commit a64ab5d
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,12 @@ def register

public
def run(logstash_queue)
@runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") }
@runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) }
@runner_consumers = consumer_threads.times.map do |index|
create_consumer("#{client_id}-#{index}")
end
@runner_threads = @runner_consumers.map.with_index do |consumer, index|
thread_runner(logstash_queue, consumer, index)
end
@runner_threads.each { |t| t.join }
end # def run

Expand All @@ -237,9 +241,11 @@ def kafka_consumers
end

private
def thread_runner(logstash_queue, consumer)
def thread_runner(logstash_queue, consumer, consumer_index)
consumer_identifier = "#{client_id}-#{consumer_index}"
Thread.new do
begin
logger.info("opening consumer #{consumer_identifier}")
unless @topics_pattern.nil?
nooplistener = org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener.new
pattern = java.util.regex.Pattern.compile(@topics_pattern)
Expand Down Expand Up @@ -270,8 +276,15 @@ def thread_runner(logstash_queue, consumer)
end
end
rescue org.apache.kafka.common.errors.WakeupException => e
raise e if !stop?
unless stop?
logger.error("wakeup exception in consumer #{consumer_identifier}: #{e}")
raise e
end
rescue => e
logger.error("uncaught exception in consumer #{consumer_identifier}: #{e}")
raise e
ensure
logger.info("closing consumer #{consumer_identifier}")
consumer.close
end
end
Expand Down

0 comments on commit a64ab5d

Please sign in to comment.