friendship ended with social-app. php is my new best friend
at main 12 kB view raw
1<?php 2 3declare(strict_types=1); 4 5namespace Fetch\Concerns; 6 7use Fetch\Interfaces\ClientHandler; 8use InvalidArgumentException; 9use Matrix\Exceptions\TimeoutException; 10use React\Promise\PromiseInterface; 11use RuntimeException; 12use Throwable; 13 14trait ManagesPromises 15{ 16 /** 17 * Whether the request should be asynchronous or not. 18 */ 19 protected bool $isAsync = false; 20 21 /** 22 * Set the request to be asynchronous or not. 23 * 24 * @param bool|null $async Whether to execute the request asynchronously 25 * @return $this 26 */ 27 public function async(?bool $async = true): ClientHandler 28 { 29 $this->isAsync = $async ?? true; 30 31 return $this; 32 } 33 34 /** 35 * Check if the request will be executed asynchronously. 36 * 37 * @return bool Whether the request is asynchronous 38 */ 39 public function isAsync(): bool 40 { 41 return $this->isAsync; 42 } 43 44 /** 45 * Wrap a callable to run asynchronously and return a promise. 46 * 47 * @param callable $callable The callable to execute asynchronously 48 * @return PromiseInterface The promise that will resolve with the result 49 */ 50 public function wrapAsync(callable $callable): PromiseInterface 51 { 52 return async($callable); 53 } 54 55 /** 56 * Wait for a promise to resolve and return its value. 57 * 58 * @param PromiseInterface $promise The promise to wait for 59 * @param float|null $timeout Optional timeout in seconds 60 * @return mixed The resolved value 61 * 62 * @throws Throwable If the promise is rejected or times out 63 */ 64 public function awaitPromise(PromiseInterface $promise, ?float $timeout = null): mixed 65 { 66 try { 67 if ($timeout !== null) { 68 return $this->awaitWithTimeout($promise, $timeout); 69 } 70 71 return await($promise); 72 } catch (Throwable $e) { 73 throw $e; 74 } 75 } 76 77 /** 78 * Execute multiple promises concurrently and wait for all to complete. 79 * 80 * @param array<PromiseInterface> $promises Array of promises 81 * @return PromiseInterface Promise that resolves with array of results 82 */ 83 public function all(array $promises): PromiseInterface 84 { 85 $this->validatePromises($promises); 86 87 return all($promises); 88 } 89 90 /** 91 * Execute multiple promises concurrently and return the first to complete. 92 * 93 * @param array<PromiseInterface> $promises Array of promises 94 * @return PromiseInterface Promise that resolves with the first result 95 */ 96 public function race(array $promises): PromiseInterface 97 { 98 $this->validatePromises($promises); 99 100 return race($promises); 101 } 102 103 /** 104 * Execute multiple promises concurrently and return the first to succeed. 105 * 106 * @param array<PromiseInterface> $promises Array of promises 107 * @return PromiseInterface Promise that resolves with the first successful result 108 */ 109 public function any(array $promises): PromiseInterface 110 { 111 $this->validatePromises($promises); 112 113 return any($promises); 114 } 115 116 /** 117 * Execute multiple promises in sequence. 118 * 119 * @param array<callable(): PromiseInterface> $callables Array of callables that return promises 120 * @return PromiseInterface Promise that resolves with array of results 121 */ 122 public function sequence(array $callables): PromiseInterface 123 { 124 return $this->executeSequence($callables, []); 125 } 126 127 /** 128 * Add a callback to be executed when the promise resolves. 129 * 130 * @param callable $onFulfilled Callback for success 131 * @param callable|null $onRejected Callback for rejection 132 * @return PromiseInterface The promise 133 */ 134 public function then(callable $onFulfilled, ?callable $onRejected = null): PromiseInterface 135 { 136 // Make sure we're in async mode 137 $this->async(); 138 139 // Create a promise from the next request 140 $promise = $this->sendAsync(); 141 142 // Add callbacks 143 return $promise->then($onFulfilled, $onRejected); 144 } 145 146 /** 147 * Add a callback to be executed when the promise is rejected. 148 * 149 * @param callable $onRejected Callback for rejection 150 * @return PromiseInterface The promise 151 */ 152 public function catch(callable $onRejected): PromiseInterface 153 { 154 // Make sure we're in async mode 155 $this->async(); 156 157 // Create a promise from the next request 158 $promise = $this->sendAsync(); 159 160 // Add rejection callback 161 return $promise->otherwise($onRejected); 162 } 163 164 /** 165 * Add a callback to be executed when the promise settles. 166 * 167 * @param callable $onFinally Callback for completion 168 * @return PromiseInterface The promise 169 */ 170 public function finally(callable $onFinally): PromiseInterface 171 { 172 // Make sure we're in async mode 173 $this->async(); 174 175 // Create a promise from the next request 176 $promise = $this->sendAsync(); 177 178 // Add finally callback 179 return $promise->always($onFinally); 180 } 181 182 /** 183 * Create a resolved promise with the given value. 184 * 185 * @param mixed $value The value to resolve with 186 * @return PromiseInterface The resolved promise 187 */ 188 public function resolve(mixed $value): PromiseInterface 189 { 190 return resolve($value); 191 } 192 193 /** 194 * Create a rejected promise with the given reason. 195 * 196 * @param mixed $reason The reason for rejection 197 * @return PromiseInterface The rejected promise 198 */ 199 public function reject(mixed $reason): PromiseInterface 200 { 201 return reject(is_string($reason) ? new RuntimeException($reason) : $reason); 202 } 203 204 /** 205 * Map an array of items through an async callback. 206 * 207 * @param array<mixed> $items Items to process 208 * @param callable $callback Callback that returns a promise 209 * @param int $concurrency Maximum number of concurrent promises 210 * @return PromiseInterface Promise that resolves with array of results 211 */ 212 public function map(array $items, callable $callback, int $concurrency = 5): PromiseInterface 213 { 214 if (empty($items)) { 215 return resolve([]); 216 } 217 218 if ($concurrency <= 0) { 219 throw new InvalidArgumentException('Concurrency must be greater than 0'); 220 } 221 222 // If concurrency is unlimited or greater than the number of items, 223 // we can process all at once 224 if ($concurrency >= count($items)) { 225 $promises = array_map($callback, $items); 226 227 return $this->all($promises); 228 } 229 230 // Process in batches for controlled concurrency 231 return $this->mapBatched($items, $callback, $concurrency); 232 } 233 234 /** 235 * Wait for a promise with a timeout. 236 * 237 * @param PromiseInterface $promise The promise to wait for 238 * @param float $timeout Timeout in seconds 239 * @return mixed The resolved value 240 * 241 * @throws RuntimeException If the promise times out 242 * @throws Throwable If the promise is rejected 243 */ 244 protected function awaitWithTimeout(PromiseInterface $promise, float $timeout): mixed 245 { 246 try { 247 // Use Matrix's built-in timeout function 248 return await(timeout($promise, $timeout)); 249 } catch (TimeoutException $e) { 250 // Convert to RuntimeException for consistency with your API 251 throw new RuntimeException("Promise timed out after {$timeout} seconds", 0, $e); 252 } 253 } 254 255 /** 256 * Create a promise that will reject after a timeout. 257 * 258 * @param float $timeout Timeout in seconds 259 * @return PromiseInterface The timeout promise 260 */ 261 protected function createTimeoutPromise(float $timeout): PromiseInterface 262 { 263 return async(function () use ($timeout) { 264 $timeoutMicro = (int) ($timeout * 1000000); 265 usleep($timeoutMicro); 266 throw new RuntimeException("Promise timed out after {$timeout} seconds"); 267 }); 268 } 269 270 /** 271 * Execute promises in sequence recursively. 272 * 273 * @param array<callable(): PromiseInterface> $callables Array of callables 274 * @param array<mixed> $results Results collected so far 275 * @return PromiseInterface Promise that resolves with array of results 276 */ 277 protected function executeSequence(array $callables, array $results): PromiseInterface 278 { 279 // If no more callables, resolve with the results 280 if (empty($callables)) { 281 return resolve($results); 282 } 283 284 // Take the first callable 285 $callable = array_shift($callables); 286 287 // Execute it and chain the next promises 288 return $callable()->then( 289 function ($result) use ($callables, $results) { 290 $results[] = $result; 291 292 return $this->executeSequence($callables, $results); 293 } 294 ); 295 } 296 297 /** 298 * Validate that all items in the array are promises. 299 * 300 * @param array<mixed> $promises Array to validate 301 * 302 * @throws InvalidArgumentException If any item is not a PromiseInterface 303 */ 304 protected function validatePromises(array $promises): void 305 { 306 foreach ($promises as $index => $promise) { 307 if (! $promise instanceof PromiseInterface) { 308 throw new InvalidArgumentException( 309 sprintf( 310 'Item at index %d is not a promise. Expected %s, got %s', 311 $index, 312 PromiseInterface::class, 313 get_debug_type($promise) 314 ) 315 ); 316 } 317 } 318 } 319 320 /** 321 * Process items in batches with controlled concurrency. 322 * 323 * @param array<mixed> $items Items to process 324 * @param callable $callback Callback that returns a promise 325 * @param int $concurrency Maximum number of concurrent promises 326 * @return PromiseInterface Promise that resolves with array of results 327 */ 328 protected function mapBatched(array $items, callable $callback, int $concurrency): PromiseInterface 329 { 330 $results = []; 331 $pendingPromises = []; 332 $itemKeys = array_keys($items); 333 $i = 0; 334 $totalItems = count($items); 335 336 // Initial function to start the first batch 337 $startBatch = function () use (&$pendingPromises, &$i, $totalItems, $itemKeys, $items, $callback, &$results, &$startBatch, $concurrency) { 338 // Fill up to concurrency 339 while (count($pendingPromises) < $concurrency && $i < $totalItems) { 340 $key = $itemKeys[$i]; 341 $item = $items[$key]; 342 $promise = $callback($item, $key); 343 344 if (! ($promise instanceof PromiseInterface)) { 345 throw new RuntimeException('Callback must return a Promise'); 346 } 347 348 // Add this promise to the pending queue with a handler to process the next item 349 $pendingPromises[$key] = $promise->then( 350 function ($result) use ($key, &$results, &$pendingPromises, &$startBatch) { 351 $results[$key] = $result; 352 unset($pendingPromises[$key]); 353 $startBatch(); // Process the next item 354 355 return $result; 356 }, 357 function ($reason) use ($key, &$pendingPromises) { 358 unset($pendingPromises[$key]); 359 360 return reject($reason); // Propagate the rejection 361 } 362 ); 363 364 $i++; 365 } 366 367 // If we've processed all items and have no more pending promises, resolve 368 if ($i >= $totalItems && empty($pendingPromises)) { 369 return resolve($results); 370 } 371 372 // Return a promise that resolves when all pending promises are done 373 if (! empty($pendingPromises)) { 374 return all($pendingPromises)->then(function () use (&$results) { 375 return $results; 376 }); 377 } 378 379 return resolve($results); 380 }; 381 382 // Start the process 383 return $startBatch(); 384 } 385}