Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
72.73% |
16 / 22 |
|
0.00% |
0 / 1 |
CRAP | |
0.00% |
0 / 1 |
AsyncAmpTransport | |
72.73% |
16 / 22 |
|
0.00% |
0 / 1 |
4.32 | |
0.00% |
0 / 1 |
request | |
72.73% |
16 / 22 |
|
0.00% |
0 / 1 |
4.32 |
1 | <?php |
2 | |
3 | declare(strict_types=1); |
4 | |
5 | namespace Ndrstmr\Icap\Transport; |
6 | |
7 | use Amp\Socket; |
8 | use Amp\Socket\ConnectContext; |
9 | use Amp\TimeoutCancellation; |
10 | use Ndrstmr\Icap\Config; |
11 | use Ndrstmr\Icap\Exception\IcapConnectionException; |
12 | |
13 | use function Amp\async; |
14 | |
15 | /** |
16 | * Asynchronous transport implementation using amphp/socket. |
17 | */ |
18 | final class AsyncAmpTransport implements TransportInterface |
19 | { |
20 | /** |
21 | * @return \Amp\Future<string> |
22 | */ |
23 | public function request(Config $config, string $rawRequest): \Amp\Future |
24 | { |
25 | /** @var \Amp\Future<string> $future */ |
26 | $future = async(function () use ($config, $rawRequest): string { |
27 | $socket = null; |
28 | $connectionUrl = sprintf('tcp://%s:%d', $config->host, $config->port); |
29 | $connectContext = (new ConnectContext()) |
30 | ->withConnectTimeout($config->getSocketTimeout()); |
31 | $cancellation = new TimeoutCancellation($config->getStreamTimeout()); |
32 | |
33 | try { |
34 | $socket = Socket\connect($connectionUrl, $connectContext, $cancellation); |
35 | $socket->write($rawRequest); |
36 | |
37 | $response = ''; |
38 | while (null !== ($chunk = $socket->read($cancellation))) { |
39 | $response .= $chunk; |
40 | } |
41 | |
42 | return $response; |
43 | } catch (Socket\ConnectException $e) { |
44 | throw new IcapConnectionException( |
45 | sprintf('Async connection to %s:%d failed.', $config->host, $config->port), |
46 | 0, |
47 | $e |
48 | ); |
49 | } finally { |
50 | if ($socket) { |
51 | $socket->close(); |
52 | } |
53 | } |
54 | }); |
55 | |
56 | return $future; |
57 | } |
58 | } |