Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
* develop:
  specify next release
  fix minimum version of innmind/immutable
  allow to pool servers together when accepting incoming connections
  add declarative way to accept incoming connection
  • Loading branch information
Baptouuuu committed Mar 9, 2024
2 parents cfe5115 + c555aea commit bd9c04c
Show file tree
Hide file tree
Showing 8 changed files with 563 additions and 17 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## 2.7.0 - 2024-03-09

### Added

- `Innmind\IO\Sockets::servers()`
- `Innmind\IO\Sockets\Servers`
- `Innmind\IO\Sockets\Server\Pool`

### Changed

- Requires `innmind/immutable:~5.2`

## 2.6.0 - 2024-03-09

### Added
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
},
"require": {
"php": "~8.2",
"innmind/immutable": "~4.15|~5.0",
"innmind/immutable": "~5.2",
"innmind/stream": "~4.0",
"innmind/socket": "~6.1"
},
Expand Down
8 changes: 8 additions & 0 deletions src/Sockets.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,12 @@ public function clients(): Sockets\Clients
{
return Sockets\Clients::of($this->watch);
}

/**
* @psalm-mutation-free
*/
public function servers(): Sockets\Servers
{
return Sockets\Servers::of($this->watch);
}
}
43 changes: 27 additions & 16 deletions src/Sockets/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
Lines,
};
use Innmind\TimeContinuum\ElapsedPeriod;
use Innmind\Socket\Client as Socket;
use Innmind\Socket\{
Client as Socket,
Server\Connection,
};
use Innmind\Stream\{
Writable,
Stream\Size,
Expand All @@ -25,12 +28,12 @@
};

