Skip to content

Commit

Permalink
GH-800: Drain the Pipeline (#450)
Browse files Browse the repository at this point in the history
* GH-800: Write the most basic TODOs for this card

* GH-800: Add todos for renaming purposes

* GH-800: Add a message scheduler for purging the stream key

* GH-800: Improve the test handle_client_response_payload_purges_stream_keys_for_terminal_response

* GH-800: Make delay configurable via constant

* GH-800: introduce StreamSenders

* GH-800: introduce channel for shutdown signal

* GH-800: add test connection_shutdown_test.rs

* GH-800: test drive the shutdown signal in StreamReader

* GH-800: refactor test stream_reader_shuts_down_when_it_receives_the_shutdown_signal

* GH-800: some refactor inside the file stream_reader

* GH-800: some more refactoring

* GH-800: trying_to_write_a_test_for_stream_senders was a success

* GH-800: did some refactoring for trying_to_write_a_test_for_stream_senders

* GH-800: stream_handler_pool_sends_shutdown_signal_when_last_data_is_true passes for the first time

* GH-800: Minor refactor for the test stream_handler_pool_sends_shutdown_signal_when_last_data_is_true

* GH-800: more refactoring of the test stream_handler_pool_sends_shutdown_signal_when_last_data_is_true

* GH-800: stream_handler_pool_sends_shutdown_signal_when_last_data_is_true is working fine

* GH-800: some more todos removed

* GH-800: clean_up_dead_streams_logs_when_the_shutdown_channel_is_down is passing

* GH-800: tests are passing in StreamHandlerPool

* GH-800: Wrote a test for the case when the shutdown signal channel is gone

* GH-800: make it easier to understand who experiences the write error

* GH-800: trying to write test while_housekeeping_the_stream_senders_are_received_by_stream_handler_pool

* GH-800: add test add_new_streams_works

* GH-800: wip: add the log

* GH-800: test proxy_client_stream_reader_dies_when_client_stream_is_killed_integration is passing

* GH-800: change the channel from crossbeam to tokio

* GH-800: Add fn send_shutdown_signal_to_stream_reader

* GH-800: wip: fixing 2 tests

* GH-800: stream_handler_pool_sends_shutdown_signal_when_last_data_is_true is passing

* GH-800: all tests in stream_handler_pool.rs are passing

* GH-800: add test for the logs; all tests passing

* GH-800: remove lookup_ip() mock fns from a test

* GH-800: remove warnings

* GH-800: use fn send_shutdown_signal_to_stream_reader in clean_up_dead_streams()

* Revert "GH-800: use fn send_shutdown_signal_to_stream_reader in clean_up_dead_streams()"

This reverts commit ae08930.

* Revert "GH-800: remove warnings"

This reverts commit d81c2f0.

* Revert "GH-800: remove lookup_ip() mock fns from a test"

This reverts commit 1880006.

* Revert "GH-800: add test for the logs; all tests passing"

This reverts commit 4f8c141.

* Revert "GH-800: all tests in stream_handler_pool.rs are passing"

This reverts commit 31db7c4.

* Revert "GH-800: stream_handler_pool_sends_shutdown_signal_when_last_data_is_true is passing"

This reverts commit 54040ae.

* Revert "GH-800: wip: fixing 2 tests"

This reverts commit f94c8d3.

* Revert "GH-800: Add fn send_shutdown_signal_to_stream_reader"

This reverts commit 3b387a7.

* Revert "GH-800: change the channel from crossbeam to tokio"

This reverts commit 2ecfebc.

* GH-800: reverted the channel to crossbeam; remove warnings

* GH-800: solve some TODOs

* GH-800: add refactored fn send_shutdown_signal_to_stream_reader; crtitical integration test passing

* GH-800: write_failure_for_nonexistent_stream_generates_termination_message is improved

* GH-800: everything is tested

* GH-800: fix proxy_server_receives_terminal_response_from_hopper

* GH-800: trigger actions

* GH-800: assert on a different error on non-mac os

* GH-80: add changes of self-review

* GH-800: some other missed changes

* GH-800: add review 1 changes

* GH-800: add review 2 changes

* GH-800: add review 3 changes

* GH-800: add review 4 changes

* GH-800: add better logging

* GH-800: delay purge when the report_to_counterpart is false

* GH-800: final changes

* GH-800: review changes

* GH-800: fix the final comment discussion changes

* GH-800: fix multiple_stream_zero_hop_test
  • Loading branch information
utkarshg6 authored Nov 29, 2024
1 parent 0caf945 commit 75f6657
Show file tree
Hide file tree
Showing 10 changed files with 963 additions and 211 deletions.
2 changes: 1 addition & 1 deletion multinode_integration_tests/tests/data_routing_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ fn multiple_stream_zero_hop_test() {
let mut another_client = zero_hop_node.make_client(8080, STANDARD_CLIENT_TIMEOUT_MILLIS);

one_client.send_chunk(b"GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n");
another_client.send_chunk(b"GET /online/ HTTP/1.1\r\nHost: whatever.neverssl.com\r\n\r\n");
another_client.send_chunk(b"GET /online/ HTTP/1.1\r\nAccept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\r\nAccept-Language: cs-CZ,cs;q=0.9,en;q=0.8,sk;q=0.7\r\nCache-Control: max-age=0\r\nConnection: keep-alive\r\nHost: whatever.neverssl.com\r\nUpgrade-Insecure-Requests: 1\r\nUser-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36\r\n\r\n");

let one_response = one_client.wait_for_chunk();
let another_response = another_client.wait_for_chunk();
Expand Down
19 changes: 15 additions & 4 deletions node/src/proxy_client/stream_establisher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved.

use crate::proxy_client::stream_handler_pool::StreamSenders;
use crate::proxy_client::stream_reader::StreamReader;
use crate::proxy_client::stream_writer::StreamWriter;
use crate::sub_lib::channel_wrappers::FuturesChannelFactory;
Expand All @@ -14,15 +15,15 @@ use crate::sub_lib::stream_connector::StreamConnectorReal;
use crate::sub_lib::stream_key::StreamKey;
use crate::sub_lib::tokio_wrappers::ReadHalfWrapper;
use actix::Recipient;
use crossbeam_channel::Sender;
use crossbeam_channel::{unbounded, Receiver, Sender};
use masq_lib::logger::Logger;
use std::io;
use std::net::IpAddr;
use std::net::SocketAddr;

pub struct StreamEstablisher {
pub cryptde: &'static dyn CryptDE,
pub stream_adder_tx: Sender<(StreamKey, Box<dyn SenderWrapper<SequencedPacket>>)>,
pub stream_adder_tx: Sender<(StreamKey, StreamSenders)>,
pub stream_killer_tx: Sender<(StreamKey, u64)>,
pub stream_connector: Box<dyn StreamConnector>,
pub proxy_client_sub: Recipient<InboundServerData>,
Expand Down Expand Up @@ -57,11 +58,13 @@ impl StreamEstablisher {
payload.target_port,
&self.logger,
)?;
let (shutdown_signal_tx, shutdown_signal_rx) = unbounded();

self.spawn_stream_reader(
&payload.clone(),
connection_info.reader,
connection_info.peer_addr,
shutdown_signal_rx,
);

let (tx_to_write, rx_to_write) = self.channel_factory.make(connection_info.peer_addr);
Expand All @@ -73,8 +76,13 @@ impl StreamEstablisher {
);
tokio::spawn(stream_writer);

let stream_senders = StreamSenders {
writer_data: tx_to_write.clone(),
reader_shutdown_tx: shutdown_signal_tx,
};

self.stream_adder_tx
.send((payload.stream_key, tx_to_write.clone()))
.send((payload.stream_key, stream_senders))
.expect("StreamHandlerPool died");
Ok(tx_to_write)
}
Expand All @@ -84,12 +92,14 @@ impl StreamEstablisher {
payload: &ClientRequestPayload_0v1,
read_stream: Box<dyn ReadHalfWrapper>,
peer_addr: SocketAddr,
shutdown_signal: Receiver<()>,
) {
let stream_reader = StreamReader::new(
payload.stream_key,
self.proxy_client_sub.clone(),
read_stream,
self.stream_killer_tx.clone(),
shutdown_signal,
peer_addr,
);
debug!(self.logger, "Spawning StreamReader for {}", peer_addr);
Expand All @@ -103,7 +113,7 @@ pub trait StreamEstablisherFactory: Send {

pub struct StreamEstablisherFactoryReal {
pub cryptde: &'static dyn CryptDE,
pub stream_adder_tx: Sender<(StreamKey, Box<dyn SenderWrapper<SequencedPacket>>)>,
pub stream_adder_tx: Sender<(StreamKey, StreamSenders)>,
pub stream_killer_tx: Sender<(StreamKey, u64)>,
pub proxy_client_subs: ProxyClientSubs,
pub logger: Logger,
Expand Down Expand Up @@ -191,6 +201,7 @@ mod tests {
},
read_stream,
SocketAddr::from_str("1.2.3.4:5678").unwrap(),
unbounded().1,
);

proxy_client_awaiter.await_message_count(1);
Expand Down
Loading

0 comments on commit 75f6657

Please sign in to comment.