friendship ended with social-app. php is my new best friend
1<?php
2
3namespace React\Http\Io;
4
5use Evenement\EventEmitter;
6use Psr\Http\Message\MessageInterface;
7use Psr\Http\Message\RequestInterface;
8use React\Http\Message\Response;
9use React\Socket\ConnectionInterface;
10use React\Stream\WritableStreamInterface;
11
12/**
13 * @event response
14 * @event drain
15 * @event error
16 * @event close
17 * @internal
18 */
19class ClientRequestStream extends EventEmitter implements WritableStreamInterface
20{
21 const STATE_INIT = 0;
22 const STATE_WRITING_HEAD = 1;
23 const STATE_HEAD_WRITTEN = 2;
24 const STATE_END = 3;
25
26 /** @var ClientConnectionManager */
27 private $connectionManager;
28
29 /** @var RequestInterface */
30 private $request;
31
32 /** @var ?ConnectionInterface */
33 private $connection;
34
35 /** @var string */
36 private $buffer = '';
37
38 private $responseFactory;
39 private $state = self::STATE_INIT;
40 private $ended = false;
41
42 private $pendingWrites = '';
43
44 public function __construct(ClientConnectionManager $connectionManager, RequestInterface $request)
45 {
46 $this->connectionManager = $connectionManager;
47 $this->request = $request;
48 }
49
50 public function isWritable()
51 {
52 return self::STATE_END > $this->state && !$this->ended;
53 }
54
55 private function writeHead()
56 {
57 $this->state = self::STATE_WRITING_HEAD;
58
59 $expected = 0;
60 $headers = "{$this->request->getMethod()} {$this->request->getRequestTarget()} HTTP/{$this->request->getProtocolVersion()}\r\n";
61 foreach ($this->request->getHeaders() as $name => $values) {
62 if (\strpos($name, ':') !== false) {
63 $expected = -1;
64 break;
65 }
66 foreach ($values as $value) {
67 $headers .= "$name: $value\r\n";
68 ++$expected;
69 }
70 }
71
72 /** @var array $m legacy PHP 5.3 only */
73 if (!\preg_match('#^\S+ \S+ HTTP/1\.[01]\r\n#m', $headers) || \substr_count($headers, "\n") !== ($expected + 1) || (\PHP_VERSION_ID >= 50400 ? \preg_match_all(AbstractMessage::REGEX_HEADERS, $headers) : \preg_match_all(AbstractMessage::REGEX_HEADERS, $headers, $m)) !== $expected) {
74 $this->closeError(new \InvalidArgumentException('Unable to send request with invalid request headers'));
75 return;
76 }
77
78 $connectionRef = &$this->connection;
79 $stateRef = &$this->state;
80 $pendingWrites = &$this->pendingWrites;
81 $that = $this;
82
83 $promise = $this->connectionManager->connect($this->request->getUri());
84 $promise->then(
85 function (ConnectionInterface $connection) use ($headers, &$connectionRef, &$stateRef, &$pendingWrites, $that) {
86 $connectionRef = $connection;
87 assert($connectionRef instanceof ConnectionInterface);
88
89 $connection->on('drain', array($that, 'handleDrain'));
90 $connection->on('data', array($that, 'handleData'));
91 $connection->on('end', array($that, 'handleEnd'));
92 $connection->on('error', array($that, 'handleError'));
93 $connection->on('close', array($that, 'close'));
94
95 $more = $connection->write($headers . "\r\n" . $pendingWrites);
96
97 assert($stateRef === ClientRequestStream::STATE_WRITING_HEAD);
98 $stateRef = ClientRequestStream::STATE_HEAD_WRITTEN;
99
100 // clear pending writes if non-empty
101 if ($pendingWrites !== '') {
102 $pendingWrites = '';
103
104 if ($more) {
105 $that->emit('drain');
106 }
107 }
108 },
109 array($this, 'closeError')
110 );
111
112 $this->on('close', function() use ($promise) {
113 $promise->cancel();
114 });
115 }
116
117 public function write($data)
118 {
119 if (!$this->isWritable()) {
120 return false;
121 }
122
123 // write directly to connection stream if already available
124 if (self::STATE_HEAD_WRITTEN <= $this->state) {
125 return $this->connection->write($data);
126 }
127
128 // otherwise buffer and try to establish connection
129 $this->pendingWrites .= $data;
130 if (self::STATE_WRITING_HEAD > $this->state) {
131 $this->writeHead();
132 }
133
134 return false;
135 }
136
137 public function end($data = null)
138 {
139 if (!$this->isWritable()) {
140 return;
141 }
142
143 if (null !== $data) {
144 $this->write($data);
145 } else if (self::STATE_WRITING_HEAD > $this->state) {
146 $this->writeHead();
147 }
148
149 $this->ended = true;
150 }
151
152 /** @internal */
153 public function handleDrain()
154 {
155 $this->emit('drain');
156 }
157
158 /** @internal */
159 public function handleData($data)
160 {
161 $this->buffer .= $data;
162
163 // buffer until double CRLF (or double LF for compatibility with legacy servers)
164 $eom = \strpos($this->buffer, "\r\n\r\n");
165 $eomLegacy = \strpos($this->buffer, "\n\n");
166 if ($eom !== false || $eomLegacy !== false) {
167 try {
168 if ($eom !== false && ($eomLegacy === false || $eom < $eomLegacy)) {
169 $response = Response::parseMessage(\substr($this->buffer, 0, $eom + 2));
170 $bodyChunk = (string) \substr($this->buffer, $eom + 4);
171 } else {
172 $response = Response::parseMessage(\substr($this->buffer, 0, $eomLegacy + 1));
173 $bodyChunk = (string) \substr($this->buffer, $eomLegacy + 2);
174 }
175 } catch (\InvalidArgumentException $exception) {
176 $this->closeError($exception);
177 return;
178 }
179
180 // response headers successfully received => remove listeners for connection events
181 $connection = $this->connection;
182 assert($connection instanceof ConnectionInterface);
183 $connection->removeListener('drain', array($this, 'handleDrain'));
184 $connection->removeListener('data', array($this, 'handleData'));
185 $connection->removeListener('end', array($this, 'handleEnd'));
186 $connection->removeListener('error', array($this, 'handleError'));
187 $connection->removeListener('close', array($this, 'close'));
188 $this->connection = null;
189 $this->buffer = '';
190
191 // take control over connection handling and check if we can reuse the connection once response body closes
192 $that = $this;
193 $request = $this->request;
194 $connectionManager = $this->connectionManager;
195 $successfulEndReceived = false;
196 $input = $body = new CloseProtectionStream($connection);
197 $input->on('close', function () use ($connection, $that, $connectionManager, $request, $response, &$successfulEndReceived) {
198 // only reuse connection after successful response and both request and response allow keep alive
199 if ($successfulEndReceived && $connection->isReadable() && $that->hasMessageKeepAliveEnabled($response) && $that->hasMessageKeepAliveEnabled($request)) {
200 $connectionManager->keepAlive($request->getUri(), $connection);
201 } else {
202 $connection->close();
203 }
204
205 $that->close();
206 });
207
208 // determine length of response body
209 $length = null;
210 $code = $response->getStatusCode();
211 if ($this->request->getMethod() === 'HEAD' || ($code >= 100 && $code < 200) || $code == Response::STATUS_NO_CONTENT || $code == Response::STATUS_NOT_MODIFIED) {
212 $length = 0;
213 } elseif (\strtolower($response->getHeaderLine('Transfer-Encoding')) === 'chunked') {
214 $body = new ChunkedDecoder($body);
215 } elseif ($response->hasHeader('Content-Length')) {
216 $length = (int) $response->getHeaderLine('Content-Length');
217 }
218 $response = $response->withBody($body = new ReadableBodyStream($body, $length));
219 $body->on('end', function () use (&$successfulEndReceived) {
220 $successfulEndReceived = true;
221 });
222
223 // emit response with streaming response body (see `Sender`)
224 $this->emit('response', array($response, $body));
225
226 // re-emit HTTP response body to trigger body parsing if parts of it are buffered
227 if ($bodyChunk !== '') {
228 $input->handleData($bodyChunk);
229 } elseif ($length === 0) {
230 $input->handleEnd();
231 }
232 }
233 }
234
235 /** @internal */
236 public function handleEnd()
237 {
238 $this->closeError(new \RuntimeException(
239 "Connection ended before receiving response"
240 ));
241 }
242
243 /** @internal */
244 public function handleError(\Exception $error)
245 {
246 $this->closeError(new \RuntimeException(
247 "An error occurred in the underlying stream",
248 0,
249 $error
250 ));
251 }
252
253 /** @internal */
254 public function closeError(\Exception $error)
255 {
256 if (self::STATE_END <= $this->state) {
257 return;
258 }
259 $this->emit('error', array($error));
260 $this->close();
261 }
262
263 public function close()
264 {
265 if (self::STATE_END <= $this->state) {
266 return;
267 }
268
269 $this->state = self::STATE_END;
270 $this->pendingWrites = '';
271 $this->buffer = '';
272
273 if ($this->connection instanceof ConnectionInterface) {
274 $this->connection->close();
275 $this->connection = null;
276 }
277
278 $this->emit('close');
279 $this->removeAllListeners();
280 }
281
282 /**
283 * @internal
284 * @return bool
285 * @link https://www.rfc-editor.org/rfc/rfc9112#section-9.3
286 * @link https://www.rfc-editor.org/rfc/rfc7230#section-6.1
287 */
288 public function hasMessageKeepAliveEnabled(MessageInterface $message)
289 {
290 // @link https://www.rfc-editor.org/rfc/rfc9110#section-7.6.1
291 $connectionOptions = \array_map('trim', \explode(',', \strtolower($message->getHeaderLine('Connection'))));
292
293 if (\in_array('close', $connectionOptions, true)) {
294 return false;
295 }
296
297 if ($message->getProtocolVersion() === '1.1') {
298 return true;
299 }
300
301 if (\in_array('keep-alive', $connectionOptions, true)) {
302 return true;
303 }
304
305 return false;
306 }
307}