friendship ended with social-app. php is my new best friend
1<?php 2 3namespace React\Http\Io; 4 5use Evenement\EventEmitter; 6use Psr\Http\Message\MessageInterface; 7use Psr\Http\Message\RequestInterface; 8use React\Http\Message\Response; 9use React\Socket\ConnectionInterface; 10use React\Stream\WritableStreamInterface; 11 12/** 13 * @event response 14 * @event drain 15 * @event error 16 * @event close 17 * @internal 18 */ 19class ClientRequestStream extends EventEmitter implements WritableStreamInterface 20{ 21 const STATE_INIT = 0; 22 const STATE_WRITING_HEAD = 1; 23 const STATE_HEAD_WRITTEN = 2; 24 const STATE_END = 3; 25 26 /** @var ClientConnectionManager */ 27 private $connectionManager; 28 29 /** @var RequestInterface */ 30 private $request; 31 32 /** @var ?ConnectionInterface */ 33 private $connection; 34 35 /** @var string */ 36 private $buffer = ''; 37 38 private $responseFactory; 39 private $state = self::STATE_INIT; 40 private $ended = false; 41 42 private $pendingWrites = ''; 43 44 public function __construct(ClientConnectionManager $connectionManager, RequestInterface $request) 45 { 46 $this->connectionManager = $connectionManager; 47 $this->request = $request; 48 } 49 50 public function isWritable() 51 { 52 return self::STATE_END > $this->state && !$this->ended; 53 } 54 55 private function writeHead() 56 { 57 $this->state = self::STATE_WRITING_HEAD; 58 59 $expected = 0; 60 $headers = "{$this->request->getMethod()} {$this->request->getRequestTarget()} HTTP/{$this->request->getProtocolVersion()}\r\n"; 61 foreach ($this->request->getHeaders() as $name => $values) { 62 if (\strpos($name, ':') !== false) { 63 $expected = -1; 64 break; 65 } 66 foreach ($values as $value) { 67 $headers .= "$name: $value\r\n"; 68 ++$expected; 69 } 70 } 71 72 /** @var array $m legacy PHP 5.3 only */ 73 if (!\preg_match('#^\S+ \S+ HTTP/1\.[01]\r\n#m', $headers) || \substr_count($headers, "\n") !== ($expected + 1) || (\PHP_VERSION_ID >= 50400 ? \preg_match_all(AbstractMessage::REGEX_HEADERS, $headers) : \preg_match_all(AbstractMessage::REGEX_HEADERS, $headers, $m)) !== $expected) { 74 $this->closeError(new \InvalidArgumentException('Unable to send request with invalid request headers')); 75 return; 76 } 77 78 $connectionRef = &$this->connection; 79 $stateRef = &$this->state; 80 $pendingWrites = &$this->pendingWrites; 81 $that = $this; 82 83 $promise = $this->connectionManager->connect($this->request->getUri()); 84 $promise->then( 85 function (ConnectionInterface $connection) use ($headers, &$connectionRef, &$stateRef, &$pendingWrites, $that) { 86 $connectionRef = $connection; 87 assert($connectionRef instanceof ConnectionInterface); 88 89 $connection->on('drain', array($that, 'handleDrain')); 90 $connection->on('data', array($that, 'handleData')); 91 $connection->on('end', array($that, 'handleEnd')); 92 $connection->on('error', array($that, 'handleError')); 93 $connection->on('close', array($that, 'close')); 94 95 $more = $connection->write($headers . "\r\n" . $pendingWrites); 96 97 assert($stateRef === ClientRequestStream::STATE_WRITING_HEAD); 98 $stateRef = ClientRequestStream::STATE_HEAD_WRITTEN; 99 100 // clear pending writes if non-empty 101 if ($pendingWrites !== '') { 102 $pendingWrites = ''; 103 104 if ($more) { 105 $that->emit('drain'); 106 } 107 } 108 }, 109 array($this, 'closeError') 110 ); 111 112 $this->on('close', function() use ($promise) { 113 $promise->cancel(); 114 }); 115 } 116 117 public function write($data) 118 { 119 if (!$this->isWritable()) { 120 return false; 121 } 122 123 // write directly to connection stream if already available 124 if (self::STATE_HEAD_WRITTEN <= $this->state) { 125 return $this->connection->write($data); 126 } 127 128 // otherwise buffer and try to establish connection 129 $this->pendingWrites .= $data; 130 if (self::STATE_WRITING_HEAD > $this->state) { 131 $this->writeHead(); 132 } 133 134 return false; 135 } 136 137 public function end($data = null) 138 { 139 if (!$this->isWritable()) { 140 return; 141 } 142 143 if (null !== $data) { 144 $this->write($data); 145 } else if (self::STATE_WRITING_HEAD > $this->state) { 146 $this->writeHead(); 147 } 148 149 $this->ended = true; 150 } 151 152 /** @internal */ 153 public function handleDrain() 154 { 155 $this->emit('drain'); 156 } 157 158 /** @internal */ 159 public function handleData($data) 160 { 161 $this->buffer .= $data; 162 163 // buffer until double CRLF (or double LF for compatibility with legacy servers) 164 $eom = \strpos($this->buffer, "\r\n\r\n"); 165 $eomLegacy = \strpos($this->buffer, "\n\n"); 166 if ($eom !== false || $eomLegacy !== false) { 167 try { 168 if ($eom !== false && ($eomLegacy === false || $eom < $eomLegacy)) { 169 $response = Response::parseMessage(\substr($this->buffer, 0, $eom + 2)); 170 $bodyChunk = (string) \substr($this->buffer, $eom + 4); 171 } else { 172 $response = Response::parseMessage(\substr($this->buffer, 0, $eomLegacy + 1)); 173 $bodyChunk = (string) \substr($this->buffer, $eomLegacy + 2); 174 } 175 } catch (\InvalidArgumentException $exception) { 176 $this->closeError($exception); 177 return; 178 } 179 180 // response headers successfully received => remove listeners for connection events 181 $connection = $this->connection; 182 assert($connection instanceof ConnectionInterface); 183 $connection->removeListener('drain', array($this, 'handleDrain')); 184 $connection->removeListener('data', array($this, 'handleData')); 185 $connection->removeListener('end', array($this, 'handleEnd')); 186 $connection->removeListener('error', array($this, 'handleError')); 187 $connection->removeListener('close', array($this, 'close')); 188 $this->connection = null; 189 $this->buffer = ''; 190 191 // take control over connection handling and check if we can reuse the connection once response body closes 192 $that = $this; 193 $request = $this->request; 194 $connectionManager = $this->connectionManager; 195 $successfulEndReceived = false; 196 $input = $body = new CloseProtectionStream($connection); 197 $input->on('close', function () use ($connection, $that, $connectionManager, $request, $response, &$successfulEndReceived) { 198 // only reuse connection after successful response and both request and response allow keep alive 199 if ($successfulEndReceived && $connection->isReadable() && $that->hasMessageKeepAliveEnabled($response) && $that->hasMessageKeepAliveEnabled($request)) { 200 $connectionManager->keepAlive($request->getUri(), $connection); 201 } else { 202 $connection->close(); 203 } 204 205 $that->close(); 206 }); 207 208 // determine length of response body 209 $length = null; 210 $code = $response->getStatusCode(); 211 if ($this->request->getMethod() === 'HEAD' || ($code >= 100 && $code < 200) || $code == Response::STATUS_NO_CONTENT || $code == Response::STATUS_NOT_MODIFIED) { 212 $length = 0; 213 } elseif (\strtolower($response->getHeaderLine('Transfer-Encoding')) === 'chunked') { 214 $body = new ChunkedDecoder($body); 215 } elseif ($response->hasHeader('Content-Length')) { 216 $length = (int) $response->getHeaderLine('Content-Length'); 217 } 218 $response = $response->withBody($body = new ReadableBodyStream($body, $length)); 219 $body->on('end', function () use (&$successfulEndReceived) { 220 $successfulEndReceived = true; 221 }); 222 223 // emit response with streaming response body (see `Sender`) 224 $this->emit('response', array($response, $body)); 225 226 // re-emit HTTP response body to trigger body parsing if parts of it are buffered 227 if ($bodyChunk !== '') { 228 $input->handleData($bodyChunk); 229 } elseif ($length === 0) { 230 $input->handleEnd(); 231 } 232 } 233 } 234 235 /** @internal */ 236 public function handleEnd() 237 { 238 $this->closeError(new \RuntimeException( 239 "Connection ended before receiving response" 240 )); 241 } 242 243 /** @internal */ 244 public function handleError(\Exception $error) 245 { 246 $this->closeError(new \RuntimeException( 247 "An error occurred in the underlying stream", 248 0, 249 $error 250 )); 251 } 252 253 /** @internal */ 254 public function closeError(\Exception $error) 255 { 256 if (self::STATE_END <= $this->state) { 257 return; 258 } 259 $this->emit('error', array($error)); 260 $this->close(); 261 } 262 263 public function close() 264 { 265 if (self::STATE_END <= $this->state) { 266 return; 267 } 268 269 $this->state = self::STATE_END; 270 $this->pendingWrites = ''; 271 $this->buffer = ''; 272 273 if ($this->connection instanceof ConnectionInterface) { 274 $this->connection->close(); 275 $this->connection = null; 276 } 277 278 $this->emit('close'); 279 $this->removeAllListeners(); 280 } 281 282 /** 283 * @internal 284 * @return bool 285 * @link https://www.rfc-editor.org/rfc/rfc9112#section-9.3 286 * @link https://www.rfc-editor.org/rfc/rfc7230#section-6.1 287 */ 288 public function hasMessageKeepAliveEnabled(MessageInterface $message) 289 { 290 // @link https://www.rfc-editor.org/rfc/rfc9110#section-7.6.1 291 $connectionOptions = \array_map('trim', \explode(',', \strtolower($message->getHeaderLine('Connection')))); 292 293 if (\in_array('close', $connectionOptions, true)) { 294 return false; 295 } 296 297 if ($message->getProtocolVersion() === '1.1') { 298 return true; 299 } 300 301 if (\in_array('keep-alive', $connectionOptions, true)) { 302 return true; 303 } 304 305 return false; 306 } 307}