friendship ended with social-app. php is my new best friend
1<?php 2 3namespace React\Http\Io; 4 5use Evenement\EventEmitter; 6use Psr\Http\Message\ResponseInterface; 7use Psr\Http\Message\ServerRequestInterface; 8use React\EventLoop\LoopInterface; 9use React\Http\Message\Response; 10use React\Http\Message\ServerRequest; 11use React\Promise; 12use React\Promise\PromiseInterface; 13use React\Socket\ConnectionInterface; 14use React\Socket\ServerInterface; 15use React\Stream\ReadableStreamInterface; 16use React\Stream\WritableStreamInterface; 17 18/** 19 * The internal `StreamingServer` class is responsible for handling incoming connections and then 20 * processing each incoming HTTP request. 21 * 22 * Unlike the [`HttpServer`](#httpserver) class, it does not buffer and parse the incoming 23 * HTTP request body by default. This means that the request handler will be 24 * invoked with a streaming request body. Once the request headers have been 25 * received, it will invoke the request handler function. This request handler 26 * function needs to be passed to the constructor and will be invoked with the 27 * respective [request](#request) object and expects a [response](#response) 28 * object in return: 29 * 30 * ```php 31 * $server = new StreamingServer($loop, function (ServerRequestInterface $request) { 32 * return new Response( 33 * Response::STATUS_OK, 34 * array( 35 * 'Content-Type' => 'text/plain' 36 * ), 37 * "Hello World!\n" 38 * ); 39 * }); 40 * ``` 41 * 42 * Each incoming HTTP request message is always represented by the 43 * [PSR-7 `ServerRequestInterface`](https://www.php-fig.org/psr/psr-7/#321-psrhttpmessageserverrequestinterface), 44 * see also following [request](#request) chapter for more details. 45 * Each outgoing HTTP response message is always represented by the 46 * [PSR-7 `ResponseInterface`](https://www.php-fig.org/psr/psr-7/#33-psrhttpmessageresponseinterface), 47 * see also following [response](#response) chapter for more details. 48 * 49 * In order to process any connections, the server needs to be attached to an 50 * instance of `React\Socket\ServerInterface` through the [`listen()`](#listen) method 51 * as described in the following chapter. In its most simple form, you can attach 52 * this to a [`React\Socket\SocketServer`](https://github.com/reactphp/socket#socketserver) 53 * in order to start a plaintext HTTP server like this: 54 * 55 * ```php 56 * $server = new StreamingServer($loop, $handler); 57 * 58 * $socket = new React\Socket\SocketServer('0.0.0.0:8080', array(), $loop); 59 * $server->listen($socket); 60 * ``` 61 * 62 * See also the [`listen()`](#listen) method and the [first example](examples) for more details. 63 * 64 * The `StreamingServer` class is considered advanced usage and unless you know 65 * what you're doing, you're recommended to use the [`HttpServer`](#httpserver) class 66 * instead. The `StreamingServer` class is specifically designed to help with 67 * more advanced use cases where you want to have full control over consuming 68 * the incoming HTTP request body and concurrency settings. 69 * 70 * In particular, this class does not buffer and parse the incoming HTTP request 71 * in memory. It will invoke the request handler function once the HTTP request 72 * headers have been received, i.e. before receiving the potentially much larger 73 * HTTP request body. This means the [request](#request) passed to your request 74 * handler function may not be fully compatible with PSR-7. See also 75 * [streaming request](#streaming-request) below for more details. 76 * 77 * @see \React\Http\HttpServer 78 * @see \React\Http\Message\Response 79 * @see self::listen() 80 * @internal 81 */ 82final class StreamingServer extends EventEmitter 83{ 84 private $callback; 85 private $parser; 86 87 /** @var Clock */ 88 private $clock; 89 90 /** 91 * Creates an HTTP server that invokes the given callback for each incoming HTTP request 92 * 93 * In order to process any connections, the server needs to be attached to an 94 * instance of `React\Socket\ServerInterface` which emits underlying streaming 95 * connections in order to then parse incoming data as HTTP. 96 * See also [listen()](#listen) for more details. 97 * 98 * @param LoopInterface $loop 99 * @param callable $requestHandler 100 * @see self::listen() 101 */ 102 public function __construct(LoopInterface $loop, $requestHandler) 103 { 104 if (!\is_callable($requestHandler)) { 105 throw new \InvalidArgumentException('Invalid request handler given'); 106 } 107 108 $this->callback = $requestHandler; 109 $this->clock = new Clock($loop); 110 $this->parser = new RequestHeaderParser($this->clock); 111 112 $that = $this; 113 $this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) { 114 $that->handleRequest($conn, $request); 115 }); 116 117 $this->parser->on('error', function(\Exception $e, ConnectionInterface $conn) use ($that) { 118 $that->emit('error', array($e)); 119 120 // parsing failed => assume dummy request and send appropriate error 121 $that->writeError( 122 $conn, 123 $e->getCode() !== 0 ? $e->getCode() : Response::STATUS_BAD_REQUEST, 124 new ServerRequest('GET', '/') 125 ); 126 }); 127 } 128 129 /** 130 * Starts listening for HTTP requests on the given socket server instance 131 * 132 * @param ServerInterface $socket 133 * @see \React\Http\HttpServer::listen() 134 */ 135 public function listen(ServerInterface $socket) 136 { 137 $socket->on('connection', array($this->parser, 'handle')); 138 } 139 140 /** @internal */ 141 public function handleRequest(ConnectionInterface $conn, ServerRequestInterface $request) 142 { 143 if ($request->getProtocolVersion() !== '1.0' && '100-continue' === \strtolower($request->getHeaderLine('Expect'))) { 144 $conn->write("HTTP/1.1 100 Continue\r\n\r\n"); 145 } 146 147 // execute request handler callback 148 $callback = $this->callback; 149 try { 150 $response = $callback($request); 151 } catch (\Exception $error) { 152 // request handler callback throws an Exception 153 $response = Promise\reject($error); 154 } catch (\Throwable $error) { // @codeCoverageIgnoreStart 155 // request handler callback throws a PHP7+ Error 156 $response = Promise\reject($error); // @codeCoverageIgnoreEnd 157 } 158 159 // cancel pending promise once connection closes 160 $connectionOnCloseResponseCancelerHandler = function () {}; 161 if ($response instanceof PromiseInterface && \method_exists($response, 'cancel')) { 162 $connectionOnCloseResponseCanceler = function () use ($response) { 163 $response->cancel(); 164 }; 165 $connectionOnCloseResponseCancelerHandler = function () use ($connectionOnCloseResponseCanceler, $conn) { 166 if ($connectionOnCloseResponseCanceler !== null) { 167 $conn->removeListener('close', $connectionOnCloseResponseCanceler); 168 } 169 }; 170 $conn->on('close', $connectionOnCloseResponseCanceler); 171 } 172 173 // happy path: response returned, handle and return immediately 174 if ($response instanceof ResponseInterface) { 175 return $this->handleResponse($conn, $request, $response); 176 } 177 178 // did not return a promise? this is an error, convert into one for rejection below. 179 if (!$response instanceof PromiseInterface) { 180 $response = Promise\resolve($response); 181 } 182 183 $that = $this; 184 $response->then( 185 function ($response) use ($that, $conn, $request) { 186 if (!$response instanceof ResponseInterface) { 187 $message = 'The response callback is expected to resolve with an object implementing Psr\Http\Message\ResponseInterface, but resolved with "%s" instead.'; 188 $message = \sprintf($message, \is_object($response) ? \get_class($response) : \gettype($response)); 189 $exception = new \RuntimeException($message); 190 191 $that->emit('error', array($exception)); 192 return $that->writeError($conn, Response::STATUS_INTERNAL_SERVER_ERROR, $request); 193 } 194 $that->handleResponse($conn, $request, $response); 195 }, 196 function ($error) use ($that, $conn, $request) { 197 $message = 'The response callback is expected to resolve with an object implementing Psr\Http\Message\ResponseInterface, but rejected with "%s" instead.'; 198 $message = \sprintf($message, \is_object($error) ? \get_class($error) : \gettype($error)); 199 200 $previous = null; 201 202 if ($error instanceof \Throwable || $error instanceof \Exception) { 203 $previous = $error; 204 } 205 206 $exception = new \RuntimeException($message, 0, $previous); 207 208 $that->emit('error', array($exception)); 209 return $that->writeError($conn, Response::STATUS_INTERNAL_SERVER_ERROR, $request); 210 } 211 )->then($connectionOnCloseResponseCancelerHandler, $connectionOnCloseResponseCancelerHandler); 212 } 213 214 /** @internal */ 215 public function writeError(ConnectionInterface $conn, $code, ServerRequestInterface $request) 216 { 217 $response = new Response( 218 $code, 219 array( 220 'Content-Type' => 'text/plain', 221 'Connection' => 'close' // we do not want to keep the connection open after an error 222 ), 223 'Error ' . $code 224 ); 225 226 // append reason phrase to response body if known 227 $reason = $response->getReasonPhrase(); 228 if ($reason !== '') { 229 $body = $response->getBody(); 230 $body->seek(0, SEEK_END); 231 $body->write(': ' . $reason); 232 } 233 234 $this->handleResponse($conn, $request, $response); 235 } 236 237 238 /** @internal */ 239 public function handleResponse(ConnectionInterface $connection, ServerRequestInterface $request, ResponseInterface $response) 240 { 241 // return early and close response body if connection is already closed 242 $body = $response->getBody(); 243 if (!$connection->isWritable()) { 244 $body->close(); 245 return; 246 } 247 248 $code = $response->getStatusCode(); 249 $method = $request->getMethod(); 250 251 // assign HTTP protocol version from request automatically 252 $version = $request->getProtocolVersion(); 253 $response = $response->withProtocolVersion($version); 254 255 // assign default "Server" header automatically 256 if (!$response->hasHeader('Server')) { 257 $response = $response->withHeader('Server', 'ReactPHP/1'); 258 } elseif ($response->getHeaderLine('Server') === ''){ 259 $response = $response->withoutHeader('Server'); 260 } 261 262 // assign default "Date" header from current time automatically 263 if (!$response->hasHeader('Date')) { 264 // IMF-fixdate = day-name "," SP date1 SP time-of-day SP GMT 265 $response = $response->withHeader('Date', gmdate('D, d M Y H:i:s', (int) $this->clock->now()) . ' GMT'); 266 } elseif ($response->getHeaderLine('Date') === ''){ 267 $response = $response->withoutHeader('Date'); 268 } 269 270 // assign "Content-Length" header automatically 271 $chunked = false; 272 if (($method === 'CONNECT' && $code >= 200 && $code < 300) || ($code >= 100 && $code < 200) || $code === Response::STATUS_NO_CONTENT) { 273 // 2xx response to CONNECT and 1xx and 204 MUST NOT include Content-Length or Transfer-Encoding header 274 $response = $response->withoutHeader('Content-Length'); 275 } elseif ($method === 'HEAD' && $response->hasHeader('Content-Length')) { 276 // HEAD Request: preserve explicit Content-Length 277 } elseif ($code === Response::STATUS_NOT_MODIFIED && ($response->hasHeader('Content-Length') || $body->getSize() === 0)) { 278 // 304 Not Modified: preserve explicit Content-Length and preserve missing header if body is empty 279 } elseif ($body->getSize() !== null) { 280 // assign Content-Length header when using a "normal" buffered body string 281 $response = $response->withHeader('Content-Length', (string)$body->getSize()); 282 } elseif (!$response->hasHeader('Content-Length') && $version === '1.1') { 283 // assign chunked transfer-encoding if no 'content-length' is given for HTTP/1.1 responses 284 $chunked = true; 285 } 286 287 // assign "Transfer-Encoding" header automatically 288 if ($chunked) { 289 $response = $response->withHeader('Transfer-Encoding', 'chunked'); 290 } else { 291 // remove any Transfer-Encoding headers unless automatically enabled above 292 $response = $response->withoutHeader('Transfer-Encoding'); 293 } 294 295 // assign "Connection" header automatically 296 $persist = false; 297 if ($code === Response::STATUS_SWITCHING_PROTOCOLS) { 298 // 101 (Switching Protocols) response uses Connection: upgrade header 299 // This implies that this stream now uses another protocol and we 300 // may not persist this connection for additional requests. 301 $response = $response->withHeader('Connection', 'upgrade'); 302 } elseif (\strtolower($request->getHeaderLine('Connection')) === 'close' || \strtolower($response->getHeaderLine('Connection')) === 'close') { 303 // obey explicit "Connection: close" request header or response header if present 304 $response = $response->withHeader('Connection', 'close'); 305 } elseif ($version === '1.1') { 306 // HTTP/1.1 assumes persistent connection support by default, so we don't need to inform client 307 $persist = true; 308 } elseif (strtolower($request->getHeaderLine('Connection')) === 'keep-alive') { 309 // obey explicit "Connection: keep-alive" request header and inform client 310 $persist = true; 311 $response = $response->withHeader('Connection', 'keep-alive'); 312 } else { 313 // remove any Connection headers unless automatically enabled above 314 $response = $response->withoutHeader('Connection'); 315 } 316 317 // 101 (Switching Protocols) response (for Upgrade request) forwards upgraded data through duplex stream 318 // 2xx (Successful) response to CONNECT forwards tunneled application data through duplex stream 319 if (($code === Response::STATUS_SWITCHING_PROTOCOLS || ($method === 'CONNECT' && $code >= 200 && $code < 300)) && $body instanceof HttpBodyStream && $body->input instanceof WritableStreamInterface) { 320 if ($request->getBody()->isReadable()) { 321 // request is still streaming => wait for request close before forwarding following data from connection 322 $request->getBody()->on('close', function () use ($connection, $body) { 323 if ($body->input->isWritable()) { 324 $connection->pipe($body->input); 325 $connection->resume(); 326 } 327 }); 328 } elseif ($body->input->isWritable()) { 329 // request already closed => forward following data from connection 330 $connection->pipe($body->input); 331 $connection->resume(); 332 } 333 } 334 335 // build HTTP response header by appending status line and header fields 336 $expected = 0; 337 $headers = "HTTP/" . $version . " " . $code . " " . $response->getReasonPhrase() . "\r\n"; 338 foreach ($response->getHeaders() as $name => $values) { 339 if (\strpos($name, ':') !== false) { 340 $expected = -1; 341 break; 342 } 343 foreach ($values as $value) { 344 $headers .= $name . ": " . $value . "\r\n"; 345 ++$expected; 346 } 347 } 348 349 /** @var array $m legacy PHP 5.3 only */ 350 if ($code < 100 || $code > 999 || \substr_count($headers, "\n") !== ($expected + 1) || (\PHP_VERSION_ID >= 50400 ? \preg_match_all(AbstractMessage::REGEX_HEADERS, $headers) : \preg_match_all(AbstractMessage::REGEX_HEADERS, $headers, $m)) !== $expected) { 351 $this->emit('error', array(new \InvalidArgumentException('Unable to send response with invalid response headers'))); 352 $this->writeError($connection, Response::STATUS_INTERNAL_SERVER_ERROR, $request); 353 return; 354 } 355 356 // response to HEAD and 1xx, 204 and 304 responses MUST NOT include a body 357 // exclude status 101 (Switching Protocols) here for Upgrade request handling above 358 if ($method === 'HEAD' || ($code >= 100 && $code < 200 && $code !== Response::STATUS_SWITCHING_PROTOCOLS) || $code === Response::STATUS_NO_CONTENT || $code === Response::STATUS_NOT_MODIFIED) { 359 $body->close(); 360 $body = ''; 361 } 362 363 // this is a non-streaming response body or the body stream already closed? 364 if (!$body instanceof ReadableStreamInterface || !$body->isReadable()) { 365 // add final chunk if a streaming body is already closed and uses `Transfer-Encoding: chunked` 366 if ($body instanceof ReadableStreamInterface && $chunked) { 367 $body = "0\r\n\r\n"; 368 } 369 370 // write response headers and body 371 $connection->write($headers . "\r\n" . $body); 372 373 // either wait for next request over persistent connection or end connection 374 if ($persist) { 375 $this->parser->handle($connection); 376 } else { 377 $connection->end(); 378 } 379 return; 380 } 381 382 $connection->write($headers . "\r\n"); 383 384 if ($chunked) { 385 $body = new ChunkedEncoder($body); 386 } 387 388 // Close response stream once connection closes. 389 // Note that this TCP/IP close detection may take some time, 390 // in particular this may only fire on a later read/write attempt. 391 $connection->on('close', array($body, 'close')); 392 393 // write streaming body and then wait for next request over persistent connection 394 if ($persist) { 395 $body->pipe($connection, array('end' => false)); 396 $parser = $this->parser; 397 $body->on('end', function () use ($connection, $parser, $body) { 398 $connection->removeListener('close', array($body, 'close')); 399 $parser->handle($connection); 400 }); 401 } else { 402 $body->pipe($connection); 403 } 404 } 405}