Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix decoding multiple frames in a single envelope in native protocol v5 #368

Merged
merged 5 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -305,24 +305,21 @@ defmodule Xandra.Cluster.ControlConnection do
end
end

rest_fun = & &1

function =
decode_fun =
case state.protocol_module do
Xandra.Protocol.V5 -> :decode_v5
Xandra.Protocol.V4 -> :decode_v4
Xandra.Protocol.V3 -> :decode_v4
Xandra.Protocol.V5 -> &Xandra.Frame.decode_v5/4
Xandra.Protocol.V4 -> &Xandra.Frame.decode_v4/4
Xandra.Protocol.V3 -> &Xandra.Frame.decode_v4/4
end

case apply(Xandra.Frame, function, [
fetch_bytes_fun,
state.buffer,
_compressor = nil,
rest_fun
]) do
{:ok, frame, rest} ->
change_event = state.protocol_module.decode_response(frame)
state = handle_change_event(state, change_event)
case decode_fun.(fetch_bytes_fun, state.buffer, _compressor = nil, _rest_fun = & &1) do
{:ok, frames, rest} ->
state =
Enum.reduce(frames, state, fn frame, acc ->
change_event = state.protocol_module.decode_response(frame)
handle_change_event(acc, change_event)
end)

consume_new_data(%__MODULE__{state | buffer: rest})

{:error, _reason} ->
Expand Down
16 changes: 10 additions & 6 deletions lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -762,10 +762,14 @@ defmodule Xandra.Connection do
data.compressor,
_rest_fun = & &1
) do
{:ok, frame, rest} ->
%__MODULE__{data | buffer: rest}
|> handle_frame(frame)
|> handle_new_bytes()
{:ok, frames, rest} ->
data = Enum.reduce(frames, %__MODULE__{data | buffer: rest}, &handle_frame/2)

if rest != "" do
handle_new_bytes(data)
else
{:keep_state, data}
end

{:error, :insufficient_data} ->
{:keep_state, data}
Expand All @@ -776,8 +780,8 @@ defmodule Xandra.Connection do
end

defp handle_frame(
%__MODULE__{timed_out_ids: timed_out_ids} = data,
%Frame{stream_id: stream_id} = frame
%Frame{stream_id: stream_id} = frame,
%__MODULE__{timed_out_ids: timed_out_ids} = data
) do
case pop_in(data.in_flight_requests[stream_id]) do
# There is no in-flight req for this response frame, BUT there is a request
Expand Down
12 changes: 9 additions & 3 deletions lib/xandra/connection/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@ defmodule Xandra.Connection.Utils do
do: {:ok, binary, fetch_state}
end

case protocol_format do
:v4_or_less -> Frame.decode_v4(fetch_bytes_fun, :no_fetch_state, compressor)
:v5_or_more -> Frame.decode_v5(fetch_bytes_fun, :no_fetch_state, compressor)
decode_fun =
case protocol_format do
:v4_or_less -> &Frame.decode_v4/3
:v5_or_more -> &Frame.decode_v5/3
end

case decode_fun.(fetch_bytes_fun, :no_fetch_state, compressor) do
{:ok, [frame], rest} -> {:ok, frame, rest}
{:error, reason} -> {:error, reason}
end
end

Expand Down
23 changes: 16 additions & 7 deletions lib/xandra/frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ defmodule Xandra.Frame do
(fetch_state, pos_integer() -> {:ok, binary(), fetch_state} | {:error, reason}),
fetch_state,
module() | nil
) :: {:ok, t(), binary()} | {:error, reason}
) :: {:ok, [t(), ...], binary()} | {:error, reason}
when fetch_state: term(), reason: term()
def decode(
protocol_module,
Expand All @@ -313,7 +313,7 @@ defmodule Xandra.Frame do
fetch_state,
module() | nil,
(fetch_state -> binary())
) :: {:ok, t(), binary()} | {:error, reason}
) :: {:ok, [t(), ...], binary()} | {:error, reason}
when fetch_state: term(), reason: term()
def decode_v4(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end)
when is_function(fetch_bytes_fun, 2) and is_atom(compressor) do
Expand All @@ -322,11 +322,11 @@ defmodule Xandra.Frame do
with {:ok, header, fetch_state} <- fetch_bytes_fun.(fetch_state, length) do
case body_length(header) do
0 ->
{:ok, decode(header), rest_fun.(fetch_state)}
{:ok, [decode(header)], rest_fun.(fetch_state)}

