friendship ended with social-app. php is my new best friend
1<?php
2
3namespace React\Http\Middleware;
4
5use Psr\Http\Message\ResponseInterface;
6use Psr\Http\Message\ServerRequestInterface;
7use React\Http\Io\HttpBodyStream;
8use React\Http\Io\PauseBufferStream;
9use React\Promise;
10use React\Promise\PromiseInterface;
11use React\Promise\Deferred;
12use React\Stream\ReadableStreamInterface;
13
14/**
15 * Limits how many next handlers can be executed concurrently.
16 *
17 * If this middleware is invoked, it will check if the number of pending
18 * handlers is below the allowed limit and then simply invoke the next handler
19 * and it will return whatever the next handler returns (or throws).
20 *
21 * If the number of pending handlers exceeds the allowed limit, the request will
22 * be queued (and its streaming body will be paused) and it will return a pending
23 * promise.
24 * Once a pending handler returns (or throws), it will pick the oldest request
25 * from this queue and invokes the next handler (and its streaming body will be
26 * resumed).
27 *
28 * The following example shows how this middleware can be used to ensure no more
29 * than 10 handlers will be invoked at once:
30 *
31 * ```php
32 * $http = new React\Http\HttpServer(
33 * new React\Http\Middleware\StreamingRequestMiddleware(),
34 * new React\Http\Middleware\LimitConcurrentRequestsMiddleware(10),
35 * $handler
36 * );
37 * ```
38 *
39 * Similarly, this middleware is often used in combination with the
40 * [`RequestBodyBufferMiddleware`](#requestbodybuffermiddleware) (see below)
41 * to limit the total number of requests that can be buffered at once:
42 *
43 * ```php
44 * $http = new React\Http\HttpServer(
45 * new React\Http\Middleware\StreamingRequestMiddleware(),
46 * new React\Http\Middleware\LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
47 * new React\Http\Middleware\RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
48 * new React\Http\Middleware\RequestBodyParserMiddleware(),
49 * $handler
50 * );
51 * ```
52 *
53 * More sophisticated examples include limiting the total number of requests
54 * that can be buffered at once and then ensure the actual request handler only
55 * processes one request after another without any concurrency:
56 *
57 * ```php
58 * $http = new React\Http\HttpServer(
59 * new React\Http\Middleware\StreamingRequestMiddleware(),
60 * new React\Http\Middleware\LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers
61 * new React\Http\Middleware\RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request
62 * new React\Http\Middleware\RequestBodyParserMiddleware(),
63 * new React\Http\Middleware\LimitConcurrentRequestsMiddleware(1), // only execute 1 handler (no concurrency)
64 * $handler
65 * );
66 * ```
67 *
68 * @see RequestBodyBufferMiddleware
69 */
70final class LimitConcurrentRequestsMiddleware
71{
72 private $limit;
73 private $pending = 0;
74 private $queue = array();
75
76 /**
77 * @param int $limit Maximum amount of concurrent requests handled.
78 *
79 * For example when $limit is set to 10, 10 requests will flow to $next
80 * while more incoming requests have to wait until one is done.
81 */
82 public function __construct($limit)
83 {
84 $this->limit = $limit;
85 }
86
87 public function __invoke(ServerRequestInterface $request, $next)
88 {
89 // happy path: simply invoke next request handler if we're below limit
90 if ($this->pending < $this->limit) {
91 ++$this->pending;
92
93 try {
94 $response = $next($request);
95 } catch (\Exception $e) {
96 $this->processQueue();
97 throw $e;
98 } catch (\Throwable $e) { // @codeCoverageIgnoreStart
99 // handle Errors just like Exceptions (PHP 7+ only)
100 $this->processQueue();
101 throw $e; // @codeCoverageIgnoreEnd
102 }
103
104 // happy path: if next request handler returned immediately,
105 // we can simply try to invoke the next queued request
106 if ($response instanceof ResponseInterface) {
107 $this->processQueue();
108 return $response;
109 }
110
111 // if the next handler returns a pending promise, we have to
112 // await its resolution before invoking next queued request
113 return $this->await(Promise\resolve($response));
114 }
115
116 // if we reach this point, then this request will need to be queued
117 // check if the body is streaming, in which case we need to buffer everything
118 $body = $request->getBody();
119 if ($body instanceof ReadableStreamInterface) {
120 // pause actual body to stop emitting data until the handler is called
121 $size = $body->getSize();
122 $body = new PauseBufferStream($body);
123 $body->pauseImplicit();
124
125 // replace with buffering body to ensure any readable events will be buffered
126 $request = $request->withBody(new HttpBodyStream(
127 $body,
128 $size
129 ));
130 }
131
132 // get next queue position
133 $queue =& $this->queue;
134 $queue[] = null;
135 \end($queue);
136 $id = \key($queue);
137
138 $deferred = new Deferred(function ($_, $reject) use (&$queue, $id) {
139 // queued promise cancelled before its next handler is invoked
140 // remove from queue and reject explicitly
141 unset($queue[$id]);
142 $reject(new \RuntimeException('Cancelled queued next handler'));
143 });
144
145 // queue request and process queue if pending does not exceed limit
146 $queue[$id] = $deferred;
147
148 $pending = &$this->pending;
149 $that = $this;
150 return $deferred->promise()->then(function () use ($request, $next, $body, &$pending, $that) {
151 // invoke next request handler
152 ++$pending;
153
154 try {
155 $response = $next($request);
156 } catch (\Exception $e) {
157 $that->processQueue();
158 throw $e;
159 } catch (\Throwable $e) { // @codeCoverageIgnoreStart
160 // handle Errors just like Exceptions (PHP 7+ only)
161 $that->processQueue();
162 throw $e; // @codeCoverageIgnoreEnd
163 }
164
165 // resume readable stream and replay buffered events
166 if ($body instanceof PauseBufferStream) {
167 $body->resumeImplicit();
168 }
169
170 // if the next handler returns a pending promise, we have to
171 // await its resolution before invoking next queued request
172 return $that->await(Promise\resolve($response));
173 });
174 }
175
176 /**
177 * @internal
178 * @param PromiseInterface $promise
179 * @return PromiseInterface
180 */
181 public function await(PromiseInterface $promise)
182 {
183 $that = $this;
184
185 return $promise->then(function ($response) use ($that) {
186 $that->processQueue();
187
188 return $response;
189 }, function ($error) use ($that) {
190 $that->processQueue();
191
192 return Promise\reject($error);
193 });
194 }
195
196 /**
197 * @internal
198 */
199 public function processQueue()
200 {
201 // skip if we're still above concurrency limit or there's no queued request waiting
202 if (--$this->pending >= $this->limit || !$this->queue) {
203 return;
204 }
205
206 $first = \reset($this->queue);
207 unset($this->queue[key($this->queue)]);
208
209 $first->resolve(null);
210 }
211}