friendship ended with social-app. php is my new best friend
at main 7.5 kB view raw
1<?php 2 3namespace React\Http\Middleware; 4 5use Psr\Http\Message\ResponseInterface; 6use Psr\Http\Message\ServerRequestInterface; 7use React\Http\Io\HttpBodyStream; 8use React\Http\Io\PauseBufferStream; 9use React\Promise; 10use React\Promise\PromiseInterface; 11use React\Promise\Deferred; 12use React\Stream\ReadableStreamInterface; 13 14/** 15 * Limits how many next handlers can be executed concurrently. 16 * 17 * If this middleware is invoked, it will check if the number of pending 18 * handlers is below the allowed limit and then simply invoke the next handler 19 * and it will return whatever the next handler returns (or throws). 20 * 21 * If the number of pending handlers exceeds the allowed limit, the request will 22 * be queued (and its streaming body will be paused) and it will return a pending 23 * promise. 24 * Once a pending handler returns (or throws), it will pick the oldest request 25 * from this queue and invokes the next handler (and its streaming body will be 26 * resumed). 27 * 28 * The following example shows how this middleware can be used to ensure no more 29 * than 10 handlers will be invoked at once: 30 * 31 * ```php 32 * $http = new React\Http\HttpServer( 33 * new React\Http\Middleware\StreamingRequestMiddleware(), 34 * new React\Http\Middleware\LimitConcurrentRequestsMiddleware(10), 35 * $handler 36 * ); 37 * ``` 38 * 39 * Similarly, this middleware is often used in combination with the 40 * [`RequestBodyBufferMiddleware`](#requestbodybuffermiddleware) (see below) 41 * to limit the total number of requests that can be buffered at once: 42 * 43 * ```php 44 * $http = new React\Http\HttpServer( 45 * new React\Http\Middleware\StreamingRequestMiddleware(), 46 * new React\Http\Middleware\LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers 47 * new React\Http\Middleware\RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request 48 * new React\Http\Middleware\RequestBodyParserMiddleware(), 49 * $handler 50 * ); 51 * ``` 52 * 53 * More sophisticated examples include limiting the total number of requests 54 * that can be buffered at once and then ensure the actual request handler only 55 * processes one request after another without any concurrency: 56 * 57 * ```php 58 * $http = new React\Http\HttpServer( 59 * new React\Http\Middleware\StreamingRequestMiddleware(), 60 * new React\Http\Middleware\LimitConcurrentRequestsMiddleware(100), // 100 concurrent buffering handlers 61 * new React\Http\Middleware\RequestBodyBufferMiddleware(2 * 1024 * 1024), // 2 MiB per request 62 * new React\Http\Middleware\RequestBodyParserMiddleware(), 63 * new React\Http\Middleware\LimitConcurrentRequestsMiddleware(1), // only execute 1 handler (no concurrency) 64 * $handler 65 * ); 66 * ``` 67 * 68 * @see RequestBodyBufferMiddleware 69 */ 70final class LimitConcurrentRequestsMiddleware 71{ 72 private $limit; 73 private $pending = 0; 74 private $queue = array(); 75 76 /** 77 * @param int $limit Maximum amount of concurrent requests handled. 78 * 79 * For example when $limit is set to 10, 10 requests will flow to $next 80 * while more incoming requests have to wait until one is done. 81 */ 82 public function __construct($limit) 83 { 84 $this->limit = $limit; 85 } 86 87 public function __invoke(ServerRequestInterface $request, $next) 88 { 89 // happy path: simply invoke next request handler if we're below limit 90 if ($this->pending < $this->limit) { 91 ++$this->pending; 92 93 try { 94 $response = $next($request); 95 } catch (\Exception $e) { 96 $this->processQueue(); 97 throw $e; 98 } catch (\Throwable $e) { // @codeCoverageIgnoreStart 99 // handle Errors just like Exceptions (PHP 7+ only) 100 $this->processQueue(); 101 throw $e; // @codeCoverageIgnoreEnd 102 } 103 104 // happy path: if next request handler returned immediately, 105 // we can simply try to invoke the next queued request 106 if ($response instanceof ResponseInterface) { 107 $this->processQueue(); 108 return $response; 109 } 110 111 // if the next handler returns a pending promise, we have to 112 // await its resolution before invoking next queued request 113 return $this->await(Promise\resolve($response)); 114 } 115 116 // if we reach this point, then this request will need to be queued 117 // check if the body is streaming, in which case we need to buffer everything 118 $body = $request->getBody(); 119 if ($body instanceof ReadableStreamInterface) { 120 // pause actual body to stop emitting data until the handler is called 121 $size = $body->getSize(); 122 $body = new PauseBufferStream($body); 123 $body->pauseImplicit(); 124 125 // replace with buffering body to ensure any readable events will be buffered 126 $request = $request->withBody(new HttpBodyStream( 127 $body, 128 $size 129 )); 130 } 131 132 // get next queue position 133 $queue =& $this->queue; 134 $queue[] = null; 135 \end($queue); 136 $id = \key($queue); 137 138 $deferred = new Deferred(function ($_, $reject) use (&$queue, $id) { 139 // queued promise cancelled before its next handler is invoked 140 // remove from queue and reject explicitly 141 unset($queue[$id]); 142 $reject(new \RuntimeException('Cancelled queued next handler')); 143 }); 144 145 // queue request and process queue if pending does not exceed limit 146 $queue[$id] = $deferred; 147 148 $pending = &$this->pending; 149 $that = $this; 150 return $deferred->promise()->then(function () use ($request, $next, $body, &$pending, $that) { 151 // invoke next request handler 152 ++$pending; 153 154 try { 155 $response = $next($request); 156 } catch (\Exception $e) { 157 $that->processQueue(); 158 throw $e; 159 } catch (\Throwable $e) { // @codeCoverageIgnoreStart 160 // handle Errors just like Exceptions (PHP 7+ only) 161 $that->processQueue(); 162 throw $e; // @codeCoverageIgnoreEnd 163 } 164 165 // resume readable stream and replay buffered events 166 if ($body instanceof PauseBufferStream) { 167 $body->resumeImplicit(); 168 } 169 170 // if the next handler returns a pending promise, we have to 171 // await its resolution before invoking next queued request 172 return $that->await(Promise\resolve($response)); 173 }); 174 } 175 176 /** 177 * @internal 178 * @param PromiseInterface $promise 179 * @return PromiseInterface 180 */ 181 public function await(PromiseInterface $promise) 182 { 183 $that = $this; 184 185 return $promise->then(function ($response) use ($that) { 186 $that->processQueue(); 187 188 return $response; 189 }, function ($error) use ($that) { 190 $that->processQueue(); 191 192 return Promise\reject($error); 193 }); 194 } 195 196 /** 197 * @internal 198 */ 199 public function processQueue() 200 { 201 // skip if we're still above concurrency limit or there's no queued request waiting 202 if (--$this->pending >= $this->limit || !$this->queue) { 203 return; 204 } 205 206 $first = \reset($this->queue); 207 unset($this->queue[key($this->queue)]); 208 209 $first->resolve(null); 210 } 211}