/**
* @template-covariant T of Socket
* @template-covariant T of Socket|Connection
*/
final class Client
{
/** @var T */
private Socket $socket;
private Socket|Connection $socket;
/** @var callable(?ElapsedPeriod): Watch */
private $watch;
/** @var callable(T): Maybe<T> */
Expand All @@ -57,7 +60,7 @@ final class Client
*/
private function __construct(
callable $watch,
Socket $socket,
Socket|Connection $socket,
callable $readyToRead,
callable $readyToWrite,
Maybe $encoding,
Expand All @@ -76,7 +79,7 @@ private function __construct(
/**
* @psalm-mutation-free
* @internal
* @template A of Socket
* @template A of Socket|Connection
*
* @param callable(?ElapsedPeriod): Watch $watch
* @param A $socket
Expand All @@ -85,7 +88,7 @@ private function __construct(
*/
public static function of(
callable $watch,
Socket $socket,
Socket|Connection $socket,
): self {
/** @var Maybe<Str\Encoding> */
$encoding = Maybe::nothing();
Expand All @@ -96,8 +99,8 @@ public static function of(
return new self(
$watch,
$socket,
static fn(Socket $socket) => Maybe::just($socket),
static fn(Socket $socket) => Maybe::just($socket),
static fn(Socket|Connection $socket) => Maybe::just($socket),
static fn(Socket|Connection $socket) => Maybe::just($socket),
$encoding,
$heartbeat,
static fn() => false,
Expand All @@ -107,7 +110,7 @@ public static function of(
/**
* @return T
*/
public function unwrap(): Socket
public function unwrap(): Socket|Connection
{
return $this->socket;
}
Expand Down Expand Up @@ -143,14 +146,18 @@ public function watch(): self
return new self(
$this->watch,
$this->socket,
fn(Socket $socket) => ($this->watch)(null)
fn(Socket|Connection $socket) => ($this->watch)(null)
->forRead($socket)()
->map(static fn($ready) => $ready->toRead())
->flatMap(static fn($toRead) => $toRead->find(
static fn($ready) => $ready === $socket,
))
->keep(Instance::of(Socket::class)),
fn(Socket $socket) => ($this->watch)(null)
->keep(
Instance::of(Socket::class)->or(
Instance::of(Connection::class),
),
),
fn(Socket|Connection $socket) => ($this->watch)(null)
->forWrite($socket)()
->map(static fn($ready) => $ready->toWrite())
->flatMap(static fn($toWrite) => $toWrite->find(
Expand All @@ -173,14 +180,18 @@ public function timeoutAfter(ElapsedPeriod $timeout): self
return new self(
$this->watch,
$this->socket,
fn(Socket $socket) => ($this->watch)($timeout)
fn(Socket|Connection $socket) => ($this->watch)($timeout)
->forRead($socket)()
->map(static fn($ready) => $ready->toRead())
->flatMap(static fn($toRead) => $toRead->find(
static fn($ready) => $ready === $socket,
))
->keep(Instance::of(Socket::class)),
fn(Socket $socket) => ($this->watch)($timeout)
->keep(
Instance::of(Socket::class)->or(
Instance::of(Connection::class),
),
),
fn(Socket|Connection $socket) => ($this->watch)($timeout)
->forWrite($socket)()
->map(static fn($ready) => $ready->toWrite())
->flatMap(static fn($toWrite) => $toWrite->find(
Expand Down Expand Up @@ -333,7 +344,7 @@ public function size(): Maybe
private function readyToRead(): callable
{
return $this->heartbeat->match(
fn($provide) => function(Socket $socket) use ($provide) {
fn($provide) => function(Socket|Connection $socket) use ($provide) {
do {
$ready = ($this->readyToRead)($socket);
$socketReadable = $ready->match(
Expand Down
142 changes: 142 additions & 0 deletions src/Sockets/Server.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
<?php
declare(strict_types = 1);

namespace Innmind\IO\Sockets;

use Innmind\TimeContinuum\ElapsedPeriod;
use Innmind\Socket\{
Server as Socket,
Server\Connection,
};
use Innmind\Stream\Watch;
use Innmind\Immutable\{
Maybe,
Predicate\Instance,
};

/**
* @template-covariant T of Socket
*/
final class Server
{
/** @var T */
private Socket $socket;
/** @var callable(?ElapsedPeriod): Watch */
private $watch;
/** @var callable(T): Maybe<T> */
private $wait;

/**
* @psalm-mutation-free
*
* @param callable(?ElapsedPeriod): Watch $watch
* @param T $socket
* @param callable(T): Maybe<T> $wait
*/
private function __construct(
callable $watch,
Socket $socket,
callable $wait,
) {
$this->watch = $watch;
$this->socket = $socket;
$this->wait = $wait;
}

/**
* @psalm-mutation-free
* @internal
* @template A of Socket
*
* @param callable(?ElapsedPeriod): Watch $watch
* @param A $socket
*
* @return self<A>
*/
public static function of(
callable $watch,
Socket $socket,
): self {
/** @var self<A> */
return new self(
$watch,
$socket,
static fn(Socket $socket) => Maybe::just($socket),
);
}

/**
* @param self<T> $socket
*
* @return Server\Pool<T>
*/
public function with(self $socket): Server\Pool
{
return Server\Pool::of($this->watch, $this->socket, $socket->unwrap());
}

/**
* @return T
*/
public function unwrap(): Socket
{
return $this->socket;
}

/**
* Wait forever for the socket to be ready to read before tryin to use it
*
* @psalm-mutation-free
*
* @return self<T>
*/
public function watch(): self
{
/** @var self<T> */
return new self(
$this->watch,
$this->socket,
fn(Socket $socket) => ($this->watch)(null)
->forRead($socket)()
->map(static fn($ready) => $ready->toRead())
->flatMap(static fn($toRead) => $toRead->find(
static fn($ready) => $ready === $socket,
))
->keep(Instance::of(Socket::class)),
);
}

/**
* @psalm-mutation-free
*
* @return self<T>
*/
public function timeoutAfter(ElapsedPeriod $timeout): self
{
/** @var self<T> */
return new self(
$this->watch,
$this->socket,
fn(Socket $socket) => ($this->watch)($timeout)
->forRead($socket)()
->map(static fn($ready) => $ready->toRead())
->flatMap(static fn($toRead) => $toRead->find(
static fn($ready) => $ready === $socket,
))
->keep(Instance::of(Socket::class)),
);
}

/**
* @return Maybe<Client<Connection>>
*/
public function accept(): Maybe
{
return ($this->wait)($this->socket)
->flatMap(static fn($socket) => $socket->accept())
->map(fn($client) => Client::of(
$this->watch,
$client,
));
}
}
Loading

0 comments on commit bd9c04c

Please sign in to comment.