Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic Parameters #63

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions lib/logstash/inputs/http_poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require "socket" # for Socket.gethostname
require "manticore"
require "rufus/scheduler"
require "yaml" # persistence

class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base
include LogStash::PluginMixins::HttpClient
Expand Down Expand Up @@ -57,10 +58,15 @@ def setup_requests!
@requests = Hash[@urls.map {|name, url| [name, normalize_request(url)] }]
end

private
def filter_dynamic_params(allowed_keys, params)
params.slice(*allowed_keys)
end

private
def normalize_request(url_or_spec)
if url_or_spec.is_a?(String)
res = [:get, url_or_spec]
res = [:get, url_or_spec, {}]
elsif url_or_spec.is_a?(Hash)
# The client will expect keys / values
spec = Hash[url_or_spec.clone.map {|k,v| [k.to_sym, v] }] # symbolize keys
Expand All @@ -77,17 +83,30 @@ def normalize_request(url_or_spec)
auth = spec[:auth]
user = spec.delete(:user) || (auth && auth["user"])
password = spec.delete(:password) || (auth && auth["password"])

if user.nil? ^ password.nil?
raise LogStash::ConfigurationError, "'user' and 'password' must both be specified for input HTTP poller!"
end

if user && password
spec[:auth] = {
user: user,
user: user,
pass: password,
eager: true
}
}
end

if spec.delete(:use_dynamic_params)
last_dynamic_params_location = spec[:last_dynamic_params]
dynamic_params_map = spec[:dynamic_params_map]

if last_dynamic_params_location.is_a?(String) && File.exist?(last_dynamic_params_location)
dynamic_params = YAML.load(File.read(last_dynamic_params_location))
allowed_keys = dynamic_params_map.is_a?(Hash) ? dynamic_params_map.keys : []
spec[:dynamic_params] = filter_dynamic_params(allowed_keys, dynamic_params)
else
spec[:dynamic_params] = {}
end
end
res = [method, url, spec]
else
Expand Down Expand Up @@ -133,13 +152,23 @@ def setup_schedule(queue)

@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
#as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
@scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) }
@scheduler.join
end

private
def assign_dynamic_params(request)
params = request[2][:dynamic_params]
request[2][:query] = {} if !request[2][:query]
params.keys.each do |key|
request[2][:query][key] = params[key]
end
end

def run_once(queue)
@requests.each do |name, request|
assign_dynamic_params(request) if request[2][:dynamic_params]
request_async(queue, name, request)
end

Expand Down Expand Up @@ -175,11 +204,23 @@ def handle_success(queue, name, request, response, execution_time)
end
end

private
def update_dynamic_params(request, event)
request[2][:dynamic_params_map].keys.each do |key|
value = request[2][:dynamic_params_map][key]
event_value = event.get(value)
request[2][:dynamic_params][key] = event_value if event_value
end
File.write(request[2][:last_dynamic_params], YAML.dump(request[2][:dynamic_params]))
end

private
def handle_decoded_event(queue, name, request, response, event, execution_time)
apply_metadata(event, name, request, response, execution_time)
decorate(event)
queue << event

update_dynamic_params(request, event) if request[2][:dynamic_params]
rescue StandardError, java.lang.Exception => e
@logger.error? && @logger.error("Error eventifying response!",
:exception => e,
Expand Down