friendship ended with social-app. php is my new best friend
1<?php 2 3namespace React\Http\Io; 4 5use Psr\Http\Message\RequestInterface; 6use Psr\Http\Message\ResponseInterface; 7use Psr\Http\Message\UriInterface; 8use React\EventLoop\LoopInterface; 9use React\Http\Message\Response; 10use React\Http\Message\ResponseException; 11use React\Http\Message\Uri; 12use React\Promise\Deferred; 13use React\Promise\Promise; 14use React\Promise\PromiseInterface; 15use React\Stream\ReadableStreamInterface; 16 17/** 18 * @internal 19 */ 20class Transaction 21{ 22 private $sender; 23 private $loop; 24 25 // context: http.timeout (ini_get('default_socket_timeout'): 60) 26 private $timeout; 27 28 // context: http.follow_location (true) 29 private $followRedirects = true; 30 31 // context: http.max_redirects (10) 32 private $maxRedirects = 10; 33 34 // context: http.ignore_errors (false) 35 private $obeySuccessCode = true; 36 37 private $streaming = false; 38 39 private $maximumSize = 16777216; // 16 MiB = 2^24 bytes 40 41 public function __construct(Sender $sender, LoopInterface $loop) 42 { 43 $this->sender = $sender; 44 $this->loop = $loop; 45 } 46 47 /** 48 * @param array $options 49 * @return self returns new instance, without modifying existing instance 50 */ 51 public function withOptions(array $options) 52 { 53 $transaction = clone $this; 54 foreach ($options as $name => $value) { 55 if (property_exists($transaction, $name)) { 56 // restore default value if null is given 57 if ($value === null) { 58 $default = new self($this->sender, $this->loop); 59 $value = $default->$name; 60 } 61 62 $transaction->$name = $value; 63 } 64 } 65 66 return $transaction; 67 } 68 69 public function send(RequestInterface $request) 70 { 71 $state = new ClientRequestState(); 72 $deferred = new Deferred(function () use ($state) { 73 if ($state->pending !== null) { 74 $state->pending->cancel(); 75 $state->pending = null; 76 } 77 }); 78 79 // use timeout from options or default to PHP's default_socket_timeout (60) 80 $timeout = (float)($this->timeout !== null ? $this->timeout : ini_get("default_socket_timeout")); 81 82 $loop = $this->loop; 83 $this->next($request, $deferred, $state)->then( 84 function (ResponseInterface $response) use ($state, $deferred, $loop, &$timeout) { 85 if ($state->timeout !== null) { 86 $loop->cancelTimer($state->timeout); 87 $state->timeout = null; 88 } 89 $timeout = -1; 90 $deferred->resolve($response); 91 }, 92 function ($e) use ($state, $deferred, $loop, &$timeout) { 93 if ($state->timeout !== null) { 94 $loop->cancelTimer($state->timeout); 95 $state->timeout = null; 96 } 97 $timeout = -1; 98 $deferred->reject($e); 99 } 100 ); 101 102 if ($timeout < 0) { 103 return $deferred->promise(); 104 } 105 106 $body = $request->getBody(); 107 if ($body instanceof ReadableStreamInterface && $body->isReadable()) { 108 $that = $this; 109 $body->on('close', function () use ($that, $deferred, $state, &$timeout) { 110 if ($timeout >= 0) { 111 $that->applyTimeout($deferred, $state, $timeout); 112 } 113 }); 114 } else { 115 $this->applyTimeout($deferred, $state, $timeout); 116 } 117 118 return $deferred->promise(); 119 } 120 121 /** 122 * @internal 123 * @param number $timeout 124 * @return void 125 */ 126 public function applyTimeout(Deferred $deferred, ClientRequestState $state, $timeout) 127 { 128 $state->timeout = $this->loop->addTimer($timeout, function () use ($timeout, $deferred, $state) { 129 $deferred->reject(new \RuntimeException( 130 'Request timed out after ' . $timeout . ' seconds' 131 )); 132 if ($state->pending !== null) { 133 $state->pending->cancel(); 134 $state->pending = null; 135 } 136 }); 137 } 138 139 private function next(RequestInterface $request, Deferred $deferred, ClientRequestState $state) 140 { 141 $this->progress('request', array($request)); 142 143 $that = $this; 144 ++$state->numRequests; 145 146 $promise = $this->sender->send($request); 147 148 if (!$this->streaming) { 149 $promise = $promise->then(function ($response) use ($deferred, $state, $that) { 150 return $that->bufferResponse($response, $deferred, $state); 151 }); 152 } 153 154 $state->pending = $promise; 155 156 return $promise->then( 157 function (ResponseInterface $response) use ($request, $that, $deferred, $state) { 158 return $that->onResponse($response, $request, $deferred, $state); 159 } 160 ); 161 } 162 163 /** 164 * @internal 165 * @return PromiseInterface Promise<ResponseInterface, Exception> 166 */ 167 public function bufferResponse(ResponseInterface $response, Deferred $deferred, ClientRequestState $state) 168 { 169 $body = $response->getBody(); 170 $size = $body->getSize(); 171 172 if ($size !== null && $size > $this->maximumSize) { 173 $body->close(); 174 return \React\Promise\reject(new \OverflowException( 175 'Response body size of ' . $size . ' bytes exceeds maximum of ' . $this->maximumSize . ' bytes', 176 \defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90 177 )); 178 } 179 180 // body is not streaming => already buffered 181 if (!$body instanceof ReadableStreamInterface) { 182 return \React\Promise\resolve($response); 183 } 184 185 /** @var ?\Closure $closer */ 186 $closer = null; 187 $maximumSize = $this->maximumSize; 188 189 return $state->pending = new Promise(function ($resolve, $reject) use ($body, $maximumSize, $response, &$closer) { 190 // resolve with current buffer when stream closes successfully 191 $buffer = ''; 192 $body->on('close', $closer = function () use (&$buffer, $response, $maximumSize, $resolve, $reject) { 193 $resolve($response->withBody(new BufferedBody($buffer))); 194 }); 195 196 // buffer response body data in memory 197 $body->on('data', function ($data) use (&$buffer, $maximumSize, $body, $closer, $reject) { 198 $buffer .= $data; 199 200 // close stream and reject promise if limit is exceeded 201 if (isset($buffer[$maximumSize])) { 202 $buffer = ''; 203 assert($closer instanceof \Closure); 204 $body->removeListener('close', $closer); 205 $body->close(); 206 207 $reject(new \OverflowException( 208 'Response body size exceeds maximum of ' . $maximumSize . ' bytes', 209 \defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90 210 )); 211 } 212 }); 213 214 // reject buffering if body emits error 215 $body->on('error', function (\Exception $e) use ($reject) { 216 $reject(new \RuntimeException( 217 'Error while buffering response body: ' . $e->getMessage(), 218 $e->getCode(), 219 $e 220 )); 221 }); 222 }, function () use ($body, &$closer) { 223 // cancelled buffering: remove close handler to avoid resolving, then close and reject 224 assert($closer instanceof \Closure); 225 $body->removeListener('close', $closer); 226 $body->close(); 227 228 throw new \RuntimeException('Cancelled buffering response body'); 229 }); 230 } 231 232 /** 233 * @internal 234 * @throws ResponseException 235 * @return ResponseInterface|PromiseInterface 236 */ 237 public function onResponse(ResponseInterface $response, RequestInterface $request, Deferred $deferred, ClientRequestState $state) 238 { 239 $this->progress('response', array($response, $request)); 240 241 // follow 3xx (Redirection) response status codes if Location header is present and not explicitly disabled 242 // @link https://tools.ietf.org/html/rfc7231#section-6.4 243 if ($this->followRedirects && ($response->getStatusCode() >= 300 && $response->getStatusCode() < 400) && $response->hasHeader('Location')) { 244 return $this->onResponseRedirect($response, $request, $deferred, $state); 245 } 246 247 // only status codes 200-399 are considered to be valid, reject otherwise 248 if ($this->obeySuccessCode && ($response->getStatusCode() < 200 || $response->getStatusCode() >= 400)) { 249 throw new ResponseException($response); 250 } 251 252 // resolve our initial promise 253 return $response; 254 } 255 256 /** 257 * @param ResponseInterface $response 258 * @param RequestInterface $request 259 * @param Deferred $deferred 260 * @param ClientRequestState $state 261 * @return PromiseInterface 262 * @throws \RuntimeException 263 */ 264 private function onResponseRedirect(ResponseInterface $response, RequestInterface $request, Deferred $deferred, ClientRequestState $state) 265 { 266 // resolve location relative to last request URI 267 $location = Uri::resolve($request->getUri(), new Uri($response->getHeaderLine('Location'))); 268 269 $request = $this->makeRedirectRequest($request, $location, $response->getStatusCode()); 270 $this->progress('redirect', array($request)); 271 272 if ($state->numRequests >= $this->maxRedirects) { 273 throw new \RuntimeException('Maximum number of redirects (' . $this->maxRedirects . ') exceeded'); 274 } 275 276 return $this->next($request, $deferred, $state); 277 } 278 279 /** 280 * @param RequestInterface $request 281 * @param UriInterface $location 282 * @param int $statusCode 283 * @return RequestInterface 284 * @throws \RuntimeException 285 */ 286 private function makeRedirectRequest(RequestInterface $request, UriInterface $location, $statusCode) 287 { 288 // Remove authorization if changing hostnames (but not if just changing ports or protocols). 289 $originalHost = $request->getUri()->getHost(); 290 if ($location->getHost() !== $originalHost) { 291 $request = $request->withoutHeader('Authorization'); 292 } 293 294 $request = $request->withoutHeader('Host')->withUri($location); 295 296 if ($statusCode === Response::STATUS_TEMPORARY_REDIRECT || $statusCode === Response::STATUS_PERMANENT_REDIRECT) { 297 if ($request->getBody() instanceof ReadableStreamInterface) { 298 throw new \RuntimeException('Unable to redirect request with streaming body'); 299 } 300 } else { 301 $request = $request 302 ->withMethod($request->getMethod() === 'HEAD' ? 'HEAD' : 'GET') 303 ->withoutHeader('Content-Type') 304 ->withoutHeader('Content-Length') 305 ->withBody(new BufferedBody('')); 306 } 307 308 return $request; 309 } 310 311 private function progress($name, array $args = array()) 312 { 313 return; 314 315 echo $name; 316 317 foreach ($args as $arg) { 318 echo ' '; 319 if ($arg instanceof ResponseInterface) { 320 echo 'HTTP/' . $arg->getProtocolVersion() . ' ' . $arg->getStatusCode() . ' ' . $arg->getReasonPhrase(); 321 } elseif ($arg instanceof RequestInterface) { 322 echo $arg->getMethod() . ' ' . $arg->getRequestTarget() . ' HTTP/' . $arg->getProtocolVersion(); 323 } else { 324 echo $arg; 325 } 326 } 327 328 echo PHP_EOL; 329 } 330}