friendship ended with social-app. php is my new best friend
1<?php 2 3namespace React\EventLoop; 4 5use Ev; 6use EvIo; 7use EvLoop; 8use React\EventLoop\Tick\FutureTickQueue; 9use React\EventLoop\Timer\Timer; 10use SplObjectStorage; 11 12/** 13 * An `ext-ev` based event loop. 14 * 15 * This loop uses the [`ev` PECL extension](https://pecl.php.net/package/ev), 16 * that provides an interface to `libev` library. 17 * `libev` itself supports a number of system-specific backends (epoll, kqueue). 18 * 19 * This loop is known to work with PHP 5.4 through PHP 8+. 20 * 21 * @see http://php.net/manual/en/book.ev.php 22 * @see https://bitbucket.org/osmanov/pecl-ev/overview 23 */ 24class ExtEvLoop implements LoopInterface 25{ 26 /** 27 * @var EvLoop 28 */ 29 private $loop; 30 31 /** 32 * @var FutureTickQueue 33 */ 34 private $futureTickQueue; 35 36 /** 37 * @var SplObjectStorage 38 */ 39 private $timers; 40 41 /** 42 * @var EvIo[] 43 */ 44 private $readStreams = array(); 45 46 /** 47 * @var EvIo[] 48 */ 49 private $writeStreams = array(); 50 51 /** 52 * @var bool 53 */ 54 private $running; 55 56 /** 57 * @var SignalsHandler 58 */ 59 private $signals; 60 61 /** 62 * @var \EvSignal[] 63 */ 64 private $signalEvents = array(); 65 66 public function __construct() 67 { 68 $this->loop = new EvLoop(); 69 $this->futureTickQueue = new FutureTickQueue(); 70 $this->timers = new SplObjectStorage(); 71 $this->signals = new SignalsHandler(); 72 } 73 74 public function addReadStream($stream, $listener) 75 { 76 $key = (int)$stream; 77 78 if (isset($this->readStreams[$key])) { 79 return; 80 } 81 82 $callback = $this->getStreamListenerClosure($stream, $listener); 83 $event = $this->loop->io($stream, Ev::READ, $callback); 84 $this->readStreams[$key] = $event; 85 } 86 87 /** 88 * @param resource $stream 89 * @param callable $listener 90 * 91 * @return \Closure 92 */ 93 private function getStreamListenerClosure($stream, $listener) 94 { 95 return function () use ($stream, $listener) { 96 \call_user_func($listener, $stream); 97 }; 98 } 99 100 public function addWriteStream($stream, $listener) 101 { 102 $key = (int)$stream; 103 104 if (isset($this->writeStreams[$key])) { 105 return; 106 } 107 108 $callback = $this->getStreamListenerClosure($stream, $listener); 109 $event = $this->loop->io($stream, Ev::WRITE, $callback); 110 $this->writeStreams[$key] = $event; 111 } 112 113 public function removeReadStream($stream) 114 { 115 $key = (int)$stream; 116 117 if (!isset($this->readStreams[$key])) { 118 return; 119 } 120 121 $this->readStreams[$key]->stop(); 122 unset($this->readStreams[$key]); 123 } 124 125 public function removeWriteStream($stream) 126 { 127 $key = (int)$stream; 128 129 if (!isset($this->writeStreams[$key])) { 130 return; 131 } 132 133 $this->writeStreams[$key]->stop(); 134 unset($this->writeStreams[$key]); 135 } 136 137 public function addTimer($interval, $callback) 138 { 139 $timer = new Timer($interval, $callback, false); 140 141 $that = $this; 142 $timers = $this->timers; 143 $callback = function () use ($timer, $timers, $that) { 144 \call_user_func($timer->getCallback(), $timer); 145 146 if ($timers->contains($timer)) { 147 $that->cancelTimer($timer); 148 } 149 }; 150 151 $event = $this->loop->timer($timer->getInterval(), 0.0, $callback); 152 $this->timers->attach($timer, $event); 153 154 return $timer; 155 } 156 157 public function addPeriodicTimer($interval, $callback) 158 { 159 $timer = new Timer($interval, $callback, true); 160 161 $callback = function () use ($timer) { 162 \call_user_func($timer->getCallback(), $timer); 163 }; 164 165 $event = $this->loop->timer($timer->getInterval(), $timer->getInterval(), $callback); 166 $this->timers->attach($timer, $event); 167 168 return $timer; 169 } 170 171 public function cancelTimer(TimerInterface $timer) 172 { 173 if (!isset($this->timers[$timer])) { 174 return; 175 } 176 177 $event = $this->timers[$timer]; 178 $event->stop(); 179 $this->timers->detach($timer); 180 } 181 182 public function futureTick($listener) 183 { 184 $this->futureTickQueue->add($listener); 185 } 186 187 public function run() 188 { 189 $this->running = true; 190 191 while ($this->running) { 192 $this->futureTickQueue->tick(); 193 194 $hasPendingCallbacks = !$this->futureTickQueue->isEmpty(); 195 $wasJustStopped = !$this->running; 196 $nothingLeftToDo = !$this->readStreams 197 && !$this->writeStreams 198 && !$this->timers->count() 199 && $this->signals->isEmpty(); 200 201 $flags = Ev::RUN_ONCE; 202 if ($wasJustStopped || $hasPendingCallbacks) { 203 $flags |= Ev::RUN_NOWAIT; 204 } elseif ($nothingLeftToDo) { 205 break; 206 } 207 208 $this->loop->run($flags); 209 } 210 } 211 212 public function stop() 213 { 214 $this->running = false; 215 } 216 217 public function __destruct() 218 { 219 /** @var TimerInterface $timer */ 220 foreach ($this->timers as $timer) { 221 $this->cancelTimer($timer); 222 } 223 224 foreach ($this->readStreams as $key => $stream) { 225 $this->removeReadStream($key); 226 } 227 228 foreach ($this->writeStreams as $key => $stream) { 229 $this->removeWriteStream($key); 230 } 231 } 232 233 public function addSignal($signal, $listener) 234 { 235 $this->signals->add($signal, $listener); 236 237 if (!isset($this->signalEvents[$signal])) { 238 $this->signalEvents[$signal] = $this->loop->signal($signal, function() use ($signal) { 239 $this->signals->call($signal); 240 }); 241 } 242 } 243 244 public function removeSignal($signal, $listener) 245 { 246 $this->signals->remove($signal, $listener); 247 248 if (isset($this->signalEvents[$signal])) { 249 $this->signalEvents[$signal]->stop(); 250 unset($this->signalEvents[$signal]); 251 } 252 } 253}