Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sniffing_attributes option #904

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# do not use full URL here, only paths, e.g. "/sniff/_nodes/http"
config :sniffing_path, :validate => :string

# Node attribute(s) to filter for. Those that possess and match the following hash of attributes
# will be selected.
config :sniffing_attributes, :validate => :hash, :default => {}

# Set the address of a forward HTTP proxy.
# This used to accept hashes as arguments but now only accepts
# arguments of the URI type to prevent leaking credentials.
Expand Down
1 change: 1 addition & 0 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ def build_pool(options)
:sniffing => sniffing,
:sniffer_delay => options[:sniffer_delay],
:sniffing_path => options[:sniffing_path],
:sniffing_attributes => options[:sniffing_attributes],
:healthcheck_path => options[:healthcheck_path],
:resurrect_delay => options[:resurrect_delay],
:url_normalizer => self.method(:host_to_url),
Expand Down
27 changes: 17 additions & 10 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def message
end
end

attr_reader :logger, :adapter, :sniffing, :sniffer_delay, :resurrect_delay, :healthcheck_path, :sniffing_path, :bulk_path
attr_reader :logger, :adapter, :sniffing, :sniffer_delay, :resurrect_delay, :healthcheck_path, :sniffing_path, :bulk_path, :sniffing_attributes

ROOT_URI_PATH = '/'.freeze
LICENSE_PATH = '/_license'.freeze
Expand All @@ -40,6 +40,7 @@ def message
:scheme => 'http',
:resurrect_delay => 5,
:sniffing => false,
:sniffing_attributes => {},
:sniffer_delay => 10,
}.freeze

Expand All @@ -48,7 +49,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
@adapter = adapter
@metric = options[:metric]
@initial_urls = initial_urls

raise ArgumentError, "No URL Normalizer specified!" unless options[:url_normalizer]
@url_normalizer = options[:url_normalizer]
DEFAULT_OPTIONS.merge(options).tap do |merged|
Expand All @@ -58,6 +59,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
@resurrect_delay = merged[:resurrect_delay]
@sniffing = merged[:sniffing]
@sniffer_delay = merged[:sniffer_delay]
@sniffing_attributes = merged[:sniffing_attributes]
end

# Used for all concurrent operations in this class
Expand All @@ -71,7 +73,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
def oss?
LogStash::Outputs::ElasticSearch.oss?
end

def start
update_initial_urls
start_resurrectionist
Expand Down Expand Up @@ -189,15 +191,20 @@ def check_sniff
end
end
end

def major_version(version_string)
version_string.split('.').first.to_i
end

def sniff_5x_and_above(nodes)
nodes.map do |id,info|
# Skip master-only nodes
next if info["roles"] && info["roles"] == ["master"]
# if !@sniffing_attributes.nil? or !@sniffing_attributes.to_h.empty?
if !@sniffing_attributes.to_h.empty?
attributes = info["attributes"].clone.delete_if { |key, value| !@sniffing_attributes.key? key }
next if attributes != @sniffing_attributes
end
address_str_to_uri(info["http"]["publish_address"]) if info["http"]
end.compact
end
Expand All @@ -215,7 +222,7 @@ def sniff_2x_1x(nodes)
nodes.map do |id,info|
# TODO Make sure this works with shield. Does that listed
# stuff as 'https_address?'

addr_str = info['http_address'].to_s
next unless addr_str # Skip hosts with HTTP disabled

Expand Down Expand Up @@ -344,7 +351,7 @@ def normalize_url(uri)

def update_urls(new_urls)
return if new_urls.nil?

# Normalize URLs
new_urls = new_urls.map(&method(:normalize_url))

Expand Down Expand Up @@ -374,14 +381,14 @@ def update_urls(new_urls)
logger.info("Elasticsearch pool URLs updated", :changes => state_changes)
end
end

# Run an inline healthcheck anytime URLs are updated
# This guarantees that during startup / post-startup
# sniffing we don't have idle periods waiting for the
# periodic sniffer to allow new hosts to come online
healthcheck!
healthcheck!
end

def size
@state_mutex.synchronize { @url_info.size }
end
Expand Down
1 change: 1 addition & 0 deletions lib/logstash/outputs/elasticsearch/http_client_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def self.build(logger, hosts, params)
if params["sniffing"]
common_options[:sniffing] = true
common_options[:sniffer_delay] = params["sniffing_delay"]
common_options[:sniffing_attributes] = params["sniffing_attributes"]
end

common_options[:timeout] = params["timeout"] if params["timeout"]
Expand Down