diff --git a/Gemfile b/Gemfile index 32cc6fb..fffa312 100644 --- a/Gemfile +++ b/Gemfile @@ -2,7 +2,7 @@ source 'https://rubygems.org' gemspec -logstash_path = ENV["LOGSTASH_PATH"] || "../../logstash" +logstash_path = ENV["LOGSTASH_PATH"] || "." use_logstash_source = ENV["LOGSTASH_SOURCE"] && ENV["LOGSTASH_SOURCE"].to_s == "1" if Dir.exist?(logstash_path) && use_logstash_source diff --git a/lib/logstash/outputs/bson/big_decimal.rb b/lib/logstash/outputs/bson/big_decimal.rb index 3b5d4ef..fd06f9e 100644 --- a/lib/logstash/outputs/bson/big_decimal.rb +++ b/lib/logstash/outputs/bson/big_decimal.rb @@ -50,7 +50,7 @@ def from_bson(bson) private def from_bson_double(double) - new(double.unpack(PACK).first.to_s) + BigDecimal(double.unpack(PACK).first.to_s) end end diff --git a/lib/logstash/outputs/mongodb.rb b/lib/logstash/outputs/mongodb.rb index 0b88c68..adc3e88 100644 --- a/lib/logstash/outputs/mongodb.rb +++ b/lib/logstash/outputs/mongodb.rb @@ -34,6 +34,12 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base # "_id" field in the event. config :generateId, :validate => :boolean, :default => false + # The field that will be used for the _id field + # This can be for example the ID column of a SQL table when using JDBC. + config :idField, :validate => :string, :required => false + + # Upsert documents flag, set to true to use replace_one instead of insert_one. + config :upsert, :validate => :boolean, :default => false # Bulk insert flag, set to true to allow bulk insertion, else it will insert events one by one. config :bulk, :validate => :boolean, :default => false @@ -65,7 +71,14 @@ def register @@mutex.synchronize do @documents.each do |collection, values| if values.length > 0 - @db[collection].insert_many(values) + if @upsert + bulk_operations = values.map do |doc| + { replace_one: { filter: { _id: doc["_id"] }, replacement: doc, upsert: true } } + end + @db[collection].bulk_write(bulk_operations) + else + @db[collection].insert_many(values) + end @documents.delete(collection) end end @@ -94,6 +107,17 @@ def receive(event) document["_id"] = BSON::ObjectId.new end + if @idField + field_name = event.sprintf(@idField) + if event.include?(field_name) && !event.get(field_name).nil? + document["_id"] = event.get(field_name) + else + @logger.warn("Cannot set MongoDB document `_id` field because it does not exist in the event", :event => event) + document["_id"] = BSON::ObjectId.new + end + end + + if @bulk collection = event.sprintf(@collection) @@mutex.synchronize do @@ -103,12 +127,23 @@ def receive(event) @documents[collection].push(document) if(@documents[collection].length >= @bulk_size) - @db[collection].insert_many(@documents[collection]) + if @upsert && document.key?("_id") + bulk_operations = @documents[collection].map do |doc| + { replace_one: { filter: { _id: doc["_id"] }, replacement: doc, upsert: true } } + end + @db[collection].bulk_write(bulk_operations) + else + @db[collection].insert_many(@documents[collection]) + end @documents.delete(collection) end end else - @db[event.sprintf(@collection)].insert_one(document) + if @upsert && document.key?("_id") + @db[event.sprintf(@collection)].replace_one({ _id: document["_id"] }, document, { upsert: true }) + else + @db[event.sprintf(@collection)].insert_one(document) + end end rescue => e if e.message =~ /^E11000/ diff --git a/logstash-output-mongodb.gemspec b/logstash-output-mongodb.gemspec index b2cd699..0f1f3af 100644 --- a/logstash-output-mongodb.gemspec +++ b/logstash-output-mongodb.gemspec @@ -24,5 +24,4 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'mongo', '~> 2.6' s.add_development_dependency 'logstash-devutils' -end - +end \ No newline at end of file diff --git a/spec/bson/big_decimal_spec.rb b/spec/bson/big_decimal_spec.rb index b515e9d..3abfbd4 100644 --- a/spec/bson/big_decimal_spec.rb +++ b/spec/bson/big_decimal_spec.rb @@ -23,7 +23,7 @@ describe "class methods" do it "builds a new BigDecimal from BSON" do decoded = described_class.from_bson(4321.1234.to_bson) - expect(decoded).to eql(BigDecimal.new(a_number)) + expect(decoded).to eql(BigDecimal(a_number)) end end end diff --git a/spec/integration/mongodb_spec.rb b/spec/integration/mongodb_spec.rb index 690e8fa..bdaf1a6 100644 --- a/spec/integration/mongodb_spec.rb +++ b/spec/integration/mongodb_spec.rb @@ -18,7 +18,7 @@ subject { LogStash::Outputs::Mongodb.new(config) } let(:properties) { { "message" => "This is a message!", - "uuid" => uuid, "number" => BigDecimal.new("4321.1234"), + "uuid" => uuid, "number" => BigDecimal("4321.1234"), "utf8" => "żółć", "int" => 42, "arry" => [42, "string", 4321.1234]} } let(:event) { LogStash::Event.new(properties) } diff --git a/spec/outputs/mongodb_spec.rb b/spec/outputs/mongodb_spec.rb index 8d3decb..3cecfd4 100644 --- a/spec/outputs/mongodb_spec.rb +++ b/spec/outputs/mongodb_spec.rb @@ -44,7 +44,7 @@ let(:properties) {{ "message" => "This is a message!", "uuid" => SecureRandom.uuid, - "number" => BigDecimal.new("4321.1234"), + "number" => BigDecimal("4321.1234"), "utf8" => "żółć" }}