friendship ended with social-app. php is my new best friend
1<?php
2
3namespace React\Http\Io;
4
5use Evenement\EventEmitter;
6use Psr\Http\Message\ResponseInterface;
7use Psr\Http\Message\ServerRequestInterface;
8use React\EventLoop\LoopInterface;
9use React\Http\Message\Response;
10use React\Http\Message\ServerRequest;
11use React\Promise;
12use React\Promise\PromiseInterface;
13use React\Socket\ConnectionInterface;
14use React\Socket\ServerInterface;
15use React\Stream\ReadableStreamInterface;
16use React\Stream\WritableStreamInterface;
17
18/**
19 * The internal `StreamingServer` class is responsible for handling incoming connections and then
20 * processing each incoming HTTP request.
21 *
22 * Unlike the [`HttpServer`](#httpserver) class, it does not buffer and parse the incoming
23 * HTTP request body by default. This means that the request handler will be
24 * invoked with a streaming request body. Once the request headers have been
25 * received, it will invoke the request handler function. This request handler
26 * function needs to be passed to the constructor and will be invoked with the
27 * respective [request](#request) object and expects a [response](#response)
28 * object in return:
29 *
30 * ```php
31 * $server = new StreamingServer($loop, function (ServerRequestInterface $request) {
32 * return new Response(
33 * Response::STATUS_OK,
34 * array(
35 * 'Content-Type' => 'text/plain'
36 * ),
37 * "Hello World!\n"
38 * );
39 * });
40 * ```
41 *
42 * Each incoming HTTP request message is always represented by the
43 * [PSR-7 `ServerRequestInterface`](https://www.php-fig.org/psr/psr-7/#321-psrhttpmessageserverrequestinterface),
44 * see also following [request](#request) chapter for more details.
45 * Each outgoing HTTP response message is always represented by the
46 * [PSR-7 `ResponseInterface`](https://www.php-fig.org/psr/psr-7/#33-psrhttpmessageresponseinterface),
47 * see also following [response](#response) chapter for more details.
48 *
49 * In order to process any connections, the server needs to be attached to an
50 * instance of `React\Socket\ServerInterface` through the [`listen()`](#listen) method
51 * as described in the following chapter. In its most simple form, you can attach
52 * this to a [`React\Socket\SocketServer`](https://github.com/reactphp/socket#socketserver)
53 * in order to start a plaintext HTTP server like this:
54 *
55 * ```php
56 * $server = new StreamingServer($loop, $handler);
57 *
58 * $socket = new React\Socket\SocketServer('0.0.0.0:8080', array(), $loop);
59 * $server->listen($socket);
60 * ```
61 *
62 * See also the [`listen()`](#listen) method and the [first example](examples) for more details.
63 *
64 * The `StreamingServer` class is considered advanced usage and unless you know
65 * what you're doing, you're recommended to use the [`HttpServer`](#httpserver) class
66 * instead. The `StreamingServer` class is specifically designed to help with
67 * more advanced use cases where you want to have full control over consuming
68 * the incoming HTTP request body and concurrency settings.
69 *
70 * In particular, this class does not buffer and parse the incoming HTTP request
71 * in memory. It will invoke the request handler function once the HTTP request
72 * headers have been received, i.e. before receiving the potentially much larger
73 * HTTP request body. This means the [request](#request) passed to your request
74 * handler function may not be fully compatible with PSR-7. See also
75 * [streaming request](#streaming-request) below for more details.
76 *
77 * @see \React\Http\HttpServer
78 * @see \React\Http\Message\Response
79 * @see self::listen()
80 * @internal
81 */
82final class StreamingServer extends EventEmitter
83{
84 private $callback;
85 private $parser;
86
87 /** @var Clock */
88 private $clock;
89
90 /**
91 * Creates an HTTP server that invokes the given callback for each incoming HTTP request
92 *
93 * In order to process any connections, the server needs to be attached to an
94 * instance of `React\Socket\ServerInterface` which emits underlying streaming
95 * connections in order to then parse incoming data as HTTP.
96 * See also [listen()](#listen) for more details.
97 *
98 * @param LoopInterface $loop
99 * @param callable $requestHandler
100 * @see self::listen()
101 */
102 public function __construct(LoopInterface $loop, $requestHandler)
103 {
104 if (!\is_callable($requestHandler)) {
105 throw new \InvalidArgumentException('Invalid request handler given');
106 }
107
108 $this->callback = $requestHandler;
109 $this->clock = new Clock($loop);
110 $this->parser = new RequestHeaderParser($this->clock);
111
112 $that = $this;
113 $this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) {
114 $that->handleRequest($conn, $request);
115 });
116
117 $this->parser->on('error', function(\Exception $e, ConnectionInterface $conn) use ($that) {
118 $that->emit('error', array($e));
119
120 // parsing failed => assume dummy request and send appropriate error
121 $that->writeError(
122 $conn,
123 $e->getCode() !== 0 ? $e->getCode() : Response::STATUS_BAD_REQUEST,
124 new ServerRequest('GET', '/')
125 );
126 });
127 }
128
129 /**
130 * Starts listening for HTTP requests on the given socket server instance
131 *
132 * @param ServerInterface $socket
133 * @see \React\Http\HttpServer::listen()
134 */
135 public function listen(ServerInterface $socket)
136 {
137 $socket->on('connection', array($this->parser, 'handle'));
138 }
139
140 /** @internal */
141 public function handleRequest(ConnectionInterface $conn, ServerRequestInterface $request)
142 {
143 if ($request->getProtocolVersion() !== '1.0' && '100-continue' === \strtolower($request->getHeaderLine('Expect'))) {
144 $conn->write("HTTP/1.1 100 Continue\r\n\r\n");
145 }
146
147 // execute request handler callback
148 $callback = $this->callback;
149 try {
150 $response = $callback($request);
151 } catch (\Exception $error) {
152 // request handler callback throws an Exception
153 $response = Promise\reject($error);
154 } catch (\Throwable $error) { // @codeCoverageIgnoreStart
155 // request handler callback throws a PHP7+ Error
156 $response = Promise\reject($error); // @codeCoverageIgnoreEnd
157 }
158
159 // cancel pending promise once connection closes
160 $connectionOnCloseResponseCancelerHandler = function () {};
161 if ($response instanceof PromiseInterface && \method_exists($response, 'cancel')) {
162 $connectionOnCloseResponseCanceler = function () use ($response) {
163 $response->cancel();
164 };
165 $connectionOnCloseResponseCancelerHandler = function () use ($connectionOnCloseResponseCanceler, $conn) {
166 if ($connectionOnCloseResponseCanceler !== null) {
167 $conn->removeListener('close', $connectionOnCloseResponseCanceler);
168 }
169 };
170 $conn->on('close', $connectionOnCloseResponseCanceler);
171 }
172
173 // happy path: response returned, handle and return immediately
174 if ($response instanceof ResponseInterface) {
175 return $this->handleResponse($conn, $request, $response);
176 }
177
178 // did not return a promise? this is an error, convert into one for rejection below.
179 if (!$response instanceof PromiseInterface) {
180 $response = Promise\resolve($response);
181 }
182
183 $that = $this;
184 $response->then(
185 function ($response) use ($that, $conn, $request) {
186 if (!$response instanceof ResponseInterface) {
187 $message = 'The response callback is expected to resolve with an object implementing Psr\Http\Message\ResponseInterface, but resolved with "%s" instead.';
188 $message = \sprintf($message, \is_object($response) ? \get_class($response) : \gettype($response));
189 $exception = new \RuntimeException($message);
190
191 $that->emit('error', array($exception));
192 return $that->writeError($conn, Response::STATUS_INTERNAL_SERVER_ERROR, $request);
193 }
194 $that->handleResponse($conn, $request, $response);
195 },
196 function ($error) use ($that, $conn, $request) {
197 $message = 'The response callback is expected to resolve with an object implementing Psr\Http\Message\ResponseInterface, but rejected with "%s" instead.';
198 $message = \sprintf($message, \is_object($error) ? \get_class($error) : \gettype($error));
199
200 $previous = null;
201
202 if ($error instanceof \Throwable || $error instanceof \Exception) {
203 $previous = $error;
204 }
205
206 $exception = new \RuntimeException($message, 0, $previous);
207
208 $that->emit('error', array($exception));
209 return $that->writeError($conn, Response::STATUS_INTERNAL_SERVER_ERROR, $request);
210 }
211 )->then($connectionOnCloseResponseCancelerHandler, $connectionOnCloseResponseCancelerHandler);
212 }
213
214 /** @internal */
215 public function writeError(ConnectionInterface $conn, $code, ServerRequestInterface $request)
216 {
217 $response = new Response(
218 $code,
219 array(
220 'Content-Type' => 'text/plain',
221 'Connection' => 'close' // we do not want to keep the connection open after an error
222 ),
223 'Error ' . $code
224 );
225
226 // append reason phrase to response body if known
227 $reason = $response->getReasonPhrase();
228 if ($reason !== '') {
229 $body = $response->getBody();
230 $body->seek(0, SEEK_END);
231 $body->write(': ' . $reason);
232 }
233
234 $this->handleResponse($conn, $request, $response);
235 }
236
237
238 /** @internal */
239 public function handleResponse(ConnectionInterface $connection, ServerRequestInterface $request, ResponseInterface $response)
240 {
241 // return early and close response body if connection is already closed
242 $body = $response->getBody();
243 if (!$connection->isWritable()) {
244 $body->close();
245 return;
246 }
247
248 $code = $response->getStatusCode();
249 $method = $request->getMethod();
250
251 // assign HTTP protocol version from request automatically
252 $version = $request->getProtocolVersion();
253 $response = $response->withProtocolVersion($version);
254
255 // assign default "Server" header automatically
256 if (!$response->hasHeader('Server')) {
257 $response = $response->withHeader('Server', 'ReactPHP/1');
258 } elseif ($response->getHeaderLine('Server') === ''){
259 $response = $response->withoutHeader('Server');
260 }
261
262 // assign default "Date" header from current time automatically
263 if (!$response->hasHeader('Date')) {
264 // IMF-fixdate = day-name "," SP date1 SP time-of-day SP GMT
265 $response = $response->withHeader('Date', gmdate('D, d M Y H:i:s', (int) $this->clock->now()) . ' GMT');
266 } elseif ($response->getHeaderLine('Date') === ''){
267 $response = $response->withoutHeader('Date');
268 }
269
270 // assign "Content-Length" header automatically
271 $chunked = false;
272 if (($method === 'CONNECT' && $code >= 200 && $code < 300) || ($code >= 100 && $code < 200) || $code === Response::STATUS_NO_CONTENT) {
273 // 2xx response to CONNECT and 1xx and 204 MUST NOT include Content-Length or Transfer-Encoding header
274 $response = $response->withoutHeader('Content-Length');
275 } elseif ($method === 'HEAD' && $response->hasHeader('Content-Length')) {
276 // HEAD Request: preserve explicit Content-Length
277 } elseif ($code === Response::STATUS_NOT_MODIFIED && ($response->hasHeader('Content-Length') || $body->getSize() === 0)) {
278 // 304 Not Modified: preserve explicit Content-Length and preserve missing header if body is empty
279 } elseif ($body->getSize() !== null) {
280 // assign Content-Length header when using a "normal" buffered body string
281 $response = $response->withHeader('Content-Length', (string)$body->getSize());
282 } elseif (!$response->hasHeader('Content-Length') && $version === '1.1') {
283 // assign chunked transfer-encoding if no 'content-length' is given for HTTP/1.1 responses
284 $chunked = true;
285 }
286
287 // assign "Transfer-Encoding" header automatically
288 if ($chunked) {
289 $response = $response->withHeader('Transfer-Encoding', 'chunked');
290 } else {
291 // remove any Transfer-Encoding headers unless automatically enabled above
292 $response = $response->withoutHeader('Transfer-Encoding');
293 }
294
295 // assign "Connection" header automatically
296 $persist = false;
297 if ($code === Response::STATUS_SWITCHING_PROTOCOLS) {
298 // 101 (Switching Protocols) response uses Connection: upgrade header
299 // This implies that this stream now uses another protocol and we
300 // may not persist this connection for additional requests.
301 $response = $response->withHeader('Connection', 'upgrade');
302 } elseif (\strtolower($request->getHeaderLine('Connection')) === 'close' || \strtolower($response->getHeaderLine('Connection')) === 'close') {
303 // obey explicit "Connection: close" request header or response header if present
304 $response = $response->withHeader('Connection', 'close');
305 } elseif ($version === '1.1') {
306 // HTTP/1.1 assumes persistent connection support by default, so we don't need to inform client
307 $persist = true;
308 } elseif (strtolower($request->getHeaderLine('Connection')) === 'keep-alive') {
309 // obey explicit "Connection: keep-alive" request header and inform client
310 $persist = true;
311 $response = $response->withHeader('Connection', 'keep-alive');
312 } else {
313 // remove any Connection headers unless automatically enabled above
314 $response = $response->withoutHeader('Connection');
315 }
316
317 // 101 (Switching Protocols) response (for Upgrade request) forwards upgraded data through duplex stream
318 // 2xx (Successful) response to CONNECT forwards tunneled application data through duplex stream
319 if (($code === Response::STATUS_SWITCHING_PROTOCOLS || ($method === 'CONNECT' && $code >= 200 && $code < 300)) && $body instanceof HttpBodyStream && $body->input instanceof WritableStreamInterface) {
320 if ($request->getBody()->isReadable()) {
321 // request is still streaming => wait for request close before forwarding following data from connection
322 $request->getBody()->on('close', function () use ($connection, $body) {
323 if ($body->input->isWritable()) {
324 $connection->pipe($body->input);
325 $connection->resume();
326 }
327 });
328 } elseif ($body->input->isWritable()) {
329 // request already closed => forward following data from connection
330 $connection->pipe($body->input);
331 $connection->resume();
332 }
333 }
334
335 // build HTTP response header by appending status line and header fields
336 $expected = 0;
337 $headers = "HTTP/" . $version . " " . $code . " " . $response->getReasonPhrase() . "\r\n";
338 foreach ($response->getHeaders() as $name => $values) {
339 if (\strpos($name, ':') !== false) {
340 $expected = -1;
341 break;
342 }
343 foreach ($values as $value) {
344 $headers .= $name . ": " . $value . "\r\n";
345 ++$expected;
346 }
347 }
348
349 /** @var array $m legacy PHP 5.3 only */
350 if ($code < 100 || $code > 999 || \substr_count($headers, "\n") !== ($expected + 1) || (\PHP_VERSION_ID >= 50400 ? \preg_match_all(AbstractMessage::REGEX_HEADERS, $headers) : \preg_match_all(AbstractMessage::REGEX_HEADERS, $headers, $m)) !== $expected) {
351 $this->emit('error', array(new \InvalidArgumentException('Unable to send response with invalid response headers')));
352 $this->writeError($connection, Response::STATUS_INTERNAL_SERVER_ERROR, $request);
353 return;
354 }
355
356 // response to HEAD and 1xx, 204 and 304 responses MUST NOT include a body
357 // exclude status 101 (Switching Protocols) here for Upgrade request handling above
358 if ($method === 'HEAD' || ($code >= 100 && $code < 200 && $code !== Response::STATUS_SWITCHING_PROTOCOLS) || $code === Response::STATUS_NO_CONTENT || $code === Response::STATUS_NOT_MODIFIED) {
359 $body->close();
360 $body = '';
361 }
362
363 // this is a non-streaming response body or the body stream already closed?
364 if (!$body instanceof ReadableStreamInterface || !$body->isReadable()) {
365 // add final chunk if a streaming body is already closed and uses `Transfer-Encoding: chunked`
366 if ($body instanceof ReadableStreamInterface && $chunked) {
367 $body = "0\r\n\r\n";
368 }
369
370 // write response headers and body
371 $connection->write($headers . "\r\n" . $body);
372
373 // either wait for next request over persistent connection or end connection
374 if ($persist) {
375 $this->parser->handle($connection);
376 } else {
377 $connection->end();
378 }
379 return;
380 }
381
382 $connection->write($headers . "\r\n");
383
384 if ($chunked) {
385 $body = new ChunkedEncoder($body);
386 }
387
388 // Close response stream once connection closes.
389 // Note that this TCP/IP close detection may take some time,
390 // in particular this may only fire on a later read/write attempt.
391 $connection->on('close', array($body, 'close'));
392
393 // write streaming body and then wait for next request over persistent connection
394 if ($persist) {
395 $body->pipe($connection, array('end' => false));
396 $parser = $this->parser;
397 $body->on('end', function () use ($connection, $parser, $body) {
398 $connection->removeListener('close', array($body, 'close'));
399 $parser->handle($connection);
400 });
401 } else {
402 $body->pipe($connection);
403 }
404 }
405}