diff --git a/docs/index.asciidoc b/docs/index.asciidoc index e62176c..95ab1ea 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -281,7 +281,9 @@ Comma-delimited list of `:` pairs that define the sort order * Value type is <> * Default value is `false` -SSL +Force SSL/TLS secured communication to Elasticsearch cluster. +Leaving this unspecified will use whatever scheme is specified in the URLs listed in <>, where mixed schemes are supported. +If SSL is set to `true`, the plugin will refuse to start if any of the hosts specifies an `http://` scheme. [id="plugins-{type}s-{plugin}-tag_on_failure"] ===== `tag_on_failure` diff --git a/lib/logstash/filters/elasticsearch.rb b/lib/logstash/filters/elasticsearch.rb index 3032153..784df2b 100644 --- a/lib/logstash/filters/elasticsearch.rb +++ b/lib/logstash/filters/elasticsearch.rb @@ -71,6 +71,8 @@ def register @query_dsl = file.read end + @normalised_hosts = normalise_hosts(@hosts, @ssl) + test_connection! end # def register @@ -140,8 +142,7 @@ def filter(event) private def client_options { - :ssl => @ssl, - :hosts => @hosts, + :hosts => @normalised_hosts, :ca_file => @ca_file, :logger => @logger } @@ -191,4 +192,48 @@ def extract_total_from_hits(hits) def test_connection! get_client.client.ping end + + private + + ## + # Map the provided array-of-strings to an array of `URI::Generic` + # instances, which the Elasticsearch client can use to establish + # connections. + # + # @param hosts [Array]: one or more hosts, each is one of: + # - a bare hostname or ip, optionally + # followed by a colon and port number + # - a qualified URL with http/https schema + # @param force_ssl [Boolean]: true to force SSL; will cause failure if one + # or more hosts explicitly supplies non-SSL + # scheme (e.g., `http`). + # + # @return [Array] + def normalise_hosts(hosts, force_ssl) + hosts.map do |input| + if force_ssl && input.start_with?('http://') + logger.error("Plugin configured to force SSL with `ssl => true`, " + + "but a host explicitly declared non-https URL `#{input}`") + + raise LogStash::ConfigurationError, "Aborting due to conflicting configuration" + end + + if input.start_with?('http://','https://') + URI::Generic.new(*URI.split(input)) + else + host, port = input.split(':') + URI::Generic.new( + force_ssl ? 'https' : 'http', + nil, # userinfo, + host, + port, + nil, # registry + nil, # path + nil, # opaque + nil, # query + nil # fragment + ) + end + end + end end #class LogStash::Filters::Elasticsearch diff --git a/lib/logstash/filters/elasticsearch/client.rb b/lib/logstash/filters/elasticsearch/client.rb index 986e797..e81fa66 100644 --- a/lib/logstash/filters/elasticsearch/client.rb +++ b/lib/logstash/filters/elasticsearch/client.rb @@ -11,9 +11,8 @@ class ElasticsearchClient attr_reader :client def initialize(user, password, options={}) - ssl = options.fetch(:ssl, false) - hosts = options[:hosts] - @logger = options[:logger] + hosts = options.fetch(:hosts) + @logger = options.fetch(:logger) transport_options = {} if user && password @@ -21,7 +20,6 @@ def initialize(user, password, options={}) transport_options[:headers] = { Authorization: "Basic #{token}" } end - hosts.map! {|h| { host: h, scheme: 'https' } } if ssl # set ca_file even if ssl isn't on, since the host can be an https url ssl_options = { ssl: true, ca_file: options[:ca_file] } if options[:ca_file] ssl_options ||= {} diff --git a/spec/filters/elasticsearch_spec.rb b/spec/filters/elasticsearch_spec.rb index ac19e3e..2dd7bdc 100644 --- a/spec/filters/elasticsearch_spec.rb +++ b/spec/filters/elasticsearch_spec.rb @@ -4,17 +4,194 @@ require "logstash/filters/elasticsearch" require "logstash/json" +RSpec::Matchers.define(:hash_with_member) do |member, matcher=nil| + match do |actual| + expect(actual).to be_a Hash + expect(actual).to include member + + value = actual[member] + + block_arg && block_arg.call(value) + + matcher.nil? || matcher.matches?(value) + end + + description do + desc = "hash with member `#{member.inspect}`" + caveats = [] + caveats << 'the provided block' unless block_arg.nil? + caveats << matcher.description unless matcher.nil? + + desc += ' satisfying ' unless caveats.empty? + desc += caveats.join(' and ') + desc + end +end + describe LogStash::Filters::Elasticsearch do context "registration" do - let(:plugin) { LogStash::Plugin.lookup("filter", "elasticsearch").new({}) } - before do - allow(plugin).to receive(:test_connection!) + let(:plugin_class) { LogStash::Plugin.lookup("filter", "elasticsearch") } + let(:plugin) { plugin_class.new(config) } + let(:config) { Hash.new } + + context 'with defaults' do + before do + allow(plugin).to receive(:test_connection!) + end + + it "should not raise an exception" do + expect {plugin.register}.to_not raise_error + end end - it "should not raise an exception" do - expect {plugin.register}.to_not raise_error + context 'hosts' do + let(:config) do + super().merge( + 'hosts' => hosts + ) + end + let(:hosts) do + fail NotImplementedError, 'spec or spec group must define `hosts`.' + end + + let(:client_stub) { double(:client).as_null_object } + let(:logger_stub) { double(:logger).as_null_object } + + before(:each) do + allow(plugin).to receive(:logger).and_return(logger_stub) + end + + context 'with schema://hostname' do + let(:hosts) { ['http://foo.local', 'http://bar.local'] } + + it 'creates client with URIs that do not include a port' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + expect(options[:hosts]).to include(having_attributes(host: 'foo.local', scheme: 'http', port: nil)) + expect(options[:hosts]).to include(having_attributes(host: 'bar.local', scheme: 'http', port: nil)) + end.and_return(client_stub) + + plugin.register + end + end + + context 'with `ssl => true`' do + let(:config) { super().merge('ssl' => 'true') } + context 'and one or more explicitly-http hosts' do + let(:hosts) { ['https://foo.local', 'http://bar.local'] } + + it 'raises an exception' do + expect { plugin.register }.to raise_error(LogStash::ConfigurationError) + end + + it 'emits a helpful log message' do + plugin.register rescue nil + expect(plugin.logger).to have_received(:error).with(match(/force SSL/)) + end + end + + context 'and all explicitly-https hosts' do + let(:hosts) { ['https://foo.local', 'https://bar.local'] } + + it 'sets the schemas on all to https' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + options[:hosts].each do |host| + expect(host).to be_an URI + expect(host.scheme).to eq 'https' + end + end.and_return(client_stub) + + plugin.register + end + end + + context 'and one or more schemaless hosts' do + let(:hosts) { ['https://foo.local', 'bar.local'] } + + it 'sets the schemas on all to https' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + options[:hosts].each do |host| + expect(host).to be_an URI + expect(host.scheme).to eq 'https' + end + end.and_return(client_stub) + + plugin.register + end + end + end + + { + 'with `ssl => false' => {'ssl' => 'false'}, + 'without `ssl` directive' => {} + }.each do |context_string, config_override| + context(context_string) do + let(:config) { super().merge(config_override) } + + context 'with a mix of http and https hosts' do + let(:hosts) { ['https://foo.local', 'http://bar.local'] } + it 'does not modify the protocol' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + expect(options[:hosts]).to include(having_attributes(host: 'foo.local', scheme: 'https')) + expect(options[:hosts]).to include(having_attributes(host: 'bar.local', scheme: 'http')) + end.and_return(client_stub) + + plugin.register + end + end + + context 'with https-only hosts' do + let(:hosts) { ['https://foo.local', 'https://bar.local'] } + it 'does not modify the protocol' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + expect(options[:hosts]).to include(having_attributes(host: 'foo.local', scheme: 'https')) + expect(options[:hosts]).to include(having_attributes(host: 'bar.local', scheme: 'https')) + end.and_return(client_stub) + + plugin.register + end + end + + context 'with http-only hosts' do + let(:hosts) { ['http://foo.local', 'http://bar.local'] } + it 'does not modify the protocol' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + expect(options[:hosts]).to include(having_attributes(host: 'foo.local', scheme: 'http')) + expect(options[:hosts]).to include(having_attributes(host: 'bar.local', scheme: 'http')) + end.and_return(client_stub) + + plugin.register + end + end + + context 'with one or more schemaless hosts' do + let(:hosts) { ['foo.local', 'bar.local'] } + it 'defaults to the http protocol' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + expect(options[:hosts]).to include(having_attributes(host: 'foo.local', scheme: 'http')) + expect(options[:hosts]).to include(having_attributes(host: 'bar.local', scheme: 'http')) + end.and_return(client_stub) + + plugin.register + end + end + end + end end end