friendship ended with social-app. php is my new best friend
1<?php
2
3declare(strict_types=1);
4
5namespace Fetch\Concerns;
6
7use Fetch\Interfaces\ClientHandler;
8use InvalidArgumentException;
9use Matrix\Exceptions\TimeoutException;
10use React\Promise\PromiseInterface;
11use RuntimeException;
12use Throwable;
13
14trait ManagesPromises
15{
16 /**
17 * Whether the request should be asynchronous or not.
18 */
19 protected bool $isAsync = false;
20
21 /**
22 * Set the request to be asynchronous or not.
23 *
24 * @param bool|null $async Whether to execute the request asynchronously
25 * @return $this
26 */
27 public function async(?bool $async = true): ClientHandler
28 {
29 $this->isAsync = $async ?? true;
30
31 return $this;
32 }
33
34 /**
35 * Check if the request will be executed asynchronously.
36 *
37 * @return bool Whether the request is asynchronous
38 */
39 public function isAsync(): bool
40 {
41 return $this->isAsync;
42 }
43
44 /**
45 * Wrap a callable to run asynchronously and return a promise.
46 *
47 * @param callable $callable The callable to execute asynchronously
48 * @return PromiseInterface The promise that will resolve with the result
49 */
50 public function wrapAsync(callable $callable): PromiseInterface
51 {
52 return async($callable);
53 }
54
55 /**
56 * Wait for a promise to resolve and return its value.
57 *
58 * @param PromiseInterface $promise The promise to wait for
59 * @param float|null $timeout Optional timeout in seconds
60 * @return mixed The resolved value
61 *
62 * @throws Throwable If the promise is rejected or times out
63 */
64 public function awaitPromise(PromiseInterface $promise, ?float $timeout = null): mixed
65 {
66 try {
67 if ($timeout !== null) {
68 return $this->awaitWithTimeout($promise, $timeout);
69 }
70
71 return await($promise);
72 } catch (Throwable $e) {
73 throw $e;
74 }
75 }
76
77 /**
78 * Execute multiple promises concurrently and wait for all to complete.
79 *
80 * @param array<PromiseInterface> $promises Array of promises
81 * @return PromiseInterface Promise that resolves with array of results
82 */
83 public function all(array $promises): PromiseInterface
84 {
85 $this->validatePromises($promises);
86
87 return all($promises);
88 }
89
90 /**
91 * Execute multiple promises concurrently and return the first to complete.
92 *
93 * @param array<PromiseInterface> $promises Array of promises
94 * @return PromiseInterface Promise that resolves with the first result
95 */
96 public function race(array $promises): PromiseInterface
97 {
98 $this->validatePromises($promises);
99
100 return race($promises);
101 }
102
103 /**
104 * Execute multiple promises concurrently and return the first to succeed.
105 *
106 * @param array<PromiseInterface> $promises Array of promises
107 * @return PromiseInterface Promise that resolves with the first successful result
108 */
109 public function any(array $promises): PromiseInterface
110 {
111 $this->validatePromises($promises);
112
113 return any($promises);
114 }
115
116 /**
117 * Execute multiple promises in sequence.
118 *
119 * @param array<callable(): PromiseInterface> $callables Array of callables that return promises
120 * @return PromiseInterface Promise that resolves with array of results
121 */
122 public function sequence(array $callables): PromiseInterface
123 {
124 return $this->executeSequence($callables, []);
125 }
126
127 /**
128 * Add a callback to be executed when the promise resolves.
129 *
130 * @param callable $onFulfilled Callback for success
131 * @param callable|null $onRejected Callback for rejection
132 * @return PromiseInterface The promise
133 */
134 public function then(callable $onFulfilled, ?callable $onRejected = null): PromiseInterface
135 {
136 // Make sure we're in async mode
137 $this->async();
138
139 // Create a promise from the next request
140 $promise = $this->sendAsync();
141
142 // Add callbacks
143 return $promise->then($onFulfilled, $onRejected);
144 }
145
146 /**
147 * Add a callback to be executed when the promise is rejected.
148 *
149 * @param callable $onRejected Callback for rejection
150 * @return PromiseInterface The promise
151 */
152 public function catch(callable $onRejected): PromiseInterface
153 {
154 // Make sure we're in async mode
155 $this->async();
156
157 // Create a promise from the next request
158 $promise = $this->sendAsync();
159
160 // Add rejection callback
161 return $promise->otherwise($onRejected);
162 }
163
164 /**
165 * Add a callback to be executed when the promise settles.
166 *
167 * @param callable $onFinally Callback for completion
168 * @return PromiseInterface The promise
169 */
170 public function finally(callable $onFinally): PromiseInterface
171 {
172 // Make sure we're in async mode
173 $this->async();
174
175 // Create a promise from the next request
176 $promise = $this->sendAsync();
177
178 // Add finally callback
179 return $promise->always($onFinally);
180 }
181
182 /**
183 * Create a resolved promise with the given value.
184 *
185 * @param mixed $value The value to resolve with
186 * @return PromiseInterface The resolved promise
187 */
188 public function resolve(mixed $value): PromiseInterface
189 {
190 return resolve($value);
191 }
192
193 /**
194 * Create a rejected promise with the given reason.
195 *
196 * @param mixed $reason The reason for rejection
197 * @return PromiseInterface The rejected promise
198 */
199 public function reject(mixed $reason): PromiseInterface
200 {
201 return reject(is_string($reason) ? new RuntimeException($reason) : $reason);
202 }
203
204 /**
205 * Map an array of items through an async callback.
206 *
207 * @param array<mixed> $items Items to process
208 * @param callable $callback Callback that returns a promise
209 * @param int $concurrency Maximum number of concurrent promises
210 * @return PromiseInterface Promise that resolves with array of results
211 */
212 public function map(array $items, callable $callback, int $concurrency = 5): PromiseInterface
213 {
214 if (empty($items)) {
215 return resolve([]);
216 }
217
218 if ($concurrency <= 0) {
219 throw new InvalidArgumentException('Concurrency must be greater than 0');
220 }
221
222 // If concurrency is unlimited or greater than the number of items,
223 // we can process all at once
224 if ($concurrency >= count($items)) {
225 $promises = array_map($callback, $items);
226
227 return $this->all($promises);
228 }
229
230 // Process in batches for controlled concurrency
231 return $this->mapBatched($items, $callback, $concurrency);
232 }
233
234 /**
235 * Wait for a promise with a timeout.
236 *
237 * @param PromiseInterface $promise The promise to wait for
238 * @param float $timeout Timeout in seconds
239 * @return mixed The resolved value
240 *
241 * @throws RuntimeException If the promise times out
242 * @throws Throwable If the promise is rejected
243 */
244 protected function awaitWithTimeout(PromiseInterface $promise, float $timeout): mixed
245 {
246 try {
247 // Use Matrix's built-in timeout function
248 return await(timeout($promise, $timeout));
249 } catch (TimeoutException $e) {
250 // Convert to RuntimeException for consistency with your API
251 throw new RuntimeException("Promise timed out after {$timeout} seconds", 0, $e);
252 }
253 }
254
255 /**
256 * Create a promise that will reject after a timeout.
257 *
258 * @param float $timeout Timeout in seconds
259 * @return PromiseInterface The timeout promise
260 */
261 protected function createTimeoutPromise(float $timeout): PromiseInterface
262 {
263 return async(function () use ($timeout) {
264 $timeoutMicro = (int) ($timeout * 1000000);
265 usleep($timeoutMicro);
266 throw new RuntimeException("Promise timed out after {$timeout} seconds");
267 });
268 }
269
270 /**
271 * Execute promises in sequence recursively.
272 *
273 * @param array<callable(): PromiseInterface> $callables Array of callables
274 * @param array<mixed> $results Results collected so far
275 * @return PromiseInterface Promise that resolves with array of results
276 */
277 protected function executeSequence(array $callables, array $results): PromiseInterface
278 {
279 // If no more callables, resolve with the results
280 if (empty($callables)) {
281 return resolve($results);
282 }
283
284 // Take the first callable
285 $callable = array_shift($callables);
286
287 // Execute it and chain the next promises
288 return $callable()->then(
289 function ($result) use ($callables, $results) {
290 $results[] = $result;
291
292 return $this->executeSequence($callables, $results);
293 }
294 );
295 }
296
297 /**
298 * Validate that all items in the array are promises.
299 *
300 * @param array<mixed> $promises Array to validate
301 *
302 * @throws InvalidArgumentException If any item is not a PromiseInterface
303 */
304 protected function validatePromises(array $promises): void
305 {
306 foreach ($promises as $index => $promise) {
307 if (! $promise instanceof PromiseInterface) {
308 throw new InvalidArgumentException(
309 sprintf(
310 'Item at index %d is not a promise. Expected %s, got %s',
311 $index,
312 PromiseInterface::class,
313 get_debug_type($promise)
314 )
315 );
316 }
317 }
318 }
319
320 /**
321 * Process items in batches with controlled concurrency.
322 *
323 * @param array<mixed> $items Items to process
324 * @param callable $callback Callback that returns a promise
325 * @param int $concurrency Maximum number of concurrent promises
326 * @return PromiseInterface Promise that resolves with array of results
327 */
328 protected function mapBatched(array $items, callable $callback, int $concurrency): PromiseInterface
329 {
330 $results = [];
331 $pendingPromises = [];
332 $itemKeys = array_keys($items);
333 $i = 0;
334 $totalItems = count($items);
335
336 // Initial function to start the first batch
337 $startBatch = function () use (&$pendingPromises, &$i, $totalItems, $itemKeys, $items, $callback, &$results, &$startBatch, $concurrency) {
338 // Fill up to concurrency
339 while (count($pendingPromises) < $concurrency && $i < $totalItems) {
340 $key = $itemKeys[$i];
341 $item = $items[$key];
342 $promise = $callback($item, $key);
343
344 if (! ($promise instanceof PromiseInterface)) {
345 throw new RuntimeException('Callback must return a Promise');
346 }
347
348 // Add this promise to the pending queue with a handler to process the next item
349 $pendingPromises[$key] = $promise->then(
350 function ($result) use ($key, &$results, &$pendingPromises, &$startBatch) {
351 $results[$key] = $result;
352 unset($pendingPromises[$key]);
353 $startBatch(); // Process the next item
354
355 return $result;
356 },
357 function ($reason) use ($key, &$pendingPromises) {
358 unset($pendingPromises[$key]);
359
360 return reject($reason); // Propagate the rejection
361 }
362 );
363
364 $i++;
365 }
366
367 // If we've processed all items and have no more pending promises, resolve
368 if ($i >= $totalItems && empty($pendingPromises)) {
369 return resolve($results);
370 }
371
372 // Return a promise that resolves when all pending promises are done
373 if (! empty($pendingPromises)) {
374 return all($pendingPromises)->then(function () use (&$results) {
375 return $results;
376 });
377 }
378
379 return resolve($results);
380 };
381
382 // Start the process
383 return $startBatch();
384 }
385}