Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka input threads die in some cases and logstash does not notice it #287

Open
yfoelling opened this issue Jul 13, 2018 · 2 comments
Open

Comments

@yfoelling
Copy link

yfoelling commented Jul 13, 2018

  • Version: tested 8.0.4 and 8.1.1 on logstash 6.3.0 and 6.2.x
  • Operating System: Ubuntu Linux
  • Config File (if you have sensitive info, please remove it):
input {
  kafka {
    auto_offset_reset  => "earliest"
    bootstrap_servers  => "KAFKA ENDPOINTS"
    consumer_threads   => 4
    enable_auto_commit => "false"
    group_id           => "MYGROUP"
    client_id          => "MYCONSUMER"
    topics_pattern     => "MYPATTERN"
    codec              => "json"
  }
}
[....]
  • Sample Data:
    Any data
  • Steps to Reproduce:
    Get any Kafka Client Error.

In this case: Delete a Topic that is included in the Topic pattern
You will get following error:

[ERROR] 2018-07-12 12:43:00.388 [Ruby-0-Thread-24: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.1.1/lib/logstash/inputs/kafka.rb:242] ConsumerCoordinator - [Consumer clientId=CLIENTID, groupId=GROUPID] Offset commit failed on partition PARTITION at offset 0: This server does not host this topic-partition.
Exception in thread "Ruby-0-Thread-24: /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.1.1/lib/logstash/inputs/kafka.rb:242" org.apache.kafka.common.KafkaException: Topic or Partition PARTITION does not exist
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:778)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:726)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:822)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:802)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(org/apache/kafka/clients/consumer/internals/RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(org/apache/kafka/clients/consumer/internals/RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(org/apache/kafka/clients/consumer/internals/RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:563)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:390)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:293)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java:209)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:597)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(org/apache/kafka/clients/consumer/KafkaConsumer.java:1218)
at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:438)
at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:302)
at RUBY.block in thread_runner(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.1.1/lib/logstash/inputs/kafka.rb:271)
at org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)
at org.jruby.RubyProc.call(org/jruby/RubyProc.java:246)
at java.lang.Thread.run(java/lang/Thread.java:748)

In that case the error occurs because the enable_auto_commit => "false" makes a manual commit and there is an error and no error-handling for that case.

This is only one of a few cases I run into. There are some similar problem when elasticsearch is down for some time.
In all cases, the Kafka input threads "die" but logstash keeps on running, so there is no way for me noticing that logstash isn't working properly any more.
It seems to be the case that logstash does not notice that the plugin is not working any more so it just waits for further input. I have seen some other plugins being restarted by logstash on errors, but that not the case for this plugin,

I have some possible solutions, but I am sadly not good enough with ruby to fix them myself...
1:
Build a real listener for topics-pattern changes:
https://github.com/logstash-plugins/logstash-input-kafka/blob/master/lib/logstash/inputs/kafka.rb#L244

          nooplistener = org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener.new
          pattern = java.util.regex.Pattern.compile(@topics_pattern)
          consumer.subscribe(pattern, nooplistener)

AND
retry on any error and restart plugin after some tries.

  1. just retry on any error and restart plugin after some tries.

Little workaround for the people out there with similar issues:
set enable_auto_commit => "false" to true, so the kafka client commits in background, then you dont have issues with changes topics anymore. Problems with restarted Elasticsearch (logstash-output) will still occur.
You could also make a jstack regulary and search for kafka and restart if kafka threads are not running anymore

@ysn2233
Copy link

ysn2233 commented Sep 26, 2018

Hi, we got the same issues. Have you solved it yet?

@colinsurprenant
Copy link
Contributor

Yes the kafka input (sub)threads management needs reworking, their lifecycle is not supervised and exception handling is also not correct (related to elastic/logstash#11603)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants