friendship ended with social-app. php is my new best friend
1<?php
2
3namespace React\EventLoop;
4
5use Ev;
6use EvIo;
7use EvLoop;
8use React\EventLoop\Tick\FutureTickQueue;
9use React\EventLoop\Timer\Timer;
10use SplObjectStorage;
11
12/**
13 * An `ext-ev` based event loop.
14 *
15 * This loop uses the [`ev` PECL extension](https://pecl.php.net/package/ev),
16 * that provides an interface to `libev` library.
17 * `libev` itself supports a number of system-specific backends (epoll, kqueue).
18 *
19 * This loop is known to work with PHP 5.4 through PHP 8+.
20 *
21 * @see http://php.net/manual/en/book.ev.php
22 * @see https://bitbucket.org/osmanov/pecl-ev/overview
23 */
24class ExtEvLoop implements LoopInterface
25{
26 /**
27 * @var EvLoop
28 */
29 private $loop;
30
31 /**
32 * @var FutureTickQueue
33 */
34 private $futureTickQueue;
35
36 /**
37 * @var SplObjectStorage
38 */
39 private $timers;
40
41 /**
42 * @var EvIo[]
43 */
44 private $readStreams = array();
45
46 /**
47 * @var EvIo[]
48 */
49 private $writeStreams = array();
50
51 /**
52 * @var bool
53 */
54 private $running;
55
56 /**
57 * @var SignalsHandler
58 */
59 private $signals;
60
61 /**
62 * @var \EvSignal[]
63 */
64 private $signalEvents = array();
65
66 public function __construct()
67 {
68 $this->loop = new EvLoop();
69 $this->futureTickQueue = new FutureTickQueue();
70 $this->timers = new SplObjectStorage();
71 $this->signals = new SignalsHandler();
72 }
73
74 public function addReadStream($stream, $listener)
75 {
76 $key = (int)$stream;
77
78 if (isset($this->readStreams[$key])) {
79 return;
80 }
81
82 $callback = $this->getStreamListenerClosure($stream, $listener);
83 $event = $this->loop->io($stream, Ev::READ, $callback);
84 $this->readStreams[$key] = $event;
85 }
86
87 /**
88 * @param resource $stream
89 * @param callable $listener
90 *
91 * @return \Closure
92 */
93 private function getStreamListenerClosure($stream, $listener)
94 {
95 return function () use ($stream, $listener) {
96 \call_user_func($listener, $stream);
97 };
98 }
99
100 public function addWriteStream($stream, $listener)
101 {
102 $key = (int)$stream;
103
104 if (isset($this->writeStreams[$key])) {
105 return;
106 }
107
108 $callback = $this->getStreamListenerClosure($stream, $listener);
109 $event = $this->loop->io($stream, Ev::WRITE, $callback);
110 $this->writeStreams[$key] = $event;
111 }
112
113 public function removeReadStream($stream)
114 {
115 $key = (int)$stream;
116
117 if (!isset($this->readStreams[$key])) {
118 return;
119 }
120
121 $this->readStreams[$key]->stop();
122 unset($this->readStreams[$key]);
123 }
124
125 public function removeWriteStream($stream)
126 {
127 $key = (int)$stream;
128
129 if (!isset($this->writeStreams[$key])) {
130 return;
131 }
132
133 $this->writeStreams[$key]->stop();
134 unset($this->writeStreams[$key]);
135 }
136
137 public function addTimer($interval, $callback)
138 {
139 $timer = new Timer($interval, $callback, false);
140
141 $that = $this;
142 $timers = $this->timers;
143 $callback = function () use ($timer, $timers, $that) {
144 \call_user_func($timer->getCallback(), $timer);
145
146 if ($timers->contains($timer)) {
147 $that->cancelTimer($timer);
148 }
149 };
150
151 $event = $this->loop->timer($timer->getInterval(), 0.0, $callback);
152 $this->timers->attach($timer, $event);
153
154 return $timer;
155 }
156
157 public function addPeriodicTimer($interval, $callback)
158 {
159 $timer = new Timer($interval, $callback, true);
160
161 $callback = function () use ($timer) {
162 \call_user_func($timer->getCallback(), $timer);
163 };
164
165 $event = $this->loop->timer($timer->getInterval(), $timer->getInterval(), $callback);
166 $this->timers->attach($timer, $event);
167
168 return $timer;
169 }
170
171 public function cancelTimer(TimerInterface $timer)
172 {
173 if (!isset($this->timers[$timer])) {
174 return;
175 }
176
177 $event = $this->timers[$timer];
178 $event->stop();
179 $this->timers->detach($timer);
180 }
181
182 public function futureTick($listener)
183 {
184 $this->futureTickQueue->add($listener);
185 }
186
187 public function run()
188 {
189 $this->running = true;
190
191 while ($this->running) {
192 $this->futureTickQueue->tick();
193
194 $hasPendingCallbacks = !$this->futureTickQueue->isEmpty();
195 $wasJustStopped = !$this->running;
196 $nothingLeftToDo = !$this->readStreams
197 && !$this->writeStreams
198 && !$this->timers->count()
199 && $this->signals->isEmpty();
200
201 $flags = Ev::RUN_ONCE;
202 if ($wasJustStopped || $hasPendingCallbacks) {
203 $flags |= Ev::RUN_NOWAIT;
204 } elseif ($nothingLeftToDo) {
205 break;
206 }
207
208 $this->loop->run($flags);
209 }
210 }
211
212 public function stop()
213 {
214 $this->running = false;
215 }
216
217 public function __destruct()
218 {
219 /** @var TimerInterface $timer */
220 foreach ($this->timers as $timer) {
221 $this->cancelTimer($timer);
222 }
223
224 foreach ($this->readStreams as $key => $stream) {
225 $this->removeReadStream($key);
226 }
227
228 foreach ($this->writeStreams as $key => $stream) {
229 $this->removeWriteStream($key);
230 }
231 }
232
233 public function addSignal($signal, $listener)
234 {
235 $this->signals->add($signal, $listener);
236
237 if (!isset($this->signalEvents[$signal])) {
238 $this->signalEvents[$signal] = $this->loop->signal($signal, function() use ($signal) {
239 $this->signals->call($signal);
240 });
241 }
242 }
243
244 public function removeSignal($signal, $listener)
245 {
246 $this->signals->remove($signal, $listener);
247
248 if (isset($this->signalEvents[$signal])) {
249 $this->signalEvents[$signal]->stop();
250 unset($this->signalEvents[$signal]);
251 }
252 }
253}