From f98ae45858a1feb07b5604e3433fdc08dc4ec7b5 Mon Sep 17 00:00:00 2001 From: "Hariprasad Selvaraj (hselvara)" Date: Thu, 13 Apr 2017 18:22:39 +0530 Subject: [PATCH 1/2] avoiding overwriting of tags with key named as 'tags' --- lib/logstash/codecs/fluent.rb | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/lib/logstash/codecs/fluent.rb b/lib/logstash/codecs/fluent.rb index 92175f3..a628308 100644 --- a/lib/logstash/codecs/fluent.rb +++ b/lib/logstash/codecs/fluent.rb @@ -40,7 +40,8 @@ def decode(data, &block) end # def decode def encode(event) - tag = event.get("tags") || "log" + tag = event.get("tags") || "log" + epochtime = event.timestamp.to_i # use normalize to make sure returned Hash is pure Ruby for @@ -70,10 +71,11 @@ def decode_event(data, &block) entries_decoder.feed_each(entries) do |entry| epochtime = entry[0] map = entry[1] - event = LogStash::Event.new(map.merge( + arr= [] + event = LogStash::Event.new(map.merge!( LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), - "tags" => [ tag ] - )) + "tags" => tag + ){ |key, v1, v2| arr.insert(0,v2,v1) }) yield event end when Array @@ -81,20 +83,22 @@ def decode_event(data, &block) entries.each do |entry| epochtime = entry[0] map = entry[1] - event = LogStash::Event.new(map.merge( + arr= [] + event = LogStash::Event.new(map.merge!( LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), - "tags" => [ tag ] - )) + "tags" => tag + ){ |key, v1, v2| arr.insert(0,v2,v1) }) yield event end when Fixnum # Message epochtime = entries map = data[2] - event = LogStash::Event.new(map.merge( + arr = [] + event = LogStash::Event.new(map.merge!( LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), - "tags" => [ tag ] - )) + "tags" => tag + ){ |key, v1, v2| arr.insert(0,v2,v1) }) yield event else raise(LogStash::Error, "Unknown event type") From 55df207ce3f3a0ac5d3117a5cd8f6bc22d9916f3 Mon Sep 17 00:00:00 2001 From: hselvara Date: Fri, 30 Jun 2017 14:52:30 +0530 Subject: [PATCH 2/2] Indentation Fix --- .github/CONTRIBUTING.md | 130 +++++++++--------- .github/ISSUE_TEMPLATE.md | 18 +-- .github/PULL_REQUEST_TEMPLATE.md | 2 +- .gitignore | 6 +- .travis.yml | 20 +-- CHANGELOG.md | 32 ++--- CONTRIBUTORS | 30 ++--- Gemfile | 8 +- LICENSE | 26 ++-- NOTICE.TXT | 8 +- README.md | 194 +++++++++++++-------------- Rakefile | 14 +- lib/logstash/codecs/fluent.rb | 222 +++++++++++++++---------------- logstash-codec-fluent.gemspec | 68 +++++----- spec/codecs/fluent_spec.rb | 198 +++++++++++++-------------- spec/spec_helper.rb | 6 +- 16 files changed, 491 insertions(+), 491 deletions(-) diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md index d325328..8b0d123 100644 --- a/.github/CONTRIBUTING.md +++ b/.github/CONTRIBUTING.md @@ -1,65 +1,65 @@ -# Contributing to Logstash - -All contributions are welcome: ideas, patches, documentation, bug reports, -complaints, etc! - -Programming is not a required skill, and there are many ways to help out! -It is more important to us that you are able to contribute. - -That said, some basic guidelines, which you are free to ignore :) - -## Want to learn? - -Want to lurk about and see what others are doing with Logstash? - -* The irc channel (#logstash on irc.freenode.org) is a good place for this -* The [forum](https://discuss.elastic.co/c/logstash) is also - great for learning from others. - -## Got Questions? - -Have a problem you want Logstash to solve for you? - -* You can ask a question in the [forum](https://discuss.elastic.co/c/logstash) -* Alternately, you are welcome to join the IRC channel #logstash on -irc.freenode.org and ask for help there! - -## Have an Idea or Feature Request? - -* File a ticket on [GitHub](https://github.com/elastic/logstash/issues). Please remember that GitHub is used only for issues and feature requests. If you have a general question, the [forum](https://discuss.elastic.co/c/logstash) or IRC would be the best place to ask. - -## Something Not Working? Found a Bug? - -If you think you found a bug, it probably is a bug. - -* If it is a general Logstash or a pipeline issue, file it in [Logstash GitHub](https://github.com/elasticsearch/logstash/issues) -* If it is specific to a plugin, please file it in the respective repository under [logstash-plugins](https://github.com/logstash-plugins) -* or ask the [forum](https://discuss.elastic.co/c/logstash). - -# Contributing Documentation and Code Changes - -If you have a bugfix or new feature that you would like to contribute to -logstash, and you think it will take more than a few minutes to produce the fix -(ie; write code), it is worth discussing the change with the Logstash users and developers first! You can reach us via [GitHub](https://github.com/elastic/logstash/issues), the [forum](https://discuss.elastic.co/c/logstash), or via IRC (#logstash on freenode irc) -Please note that Pull Requests without tests will not be merged. If you would like to contribute but do not have experience with writing tests, please ping us on IRC/forum or create a PR and ask our help. - -## Contributing to plugins - -Check our [documentation](https://www.elastic.co/guide/en/logstash/current/contributing-to-logstash.html) on how to contribute to plugins or write your own! It is super easy! - -## Contribution Steps - -1. Test your changes! [Run](https://github.com/elastic/logstash#testing) the test suite -2. Please make sure you have signed our [Contributor License - Agreement](https://www.elastic.co/contributor-agreement/). We are not - asking you to assign copyright to us, but to give us the right to distribute - your code without restriction. We ask this of all contributors in order to - assure our users of the origin and continuing existence of the code. You - only need to sign the CLA once. -3. Send a pull request! Push your changes to your fork of the repository and - [submit a pull - request](https://help.github.com/articles/using-pull-requests). In the pull - request, describe what your changes do and mention any bugs/issues related - to the pull request. - - +# Contributing to Logstash + +All contributions are welcome: ideas, patches, documentation, bug reports, +complaints, etc! + +Programming is not a required skill, and there are many ways to help out! +It is more important to us that you are able to contribute. + +That said, some basic guidelines, which you are free to ignore :) + +## Want to learn? + +Want to lurk about and see what others are doing with Logstash? + +* The irc channel (#logstash on irc.freenode.org) is a good place for this +* The [forum](https://discuss.elastic.co/c/logstash) is also + great for learning from others. + +## Got Questions? + +Have a problem you want Logstash to solve for you? + +* You can ask a question in the [forum](https://discuss.elastic.co/c/logstash) +* Alternately, you are welcome to join the IRC channel #logstash on +irc.freenode.org and ask for help there! + +## Have an Idea or Feature Request? + +* File a ticket on [GitHub](https://github.com/elastic/logstash/issues). Please remember that GitHub is used only for issues and feature requests. If you have a general question, the [forum](https://discuss.elastic.co/c/logstash) or IRC would be the best place to ask. + +## Something Not Working? Found a Bug? + +If you think you found a bug, it probably is a bug. + +* If it is a general Logstash or a pipeline issue, file it in [Logstash GitHub](https://github.com/elasticsearch/logstash/issues) +* If it is specific to a plugin, please file it in the respective repository under [logstash-plugins](https://github.com/logstash-plugins) +* or ask the [forum](https://discuss.elastic.co/c/logstash). + +# Contributing Documentation and Code Changes + +If you have a bugfix or new feature that you would like to contribute to +logstash, and you think it will take more than a few minutes to produce the fix +(ie; write code), it is worth discussing the change with the Logstash users and developers first! You can reach us via [GitHub](https://github.com/elastic/logstash/issues), the [forum](https://discuss.elastic.co/c/logstash), or via IRC (#logstash on freenode irc) +Please note that Pull Requests without tests will not be merged. If you would like to contribute but do not have experience with writing tests, please ping us on IRC/forum or create a PR and ask our help. + +## Contributing to plugins + +Check our [documentation](https://www.elastic.co/guide/en/logstash/current/contributing-to-logstash.html) on how to contribute to plugins or write your own! It is super easy! + +## Contribution Steps + +1. Test your changes! [Run](https://github.com/elastic/logstash#testing) the test suite +2. Please make sure you have signed our [Contributor License + Agreement](https://www.elastic.co/contributor-agreement/). We are not + asking you to assign copyright to us, but to give us the right to distribute + your code without restriction. We ask this of all contributors in order to + assure our users of the origin and continuing existence of the code. You + only need to sign the CLA once. +3. Send a pull request! Push your changes to your fork of the repository and + [submit a pull + request](https://help.github.com/articles/using-pull-requests). In the pull + request, describe what your changes do and mention any bugs/issues related + to the pull request. + + diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md index c3cc91d..44dfeaa 100644 --- a/.github/ISSUE_TEMPLATE.md +++ b/.github/ISSUE_TEMPLATE.md @@ -1,9 +1,9 @@ -Please post all product and debugging questions on our [forum](https://discuss.elastic.co/c/logstash). Your questions will reach our wider community members there, and if we confirm that there is a bug, then we can open a new issue here. - -For all general issues, please provide the following details for fast resolution: - -- Version: -- Operating System: -- Config File (if you have sensitive info, please remove it): -- Sample Data: -- Steps to Reproduce: +Please post all product and debugging questions on our [forum](https://discuss.elastic.co/c/logstash). Your questions will reach our wider community members there, and if we confirm that there is a bug, then we can open a new issue here. + +For all general issues, please provide the following details for fast resolution: + +- Version: +- Operating System: +- Config File (if you have sensitive info, please remove it): +- Sample Data: +- Steps to Reproduce: diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index a153827..0a422e9 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -1 +1 @@ -Thanks for contributing to Logstash! If you haven't already signed our CLA, here's a handy link: https://www.elastic.co/contributor-agreement/ +Thanks for contributing to Logstash! If you haven't already signed our CLA, here's a handy link: https://www.elastic.co/contributor-agreement/ diff --git a/.gitignore b/.gitignore index 0c14ba2..c39ed46 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ -*.gem -Gemfile.lock -.bundle +*.gem +Gemfile.lock +.bundle diff --git a/.travis.yml b/.travis.yml index 5c63af6..40cca81 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,10 @@ ---- -sudo: false -language: ruby -cache: bundler -rvm: -- jruby-1.7.25 -script: -- bundle exec rspec spec -jdk: oraclejdk8 -before_install: [] +--- +sudo: false +language: ruby +cache: bundler +rvm: +- jruby-1.7.25 +script: +- bundle exec rspec spec +jdk: oraclejdk8 +before_install: [] diff --git a/CHANGELOG.md b/CHANGELOG.md index e9f2479..28c5765 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,16 +1,16 @@ -## 3.0.2 - - Relax constraint on logstash-core-plugin-api to >= 1.60 <= 2.99 - -## 3.0.1 - - Republish all the gems under jruby. -## 3.0.0 - - Update the plugin to the version 2.0 of the plugin api, this change is required for Logstash 5.0 compatibility. See https://github.com/elastic/logstash/issues/5141 -# 2.0.4 - - Depend on logstash-core-plugin-api instead of logstash-core, removing the need to mass update plugins on major releases of logstash -# 2.0.3 - - New dependency requirements for logstash-core for the 5.0 release -## 2.0.0 - - Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully, - instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895 - - Dependency on logstash-core update to 2.0 - +## 3.0.2 + - Relax constraint on logstash-core-plugin-api to >= 1.60 <= 2.99 + +## 3.0.1 + - Republish all the gems under jruby. +## 3.0.0 + - Update the plugin to the version 2.0 of the plugin api, this change is required for Logstash 5.0 compatibility. See https://github.com/elastic/logstash/issues/5141 +# 2.0.4 + - Depend on logstash-core-plugin-api instead of logstash-core, removing the need to mass update plugins on major releases of logstash +# 2.0.3 + - New dependency requirements for logstash-core for the 5.0 release +## 2.0.0 + - Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully, + instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895 + - Dependency on logstash-core update to 2.0 + diff --git a/CONTRIBUTORS b/CONTRIBUTORS index eba094a..f7e6399 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -1,15 +1,15 @@ -The following is a list of people who have contributed ideas, code, bug -reports, or in general have helped logstash along its way. - -Contributors: -* Colin Surprenant (colinsurprenant) -* Jordan Sissel (jordansissel) -* João Duarte (jsvd) -* Pier-Hugues Pellerin (ph) -* Richard Pijnenburg (electrical) -* Suyog Rao (suyograo) - -Note: If you've sent us patches, bug reports, or otherwise contributed to -Logstash, and you aren't on the list above and want to be, please let us know -and we'll make sure you're here. Contributions from folks like you are what make -open source awesome. +The following is a list of people who have contributed ideas, code, bug +reports, or in general have helped logstash along its way. + +Contributors: +* Colin Surprenant (colinsurprenant) +* Jordan Sissel (jordansissel) +* João Duarte (jsvd) +* Pier-Hugues Pellerin (ph) +* Richard Pijnenburg (electrical) +* Suyog Rao (suyograo) + +Note: If you've sent us patches, bug reports, or otherwise contributed to +Logstash, and you aren't on the list above and want to be, please let us know +and we'll make sure you're here. Contributions from folks like you are what make +open source awesome. diff --git a/Gemfile b/Gemfile index 2b03d18..a72643f 100644 --- a/Gemfile +++ b/Gemfile @@ -1,4 +1,4 @@ -source 'https://rubygems.org' - -# Specify your gem's dependencies in logstash-mass_effect.gemspec -gemspec +source 'https://rubygems.org' + +# Specify your gem's dependencies in logstash-mass_effect.gemspec +gemspec diff --git a/LICENSE b/LICENSE index 43976b7..47250f6 100644 --- a/LICENSE +++ b/LICENSE @@ -1,13 +1,13 @@ -Copyright (c) 2012–2016 Elasticsearch - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. +Copyright (c) 2012–2016 Elasticsearch + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/NOTICE.TXT b/NOTICE.TXT index 0b8a947..70efcf4 100644 --- a/NOTICE.TXT +++ b/NOTICE.TXT @@ -1,5 +1,5 @@ -Elasticsearch -Copyright 2012-2015 Elasticsearch - -This product includes software developed by The Apache Software +Elasticsearch +Copyright 2012-2015 Elasticsearch + +This product includes software developed by The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file diff --git a/README.md b/README.md index a66c9b9..77182d3 100644 --- a/README.md +++ b/README.md @@ -1,98 +1,98 @@ -# Logstash Plugin - -[![Travis Build Status](https://travis-ci.org/logstash-plugins/logstash-codec-fluent.svg)](https://travis-ci.org/logstash-plugins/logstash-codec-fluent) - -This is a plugin for [Logstash](https://github.com/elastic/logstash). - -It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way. - -## Documentation - -Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one [central location](http://www.elastic.co/guide/en/logstash/current/). - -- For formatting code or config example, you can use the asciidoc `[source,ruby]` directive -- For more asciidoc formatting tips, see the excellent reference here https://github.com/elastic/docs#asciidoc-guide - -## Need Help? - -Need help? Try #logstash on freenode IRC or the https://discuss.elastic.co/c/logstash discussion forum. - -## Developing - -### 1. Plugin Developement and Testing - -#### Code -- To get started, you'll need JRuby with the Bundler gem installed. - -- Create a new plugin or clone and existing from the GitHub [logstash-plugins](https://github.com/logstash-plugins) organization. We also provide [example plugins](https://github.com/logstash-plugins?query=example). - -- Install dependencies -```sh -bundle install -``` - -#### Test - -- Update your dependencies - -```sh -bundle install -``` - -- Run tests - -```sh -bundle exec rspec -``` - -### 2. Running your unpublished Plugin in Logstash - -#### 2.1 Run in a local Logstash clone - -- Edit Logstash `Gemfile` and add the local plugin path, for example: -```ruby -gem "logstash-filter-awesome", :path => "/your/local/logstash-filter-awesome" -``` -- Install plugin -```sh -# Logstash 2.3 and higher -bin/logstash-plugin install --no-verify - -# Prior to Logstash 2.3 -bin/plugin install --no-verify - -``` -- Run Logstash with your plugin -```sh -bin/logstash -e 'filter {awesome {}}' -``` -At this point any modifications to the plugin code will be applied to this local Logstash setup. After modifying the plugin, simply rerun Logstash. - -#### 2.2 Run in an installed Logstash - -You can use the same **2.1** method to run your plugin in an installed Logstash by editing its `Gemfile` and pointing the `:path` to your local plugin development directory or you can build the gem and install it using: - -- Build your plugin gem -```sh -gem build logstash-filter-awesome.gemspec -``` -- Install the plugin from the Logstash home -```sh -# Logstash 2.3 and higher -bin/logstash-plugin install --no-verify - -# Prior to Logstash 2.3 -bin/plugin install --no-verify - -``` -- Start Logstash and proceed to test the plugin - -## Contributing - -All contributions are welcome: ideas, patches, documentation, bug reports, complaints, and even something you drew up on a napkin. - -Programming is not a required skill. Whatever you've seen about open source and maintainers or community members saying "send patches or die" - you will not see that here. - -It is more important to the community that you are able to contribute. - +# Logstash Plugin + +[![Travis Build Status](https://travis-ci.org/logstash-plugins/logstash-codec-fluent.svg)](https://travis-ci.org/logstash-plugins/logstash-codec-fluent) + +This is a plugin for [Logstash](https://github.com/elastic/logstash). + +It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way. + +## Documentation + +Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one [central location](http://www.elastic.co/guide/en/logstash/current/). + +- For formatting code or config example, you can use the asciidoc `[source,ruby]` directive +- For more asciidoc formatting tips, see the excellent reference here https://github.com/elastic/docs#asciidoc-guide + +## Need Help? + +Need help? Try #logstash on freenode IRC or the https://discuss.elastic.co/c/logstash discussion forum. + +## Developing + +### 1. Plugin Developement and Testing + +#### Code +- To get started, you'll need JRuby with the Bundler gem installed. + +- Create a new plugin or clone and existing from the GitHub [logstash-plugins](https://github.com/logstash-plugins) organization. We also provide [example plugins](https://github.com/logstash-plugins?query=example). + +- Install dependencies +```sh +bundle install +``` + +#### Test + +- Update your dependencies + +```sh +bundle install +``` + +- Run tests + +```sh +bundle exec rspec +``` + +### 2. Running your unpublished Plugin in Logstash + +#### 2.1 Run in a local Logstash clone + +- Edit Logstash `Gemfile` and add the local plugin path, for example: +```ruby +gem "logstash-filter-awesome", :path => "/your/local/logstash-filter-awesome" +``` +- Install plugin +```sh +# Logstash 2.3 and higher +bin/logstash-plugin install --no-verify + +# Prior to Logstash 2.3 +bin/plugin install --no-verify + +``` +- Run Logstash with your plugin +```sh +bin/logstash -e 'filter {awesome {}}' +``` +At this point any modifications to the plugin code will be applied to this local Logstash setup. After modifying the plugin, simply rerun Logstash. + +#### 2.2 Run in an installed Logstash + +You can use the same **2.1** method to run your plugin in an installed Logstash by editing its `Gemfile` and pointing the `:path` to your local plugin development directory or you can build the gem and install it using: + +- Build your plugin gem +```sh +gem build logstash-filter-awesome.gemspec +``` +- Install the plugin from the Logstash home +```sh +# Logstash 2.3 and higher +bin/logstash-plugin install --no-verify + +# Prior to Logstash 2.3 +bin/plugin install --no-verify + +``` +- Start Logstash and proceed to test the plugin + +## Contributing + +All contributions are welcome: ideas, patches, documentation, bug reports, complaints, and even something you drew up on a napkin. + +Programming is not a required skill. Whatever you've seen about open source and maintainers or community members saying "send patches or die" - you will not see that here. + +It is more important to the community that you are able to contribute. + For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file. \ No newline at end of file diff --git a/Rakefile b/Rakefile index 4f4b858..a6c441f 100644 --- a/Rakefile +++ b/Rakefile @@ -1,7 +1,7 @@ -@files=[] - -task :default do - system("rake -T") -end - -require "logstash/devutils/rake" +@files=[] + +task :default do + system("rake -T") +end + +require "logstash/devutils/rake" diff --git a/lib/logstash/codecs/fluent.rb b/lib/logstash/codecs/fluent.rb index a628308..69930dc 100644 --- a/lib/logstash/codecs/fluent.rb +++ b/lib/logstash/codecs/fluent.rb @@ -1,111 +1,111 @@ -# encoding: utf-8 -require "logstash/codecs/base" -require "logstash/util/charset" -require "logstash/timestamp" -require "logstash/util" - -# This codec handles fluentd's msgpack schema. -# -# For example, you can receive logs from `fluent-logger-ruby` with: -# [source,ruby] -# input { -# tcp { -# codec => fluent -# port => 4000 -# } -# } -# -# And from your ruby code in your own application: -# [source,ruby] -# logger = Fluent::Logger::FluentLogger.new(nil, :host => "example.log", :port => 4000) -# logger.post("some_tag", { "your" => "data", "here" => "yay!" }) -# -# Notes: -# -# * the fluent uses a second-precision time for events, so you will never see -# subsecond precision on events processed by this codec. -# -class LogStash::Codecs::Fluent < LogStash::Codecs::Base - config_name "fluent" - - def register - require "msgpack" - @decoder = MessagePack::Unpacker.new - end - - def decode(data, &block) - @decoder.feed_each(data) do |item| - decode_event(item, &block) - end - end # def decode - - def encode(event) - tag = event.get("tags") || "log" - - epochtime = event.timestamp.to_i - - # use normalize to make sure returned Hash is pure Ruby for - # MessagePack#pack which relies on pure Ruby object recognition - data = LogStash::Util.normalize(event.to_hash) - # timestamp is serialized as a iso8601 string - # merge to avoid modifying data which could have side effects if multiple outputs - @on_event.call(event, MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])) - end # def encode - - private - - def decode_event(data, &block) - tag = data[0] - entries = data[1] - - case entries - when String - # PackedForward - option = data[2] - compressed = (option && option['compressed'] == 'gzip') - if compressed - raise(LogStash::Error, "PackedForward with compression is not supported") - end - - entries_decoder = MessagePack::Unpacker.new - entries_decoder.feed_each(entries) do |entry| - epochtime = entry[0] - map = entry[1] - arr= [] - event = LogStash::Event.new(map.merge!( - LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), - "tags" => tag - ){ |key, v1, v2| arr.insert(0,v2,v1) }) - yield event - end - when Array - # Forward - entries.each do |entry| - epochtime = entry[0] - map = entry[1] - arr= [] - event = LogStash::Event.new(map.merge!( - LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), - "tags" => tag - ){ |key, v1, v2| arr.insert(0,v2,v1) }) - yield event - end - when Fixnum - # Message - epochtime = entries - map = data[2] - arr = [] - event = LogStash::Event.new(map.merge!( - LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), - "tags" => tag - ){ |key, v1, v2| arr.insert(0,v2,v1) }) - yield event - else - raise(LogStash::Error, "Unknown event type") - end - rescue StandardError => e - @logger.error("Fluent parse error, original data now in message field", :error => e, :data => data) - yield LogStash::Event.new("message" => data, "tags" => [ "_fluentparsefailure" ]) - end - -end # class LogStash::Codecs::Fluent +# encoding: utf-8 +require "logstash/codecs/base" +require "logstash/util/charset" +require "logstash/timestamp" +require "logstash/util" + +# This codec handles fluentd's msgpack schema. +# +# For example, you can receive logs from `fluent-logger-ruby` with: +# [source,ruby] +# input { +# tcp { +# codec => fluent +# port => 4000 +# } +# } +# +# And from your ruby code in your own application: +# [source,ruby] +# logger = Fluent::Logger::FluentLogger.new(nil, :host => "example.log", :port => 4000) +# logger.post("some_tag", { "your" => "data", "here" => "yay!" }) +# +# Notes: +# +# * the fluent uses a second-precision time for events, so you will never see +# subsecond precision on events processed by this codec. +# +class LogStash::Codecs::Fluent < LogStash::Codecs::Base + config_name "fluent" + + def register + require "msgpack" + @decoder = MessagePack::Unpacker.new + end + + def decode(data, &block) + @decoder.feed_each(data) do |item| + decode_event(item, &block) + end + end # def decode + + def encode(event) + tag = event.get("tags") || "log" + + epochtime = event.timestamp.to_i + + # use normalize to make sure returned Hash is pure Ruby for + # MessagePack#pack which relies on pure Ruby object recognition + data = LogStash::Util.normalize(event.to_hash) + # timestamp is serialized as a iso8601 string + # merge to avoid modifying data which could have side effects if multiple outputs + @on_event.call(event, MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])) + end # def encode + + private + + def decode_event(data, &block) + tag = data[0] + entries = data[1] + + case entries + when String + # PackedForward + option = data[2] + compressed = (option && option['compressed'] == 'gzip') + if compressed + raise(LogStash::Error, "PackedForward with compression is not supported") + end + + entries_decoder = MessagePack::Unpacker.new + entries_decoder.feed_each(entries) do |entry| + epochtime = entry[0] + map = entry[1] + arr = [] + event = LogStash::Event.new(map.merge!( + LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), + "tags" => tag + ){ |key, v1, v2| arr.insert(0,v2,v1) }) + yield event + end + when Array + # Forward + entries.each do |entry| + epochtime = entry[0] + map = entry[1] + arr = [] + event = LogStash::Event.new(map.merge!( + LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), + "tags" => tag + ){ |key, v1, v2| arr.insert(0,v2,v1) }) + yield event + end + when Fixnum + # Message + epochtime = entries + map = data[2] + arr = [] + event = LogStash::Event.new(map.merge!( + LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), + "tags" => tag + ){ |key, v1, v2| arr.insert(0,v2,v1) }) + yield event + else + raise(LogStash::Error, "Unknown event type") + end + rescue StandardError => e + @logger.error("Fluent parse error, original data now in message field", :error => e, :data => data) + yield LogStash::Event.new("message" => data, "tags" => [ "_fluentparsefailure" ]) + end + +end # class LogStash::Codecs::Fluent diff --git a/logstash-codec-fluent.gemspec b/logstash-codec-fluent.gemspec index 2d682df..295a383 100644 --- a/logstash-codec-fluent.gemspec +++ b/logstash-codec-fluent.gemspec @@ -1,34 +1,34 @@ -Gem::Specification.new do |s| - - s.name = 'logstash-codec-fluent' - s.version = '3.1.1' - s.licenses = ['Apache License (2.0)'] - s.summary = "This codec handles fluentd's msgpack schema." - s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" - s.authors = ["Elastic"] - s.email = 'info@elastic.co' - s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html" - s.require_paths = ["lib"] - - # Files - s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT'] - - # Tests - s.test_files = s.files.grep(%r{^(test|spec|features)/}) - - # Special flag to let us know this is actually a logstash plugin - s.metadata = { "logstash_plugin" => "true", "logstash_group" => "codec" } - - # Gem dependencies - s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" - - if RUBY_PLATFORM == 'java' - s.platform = RUBY_PLATFORM - s.add_runtime_dependency 'msgpack-jruby' - else - s.add_runtime_dependency 'msgpack' - end - - s.add_development_dependency 'logstash-devutils', ">= 1.0.0" -end - +Gem::Specification.new do |s| + + s.name = 'logstash-codec-fluent' + s.version = '3.1.1' + s.licenses = ['Apache License (2.0)'] + s.summary = "This codec handles fluentd's msgpack schema." + s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" + s.authors = ["Elastic"] + s.email = 'info@elastic.co' + s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html" + s.require_paths = ["lib"] + + # Files + s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT'] + + # Tests + s.test_files = s.files.grep(%r{^(test|spec|features)/}) + + # Special flag to let us know this is actually a logstash plugin + s.metadata = { "logstash_plugin" => "true", "logstash_group" => "codec" } + + # Gem dependencies + s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" + + if RUBY_PLATFORM == 'java' + s.platform = RUBY_PLATFORM + s.add_runtime_dependency 'msgpack-jruby' + else + s.add_runtime_dependency 'msgpack' + end + + s.add_development_dependency 'logstash-devutils', ">= 1.0.0" +end + diff --git a/spec/codecs/fluent_spec.rb b/spec/codecs/fluent_spec.rb index c85cd1f..e0fddfe 100644 --- a/spec/codecs/fluent_spec.rb +++ b/spec/codecs/fluent_spec.rb @@ -1,99 +1,99 @@ -# encoding: utf-8 -require_relative "../spec_helper" -require "logstash/plugin" -require "logstash/event" - -describe LogStash::Codecs::Fluent do - - let(:properties) { {:name => "foo" } } - let(:event) { LogStash::Event.new(properties) } - - it "should register without errors" do - plugin = LogStash::Plugin.lookup("codec", "fluent").new - expect { plugin.register }.to_not raise_error - end - - describe "event encoding" do - - it "should encode as message pack format" do - subject.on_event do |event, data| - fields = MessagePack.unpack(data) - expect(fields[0]).to eq("log") - expect(fields[2]["name"]).to eq("foo") - end - subject.encode(event) - end - - end - - describe "event decoding" do - - let(:tag) { "mytag" } - let(:epochtime) { event.timestamp.to_i } - let(:data) { LogStash::Util.normalize(event.to_hash) } - let(:message) do - MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]) - end - - it "should decode without errors" do - subject.decode(message) do |event| - expect(event.get("name")).to eq("foo") - end - end - - end - - describe "event decoding (buckets of events)" do - - let(:tag) { "mytag" } - let(:epochtime) { event.timestamp.to_i } - let(:data) { LogStash::Util.normalize(event.to_hash) } - let(:message) do - MessagePack.pack([tag, - [ - [epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)], - [epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)], - [epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)] - ] - ]) - end - - it "should decode without errors" do - count = 0 - - subject.decode(message) do |event| - expect(event.get("name")).to eq("foo") - count += 1 - end - - expect(count).to eq(3) - end - - end - - describe "event decoding (broken package)" do - - let(:tag) { "mytag" } - let(:epochtime) { event.timestamp.to_s } - let(:data) { LogStash::Util.normalize(event.to_hash) } - let(:message) do - MessagePack.pack([tag, - epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601) - ]) - end - - it "should decode with errors" do - subject.decode(message) do |event| - expect(event.get("name")).not_to eq("foo") - end - end - - it "should inject a failure event" do - subject.decode(message) do |event| - expect(event.get("tags")).to include("_fluentparsefailure") - end - end - - end - -end +# encoding: utf-8 +require_relative "../spec_helper" +require "logstash/plugin" +require "logstash/event" + +describe LogStash::Codecs::Fluent do + + let(:properties) { {:name => "foo" } } + let(:event) { LogStash::Event.new(properties) } + + it "should register without errors" do + plugin = LogStash::Plugin.lookup("codec", "fluent").new + expect { plugin.register }.to_not raise_error + end + + describe "event encoding" do + + it "should encode as message pack format" do + subject.on_event do |event, data| + fields = MessagePack.unpack(data) + expect(fields[0]).to eq("log") + expect(fields[2]["name"]).to eq("foo") + end + subject.encode(event) + end + + end + + describe "event decoding" do + + let(:tag) { "mytag" } + let(:epochtime) { event.timestamp.to_i } + let(:data) { LogStash::Util.normalize(event.to_hash) } + let(:message) do + MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]) + end + + it "should decode without errors" do + subject.decode(message) do |event| + expect(event.get("name")).to eq("foo") + end + end + + end + + describe "event decoding (buckets of events)" do + + let(:tag) { "mytag" } + let(:epochtime) { event.timestamp.to_i } + let(:data) { LogStash::Util.normalize(event.to_hash) } + let(:message) do + MessagePack.pack([tag, + [ + [epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)], + [epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)], + [epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)] + ] + ]) + end + + it "should decode without errors" do + count = 0 + + subject.decode(message) do |event| + expect(event.get("name")).to eq("foo") + count += 1 + end + + expect(count).to eq(3) + end + + end + + describe "event decoding (broken package)" do + + let(:tag) { "mytag" } + let(:epochtime) { event.timestamp.to_s } + let(:data) { LogStash::Util.normalize(event.to_hash) } + let(:message) do + MessagePack.pack([tag, + epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601) + ]) + end + + it "should decode with errors" do + subject.decode(message) do |event| + expect(event.get("name")).not_to eq("foo") + end + end + + it "should inject a failure event" do + subject.decode(message) do |event| + expect(event.get("tags")).to include("_fluentparsefailure") + end + end + + end + +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 5d45c8f..37847ea 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,3 +1,3 @@ -# encoding: utf-8 -require "logstash/devutils/rspec/spec_helper" -require "logstash/codecs/fluent" +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/codecs/fluent"