friendship ended with social-app. php is my new best friend
1<?php
2
3namespace React\EventLoop;
4
5use BadMethodCallException;
6use Event;
7use EventBase;
8use React\EventLoop\Tick\FutureTickQueue;
9use React\EventLoop\Timer\Timer;
10use SplObjectStorage;
11
12/**
13 * An `ext-event` based event loop.
14 *
15 * This uses the [`event` PECL extension](https://pecl.php.net/package/event),
16 * that provides an interface to `libevent` library.
17 * `libevent` itself supports a number of system-specific backends (epoll, kqueue).
18 *
19 * This loop is known to work with PHP 5.4 through PHP 8+.
20 *
21 * @link https://pecl.php.net/package/event
22 */
23final class ExtEventLoop implements LoopInterface
24{
25 private $eventBase;
26 private $futureTickQueue;
27 private $timerCallback;
28 private $timerEvents;
29 private $streamCallback;
30 private $readEvents = array();
31 private $writeEvents = array();
32 private $readListeners = array();
33 private $writeListeners = array();
34 private $readRefs = array();
35 private $writeRefs = array();
36 private $running;
37 private $signals;
38 private $signalEvents = array();
39
40 public function __construct()
41 {
42 if (!\class_exists('EventBase', false)) {
43 throw new BadMethodCallException('Cannot create ExtEventLoop, ext-event extension missing');
44 }
45
46 // support arbitrary file descriptors and not just sockets
47 // Windows only has limited file descriptor support, so do not require this (will fail otherwise)
48 // @link http://www.wangafu.net/~nickm/libevent-book/Ref2_eventbase.html#_setting_up_a_complicated_event_base
49 $config = new \EventConfig();
50 if (\DIRECTORY_SEPARATOR !== '\\') {
51 $config->requireFeatures(\EventConfig::FEATURE_FDS);
52 }
53
54 $this->eventBase = new EventBase($config);
55 $this->futureTickQueue = new FutureTickQueue();
56 $this->timerEvents = new SplObjectStorage();
57 $this->signals = new SignalsHandler();
58
59 $this->createTimerCallback();
60 $this->createStreamCallback();
61 }
62
63 public function __destruct()
64 {
65 // explicitly clear all references to Event objects to prevent SEGFAULTs on Windows
66 foreach ($this->timerEvents as $timer) {
67 $this->timerEvents->detach($timer);
68 }
69
70 $this->readEvents = array();
71 $this->writeEvents = array();
72 }
73
74 public function addReadStream($stream, $listener)
75 {
76 $key = (int) $stream;
77 if (isset($this->readListeners[$key])) {
78 return;
79 }
80
81 $event = new Event($this->eventBase, $stream, Event::PERSIST | Event::READ, $this->streamCallback);
82 $event->add();
83 $this->readEvents[$key] = $event;
84 $this->readListeners[$key] = $listener;
85
86 // ext-event does not increase refcount on stream resources for PHP 7+
87 // manually keep track of stream resource to prevent premature garbage collection
88 if (\PHP_VERSION_ID >= 70000) {
89 $this->readRefs[$key] = $stream;
90 }
91 }
92
93 public function addWriteStream($stream, $listener)
94 {
95 $key = (int) $stream;
96 if (isset($this->writeListeners[$key])) {
97 return;
98 }
99
100 $event = new Event($this->eventBase, $stream, Event::PERSIST | Event::WRITE, $this->streamCallback);
101 $event->add();
102 $this->writeEvents[$key] = $event;
103 $this->writeListeners[$key] = $listener;
104
105 // ext-event does not increase refcount on stream resources for PHP 7+
106 // manually keep track of stream resource to prevent premature garbage collection
107 if (\PHP_VERSION_ID >= 70000) {
108 $this->writeRefs[$key] = $stream;
109 }
110 }
111
112 public function removeReadStream($stream)
113 {
114 $key = (int) $stream;
115
116 if (isset($this->readEvents[$key])) {
117 $this->readEvents[$key]->free();
118 unset(
119 $this->readEvents[$key],
120 $this->readListeners[$key],
121 $this->readRefs[$key]
122 );
123 }
124 }
125
126 public function removeWriteStream($stream)
127 {
128 $key = (int) $stream;
129
130 if (isset($this->writeEvents[$key])) {
131 $this->writeEvents[$key]->free();
132 unset(
133 $this->writeEvents[$key],
134 $this->writeListeners[$key],
135 $this->writeRefs[$key]
136 );
137 }
138 }
139
140 public function addTimer($interval, $callback)
141 {
142 $timer = new Timer($interval, $callback, false);
143
144 $this->scheduleTimer($timer);
145
146 return $timer;
147 }
148
149 public function addPeriodicTimer($interval, $callback)
150 {
151 $timer = new Timer($interval, $callback, true);
152
153 $this->scheduleTimer($timer);
154
155 return $timer;
156 }
157
158 public function cancelTimer(TimerInterface $timer)
159 {
160 if ($this->timerEvents->contains($timer)) {
161 $this->timerEvents[$timer]->free();
162 $this->timerEvents->detach($timer);
163 }
164 }
165
166 public function futureTick($listener)
167 {
168 $this->futureTickQueue->add($listener);
169 }
170
171 public function addSignal($signal, $listener)
172 {
173 $this->signals->add($signal, $listener);
174
175 if (!isset($this->signalEvents[$signal])) {
176 $this->signalEvents[$signal] = Event::signal($this->eventBase, $signal, array($this->signals, 'call'));
177 $this->signalEvents[$signal]->add();
178 }
179 }
180
181 public function removeSignal($signal, $listener)
182 {
183 $this->signals->remove($signal, $listener);
184
185 if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) {
186 $this->signalEvents[$signal]->free();
187 unset($this->signalEvents[$signal]);
188 }
189 }
190
191 public function run()
192 {
193 $this->running = true;
194
195 while ($this->running) {
196 $this->futureTickQueue->tick();
197
198 $flags = EventBase::LOOP_ONCE;
199 if (!$this->running || !$this->futureTickQueue->isEmpty()) {
200 $flags |= EventBase::LOOP_NONBLOCK;
201 } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count() && $this->signals->isEmpty()) {
202 break;
203 }
204
205 $this->eventBase->loop($flags);
206 }
207 }
208
209 public function stop()
210 {
211 $this->running = false;
212 }
213
214 /**
215 * Schedule a timer for execution.
216 *
217 * @param TimerInterface $timer
218 */
219 private function scheduleTimer(TimerInterface $timer)
220 {
221 $flags = Event::TIMEOUT;
222
223 if ($timer->isPeriodic()) {
224 $flags |= Event::PERSIST;
225 }
226
227 $event = new Event($this->eventBase, -1, $flags, $this->timerCallback, $timer);
228 $this->timerEvents[$timer] = $event;
229
230 $event->add($timer->getInterval());
231 }
232
233 /**
234 * Create a callback used as the target of timer events.
235 *
236 * A reference is kept to the callback for the lifetime of the loop
237 * to prevent "Cannot destroy active lambda function" fatal error from
238 * the event extension.
239 */
240 private function createTimerCallback()
241 {
242 $timers = $this->timerEvents;
243 $this->timerCallback = function ($_, $__, $timer) use ($timers) {
244 \call_user_func($timer->getCallback(), $timer);
245
246 if (!$timer->isPeriodic() && $timers->contains($timer)) {
247 $this->cancelTimer($timer);
248 }
249 };
250 }
251
252 /**
253 * Create a callback used as the target of stream events.
254 *
255 * A reference is kept to the callback for the lifetime of the loop
256 * to prevent "Cannot destroy active lambda function" fatal error from
257 * the event extension.
258 */
259 private function createStreamCallback()
260 {
261 $read =& $this->readListeners;
262 $write =& $this->writeListeners;
263 $this->streamCallback = function ($stream, $flags) use (&$read, &$write) {
264 $key = (int) $stream;
265
266 if (Event::READ === (Event::READ & $flags) && isset($read[$key])) {
267 \call_user_func($read[$key], $stream);
268 }
269
270 if (Event::WRITE === (Event::WRITE & $flags) && isset($write[$key])) {
271 \call_user_func($write[$key], $stream);
272 }
273 };
274 }
275}