Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
| Total | |
64.00% |
32 / 50 |
|
20.00% |
1 / 5 |
CRAP | |
0.00% |
0 / 1 |
| AsyncAmpTransport | |
64.00% |
32 / 50 |
|
20.00% |
1 / 5 |
23.14 | |
0.00% |
0 / 1 |
| __construct | |
100.00% |
1 / 1 |
|
100.00% |
1 / 1 |
1 | |||
| request | |
25.00% |
4 / 16 |
|
0.00% |
0 / 1 |
10.75 | |||
| openSession | |
92.86% |
13 / 14 |
|
0.00% |
0 / 1 |
2.00 | |||
| acquireSocket | |
87.50% |
14 / 16 |
|
0.00% |
0 / 1 |
5.05 | |||
| serverWantsClose | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
| 1 | <?php |
| 2 | |
| 3 | /** |
| 4 | * SPDX-License-Identifier: EUPL-1.2 |
| 5 | * |
| 6 | * This file is part of icap-flow. |
| 7 | * |
| 8 | * Licensed under the EUPL, Version 1.2 only (the "Licence"); |
| 9 | * you may not use this work except in compliance with the Licence. |
| 10 | * You may obtain a copy of the Licence at: |
| 11 | * |
| 12 | * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 |
| 13 | * |
| 14 | * Unless required by applicable law or agreed to in writing, software |
| 15 | * distributed under the Licence is distributed on an "AS IS" basis, |
| 16 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 17 | */ |
| 18 | |
| 19 | declare(strict_types=1); |
| 20 | |
| 21 | namespace Ndrstmr\Icap\Transport; |
| 22 | |
| 23 | use Amp\Cancellation; |
| 24 | use Amp\CompositeCancellation; |
| 25 | use Amp\Socket; |
| 26 | use Amp\Socket\ConnectContext; |
| 27 | use Amp\Socket\Socket as SocketInterface; |
| 28 | use Amp\TimeoutCancellation; |
| 29 | use Ndrstmr\Icap\Config; |
| 30 | use Ndrstmr\Icap\Exception\IcapConnectionException; |
| 31 | |
| 32 | use function Amp\async; |
| 33 | |
| 34 | /** |
| 35 | * Asynchronous transport implementation using amphp/socket. |
| 36 | * |
| 37 | * Upgrades to TLS (icaps://) automatically when the supplied Config |
| 38 | * carries a {@see \Amp\Socket\ClientTlsContext}; otherwise connects |
| 39 | * plain tcp://. The response is bounded by Config::maxResponseSize to |
| 40 | * keep a hostile server from exhausting the client's memory. |
| 41 | * |
| 42 | * The optional user-supplied {@see Cancellation} is combined with the |
| 43 | * transport's internal {@see TimeoutCancellation} via a |
| 44 | * {@see CompositeCancellation}; whichever fires first aborts the |
| 45 | * read/write loop with `Amp\CancelledException`. |
| 46 | * |
| 47 | * Response framing is done by {@see ResponseFrameReader} so the read |
| 48 | * loop terminates as soon as the message is complete — no dependency |
| 49 | * on the server closing the socket. |
| 50 | * |
| 51 | * **Connection pooling (v2.1+).** When constructed with a |
| 52 | * {@see ConnectionPoolInterface}, the transport calls |
| 53 | * `acquire()` instead of opening a fresh TCP/TLS connection and |
| 54 | * `release()` instead of closing the socket. The socket is closed |
| 55 | * (rather than returned) when: |
| 56 | * - the framing reader threw — the socket might be in an |
| 57 | * inconsistent state and we play safe; |
| 58 | * - the server's response carries `Connection: close` (RFC 7230 |
| 59 | * §6.1) — we honour the close intent. |
| 60 | * Without a pool the transport opens a fresh socket per request and |
| 61 | * closes it after — identical to the v2.0 behaviour. |
| 62 | * |
| 63 | * **Sessions (v2.1+).** Implements {@see SessionAwareTransport} so |
| 64 | * callers that need to send + receive multiple times on the same |
| 65 | * connection (strict RFC 3507 §4.5 preview-continue, ICAP request |
| 66 | * pipelining, …) can call {@see openSession()}. |
| 67 | */ |
| 68 | final class AsyncAmpTransport implements SessionAwareTransport |
| 69 | { |
| 70 | public function __construct( |
| 71 | private ?ConnectionPoolInterface $pool = null, |
| 72 | ) { |
| 73 | } |
| 74 | |
| 75 | /** |
| 76 | * @param iterable<string> $rawRequest |
| 77 | * @return \Amp\Future<string> |
| 78 | */ |
| 79 | #[\Override] |
| 80 | public function request(Config $config, iterable $rawRequest, ?Cancellation $cancellation = null): \Amp\Future |
| 81 | { |
| 82 | /** @var \Amp\Future<string> $future */ |
| 83 | $future = async(function () use ($config, $rawRequest, $cancellation): string { |
| 84 | $session = $this->openSession($config, $cancellation); |
| 85 | $closeForced = false; |
| 86 | try { |
| 87 | $session->write($rawRequest); |
| 88 | $response = $session->readResponse(); |
| 89 | if ($this->serverWantsClose($response)) { |
| 90 | $closeForced = true; |
| 91 | } |
| 92 | return $response; |
| 93 | } catch (\Throwable $e) { |
| 94 | $closeForced = true; |
| 95 | throw $e; |
| 96 | } finally { |
| 97 | if ($closeForced) { |
| 98 | $session->close(); |
| 99 | } else { |
| 100 | $session->release(); |
| 101 | } |
| 102 | } |
| 103 | }); |
| 104 | |
| 105 | return $future; |
| 106 | } |
| 107 | |
| 108 | #[\Override] |
| 109 | public function openSession(Config $config, ?Cancellation $cancellation = null): TransportSession |
| 110 | { |
| 111 | // Use a one-shot timeout for socket acquisition only. |
| 112 | $acquireCancellation = new TimeoutCancellation($config->getStreamTimeout()); |
| 113 | $effectiveAcquire = $cancellation === null |
| 114 | ? $acquireCancellation |
| 115 | : new CompositeCancellation($cancellation, $acquireCancellation); |
| 116 | |
| 117 | $socket = $this->acquireSocket($config, $effectiveAcquire); |
| 118 | |
| 119 | // The session creates a fresh TimeoutCancellation per IO call |
| 120 | // (write / readResponse), so each individual operation gets |
| 121 | // the full streamTimeout window — no session-lifetime timer. |
| 122 | return new AmpTransportSession( |
| 123 | config: $config, |
| 124 | socket: $socket, |
| 125 | streamTimeout: $config->getStreamTimeout(), |
| 126 | userCancellation: $cancellation, |
| 127 | maxResponseSize: $config->getMaxResponseSize(), |
| 128 | maxHeaderLineLength: $config->getMaxHeaderLineLength(), |
| 129 | pool: $this->pool, |
| 130 | ); |
| 131 | } |
| 132 | |
| 133 | private function acquireSocket(Config $config, Cancellation $cancellation): SocketInterface |
| 134 | { |
| 135 | if ($this->pool !== null) { |
| 136 | return $this->pool->acquire($config, $cancellation); |
| 137 | } |
| 138 | |
| 139 | $tls = $config->getTlsContext(); |
| 140 | $url = sprintf('tcp://%s:%d', $config->host, $config->port); |
| 141 | $context = (new ConnectContext())->withConnectTimeout($config->getSocketTimeout()); |
| 142 | if ($tls !== null) { |
| 143 | $context = $context->withTlsContext($tls); |
| 144 | } |
| 145 | |
| 146 | try { |
| 147 | if ($tls !== null) { |
| 148 | return Socket\connectTls($url, $context, $cancellation); |
| 149 | } |
| 150 | return Socket\connect($url, $context, $cancellation); |
| 151 | } catch (Socket\ConnectException $e) { |
| 152 | throw new IcapConnectionException( |
| 153 | sprintf('Async connection to %s:%d failed.', $config->host, $config->port), |
| 154 | 0, |
| 155 | $e, |
| 156 | ); |
| 157 | } |
| 158 | } |
| 159 | |
| 160 | /** |
| 161 | * Cheap heuristic for `Connection: close` in the ICAP head. We |
| 162 | * don't fully reparse here — the response parser does that — but |
| 163 | * we want to keep the socket-disposal decision local to the |
| 164 | * transport. |
| 165 | */ |
| 166 | private function serverWantsClose(string $response): bool |
| 167 | { |
| 168 | $headEnd = strpos($response, "\r\n\r\n"); |
| 169 | $head = $headEnd === false ? $response : substr($response, 0, $headEnd); |
| 170 | return preg_match('/^Connection:\s*close\s*$/im', $head) === 1; |
| 171 | } |
| 172 | } |