friendship ended with social-app. php is my new best friend
1<?php
2
3namespace React\EventLoop;
4
5use React\EventLoop\Tick\FutureTickQueue;
6use React\EventLoop\Timer\Timer;
7use React\EventLoop\Timer\Timers;
8
9/**
10 * A `stream_select()` based event loop.
11 *
12 * This uses the [`stream_select()`](https://www.php.net/manual/en/function.stream-select.php)
13 * function and is the only implementation that works out of the box with PHP.
14 *
15 * This event loop works out of the box on PHP 5.4 through PHP 8+ and HHVM.
16 * This means that no installation is required and this library works on all
17 * platforms and supported PHP versions.
18 * Accordingly, the [`Loop` class](#loop) and the deprecated [`Factory`](#factory)
19 * will use this event loop by default if you do not install any of the event loop
20 * extensions listed below.
21 *
22 * Under the hood, it does a simple `select` system call.
23 * This system call is limited to the maximum file descriptor number of
24 * `FD_SETSIZE` (platform dependent, commonly 1024) and scales with `O(m)`
25 * (`m` being the maximum file descriptor number passed).
26 * This means that you may run into issues when handling thousands of streams
27 * concurrently and you may want to look into using one of the alternative
28 * event loop implementations listed below in this case.
29 * If your use case is among the many common use cases that involve handling only
30 * dozens or a few hundred streams at once, then this event loop implementation
31 * performs really well.
32 *
33 * If you want to use signal handling (see also [`addSignal()`](#addsignal) below),
34 * this event loop implementation requires `ext-pcntl`.
35 * This extension is only available for Unix-like platforms and does not support
36 * Windows.
37 * It is commonly installed as part of many PHP distributions.
38 * If this extension is missing (or you're running on Windows), signal handling is
39 * not supported and throws a `BadMethodCallException` instead.
40 *
41 * This event loop is known to rely on wall-clock time to schedule future timers
42 * when using any version before PHP 7.3, because a monotonic time source is
43 * only available as of PHP 7.3 (`hrtime()`).
44 * While this does not affect many common use cases, this is an important
45 * distinction for programs that rely on a high time precision or on systems
46 * that are subject to discontinuous time adjustments (time jumps).
47 * This means that if you schedule a timer to trigger in 30s on PHP < 7.3 and
48 * then adjust your system time forward by 20s, the timer may trigger in 10s.
49 * See also [`addTimer()`](#addtimer) for more details.
50 *
51 * @link https://www.php.net/manual/en/function.stream-select.php
52 */
53final class StreamSelectLoop implements LoopInterface
54{
55 /** @internal */
56 const MICROSECONDS_PER_SECOND = 1000000;
57
58 private $futureTickQueue;
59 private $timers;
60 private $readStreams = array();
61 private $readListeners = array();
62 private $writeStreams = array();
63 private $writeListeners = array();
64 private $running;
65 private $pcntl = false;
66 private $pcntlPoll = false;
67 private $signals;
68
69 public function __construct()
70 {
71 $this->futureTickQueue = new FutureTickQueue();
72 $this->timers = new Timers();
73 $this->pcntl = \function_exists('pcntl_signal') && \function_exists('pcntl_signal_dispatch');
74 $this->pcntlPoll = $this->pcntl && !\function_exists('pcntl_async_signals');
75 $this->signals = new SignalsHandler();
76
77 // prefer async signals if available (PHP 7.1+) or fall back to dispatching on each tick
78 if ($this->pcntl && !$this->pcntlPoll) {
79 \pcntl_async_signals(true);
80 }
81 }
82
83 public function addReadStream($stream, $listener)
84 {
85 $key = (int) $stream;
86
87 if (!isset($this->readStreams[$key])) {
88 $this->readStreams[$key] = $stream;
89 $this->readListeners[$key] = $listener;
90 }
91 }
92
93 public function addWriteStream($stream, $listener)
94 {
95 $key = (int) $stream;
96
97 if (!isset($this->writeStreams[$key])) {
98 $this->writeStreams[$key] = $stream;
99 $this->writeListeners[$key] = $listener;
100 }
101 }
102
103 public function removeReadStream($stream)
104 {
105 $key = (int) $stream;
106
107 unset(
108 $this->readStreams[$key],
109 $this->readListeners[$key]
110 );
111 }
112
113 public function removeWriteStream($stream)
114 {
115 $key = (int) $stream;
116
117 unset(
118 $this->writeStreams[$key],
119 $this->writeListeners[$key]
120 );
121 }
122
123 public function addTimer($interval, $callback)
124 {
125 $timer = new Timer($interval, $callback, false);
126
127 $this->timers->add($timer);
128
129 return $timer;
130 }
131
132 public function addPeriodicTimer($interval, $callback)
133 {
134 $timer = new Timer($interval, $callback, true);
135
136 $this->timers->add($timer);
137
138 return $timer;
139 }
140
141 public function cancelTimer(TimerInterface $timer)
142 {
143 $this->timers->cancel($timer);
144 }
145
146 public function futureTick($listener)
147 {
148 $this->futureTickQueue->add($listener);
149 }
150
151 public function addSignal($signal, $listener)
152 {
153 if ($this->pcntl === false) {
154 throw new \BadMethodCallException('Event loop feature "signals" isn\'t supported by the "StreamSelectLoop"');
155 }
156
157 $first = $this->signals->count($signal) === 0;
158 $this->signals->add($signal, $listener);
159
160 if ($first) {
161 \pcntl_signal($signal, array($this->signals, 'call'));
162 }
163 }
164
165 public function removeSignal($signal, $listener)
166 {
167 if (!$this->signals->count($signal)) {
168 return;
169 }
170
171 $this->signals->remove($signal, $listener);
172
173 if ($this->signals->count($signal) === 0) {
174 \pcntl_signal($signal, \SIG_DFL);
175 }
176 }
177
178 public function run()
179 {
180 $this->running = true;
181
182 while ($this->running) {
183 $this->futureTickQueue->tick();
184
185 $this->timers->tick();
186
187 // Future-tick queue has pending callbacks ...
188 if (!$this->running || !$this->futureTickQueue->isEmpty()) {
189 $timeout = 0;
190
191 // There is a pending timer, only block until it is due ...
192 } elseif ($scheduledAt = $this->timers->getFirst()) {
193 $timeout = $scheduledAt - $this->timers->getTime();
194 if ($timeout < 0) {
195 $timeout = 0;
196 } else {
197 // Convert float seconds to int microseconds.
198 // Ensure we do not exceed maximum integer size, which may
199 // cause the loop to tick once every ~35min on 32bit systems.
200 $timeout *= self::MICROSECONDS_PER_SECOND;
201 $timeout = $timeout > \PHP_INT_MAX ? \PHP_INT_MAX : (int)$timeout;
202 }
203
204 // The only possible event is stream or signal activity, so wait forever ...
205 } elseif ($this->readStreams || $this->writeStreams || !$this->signals->isEmpty()) {
206 $timeout = null;
207
208 // There's nothing left to do ...
209 } else {
210 break;
211 }
212
213 $this->waitForStreamActivity($timeout);
214 }
215 }
216
217 public function stop()
218 {
219 $this->running = false;
220 }
221
222 /**
223 * Wait/check for stream activity, or until the next timer is due.
224 *
225 * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
226 */
227 private function waitForStreamActivity($timeout)
228 {
229 $read = $this->readStreams;
230 $write = $this->writeStreams;
231
232 $available = $this->streamSelect($read, $write, $timeout);
233 if ($this->pcntlPoll) {
234 \pcntl_signal_dispatch();
235 }
236 if (false === $available) {
237 // if a system call has been interrupted,
238 // we cannot rely on it's outcome
239 return;
240 }
241
242 foreach ($read as $stream) {
243 $key = (int) $stream;
244
245 if (isset($this->readListeners[$key])) {
246 \call_user_func($this->readListeners[$key], $stream);
247 }
248 }
249
250 foreach ($write as $stream) {
251 $key = (int) $stream;
252
253 if (isset($this->writeListeners[$key])) {
254 \call_user_func($this->writeListeners[$key], $stream);
255 }
256 }
257 }
258
259 /**
260 * Emulate a stream_select() implementation that does not break when passed
261 * empty stream arrays.
262 *
263 * @param array $read An array of read streams to select upon.
264 * @param array $write An array of write streams to select upon.
265 * @param int|null $timeout Activity timeout in microseconds, or null to wait forever.
266 *
267 * @return int|false The total number of streams that are ready for read/write.
268 * Can return false if stream_select() is interrupted by a signal.
269 */
270 private function streamSelect(array &$read, array &$write, $timeout)
271 {
272 if ($read || $write) {
273 // We do not usually use or expose the `exceptfds` parameter passed to the underlying `select`.
274 // However, Windows does not report failed connection attempts in `writefds` passed to `select` like most other platforms.
275 // Instead, it uses `writefds` only for successful connection attempts and `exceptfds` for failed connection attempts.
276 // We work around this by adding all sockets that look like a pending connection attempt to `exceptfds` automatically on Windows and merge it back later.
277 // This ensures the public API matches other loop implementations across all platforms (see also test suite or rather test matrix).
278 // Lacking better APIs, every write-only socket that has not yet read any data is assumed to be in a pending connection attempt state.
279 // @link https://docs.microsoft.com/de-de/windows/win32/api/winsock2/nf-winsock2-select
280 $except = null;
281 if (\DIRECTORY_SEPARATOR === '\\') {
282 $except = array();
283 foreach ($write as $key => $socket) {
284 if (!isset($read[$key]) && @\ftell($socket) === 0) {
285 $except[$key] = $socket;
286 }
287 }
288 }
289
290 /** @var ?callable $previous */
291 $previous = \set_error_handler(function ($errno, $errstr) use (&$previous) {
292 // suppress warnings that occur when `stream_select()` is interrupted by a signal
293 // PHP defines `EINTR` through `ext-sockets` or `ext-pcntl`, otherwise use common default (Linux & Mac)
294 $eintr = \defined('SOCKET_EINTR') ? \SOCKET_EINTR : (\defined('PCNTL_EINTR') ? \PCNTL_EINTR : 4);
295 if ($errno === \E_WARNING && \strpos($errstr, '[' . $eintr .']: ') !== false) {
296 return;
297 }
298
299 // forward any other error to registered error handler or print warning
300 return ($previous !== null) ? \call_user_func_array($previous, \func_get_args()) : false;
301 });
302
303 try {
304 $ret = \stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
305 \restore_error_handler();
306 } catch (\Throwable $e) { // @codeCoverageIgnoreStart
307 \restore_error_handler();
308 throw $e;
309 } catch (\Exception $e) {
310 \restore_error_handler();
311 throw $e;
312 } // @codeCoverageIgnoreEnd
313
314 if ($except) {
315 $write = \array_merge($write, $except);
316 }
317 return $ret;
318 }
319
320 if ($timeout > 0) {
321 \usleep($timeout);
322 } elseif ($timeout === null) {
323 // wait forever (we only reach this if we're only awaiting signals)
324 // this may be interrupted and return earlier when a signal is received
325 \sleep(PHP_INT_MAX);
326 }
327
328 return 0;
329 }
330}