diff --git a/CHANGELOG.md b/CHANGELOG.md index b530ec5..c221e04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog +## 2.7.0 - 2024-03-09 + +### Added + +- `Innmind\IO\Sockets::servers()` +- `Innmind\IO\Sockets\Servers` +- `Innmind\IO\Sockets\Server\Pool` + +### Changed + +- Requires `innmind/immutable:~5.2` + ## 2.6.0 - 2024-03-09 ### Added diff --git a/composer.json b/composer.json index 3199b45..995d160 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,7 @@ }, "require": { "php": "~8.2", - "innmind/immutable": "~4.15|~5.0", + "innmind/immutable": "~5.2", "innmind/stream": "~4.0", "innmind/socket": "~6.1" }, diff --git a/src/Sockets.php b/src/Sockets.php index fdd1a6e..ce76953 100644 --- a/src/Sockets.php +++ b/src/Sockets.php @@ -39,4 +39,12 @@ public function clients(): Sockets\Clients { return Sockets\Clients::of($this->watch); } + + /** + * @psalm-mutation-free + */ + public function servers(): Sockets\Servers + { + return Sockets\Servers::of($this->watch); + } } diff --git a/src/Sockets/Client.php b/src/Sockets/Client.php index 797a291..dab3b64 100644 --- a/src/Sockets/Client.php +++ b/src/Sockets/Client.php @@ -10,7 +10,10 @@ Lines, }; use Innmind\TimeContinuum\ElapsedPeriod; -use Innmind\Socket\Client as Socket; +use Innmind\Socket\{ + Client as Socket, + Server\Connection, +}; use Innmind\Stream\{ Writable, Stream\Size, @@ -25,12 +28,12 @@ }; /** - * @template-covariant T of Socket + * @template-covariant T of Socket|Connection */ final class Client { /** @var T */ - private Socket $socket; + private Socket|Connection $socket; /** @var callable(?ElapsedPeriod): Watch */ private $watch; /** @var callable(T): Maybe */ @@ -57,7 +60,7 @@ final class Client */ private function __construct( callable $watch, - Socket $socket, + Socket|Connection $socket, callable $readyToRead, callable $readyToWrite, Maybe $encoding, @@ -76,7 +79,7 @@ private function __construct( /** * @psalm-mutation-free * @internal - * @template A of Socket + * @template A of Socket|Connection * * @param callable(?ElapsedPeriod): Watch $watch * @param A $socket @@ -85,7 +88,7 @@ private function __construct( */ public static function of( callable $watch, - Socket $socket, + Socket|Connection $socket, ): self { /** @var Maybe */ $encoding = Maybe::nothing(); @@ -96,8 +99,8 @@ public static function of( return new self( $watch, $socket, - static fn(Socket $socket) => Maybe::just($socket), - static fn(Socket $socket) => Maybe::just($socket), + static fn(Socket|Connection $socket) => Maybe::just($socket), + static fn(Socket|Connection $socket) => Maybe::just($socket), $encoding, $heartbeat, static fn() => false, @@ -107,7 +110,7 @@ public static function of( /** * @return T */ - public function unwrap(): Socket + public function unwrap(): Socket|Connection { return $this->socket; } @@ -143,14 +146,18 @@ public function watch(): self return new self( $this->watch, $this->socket, - fn(Socket $socket) => ($this->watch)(null) + fn(Socket|Connection $socket) => ($this->watch)(null) ->forRead($socket)() ->map(static fn($ready) => $ready->toRead()) ->flatMap(static fn($toRead) => $toRead->find( static fn($ready) => $ready === $socket, )) - ->keep(Instance::of(Socket::class)), - fn(Socket $socket) => ($this->watch)(null) + ->keep( + Instance::of(Socket::class)->or( + Instance::of(Connection::class), + ), + ), + fn(Socket|Connection $socket) => ($this->watch)(null) ->forWrite($socket)() ->map(static fn($ready) => $ready->toWrite()) ->flatMap(static fn($toWrite) => $toWrite->find( @@ -173,14 +180,18 @@ public function timeoutAfter(ElapsedPeriod $timeout): self return new self( $this->watch, $this->socket, - fn(Socket $socket) => ($this->watch)($timeout) + fn(Socket|Connection $socket) => ($this->watch)($timeout) ->forRead($socket)() ->map(static fn($ready) => $ready->toRead()) ->flatMap(static fn($toRead) => $toRead->find( static fn($ready) => $ready === $socket, )) - ->keep(Instance::of(Socket::class)), - fn(Socket $socket) => ($this->watch)($timeout) + ->keep( + Instance::of(Socket::class)->or( + Instance::of(Connection::class), + ), + ), + fn(Socket|Connection $socket) => ($this->watch)($timeout) ->forWrite($socket)() ->map(static fn($ready) => $ready->toWrite()) ->flatMap(static fn($toWrite) => $toWrite->find( @@ -333,7 +344,7 @@ public function size(): Maybe private function readyToRead(): callable { return $this->heartbeat->match( - fn($provide) => function(Socket $socket) use ($provide) { + fn($provide) => function(Socket|Connection $socket) use ($provide) { do { $ready = ($this->readyToRead)($socket); $socketReadable = $ready->match( diff --git a/src/Sockets/Server.php b/src/Sockets/Server.php new file mode 100644 index 0000000..dcea3f0 --- /dev/null +++ b/src/Sockets/Server.php @@ -0,0 +1,142 @@ + */ + private $wait; + + /** + * @psalm-mutation-free + * + * @param callable(?ElapsedPeriod): Watch $watch + * @param T $socket + * @param callable(T): Maybe $wait + */ + private function __construct( + callable $watch, + Socket $socket, + callable $wait, + ) { + $this->watch = $watch; + $this->socket = $socket; + $this->wait = $wait; + } + + /** + * @psalm-mutation-free + * @internal + * @template A of Socket + * + * @param callable(?ElapsedPeriod): Watch $watch + * @param A $socket + * + * @return self + */ + public static function of( + callable $watch, + Socket $socket, + ): self { + /** @var self */ + return new self( + $watch, + $socket, + static fn(Socket $socket) => Maybe::just($socket), + ); + } + + /** + * @param self $socket + * + * @return Server\Pool + */ + public function with(self $socket): Server\Pool + { + return Server\Pool::of($this->watch, $this->socket, $socket->unwrap()); + } + + /** + * @return T + */ + public function unwrap(): Socket + { + return $this->socket; + } + + /** + * Wait forever for the socket to be ready to read before tryin to use it + * + * @psalm-mutation-free + * + * @return self + */ + public function watch(): self + { + /** @var self */ + return new self( + $this->watch, + $this->socket, + fn(Socket $socket) => ($this->watch)(null) + ->forRead($socket)() + ->map(static fn($ready) => $ready->toRead()) + ->flatMap(static fn($toRead) => $toRead->find( + static fn($ready) => $ready === $socket, + )) + ->keep(Instance::of(Socket::class)), + ); + } + + /** + * @psalm-mutation-free + * + * @return self + */ + public function timeoutAfter(ElapsedPeriod $timeout): self + { + /** @var self */ + return new self( + $this->watch, + $this->socket, + fn(Socket $socket) => ($this->watch)($timeout) + ->forRead($socket)() + ->map(static fn($ready) => $ready->toRead()) + ->flatMap(static fn($toRead) => $toRead->find( + static fn($ready) => $ready === $socket, + )) + ->keep(Instance::of(Socket::class)), + ); + } + + /** + * @return Maybe> + */ + public function accept(): Maybe + { + return ($this->wait)($this->socket) + ->flatMap(static fn($socket) => $socket->accept()) + ->map(fn($client) => Client::of( + $this->watch, + $client, + )); + } +} diff --git a/src/Sockets/Server/Pool.php b/src/Sockets/Server/Pool.php new file mode 100644 index 0000000..b276f43 --- /dev/null +++ b/src/Sockets/Server/Pool.php @@ -0,0 +1,174 @@ + */ + private array $sockets; + /** @var callable(?ElapsedPeriod): Watch */ + private $watch; + /** @var callable(T): Sequence */ + private $wait; + + /** + * @psalm-mutation-free + * + * @param callable(?ElapsedPeriod): Watch $watch + * @param non-empty-list $sockets + * @param callable(T): Sequence $wait + */ + private function __construct( + callable $watch, + array $sockets, + callable $wait, + ) { + $this->watch = $watch; + $this->sockets = $sockets; + $this->wait = $wait; + } + + /** + * @psalm-mutation-free + * @internal + * @template A of Socket + * + * @param callable(?ElapsedPeriod): Watch $watch + * @param A $first + * @param A $second + * + * @return self + */ + public static function of( + callable $watch, + Socket $first, + Socket $second, + ): self { + /** @var self */ + return new self( + $watch, + [$first, $second], + static fn(Socket $socket) => Sequence::of($socket), + ); + } + + /** + * @param Server $server + * + * @return self + */ + public function with(Server $server): self + { + return new self( + $this->watch, + [...$this->sockets, $server->unwrap()], + $this->wait, + ); + } + + /** + * @return Sequence + */ + public function unwrap(): Sequence + { + return Sequence::of(...$this->sockets); + } + + /** + * Wait forever for the socket to be ready to read before tryin to use it + * + * @psalm-mutation-free + * + * @return self + */ + public function watch(): self + { + /** @var self */ + return new self( + $this->watch, + $this->sockets, + fn(Socket $socket, Socket ...$sockets) => ($this->watch)(null) + ->forRead($socket, ...$sockets)() + ->map( + static fn($ready) => $ready + ->toRead() + ->filter(static fn($ready) => \in_array( + $ready, + [$socket, ...$sockets], + true, + )) + ->keep(Instance::of(Socket::class)), + ) + ->toSequence() + ->flatMap( + static fn($toRead) => Sequence::of(...$toRead->toList()), + ), + ); + } + + /** + * @psalm-mutation-free + * + * @return self + */ + public function timeoutAfter(ElapsedPeriod $timeout): self + { + /** @var self */ + return new self( + $this->watch, + $this->sockets, + fn(Socket $socket, Socket ...$sockets) => ($this->watch)($timeout) + ->forRead($socket, ...$sockets)() + ->map( + static fn($ready) => $ready + ->toRead() + ->filter(static fn($ready) => \in_array( + $ready, + [$socket, ...$sockets], + true, + )) + ->keep(Instance::of(Socket::class)), + ) + ->toSequence() + ->flatMap( + static fn($toRead) => Sequence::of(...$toRead->toList()), + ), + ); + } + + /** + * @return Sequence> + */ + public function accept(): Sequence + { + return ($this->wait)(...$this->sockets) + ->flatMap( + static fn($socket) => $socket + ->accept() + ->toSequence(), + ) + ->map(fn($client) => Client::of( + $this->watch, + $client, + )); + } +} diff --git a/src/Sockets/Servers.php b/src/Sockets/Servers.php new file mode 100644 index 0000000..641320f --- /dev/null +++ b/src/Sockets/Servers.php @@ -0,0 +1,43 @@ +watch = $watch; + } + + /** + * @internal + * @psalm-pure + * + * @param callable(?ElapsedPeriod): Watch $watch + */ + public static function of(callable $watch): self + { + return new self($watch); + } + + /** + * @psalm-mutation-free + */ + public function wrap(Socket $socket): Server + { + return Server::of($this->watch, $socket); + } +} diff --git a/tests/FunctionalTest.php b/tests/FunctionalTest.php index e643de5..e0471a4 100644 --- a/tests/FunctionalTest.php +++ b/tests/FunctionalTest.php @@ -724,4 +724,160 @@ public function testSocketAbort() $client->close(); $server->close(); } + + public function testServerAcceptConnection() + { + @\unlink('/tmp/foo.sock'); + $address = Address\Unix::of('/tmp/foo'); + $server = Server\Unix::recoverable($address)->match( + static fn($server) => $server, + static fn() => null, + ); + + $this->assertNotNull($server); + + $client = Client\Unix::of($address)->match( + static fn($socket) => $socket, + static fn() => null, + ); + + $this->assertNotNull($client); + + $_ = IO::of(Select::timeoutAfter(...)) + ->sockets() + ->clients() + ->wrap($client) + ->send(Sequence::of(Str::of('foo'))) + ->match( + static fn() => null, + static fn() => null, + ); + + $result = IO::of(Select::timeoutAfter(...)) + ->sockets() + ->servers() + ->wrap($server) + ->timeoutAfter(ElapsedPeriod::of(1_000)) + ->accept() + ->flatMap( + static fn($client) => $client + ->frames(Frame\Chunk::of(3)) + ->one(), + ) + ->match( + static fn($data) => $data->toString(), + static fn() => null, + ); + + $this->assertSame('foo', $result); + $client->close(); + $server->close(); + } + + public function testServerPool() + { + @\unlink('/tmp/foo.sock'); + @\unlink('/tmp/bar.sock'); + @\unlink('/tmp/baz.sock'); + $addressFoo = Address\Unix::of('/tmp/foo'); + $addressBar = Address\Unix::of('/tmp/bar'); + $addressBaz = Address\Unix::of('/tmp/baz'); + $serverFoo = Server\Unix::recoverable($addressFoo)->match( + static fn($server) => $server, + static fn() => null, + ); + + $this->assertNotNull($serverFoo); + + $serverBar = Server\Unix::recoverable($addressBar)->match( + static fn($server) => $server, + static fn() => null, + ); + + $this->assertNotNull($serverBar); + + $serverBaz = Server\Unix::recoverable($addressBaz)->match( + static fn($server) => $server, + static fn() => null, + ); + + $this->assertNotNull($serverBaz); + + $clientFoo = Client\Unix::of($addressFoo)->match( + static fn($socket) => $socket, + static fn() => null, + ); + + $this->assertNotNull($clientFoo); + + $clientBar = Client\Unix::of($addressBar)->match( + static fn($socket) => $socket, + static fn() => null, + ); + + $this->assertNotNull($clientBar); + + $clientBaz = Client\Unix::of($addressBaz)->match( + static fn($socket) => $socket, + static fn() => null, + ); + + $this->assertNotNull($clientBaz); + + $_ = IO::of(Select::timeoutAfter(...)) + ->sockets() + ->clients() + ->wrap($clientFoo) + ->send(Sequence::of(Str::of('foo'))) + ->match( + static fn() => null, + static fn() => null, + ); + $_ = IO::of(Select::timeoutAfter(...)) + ->sockets() + ->clients() + ->wrap($clientBar) + ->send(Sequence::of(Str::of('bar'))) + ->match( + static fn() => null, + static fn() => null, + ); + $_ = IO::of(Select::timeoutAfter(...)) + ->sockets() + ->clients() + ->wrap($clientBaz) + ->send(Sequence::of(Str::of('baz'))) + ->match( + static fn() => null, + static fn() => null, + ); + + $servers = IO::of(Select::timeoutAfter(...)) + ->sockets() + ->servers(); + $result = $servers + ->wrap($serverFoo) + ->with($servers->wrap($serverBar)) + ->with($servers->wrap($serverBaz)) + ->timeoutAfter(ElapsedPeriod::of(1_000)) + ->accept() + ->flatMap( + static fn($client) => $client + ->frames(Frame\Chunk::of(3)) + ->one() + ->toSequence(), + ) + ->map(static fn($data) => $data->toString()); + + $this->assertCount(3, $result); + $this->assertTrue($result->contains('foo')); + $this->assertTrue($result->contains('bar')); + $this->assertTrue($result->contains('baz')); + $clientFoo->close(); + $clientBar->close(); + $clientBaz->close(); + $serverFoo->close(); + $serverBar->close(); + $serverBaz->close(); + } }