From 74da57a197e8014cd4065738fae09b2c2e55f24c Mon Sep 17 00:00:00 2001 From: blueskea Date: Sat, 25 Nov 2023 00:32:30 +0800 Subject: [PATCH 1/5] feat: Supports multiple servers in single client --- src/client.rs | 44 ++++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/src/client.rs b/src/client.rs index 95a5a749..a77744ac 100644 --- a/src/client.rs +++ b/src/client.rs @@ -104,15 +104,18 @@ impl Client { mut shutdown_rx: broadcast::Receiver, mut update_rx: mpsc::Receiver, ) -> Result<()> { - for (name, config) in &self.config.services { - // Create a control channel for each service defined - let handle = ControlChannelHandle::new( - (*config).clone(), - self.config.remote_addr.clone(), - self.transport.clone(), - self.config.heartbeat_timeout, - ); - self.service_handles.insert(name.clone(), handle); + for remote_addr in self.config.remote_addr.split(",") { + for (name, config) in &self.config.services { + // Create a control channel for each service defined + let handle = ControlChannelHandle::new( + (*config).clone(), + remote_addr.to_string(), + self.transport.clone(), + self.config.heartbeat_timeout, + ); + let full_name = remote_addr.to_string() + "," + name; + self.service_handles.insert(full_name, handle); + } } // Wait for the shutdown signal @@ -147,17 +150,22 @@ impl Client { match e { ConfigChange::ClientChange(client_change) => match client_change { ClientServiceChange::Add(cfg) => { - let name = cfg.name.clone(); - let handle = ControlChannelHandle::new( - cfg, - self.config.remote_addr.clone(), - self.transport.clone(), - self.config.heartbeat_timeout, - ); - let _ = self.service_handles.insert(name, handle); + for remote_addr in self.config.remote_addr.split(",") { + let handle = ControlChannelHandle::new( + cfg.clone(), + remote_addr.to_string(), + self.transport.clone(), + self.config.heartbeat_timeout, + ); + let full_name = remote_addr.to_string() + "," + &cfg.name; + let _ = self.service_handles.insert(full_name, handle); + } } ClientServiceChange::Delete(s) => { - let _ = self.service_handles.remove(&s); + for remote_addr in self.config.remote_addr.split(",") { + let full_name = remote_addr.to_string() + "," + &s; + let _ = self.service_handles.remove(&full_name); + } } }, ignored => warn!("Ignored {:?} since running as a client", ignored), From bcbd3e68b3c79996d6d8dd999a0de06092bd5956 Mon Sep 17 00:00:00 2001 From: blueskea Date: Sat, 25 Nov 2023 00:42:30 +0800 Subject: [PATCH 2/5] docs: multiple servers --- README-zh.md | 1 + README.md | 1 + 2 files changed, 2 insertions(+) diff --git a/README-zh.md b/README-zh.md index cb85db02..ead662ae 100644 --- a/README-zh.md +++ b/README-zh.md @@ -104,6 +104,7 @@ local_addr = "127.0.0.1:22" # 需要被转发的服务的地址 ```toml [client] remote_addr = "example.com:2333" # Necessary. The address of the server +# remote_addr = "foo.com:2333,bar.com:3332" # Multiple server addresses, this is a preview feature and address changes cannot be hot loaded now default_token = "default_token_if_not_specify" # Optional. The default token of services, if they don't define their own ones heartbeat_timeout = 40 # Optional. Set to 0 to disable the application-layer heartbeat test. The value must be greater than `server.heartbeat_interval`. Default: 40 seconds retry_interval = 1 # Optional. The interval between retry to connect to the server. Default: 1 second diff --git a/README.md b/README.md index 18e46aff..695d1df9 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,7 @@ Here is the full configuration specification: ```toml [client] remote_addr = "example.com:2333" # Necessary. The address of the server +# remote_addr = "foo.com:2333,bar.com:3332" # Multiple server addresses, this is a preview feature and address changes cannot be hot loaded now default_token = "default_token_if_not_specify" # Optional. The default token of services, if they don't define their own ones heartbeat_timeout = 40 # Optional. Set to 0 to disable the application-layer heartbeat test. The value must be greater than `server.heartbeat_interval`. Default: 40 seconds retry_interval = 1 # Optional. The interval between retry to connect to the server. Default: 1 second From 85c70bf99da81256cd92c0655198632adf8b0bf1 Mon Sep 17 00:00:00 2001 From: blueskea Date: Sat, 25 Nov 2023 00:54:02 +0800 Subject: [PATCH 3/5] feat: Supports multiple servers in single client --- src/client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index a77744ac..8717dfc4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -104,7 +104,7 @@ impl Client { mut shutdown_rx: broadcast::Receiver, mut update_rx: mpsc::Receiver, ) -> Result<()> { - for remote_addr in self.config.remote_addr.split(",") { + for remote_addr in self.config.remote_addr.split(',') { for (name, config) in &self.config.services { // Create a control channel for each service defined let handle = ControlChannelHandle::new( @@ -150,7 +150,7 @@ impl Client { match e { ConfigChange::ClientChange(client_change) => match client_change { ClientServiceChange::Add(cfg) => { - for remote_addr in self.config.remote_addr.split(",") { + for remote_addr in self.config.remote_addr.split(',') { let handle = ControlChannelHandle::new( cfg.clone(), remote_addr.to_string(), @@ -162,7 +162,7 @@ impl Client { } } ClientServiceChange::Delete(s) => { - for remote_addr in self.config.remote_addr.split(",") { + for remote_addr in self.config.remote_addr.split(',') { let full_name = remote_addr.to_string() + "," + &s; let _ = self.service_handles.remove(&full_name); } From 7ebfa0601246d1a715d385d18102d5ee55047a9c Mon Sep 17 00:00:00 2001 From: blueskea Date: Fri, 2 Feb 2024 00:03:06 +0800 Subject: [PATCH 4/5] feat: Add service side proxies config parameters --- examples/minimal/server.toml | 13 +++++++++++ src/config.rs | 8 +++++++ src/server.rs | 43 ++++++++++++++++++++++++------------ 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/examples/minimal/server.toml b/examples/minimal/server.toml index 36396c95..35ff2f3b 100644 --- a/examples/minimal/server.toml +++ b/examples/minimal/server.toml @@ -4,3 +4,16 @@ default_token = "123" [server.services.foo1] bind_addr = "0.0.0.0:5202" + +[proxies.pve.ssh] +bind_addr = "0.0.0.0:5001" +local_addr = "127.0.0.1:22" + +[proxies.pve.rdp] +bind_addr = "0.0.0.0:5002" +local_addr = "127.0.0.1:3389" + +[proxies.router.web] +bind_addr = "0.0.0.0:5003" +local_addr = "127.0.0.1:80" + diff --git a/src/config.rs b/src/config.rs index ca85fc20..d97eef24 100644 --- a/src/config.rs +++ b/src/config.rs @@ -226,10 +226,18 @@ pub struct ServerConfig { pub heartbeat_interval: u64, } +#[derive(Debug, Serialize, Deserialize, Default, PartialEq, Eq, Clone)] +#[serde(deny_unknown_fields)] +pub struct ProxyConfig { + pub bind_addr: String, + pub local_addr: String, +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(deny_unknown_fields)] pub struct Config { pub server: Option, + pub proxies: Option>>, pub client: Option, } diff --git a/src/server.rs b/src/server.rs index a36e3c21..31e81ee9 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,4 +1,4 @@ -use crate::config::{Config, ServerConfig, ServerServiceConfig, ServiceType, TransportType}; +use crate::config::{Config, ProxyConfig, ServerConfig, ServerServiceConfig, ServiceType, TransportType}; use crate::config_watcher::{ConfigChange, ServerServiceChange}; use crate::constants::{listen_backoff, UDP_BUFFER_SIZE}; use crate::helper::retry_notify_with_deadline; @@ -44,22 +44,24 @@ pub async fn run_server( shutdown_rx: broadcast::Receiver, update_rx: mpsc::Receiver, ) -> Result<()> { - let config = match config.server { + let server_config = match config.server { Some(config) => config, None => { return Err(anyhow!("Try to run as a server, but the configuration is missing. Please add the `[server]` block")) } }; - match config.transport.transport_type { + let proxy_config = config.proxies.unwrap_or_else(|| HashMap::new()); + + match server_config.transport.transport_type { TransportType::Tcp => { - let mut server = Server::::from(config).await?; + let mut server = Server::::from(server_config, proxy_config).await?; server.run(shutdown_rx, update_rx).await?; } TransportType::Tls => { #[cfg(feature = "tls")] { - let mut server = Server::::from(config).await?; + let mut server = Server::::from(server_config, proxy_config).await?; server.run(shutdown_rx, update_rx).await?; } #[cfg(not(feature = "tls"))] @@ -68,7 +70,7 @@ pub async fn run_server( TransportType::Noise => { #[cfg(feature = "noise")] { - let mut server = Server::::from(config).await?; + let mut server = Server::::from(server_config, proxy_config).await?; server.run(shutdown_rx, update_rx).await?; } #[cfg(not(feature = "noise"))] @@ -77,7 +79,7 @@ pub async fn run_server( TransportType::Websocket => { #[cfg(feature = "websocket")] { - let mut server = Server::::from(config).await?; + let mut server = Server::::from(server_config, proxy_config).await?; server.run(shutdown_rx, update_rx).await?; } #[cfg(not(feature = "websocket"))] @@ -96,7 +98,7 @@ type ControlChannelMap = MultiMap { // `[server]` config config: Arc, - + proxies: Arc>>, // `[server.services]` config, indexed by ServiceDigest services: Arc>>, // Collection of contorl channels @@ -118,13 +120,15 @@ fn generate_service_hashmap( impl Server { // Create a server from `[server]` - pub async fn from(config: ServerConfig) -> Result> { - let config = Arc::new(config); - let services = Arc::new(RwLock::new(generate_service_hashmap(&config))); + pub async fn from(server_config: ServerConfig, proxies: HashMap>) -> Result> { + let server_config = Arc::new(server_config); + let proxies = Arc::new(proxies); + let services = Arc::new(RwLock::new(generate_service_hashmap(&server_config))); let control_channels = Arc::new(RwLock::new(ControlChannelMap::new())); - let transport = Arc::new(T::new(&config.transport)?); + let transport = Arc::new(T::new(&server_config.transport)?); Ok(Server { - config, + config: server_config, + proxies, services, control_channels, transport, @@ -152,6 +156,14 @@ impl Server { ..Default::default() }; + // Log initial services + let proxies = self.proxies.clone(); + for (client_name, services) in proxies.iter() { + for (service_name, service_config) in services.iter() { + info!("client_name={}, service_name={}: {:?}", client_name, service_name, service_config); + } + } + // Wait for connections and shutdown signals loop { tokio::select! { @@ -185,10 +197,11 @@ impl Server { match conn.with_context(|| "Failed to do transport handshake") { Ok(conn) => { let services = self.services.clone(); + let proxies = self.proxies.clone(); let control_channels = self.control_channels.clone(); let server_config = self.config.clone(); tokio::spawn(async move { - if let Err(err) = handle_connection(conn, services, control_channels, server_config).await { + if let Err(err) = handle_connection(conn, services, proxies, control_channels, server_config).await { error!("{:#}", err); } }.instrument(info_span!("connection", %addr))); @@ -250,6 +263,7 @@ impl Server { async fn handle_connection( mut conn: T::Stream, services: Arc>>, + proxies: Arc>>, control_channels: Arc>>, server_config: Arc, ) -> Result<()> { @@ -257,6 +271,7 @@ async fn handle_connection( let hello = read_hello(&mut conn).await?; match hello { ControlChannelHello(_, service_digest) => { + debug!("server proxies: {:?}", proxies); do_control_channel_handshake( conn, services, From d894a4e87e3c8be2fb5c30a7e82a8f3309a6042a Mon Sep 17 00:00:00 2001 From: blueskea Date: Fri, 2 Feb 2024 01:37:10 +0800 Subject: [PATCH 5/5] feat: Add multi server parameters --- examples/minimal/client.toml | 11 ++++++++++ src/client.rs | 41 ++++++++++++++++++++++-------------- src/config.rs | 15 +++++++++++++ 3 files changed, 51 insertions(+), 16 deletions(-) diff --git a/examples/minimal/client.toml b/examples/minimal/client.toml index bb5f6b8c..86e27c0d 100644 --- a/examples/minimal/client.toml +++ b/examples/minimal/client.toml @@ -1,6 +1,17 @@ [client] +name = "pve" remote_addr = "localhost:2333" default_token = "123" [client.services.foo1] local_addr = "127.0.0.1:80" + +[servers.hk] +remote_addr = "localhost:2333" +default_token = "123" + +[servers.cn] +remote_addr = "localhost:2444" +default_token = "123" + + diff --git a/src/client.rs b/src/client.rs index 8717dfc4..c9d50eb6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,4 @@ -use crate::config::{ClientConfig, ClientServiceConfig, Config, ServiceType, TransportType}; +use crate::config::{ClientConfig, ClientServerConfig, ClientServiceConfig, Config, ServiceType, TransportType}; use crate::config_watcher::{ClientServiceChange, ConfigChange}; use crate::helper::udp_connect; use crate::protocol::Hello::{self, *}; @@ -35,21 +35,23 @@ pub async fn run_client( shutdown_rx: broadcast::Receiver, update_rx: mpsc::Receiver, ) -> Result<()> { - let config = config.client.ok_or_else(|| { + let client_config = config.client.ok_or_else(|| { anyhow!( "Try to run as a client, but the configuration is missing. Please add the `[client]` block" ) })?; - match config.transport.transport_type { + let servers_config = config.servers.unwrap_or_else(|| HashMap::new()); + + match client_config.transport.transport_type { TransportType::Tcp => { - let mut client = Client::::from(config).await?; + let mut client = Client::::from(client_config, servers_config).await?; client.run(shutdown_rx, update_rx).await } TransportType::Tls => { #[cfg(feature = "tls")] { - let mut client = Client::::from(config).await?; + let mut client = Client::::from(client_config, servers_config).await?; client.run(shutdown_rx, update_rx).await } #[cfg(not(feature = "tls"))] @@ -58,7 +60,7 @@ pub async fn run_client( TransportType::Noise => { #[cfg(feature = "noise")] { - let mut client = Client::::from(config).await?; + let mut client = Client::::from(client_config, servers_config).await?; client.run(shutdown_rx, update_rx).await } #[cfg(not(feature = "noise"))] @@ -67,7 +69,7 @@ pub async fn run_client( TransportType::Websocket => { #[cfg(feature = "websocket")] { - let mut client = Client::::from(config).await?; + let mut client = Client::::from(client_config, servers_config).await?; client.run(shutdown_rx, update_rx).await } #[cfg(not(feature = "websocket"))] @@ -82,17 +84,19 @@ type Nonce = protocol::Digest; // Holds the state of a client struct Client { config: ClientConfig, + servers: HashMap, service_handles: HashMap, transport: Arc, } impl Client { // Create a Client from `[client]` config block - async fn from(config: ClientConfig) -> Result> { + async fn from(config: ClientConfig, servers: HashMap) -> Result> { let transport = Arc::new(T::new(&config.transport).with_context(|| "Failed to create the transport")?); Ok(Client { config, + servers, service_handles: HashMap::new(), transport, }) @@ -104,16 +108,18 @@ impl Client { mut shutdown_rx: broadcast::Receiver, mut update_rx: mpsc::Receiver, ) -> Result<()> { - for remote_addr in self.config.remote_addr.split(',') { - for (name, config) in &self.config.services { + for (server_name, server_config) in self.servers.iter() { + info!("server_name={}, server_config={:?}", server_name, server_config); + let remote_addr = &server_config.remote_addr; + for (service_name, service_config) in &self.config.services { // Create a control channel for each service defined let handle = ControlChannelHandle::new( - (*config).clone(), + (*service_config).clone(), remote_addr.to_string(), self.transport.clone(), self.config.heartbeat_timeout, ); - let full_name = remote_addr.to_string() + "," + name; + let full_name = server_name.to_string() + "," + service_name; self.service_handles.insert(full_name, handle); } } @@ -150,20 +156,23 @@ impl Client { match e { ConfigChange::ClientChange(client_change) => match client_change { ClientServiceChange::Add(cfg) => { - for remote_addr in self.config.remote_addr.split(',') { + for (server_name, server_config) in self.servers.iter() { + info!("server_name={}, server_config={:?}", server_name, server_config); + let remote_addr = &server_config.remote_addr; let handle = ControlChannelHandle::new( cfg.clone(), remote_addr.to_string(), self.transport.clone(), self.config.heartbeat_timeout, ); - let full_name = remote_addr.to_string() + "," + &cfg.name; + let full_name = server_name.to_string() + "," + &cfg.name; let _ = self.service_handles.insert(full_name, handle); } } ClientServiceChange::Delete(s) => { - for remote_addr in self.config.remote_addr.split(',') { - let full_name = remote_addr.to_string() + "," + &s; + for (server_name, server_config) in self.servers.iter() { + info!("server_name={}, server_config={:?}", server_name, server_config); + let full_name = server_name.to_string() + "," + &s; let _ = self.service_handles.remove(&full_name); } } diff --git a/src/config.rs b/src/config.rs index d97eef24..1bec80a4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -199,6 +199,7 @@ fn default_client_retry_interval() -> u64 { #[derive(Debug, Serialize, Deserialize, Default, PartialEq, Eq, Clone)] #[serde(deny_unknown_fields)] pub struct ClientConfig { + pub name: Option, pub remote_addr: String, pub default_token: Option, pub services: HashMap, @@ -210,6 +211,19 @@ pub struct ClientConfig { pub retry_interval: u64, } +#[derive(Debug, Serialize, Deserialize, Default, PartialEq, Eq, Clone)] +#[serde(deny_unknown_fields)] +pub struct ClientServerConfig { + pub remote_addr: String, + pub default_token: Option, + #[serde(default)] + pub transport: TransportConfig, + #[serde(default = "default_heartbeat_timeout")] + pub heartbeat_timeout: u64, + #[serde(default = "default_client_retry_interval")] + pub retry_interval: u64, +} + fn default_heartbeat_interval() -> u64 { DEFAULT_HEARTBEAT_INTERVAL_SECS } @@ -239,6 +253,7 @@ pub struct Config { pub server: Option, pub proxies: Option>>, pub client: Option, + pub servers: Option>, } impl Config {