body_length ->
with {:ok, body, bytes_state} <- fetch_bytes_fun.(fetch_state, body_length),
do: {:ok, decode(header, body, compressor), rest_fun.(bytes_state)}
do: {:ok, [decode(header, body, compressor)], rest_fun.(bytes_state)}
end
end
end
Expand All @@ -336,17 +336,26 @@ defmodule Xandra.Frame do
fetch_state,
module() | nil,
(fetch_state -> binary())
) :: {:ok, t(), rest :: binary()} | {:error, reason}
) :: {:ok, [t(), ...], rest :: binary()} | {:error, reason}
when fetch_state: term(), reason: term()
def decode_v5(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end)
when is_function(fetch_bytes_fun, 2) and is_atom(compressor) do
with {:ok, envelope, rest} <-
decode_v5_wrapper(fetch_bytes_fun, fetch_state, compressor, rest_fun) do
{frame, _ignored_rest} = decode_from_binary(envelope, compressor)
{:ok, frame, rest}
frames = decode_all_v5_frames_in_envelope(envelope, compressor, _acc = [])
{:ok, frames, rest}
end
end

defp decode_all_v5_frames_in_envelope("", _compressor, acc) do
Enum.reverse(acc)
end

defp decode_all_v5_frames_in_envelope(envelope, compressor, acc) do
{frame, rest} = decode_from_binary(envelope, compressor)
decode_all_v5_frames_in_envelope(rest, compressor, [frame | acc])
end

# Made public for testing.
@doc false
def decode_v5_wrapper(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end) do
Expand Down
26 changes: 25 additions & 1 deletion test/xandra/frame_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ defmodule Xandra.FrameTest do
|> Frame.encode(protocol_module)
|> IO.iodata_to_binary()

assert {:ok, redecoded_frame, _rest = ""} =
assert {:ok, [redecoded_frame], _rest = ""} =
Frame.decode_v5(&fetch_bytes_from_binary/2, encoded, _compressor = nil)

assert redecoded_frame == frame
Expand Down Expand Up @@ -240,6 +240,30 @@ defmodule Xandra.FrameTest do
end
end

describe "decode/5" do
# Regression for: https://issues.apache.org/jira/browse/CASSANDRA-19753
@tag :regression
test "can decode multiple envelopes in a single frame" do
payload =
<<144, 0, 2, 138, 218, 155, 133, 0, 0, 2, 8, 0, 0, 0, 63, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0,
1, 0, 6, 115, 121, 115, 116, 101, 109, 0, 5, 108, 111, 99, 97, 108, 0, 12, 99, 108, 117,
115, 116, 101, 114, 95, 110, 97, 109, 101, 0, 13, 0, 0, 0, 1, 0, 0, 0, 12, 116, 101,
115, 116, 95, 99, 108, 117, 115, 116, 101, 114, 133, 0, 0, 1, 8, 0, 0, 0, 63, 0, 0, 0,
2, 0, 0, 0, 1, 0, 0, 0, 1, 0, 6, 115, 121, 115, 116, 101, 109, 0, 5, 108, 111, 99, 97,
108, 0, 12, 99, 108, 117, 115, 116, 101, 114, 95, 110, 97, 109, 101, 0, 13, 0, 0, 0, 1,
0, 0, 0, 12, 116, 101, 115, 116, 95, 99, 108, 117, 115, 116, 101, 114, 40, 65, 100, 21>>

assert {:ok, [%Frame{stream_id: 2}, %Frame{stream_id: 1}], _rest = ""} =
Frame.decode(
Xandra.Protocol.V5,
&fetch_bytes_from_binary/2,
payload,
_compressor = nil,
_rest_fun = & &1
)
end
end

defp kind_generator do
member_of([
:startup,
Expand Down
13 changes: 13 additions & 0 deletions test/xandra_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,19 @@ defmodule XandraTest do
end
end

# Regression for timeouts on native protocol v5:
# https://github.com/whatyouhide/xandra/issues/356
@tag :regression
test "concurrent requests on a single connection", %{conn: conn} do
1..5
|> Task.async_stream(fn _i ->
Xandra.execute(conn, "SELECT cluster_name FROM system.local", [], timeout: 5000)
end)
|> Enum.each(fn {:ok, result} ->
assert {:ok, %Xandra.Page{}} = result
end)
end

def configure_fun(options, original_start_options, pid, ref) do
send(pid, {ref, options})
Keyword.replace!(options, :nodes, original_start_options[:nodes])
Expand Down
Loading