Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
64.00% covered (warning)
64.00%
32 / 50
20.00% covered (danger)
20.00%
1 / 5
CRAP
0.00% covered (danger)
0.00%
0 / 1
AsyncAmpTransport
64.00% covered (warning)
64.00%
32 / 50
20.00% covered (danger)
20.00%
1 / 5
23.14
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 request
25.00% covered (danger)
25.00%
4 / 16
0.00% covered (danger)
0.00%
0 / 1
10.75
 openSession
92.86% covered (success)
92.86%
13 / 14
0.00% covered (danger)
0.00%
0 / 1
2.00
 acquireSocket
87.50% covered (warning)
87.50%
14 / 16
0.00% covered (danger)
0.00%
0 / 1
5.05
 serverWantsClose
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2
3/**
4 * SPDX-License-Identifier: EUPL-1.2
5 *
6 * This file is part of icap-flow.
7 *
8 * Licensed under the EUPL, Version 1.2 only (the "Licence");
9 * you may not use this work except in compliance with the Licence.
10 * You may obtain a copy of the Licence at:
11 *
12 *     https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the Licence is distributed on an "AS IS" basis,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 */
18
19declare(strict_types=1);
20
21namespace Ndrstmr\Icap\Transport;
22
23use Amp\Cancellation;
24use Amp\CompositeCancellation;
25use Amp\Socket;
26use Amp\Socket\ConnectContext;
27use Amp\Socket\Socket as SocketInterface;
28use Amp\TimeoutCancellation;
29use Ndrstmr\Icap\Config;
30use Ndrstmr\Icap\Exception\IcapConnectionException;
31
32use function Amp\async;
33
34/**
35 * Asynchronous transport implementation using amphp/socket.
36 *
37 * Upgrades to TLS (icaps://) automatically when the supplied Config
38 * carries a {@see \Amp\Socket\ClientTlsContext}; otherwise connects
39 * plain tcp://. The response is bounded by Config::maxResponseSize to
40 * keep a hostile server from exhausting the client's memory.
41 *
42 * The optional user-supplied {@see Cancellation} is combined with the
43 * transport's internal {@see TimeoutCancellation} via a
44 * {@see CompositeCancellation}; whichever fires first aborts the
45 * read/write loop with `Amp\CancelledException`.
46 *
47 * Response framing is done by {@see ResponseFrameReader} so the read
48 * loop terminates as soon as the message is complete — no dependency
49 * on the server closing the socket.
50 *
51 * **Connection pooling (v2.1+).** When constructed with a
52 * {@see ConnectionPoolInterface}, the transport calls
53 * `acquire()` instead of opening a fresh TCP/TLS connection and
54 * `release()` instead of closing the socket. The socket is closed
55 * (rather than returned) when:
56 *   - the framing reader threw — the socket might be in an
57 *     inconsistent state and we play safe;
58 *   - the server's response carries `Connection: close` (RFC 7230
59 *     §6.1) — we honour the close intent.
60 * Without a pool the transport opens a fresh socket per request and
61 * closes it after — identical to the v2.0 behaviour.
62 *
63 * **Sessions (v2.1+).** Implements {@see SessionAwareTransport} so
64 * callers that need to send + receive multiple times on the same
65 * connection (strict RFC 3507 §4.5 preview-continue, ICAP request
66 * pipelining, …) can call {@see openSession()}.
67 */
68final class AsyncAmpTransport implements SessionAwareTransport
69{
70    public function __construct(
71        private ?ConnectionPoolInterface $pool = null,
72    ) {
73    }
74
75    /**
76     * @param iterable<string> $rawRequest
77     * @return \Amp\Future<string>
78     */
79    #[\Override]
80    public function request(Config $config, iterable $rawRequest, ?Cancellation $cancellation = null): \Amp\Future
81    {
82        /** @var \Amp\Future<string> $future */
83        $future = async(function () use ($config, $rawRequest, $cancellation): string {
84            $session = $this->openSession($config, $cancellation);
85            $closeForced = false;
86            try {
87                $session->write($rawRequest);
88                $response = $session->readResponse();
89                if ($this->serverWantsClose($response)) {
90                    $closeForced = true;
91                }
92                return $response;
93            } catch (\Throwable $e) {
94                $closeForced = true;
95                throw $e;
96            } finally {
97                if ($closeForced) {
98                    $session->close();
99                } else {
100                    $session->release();
101                }
102            }
103        });
104
105        return $future;
106    }
107
108    #[\Override]
109    public function openSession(Config $config, ?Cancellation $cancellation = null): TransportSession
110    {
111        // Use a one-shot timeout for socket acquisition only.
112        $acquireCancellation = new TimeoutCancellation($config->getStreamTimeout());
113        $effectiveAcquire = $cancellation === null
114            ? $acquireCancellation
115            : new CompositeCancellation($cancellation, $acquireCancellation);
116
117        $socket = $this->acquireSocket($config, $effectiveAcquire);
118
119        // The session creates a fresh TimeoutCancellation per IO call
120        // (write / readResponse), so each individual operation gets
121        // the full streamTimeout window — no session-lifetime timer.
122        return new AmpTransportSession(
123            config: $config,
124            socket: $socket,
125            streamTimeout: $config->getStreamTimeout(),
126            userCancellation: $cancellation,
127            maxResponseSize: $config->getMaxResponseSize(),
128            maxHeaderLineLength: $config->getMaxHeaderLineLength(),
129            pool: $this->pool,
130        );
131    }
132
133    private function acquireSocket(Config $config, Cancellation $cancellation): SocketInterface
134    {
135        if ($this->pool !== null) {
136            return $this->pool->acquire($config, $cancellation);
137        }
138
139        $tls = $config->getTlsContext();
140        $url = sprintf('tcp://%s:%d', $config->host, $config->port);
141        $context = (new ConnectContext())->withConnectTimeout($config->getSocketTimeout());
142        if ($tls !== null) {
143            $context = $context->withTlsContext($tls);
144        }
145
146        try {
147            if ($tls !== null) {
148                return Socket\connectTls($url, $context, $cancellation);
149            }
150            return Socket\connect($url, $context, $cancellation);
151        } catch (Socket\ConnectException $e) {
152            throw new IcapConnectionException(
153                sprintf('Async connection to %s:%d failed.', $config->host, $config->port),
154                0,
155                $e,
156            );
157        }
158    }
159
160    /**
161     * Cheap heuristic for `Connection: close` in the ICAP head. We
162     * don't fully reparse here — the response parser does that — but
163     * we want to keep the socket-disposal decision local to the
164     * transport.
165     */
166    private function serverWantsClose(string $response): bool
167    {
168        $headEnd = strpos($response, "\r\n\r\n");
169        $head = $headEnd === false ? $response : substr($response, 0, $headEnd);
170        return preg_match('/^Connection:\s*close\s*$/im', $head) === 1;
171    }
172}