friendship ended with social-app. php is my new best friend
1<?php
2
3namespace React\Http\Io;
4
5use Evenement\EventEmitter;
6use React\Stream\ReadableStreamInterface;
7use React\Stream\Util;
8use React\Stream\WritableStreamInterface;
9
10/**
11 * [Internal] Pauses a given stream and buffers all events while paused
12 *
13 * This class is used to buffer all events that happen on a given stream while
14 * it is paused. This allows you to pause a stream and no longer watch for any
15 * of its events. Once the stream is resumed, all buffered events will be
16 * emitted. Explicitly closing the resulting stream clears all buffers.
17 *
18 * Note that this is an internal class only and nothing you should usually care
19 * about.
20 *
21 * @see ReadableStreamInterface
22 * @internal
23 */
24class PauseBufferStream extends EventEmitter implements ReadableStreamInterface
25{
26 private $input;
27 private $closed = false;
28 private $paused = false;
29 private $dataPaused = '';
30 private $endPaused = false;
31 private $closePaused = false;
32 private $errorPaused;
33 private $implicit = false;
34
35 public function __construct(ReadableStreamInterface $input)
36 {
37 $this->input = $input;
38
39 $this->input->on('data', array($this, 'handleData'));
40 $this->input->on('end', array($this, 'handleEnd'));
41 $this->input->on('error', array($this, 'handleError'));
42 $this->input->on('close', array($this, 'handleClose'));
43 }
44
45 /**
46 * pause and remember this was not explicitly from user control
47 *
48 * @internal
49 */
50 public function pauseImplicit()
51 {
52 $this->pause();
53 $this->implicit = true;
54 }
55
56 /**
57 * resume only if this was previously paused implicitly and not explicitly from user control
58 *
59 * @internal
60 */
61 public function resumeImplicit()
62 {
63 if ($this->implicit) {
64 $this->resume();
65 }
66 }
67
68 public function isReadable()
69 {
70 return !$this->closed;
71 }
72
73 public function pause()
74 {
75 if ($this->closed) {
76 return;
77 }
78
79 $this->input->pause();
80 $this->paused = true;
81 $this->implicit = false;
82 }
83
84 public function resume()
85 {
86 if ($this->closed) {
87 return;
88 }
89
90 $this->paused = false;
91 $this->implicit = false;
92
93 if ($this->dataPaused !== '') {
94 $this->emit('data', array($this->dataPaused));
95 $this->dataPaused = '';
96 }
97
98 if ($this->errorPaused) {
99 $this->emit('error', array($this->errorPaused));
100 return $this->close();
101 }
102
103 if ($this->endPaused) {
104 $this->endPaused = false;
105 $this->emit('end');
106 return $this->close();
107 }
108
109 if ($this->closePaused) {
110 $this->closePaused = false;
111 return $this->close();
112 }
113
114 $this->input->resume();
115 }
116
117 public function pipe(WritableStreamInterface $dest, array $options = array())
118 {
119 Util::pipe($this, $dest, $options);
120
121 return $dest;
122 }
123
124 public function close()
125 {
126 if ($this->closed) {
127 return;
128 }
129
130 $this->closed = true;
131 $this->dataPaused = '';
132 $this->endPaused = $this->closePaused = false;
133 $this->errorPaused = null;
134
135 $this->input->close();
136
137 $this->emit('close');
138 $this->removeAllListeners();
139 }
140
141 /** @internal */
142 public function handleData($data)
143 {
144 if ($this->paused) {
145 $this->dataPaused .= $data;
146 return;
147 }
148
149 $this->emit('data', array($data));
150 }
151
152 /** @internal */
153 public function handleError(\Exception $e)
154 {
155 if ($this->paused) {
156 $this->errorPaused = $e;
157 return;
158 }
159
160 $this->emit('error', array($e));
161 $this->close();
162 }
163
164 /** @internal */
165 public function handleEnd()
166 {
167 if ($this->paused) {
168 $this->endPaused = true;
169 return;
170 }
171
172 if (!$this->closed) {
173 $this->emit('end');
174 $this->close();
175 }
176 }
177
178 /** @internal */
179 public function handleClose()
180 {
181 if ($this->paused) {
182 $this->closePaused = true;
183 return;
184 }
185
186 $this->close();
187 }
188}