Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Andsel/verify oom behavior of parser in isolation #477

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

andsel
Copy link
Contributor

@andsel andsel commented Aug 25, 2023

Use log4j2.properties like

status = error

appender.console.type = Console
appender.console.name = plain_console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n


logger.netty.name = io.netty
logger.netty.level = debug
logger.beats_parser.name = org.logstash.beats.BeatsParser
logger.beats_parser.level = trace
logger.beats_server.name = org.logstash.beats.Server
logger.beats_server.level = warn

Use a ruby script to load, in plain TCP like:

# encoding: utf-8
# JRUBY_OPTS="-J-Xmx4g -J-Xms4g" ruby beats_writer_ssl.rb
require "socket"
require "thread"
require "zlib"
require "json"
require "openssl"

Thread.abort_on_exception = true
HOST="127.0.0.1"
PORT=3333
CLIENT_CERT="/Users/andrea/workspace/certificates/client_from_root.crt"
CLIENT_KEY="/Users/andrea/workspace/certificates/client_from_root.key.pkcs8"

module Lumberjack
  SEQUENCE_MAX = (2**32-1).freeze

  class Client
    def initialize
      @sequence = 0
      #@socket = connect
      @socket = TCPSocket.new(HOST, 3334)
    end

    private
    def connect
      socket = TCPSocket.new(HOST, PORT)
      ctx = OpenSSL::SSL::SSLContext.new
      ctx.cert = OpenSSL::X509::Certificate.new(File.read(CLIENT_CERT))
      ctx.key = OpenSSL::PKey::RSA.new(File.read(CLIENT_KEY))
      ctx.ssl_version = :TLSv1_2

      # Wrap the socket with SSL/TLS
      ssl_socket = OpenSSL::SSL::SSLSocket.new(socket, ctx)
      ssl_socket.sync_close = true
      ssl_socket.connect
      ssl_socket
    end

    public
    def write(elements, chunk_size=5000, sleep_seconds=0.01)
      elements = [elements] if elements.is_a?(Hash)
      send_window_size(elements.size)

      payload = elements.map { |element| JsonEncoder.to_frame(element, inc) }.join
      send_payload(payload, chunk_size, sleep_seconds)
    end

    private
    def inc
      @sequence = 0 if @sequence + 1 > Lumberjack::SEQUENCE_MAX
      @sequence = @sequence + 1
    end

    private
    def send_window_size(size)
      @socket.syswrite(["2", "W", size].pack("AAN"))
    end

    private
    def send_payload(payload, chunk_size, sleep_seconds)
      payload_size = payload.size
      written = 0
      while written < payload_size
        written += @socket.syswrite(payload[written..written+chunk_size])
        puts "written #{written}.."
        sleep sleep_seconds
      end
    end

    public
    def close
      @socket.close
    end
  end

  module JsonEncoder
    def self.to_frame(hash, sequence)
      json = hash.to_json
      json_length = json.bytesize
      pack = "AANNA#{json_length}"
      frame = ["2", "J", sequence, json_length, json]
      frame.pack(pack)
    end
  end

end

client_count = 1500
message = 'a'*8*16*1024
#require 'pry'
#binding.pry

puts "Connecting #{client_count} clients"
clients = client_count.times.map { Lumberjack::Client.new }
puts "Writing approximately #{(client_count*message.size)/1024.0/1024.0}Mib across #{client_count} clients"
threads = client_count.times.map do |i|
  Thread.new(i) do |i|
    client = clients[i]
    # keep message size above 16k, requiring two TLS records
    data = [ { "message" => message } ]
    50.times do
      client.write(data)
      sleep 1*rand
    end
    client.close
  end
end
threads.each(&:join)
puts "Done"
sleep 10

Run with:

# to create a single uber-jar
> ./gradlew jar 

# run the app
> java -Dio.netty.allocator.numHeapArenas=0 -XX:NativeMemoryTracking=summary -Dio.netty.allocator.numDirectArenas=1 -XX:MaxDirectMemorySize=128m -XX:-MaxFDLimit -Dlog4j.configurationFile=/path_to/log4j2.properties -jar build/libs/logstash-input-beats-6.6.0.jar

andsel and others added 15 commits July 5, 2023 15:48
…omes not writable, to excert backpressure to the sender system
On new channel registration (that correspond to a new client connection),
verifies the direct memory stastus to understand if almost the totality max direct memory
is reached and also if the majoproity of that space is used by pinned byte buffers.
If the codition is verified that means direct memory avvailable is terminating, so no new
connection would help in the situation, and the incoming new connections are closed.
…ble status due to that offload every message to the outbound list
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant