friendship ended with social-app. php is my new best friend
1<?php 2 3namespace React\EventLoop; 4 5use React\EventLoop\Tick\FutureTickQueue; 6use React\EventLoop\Timer\Timer; 7use SplObjectStorage; 8 9/** 10 * An `ext-uv` based event loop. 11 * 12 * This loop uses the [`uv` PECL extension](https://pecl.php.net/package/uv), 13 * that provides an interface to `libuv` library. 14 * `libuv` itself supports a number of system-specific backends (epoll, kqueue). 15 * 16 * This loop is known to work with PHP 7+. 17 * 18 * @see https://github.com/bwoebi/php-uv 19 */ 20final class ExtUvLoop implements LoopInterface 21{ 22 private $uv; 23 private $futureTickQueue; 24 private $timers; 25 private $streamEvents = array(); 26 private $readStreams = array(); 27 private $writeStreams = array(); 28 private $running; 29 private $signals; 30 private $signalEvents = array(); 31 private $streamListener; 32 33 public function __construct() 34 { 35 if (!\function_exists('uv_loop_new')) { 36 throw new \BadMethodCallException('Cannot create LibUvLoop, ext-uv extension missing'); 37 } 38 39 $this->uv = \uv_loop_new(); 40 $this->futureTickQueue = new FutureTickQueue(); 41 $this->timers = new SplObjectStorage(); 42 $this->streamListener = $this->createStreamListener(); 43 $this->signals = new SignalsHandler(); 44 } 45 46 /** 47 * Returns the underlying ext-uv event loop. (Internal ReactPHP use only.) 48 * 49 * @internal 50 * 51 * @return resource 52 */ 53 public function getUvLoop() 54 { 55 return $this->uv; 56 } 57 58 /** 59 * {@inheritdoc} 60 */ 61 public function addReadStream($stream, $listener) 62 { 63 if (isset($this->readStreams[(int) $stream])) { 64 return; 65 } 66 67 $this->readStreams[(int) $stream] = $listener; 68 $this->addStream($stream); 69 } 70 71 /** 72 * {@inheritdoc} 73 */ 74 public function addWriteStream($stream, $listener) 75 { 76 if (isset($this->writeStreams[(int) $stream])) { 77 return; 78 } 79 80 $this->writeStreams[(int) $stream] = $listener; 81 $this->addStream($stream); 82 } 83 84 /** 85 * {@inheritdoc} 86 */ 87 public function removeReadStream($stream) 88 { 89 if (!isset($this->streamEvents[(int) $stream])) { 90 return; 91 } 92 93 unset($this->readStreams[(int) $stream]); 94 $this->removeStream($stream); 95 } 96 97 /** 98 * {@inheritdoc} 99 */ 100 public function removeWriteStream($stream) 101 { 102 if (!isset($this->streamEvents[(int) $stream])) { 103 return; 104 } 105 106 unset($this->writeStreams[(int) $stream]); 107 $this->removeStream($stream); 108 } 109 110 /** 111 * {@inheritdoc} 112 */ 113 public function addTimer($interval, $callback) 114 { 115 $timer = new Timer($interval, $callback, false); 116 117 $that = $this; 118 $timers = $this->timers; 119 $callback = function () use ($timer, $timers, $that) { 120 \call_user_func($timer->getCallback(), $timer); 121 122 if ($timers->contains($timer)) { 123 $that->cancelTimer($timer); 124 } 125 }; 126 127 $event = \uv_timer_init($this->uv); 128 $this->timers->attach($timer, $event); 129 \uv_timer_start( 130 $event, 131 $this->convertFloatSecondsToMilliseconds($interval), 132 0, 133 $callback 134 ); 135 136 return $timer; 137 } 138 139 /** 140 * {@inheritdoc} 141 */ 142 public function addPeriodicTimer($interval, $callback) 143 { 144 $timer = new Timer($interval, $callback, true); 145 146 $callback = function () use ($timer) { 147 \call_user_func($timer->getCallback(), $timer); 148 }; 149 150 $interval = $this->convertFloatSecondsToMilliseconds($interval); 151 $event = \uv_timer_init($this->uv); 152 $this->timers->attach($timer, $event); 153 \uv_timer_start( 154 $event, 155 $interval, 156 (int) $interval === 0 ? 1 : $interval, 157 $callback 158 ); 159 160 return $timer; 161 } 162 163 /** 164 * {@inheritdoc} 165 */ 166 public function cancelTimer(TimerInterface $timer) 167 { 168 if (isset($this->timers[$timer])) { 169 @\uv_timer_stop($this->timers[$timer]); 170 $this->timers->detach($timer); 171 } 172 } 173 174 /** 175 * {@inheritdoc} 176 */ 177 public function futureTick($listener) 178 { 179 $this->futureTickQueue->add($listener); 180 } 181 182 public function addSignal($signal, $listener) 183 { 184 $this->signals->add($signal, $listener); 185 186 if (!isset($this->signalEvents[$signal])) { 187 $signals = $this->signals; 188 $this->signalEvents[$signal] = \uv_signal_init($this->uv); 189 \uv_signal_start($this->signalEvents[$signal], function () use ($signals, $signal) { 190 $signals->call($signal); 191 }, $signal); 192 } 193 } 194 195 public function removeSignal($signal, $listener) 196 { 197 $this->signals->remove($signal, $listener); 198 199 if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) { 200 \uv_signal_stop($this->signalEvents[$signal]); 201 unset($this->signalEvents[$signal]); 202 } 203 } 204 205 /** 206 * {@inheritdoc} 207 */ 208 public function run() 209 { 210 $this->running = true; 211 212 while ($this->running) { 213 $this->futureTickQueue->tick(); 214 215 $hasPendingCallbacks = !$this->futureTickQueue->isEmpty(); 216 $wasJustStopped = !$this->running; 217 $nothingLeftToDo = !$this->readStreams 218 && !$this->writeStreams 219 && !$this->timers->count() 220 && $this->signals->isEmpty(); 221 222 // Use UV::RUN_ONCE when there are only I/O events active in the loop and block until one of those triggers, 223 // otherwise use UV::RUN_NOWAIT. 224 // @link http://docs.libuv.org/en/v1.x/loop.html#c.uv_run 225 $flags = \UV::RUN_ONCE; 226 if ($wasJustStopped || $hasPendingCallbacks) { 227 $flags = \UV::RUN_NOWAIT; 228 } elseif ($nothingLeftToDo) { 229 break; 230 } 231 232 \uv_run($this->uv, $flags); 233 } 234 } 235 236 /** 237 * {@inheritdoc} 238 */ 239 public function stop() 240 { 241 $this->running = false; 242 } 243 244 private function addStream($stream) 245 { 246 if (!isset($this->streamEvents[(int) $stream])) { 247 $this->streamEvents[(int)$stream] = \uv_poll_init_socket($this->uv, $stream); 248 } 249 250 if ($this->streamEvents[(int) $stream] !== false) { 251 $this->pollStream($stream); 252 } 253 } 254 255 private function removeStream($stream) 256 { 257 if (!isset($this->streamEvents[(int) $stream])) { 258 return; 259 } 260 261 if (!isset($this->readStreams[(int) $stream]) 262 && !isset($this->writeStreams[(int) $stream])) { 263 \uv_poll_stop($this->streamEvents[(int) $stream]); 264 \uv_close($this->streamEvents[(int) $stream]); 265 unset($this->streamEvents[(int) $stream]); 266 return; 267 } 268 269 $this->pollStream($stream); 270 } 271 272 private function pollStream($stream) 273 { 274 if (!isset($this->streamEvents[(int) $stream])) { 275 return; 276 } 277 278 $flags = 0; 279 if (isset($this->readStreams[(int) $stream])) { 280 $flags |= \UV::READABLE; 281 } 282 283 if (isset($this->writeStreams[(int) $stream])) { 284 $flags |= \UV::WRITABLE; 285 } 286 287 \uv_poll_start($this->streamEvents[(int) $stream], $flags, $this->streamListener); 288 } 289 290 /** 291 * Create a stream listener 292 * 293 * @return callable Returns a callback 294 */ 295 private function createStreamListener() 296 { 297 $callback = function ($event, $status, $events, $stream) { 298 // libuv automatically stops polling on error, re-enable polling to match other loop implementations 299 if ($status !== 0) { 300 $this->pollStream($stream); 301 302 // libuv may report no events on error, but this should still invoke stream listeners to report closed connections 303 // re-enable both readable and writable, correct listeners will be checked below anyway 304 if ($events === 0) { 305 $events = \UV::READABLE | \UV::WRITABLE; 306 } 307 } 308 309 if (isset($this->readStreams[(int) $stream]) && ($events & \UV::READABLE)) { 310 \call_user_func($this->readStreams[(int) $stream], $stream); 311 } 312 313 if (isset($this->writeStreams[(int) $stream]) && ($events & \UV::WRITABLE)) { 314 \call_user_func($this->writeStreams[(int) $stream], $stream); 315 } 316 }; 317 318 return $callback; 319 } 320 321 /** 322 * @param float $interval 323 * @return int 324 */ 325 private function convertFloatSecondsToMilliseconds($interval) 326 { 327 if ($interval < 0) { 328 return 0; 329 } 330 331 $maxValue = (int) (\PHP_INT_MAX / 1000); 332 $intInterval = (int) $interval; 333 334 if (($intInterval <= 0 && $interval > 1) || $intInterval >= $maxValue) { 335 throw new \InvalidArgumentException( 336 "Interval overflow, value must be lower than '{$maxValue}', but '{$interval}' passed." 337 ); 338 } 339 340 return (int) \floor($interval * 1000); 341 } 342}