friendship ended with social-app. php is my new best friend
1<?php
2
3namespace React\Http\Io;
4
5use Psr\Http\Message\RequestInterface;
6use Psr\Http\Message\ResponseInterface;
7use Psr\Http\Message\UriInterface;
8use React\EventLoop\LoopInterface;
9use React\Http\Message\Response;
10use React\Http\Message\ResponseException;
11use React\Http\Message\Uri;
12use React\Promise\Deferred;
13use React\Promise\Promise;
14use React\Promise\PromiseInterface;
15use React\Stream\ReadableStreamInterface;
16
17/**
18 * @internal
19 */
20class Transaction
21{
22 private $sender;
23 private $loop;
24
25 // context: http.timeout (ini_get('default_socket_timeout'): 60)
26 private $timeout;
27
28 // context: http.follow_location (true)
29 private $followRedirects = true;
30
31 // context: http.max_redirects (10)
32 private $maxRedirects = 10;
33
34 // context: http.ignore_errors (false)
35 private $obeySuccessCode = true;
36
37 private $streaming = false;
38
39 private $maximumSize = 16777216; // 16 MiB = 2^24 bytes
40
41 public function __construct(Sender $sender, LoopInterface $loop)
42 {
43 $this->sender = $sender;
44 $this->loop = $loop;
45 }
46
47 /**
48 * @param array $options
49 * @return self returns new instance, without modifying existing instance
50 */
51 public function withOptions(array $options)
52 {
53 $transaction = clone $this;
54 foreach ($options as $name => $value) {
55 if (property_exists($transaction, $name)) {
56 // restore default value if null is given
57 if ($value === null) {
58 $default = new self($this->sender, $this->loop);
59 $value = $default->$name;
60 }
61
62 $transaction->$name = $value;
63 }
64 }
65
66 return $transaction;
67 }
68
69 public function send(RequestInterface $request)
70 {
71 $state = new ClientRequestState();
72 $deferred = new Deferred(function () use ($state) {
73 if ($state->pending !== null) {
74 $state->pending->cancel();
75 $state->pending = null;
76 }
77 });
78
79 // use timeout from options or default to PHP's default_socket_timeout (60)
80 $timeout = (float)($this->timeout !== null ? $this->timeout : ini_get("default_socket_timeout"));
81
82 $loop = $this->loop;
83 $this->next($request, $deferred, $state)->then(
84 function (ResponseInterface $response) use ($state, $deferred, $loop, &$timeout) {
85 if ($state->timeout !== null) {
86 $loop->cancelTimer($state->timeout);
87 $state->timeout = null;
88 }
89 $timeout = -1;
90 $deferred->resolve($response);
91 },
92 function ($e) use ($state, $deferred, $loop, &$timeout) {
93 if ($state->timeout !== null) {
94 $loop->cancelTimer($state->timeout);
95 $state->timeout = null;
96 }
97 $timeout = -1;
98 $deferred->reject($e);
99 }
100 );
101
102 if ($timeout < 0) {
103 return $deferred->promise();
104 }
105
106 $body = $request->getBody();
107 if ($body instanceof ReadableStreamInterface && $body->isReadable()) {
108 $that = $this;
109 $body->on('close', function () use ($that, $deferred, $state, &$timeout) {
110 if ($timeout >= 0) {
111 $that->applyTimeout($deferred, $state, $timeout);
112 }
113 });
114 } else {
115 $this->applyTimeout($deferred, $state, $timeout);
116 }
117
118 return $deferred->promise();
119 }
120
121 /**
122 * @internal
123 * @param number $timeout
124 * @return void
125 */
126 public function applyTimeout(Deferred $deferred, ClientRequestState $state, $timeout)
127 {
128 $state->timeout = $this->loop->addTimer($timeout, function () use ($timeout, $deferred, $state) {
129 $deferred->reject(new \RuntimeException(
130 'Request timed out after ' . $timeout . ' seconds'
131 ));
132 if ($state->pending !== null) {
133 $state->pending->cancel();
134 $state->pending = null;
135 }
136 });
137 }
138
139 private function next(RequestInterface $request, Deferred $deferred, ClientRequestState $state)
140 {
141 $this->progress('request', array($request));
142
143 $that = $this;
144 ++$state->numRequests;
145
146 $promise = $this->sender->send($request);
147
148 if (!$this->streaming) {
149 $promise = $promise->then(function ($response) use ($deferred, $state, $that) {
150 return $that->bufferResponse($response, $deferred, $state);
151 });
152 }
153
154 $state->pending = $promise;
155
156 return $promise->then(
157 function (ResponseInterface $response) use ($request, $that, $deferred, $state) {
158 return $that->onResponse($response, $request, $deferred, $state);
159 }
160 );
161 }
162
163 /**
164 * @internal
165 * @return PromiseInterface Promise<ResponseInterface, Exception>
166 */
167 public function bufferResponse(ResponseInterface $response, Deferred $deferred, ClientRequestState $state)
168 {
169 $body = $response->getBody();
170 $size = $body->getSize();
171
172 if ($size !== null && $size > $this->maximumSize) {
173 $body->close();
174 return \React\Promise\reject(new \OverflowException(
175 'Response body size of ' . $size . ' bytes exceeds maximum of ' . $this->maximumSize . ' bytes',
176 \defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90
177 ));
178 }
179
180 // body is not streaming => already buffered
181 if (!$body instanceof ReadableStreamInterface) {
182 return \React\Promise\resolve($response);
183 }
184
185 /** @var ?\Closure $closer */
186 $closer = null;
187 $maximumSize = $this->maximumSize;
188
189 return $state->pending = new Promise(function ($resolve, $reject) use ($body, $maximumSize, $response, &$closer) {
190 // resolve with current buffer when stream closes successfully
191 $buffer = '';
192 $body->on('close', $closer = function () use (&$buffer, $response, $maximumSize, $resolve, $reject) {
193 $resolve($response->withBody(new BufferedBody($buffer)));
194 });
195
196 // buffer response body data in memory
197 $body->on('data', function ($data) use (&$buffer, $maximumSize, $body, $closer, $reject) {
198 $buffer .= $data;
199
200 // close stream and reject promise if limit is exceeded
201 if (isset($buffer[$maximumSize])) {
202 $buffer = '';
203 assert($closer instanceof \Closure);
204 $body->removeListener('close', $closer);
205 $body->close();
206
207 $reject(new \OverflowException(
208 'Response body size exceeds maximum of ' . $maximumSize . ' bytes',
209 \defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90
210 ));
211 }
212 });
213
214 // reject buffering if body emits error
215 $body->on('error', function (\Exception $e) use ($reject) {
216 $reject(new \RuntimeException(
217 'Error while buffering response body: ' . $e->getMessage(),
218 $e->getCode(),
219 $e
220 ));
221 });
222 }, function () use ($body, &$closer) {
223 // cancelled buffering: remove close handler to avoid resolving, then close and reject
224 assert($closer instanceof \Closure);
225 $body->removeListener('close', $closer);
226 $body->close();
227
228 throw new \RuntimeException('Cancelled buffering response body');
229 });
230 }
231
232 /**
233 * @internal
234 * @throws ResponseException
235 * @return ResponseInterface|PromiseInterface
236 */
237 public function onResponse(ResponseInterface $response, RequestInterface $request, Deferred $deferred, ClientRequestState $state)
238 {
239 $this->progress('response', array($response, $request));
240
241 // follow 3xx (Redirection) response status codes if Location header is present and not explicitly disabled
242 // @link https://tools.ietf.org/html/rfc7231#section-6.4
243 if ($this->followRedirects && ($response->getStatusCode() >= 300 && $response->getStatusCode() < 400) && $response->hasHeader('Location')) {
244 return $this->onResponseRedirect($response, $request, $deferred, $state);
245 }
246
247 // only status codes 200-399 are considered to be valid, reject otherwise
248 if ($this->obeySuccessCode && ($response->getStatusCode() < 200 || $response->getStatusCode() >= 400)) {
249 throw new ResponseException($response);
250 }
251
252 // resolve our initial promise
253 return $response;
254 }
255
256 /**
257 * @param ResponseInterface $response
258 * @param RequestInterface $request
259 * @param Deferred $deferred
260 * @param ClientRequestState $state
261 * @return PromiseInterface
262 * @throws \RuntimeException
263 */
264 private function onResponseRedirect(ResponseInterface $response, RequestInterface $request, Deferred $deferred, ClientRequestState $state)
265 {
266 // resolve location relative to last request URI
267 $location = Uri::resolve($request->getUri(), new Uri($response->getHeaderLine('Location')));
268
269 $request = $this->makeRedirectRequest($request, $location, $response->getStatusCode());
270 $this->progress('redirect', array($request));
271
272 if ($state->numRequests >= $this->maxRedirects) {
273 throw new \RuntimeException('Maximum number of redirects (' . $this->maxRedirects . ') exceeded');
274 }
275
276 return $this->next($request, $deferred, $state);
277 }
278
279 /**
280 * @param RequestInterface $request
281 * @param UriInterface $location
282 * @param int $statusCode
283 * @return RequestInterface
284 * @throws \RuntimeException
285 */
286 private function makeRedirectRequest(RequestInterface $request, UriInterface $location, $statusCode)
287 {
288 // Remove authorization if changing hostnames (but not if just changing ports or protocols).
289 $originalHost = $request->getUri()->getHost();
290 if ($location->getHost() !== $originalHost) {
291 $request = $request->withoutHeader('Authorization');
292 }
293
294 $request = $request->withoutHeader('Host')->withUri($location);
295
296 if ($statusCode === Response::STATUS_TEMPORARY_REDIRECT || $statusCode === Response::STATUS_PERMANENT_REDIRECT) {
297 if ($request->getBody() instanceof ReadableStreamInterface) {
298 throw new \RuntimeException('Unable to redirect request with streaming body');
299 }
300 } else {
301 $request = $request
302 ->withMethod($request->getMethod() === 'HEAD' ? 'HEAD' : 'GET')
303 ->withoutHeader('Content-Type')
304 ->withoutHeader('Content-Length')
305 ->withBody(new BufferedBody(''));
306 }
307
308 return $request;
309 }
310
311 private function progress($name, array $args = array())
312 {
313 return;
314
315 echo $name;
316
317 foreach ($args as $arg) {
318 echo ' ';
319 if ($arg instanceof ResponseInterface) {
320 echo 'HTTP/' . $arg->getProtocolVersion() . ' ' . $arg->getStatusCode() . ' ' . $arg->getReasonPhrase();
321 } elseif ($arg instanceof RequestInterface) {
322 echo $arg->getMethod() . ' ' . $arg->getRequestTarget() . ' HTTP/' . $arg->getProtocolVersion();
323 } else {
324 echo $arg;
325 }
326 }
327
328 echo PHP_EOL;
329 }
330}