friendship ended with social-app. php is my new best friend
1<?php
2
3namespace React\EventLoop;
4
5use React\EventLoop\Tick\FutureTickQueue;
6use React\EventLoop\Timer\Timer;
7use SplObjectStorage;
8
9/**
10 * An `ext-uv` based event loop.
11 *
12 * This loop uses the [`uv` PECL extension](https://pecl.php.net/package/uv),
13 * that provides an interface to `libuv` library.
14 * `libuv` itself supports a number of system-specific backends (epoll, kqueue).
15 *
16 * This loop is known to work with PHP 7+.
17 *
18 * @see https://github.com/bwoebi/php-uv
19 */
20final class ExtUvLoop implements LoopInterface
21{
22 private $uv;
23 private $futureTickQueue;
24 private $timers;
25 private $streamEvents = array();
26 private $readStreams = array();
27 private $writeStreams = array();
28 private $running;
29 private $signals;
30 private $signalEvents = array();
31 private $streamListener;
32
33 public function __construct()
34 {
35 if (!\function_exists('uv_loop_new')) {
36 throw new \BadMethodCallException('Cannot create LibUvLoop, ext-uv extension missing');
37 }
38
39 $this->uv = \uv_loop_new();
40 $this->futureTickQueue = new FutureTickQueue();
41 $this->timers = new SplObjectStorage();
42 $this->streamListener = $this->createStreamListener();
43 $this->signals = new SignalsHandler();
44 }
45
46 /**
47 * Returns the underlying ext-uv event loop. (Internal ReactPHP use only.)
48 *
49 * @internal
50 *
51 * @return resource
52 */
53 public function getUvLoop()
54 {
55 return $this->uv;
56 }
57
58 /**
59 * {@inheritdoc}
60 */
61 public function addReadStream($stream, $listener)
62 {
63 if (isset($this->readStreams[(int) $stream])) {
64 return;
65 }
66
67 $this->readStreams[(int) $stream] = $listener;
68 $this->addStream($stream);
69 }
70
71 /**
72 * {@inheritdoc}
73 */
74 public function addWriteStream($stream, $listener)
75 {
76 if (isset($this->writeStreams[(int) $stream])) {
77 return;
78 }
79
80 $this->writeStreams[(int) $stream] = $listener;
81 $this->addStream($stream);
82 }
83
84 /**
85 * {@inheritdoc}
86 */
87 public function removeReadStream($stream)
88 {
89 if (!isset($this->streamEvents[(int) $stream])) {
90 return;
91 }
92
93 unset($this->readStreams[(int) $stream]);
94 $this->removeStream($stream);
95 }
96
97 /**
98 * {@inheritdoc}
99 */
100 public function removeWriteStream($stream)
101 {
102 if (!isset($this->streamEvents[(int) $stream])) {
103 return;
104 }
105
106 unset($this->writeStreams[(int) $stream]);
107 $this->removeStream($stream);
108 }
109
110 /**
111 * {@inheritdoc}
112 */
113 public function addTimer($interval, $callback)
114 {
115 $timer = new Timer($interval, $callback, false);
116
117 $that = $this;
118 $timers = $this->timers;
119 $callback = function () use ($timer, $timers, $that) {
120 \call_user_func($timer->getCallback(), $timer);
121
122 if ($timers->contains($timer)) {
123 $that->cancelTimer($timer);
124 }
125 };
126
127 $event = \uv_timer_init($this->uv);
128 $this->timers->attach($timer, $event);
129 \uv_timer_start(
130 $event,
131 $this->convertFloatSecondsToMilliseconds($interval),
132 0,
133 $callback
134 );
135
136 return $timer;
137 }
138
139 /**
140 * {@inheritdoc}
141 */
142 public function addPeriodicTimer($interval, $callback)
143 {
144 $timer = new Timer($interval, $callback, true);
145
146 $callback = function () use ($timer) {
147 \call_user_func($timer->getCallback(), $timer);
148 };
149
150 $interval = $this->convertFloatSecondsToMilliseconds($interval);
151 $event = \uv_timer_init($this->uv);
152 $this->timers->attach($timer, $event);
153 \uv_timer_start(
154 $event,
155 $interval,
156 (int) $interval === 0 ? 1 : $interval,
157 $callback
158 );
159
160 return $timer;
161 }
162
163 /**
164 * {@inheritdoc}
165 */
166 public function cancelTimer(TimerInterface $timer)
167 {
168 if (isset($this->timers[$timer])) {
169 @\uv_timer_stop($this->timers[$timer]);
170 $this->timers->detach($timer);
171 }
172 }
173
174 /**
175 * {@inheritdoc}
176 */
177 public function futureTick($listener)
178 {
179 $this->futureTickQueue->add($listener);
180 }
181
182 public function addSignal($signal, $listener)
183 {
184 $this->signals->add($signal, $listener);
185
186 if (!isset($this->signalEvents[$signal])) {
187 $signals = $this->signals;
188 $this->signalEvents[$signal] = \uv_signal_init($this->uv);
189 \uv_signal_start($this->signalEvents[$signal], function () use ($signals, $signal) {
190 $signals->call($signal);
191 }, $signal);
192 }
193 }
194
195 public function removeSignal($signal, $listener)
196 {
197 $this->signals->remove($signal, $listener);
198
199 if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) {
200 \uv_signal_stop($this->signalEvents[$signal]);
201 unset($this->signalEvents[$signal]);
202 }
203 }
204
205 /**
206 * {@inheritdoc}
207 */
208 public function run()
209 {
210 $this->running = true;
211
212 while ($this->running) {
213 $this->futureTickQueue->tick();
214
215 $hasPendingCallbacks = !$this->futureTickQueue->isEmpty();
216 $wasJustStopped = !$this->running;
217 $nothingLeftToDo = !$this->readStreams
218 && !$this->writeStreams
219 && !$this->timers->count()
220 && $this->signals->isEmpty();
221
222 // Use UV::RUN_ONCE when there are only I/O events active in the loop and block until one of those triggers,
223 // otherwise use UV::RUN_NOWAIT.
224 // @link http://docs.libuv.org/en/v1.x/loop.html#c.uv_run
225 $flags = \UV::RUN_ONCE;
226 if ($wasJustStopped || $hasPendingCallbacks) {
227 $flags = \UV::RUN_NOWAIT;
228 } elseif ($nothingLeftToDo) {
229 break;
230 }
231
232 \uv_run($this->uv, $flags);
233 }
234 }
235
236 /**
237 * {@inheritdoc}
238 */
239 public function stop()
240 {
241 $this->running = false;
242 }
243
244 private function addStream($stream)
245 {
246 if (!isset($this->streamEvents[(int) $stream])) {
247 $this->streamEvents[(int)$stream] = \uv_poll_init_socket($this->uv, $stream);
248 }
249
250 if ($this->streamEvents[(int) $stream] !== false) {
251 $this->pollStream($stream);
252 }
253 }
254
255 private function removeStream($stream)
256 {
257 if (!isset($this->streamEvents[(int) $stream])) {
258 return;
259 }
260
261 if (!isset($this->readStreams[(int) $stream])
262 && !isset($this->writeStreams[(int) $stream])) {
263 \uv_poll_stop($this->streamEvents[(int) $stream]);
264 \uv_close($this->streamEvents[(int) $stream]);
265 unset($this->streamEvents[(int) $stream]);
266 return;
267 }
268
269 $this->pollStream($stream);
270 }
271
272 private function pollStream($stream)
273 {
274 if (!isset($this->streamEvents[(int) $stream])) {
275 return;
276 }
277
278 $flags = 0;
279 if (isset($this->readStreams[(int) $stream])) {
280 $flags |= \UV::READABLE;
281 }
282
283 if (isset($this->writeStreams[(int) $stream])) {
284 $flags |= \UV::WRITABLE;
285 }
286
287 \uv_poll_start($this->streamEvents[(int) $stream], $flags, $this->streamListener);
288 }
289
290 /**
291 * Create a stream listener
292 *
293 * @return callable Returns a callback
294 */
295 private function createStreamListener()
296 {
297 $callback = function ($event, $status, $events, $stream) {
298 // libuv automatically stops polling on error, re-enable polling to match other loop implementations
299 if ($status !== 0) {
300 $this->pollStream($stream);
301
302 // libuv may report no events on error, but this should still invoke stream listeners to report closed connections
303 // re-enable both readable and writable, correct listeners will be checked below anyway
304 if ($events === 0) {
305 $events = \UV::READABLE | \UV::WRITABLE;
306 }
307 }
308
309 if (isset($this->readStreams[(int) $stream]) && ($events & \UV::READABLE)) {
310 \call_user_func($this->readStreams[(int) $stream], $stream);
311 }
312
313 if (isset($this->writeStreams[(int) $stream]) && ($events & \UV::WRITABLE)) {
314 \call_user_func($this->writeStreams[(int) $stream], $stream);
315 }
316 };
317
318 return $callback;
319 }
320
321 /**
322 * @param float $interval
323 * @return int
324 */
325 private function convertFloatSecondsToMilliseconds($interval)
326 {
327 if ($interval < 0) {
328 return 0;
329 }
330
331 $maxValue = (int) (\PHP_INT_MAX / 1000);
332 $intInterval = (int) $interval;
333
334 if (($intInterval <= 0 && $interval > 1) || $intInterval >= $maxValue) {
335 throw new \InvalidArgumentException(
336 "Interval overflow, value must be lower than '{$maxValue}', but '{$interval}' passed."
337 );
338 }
339
340 return (int) \floor($interval * 1000);
341 }
342}