friendship ended with social-app. php is my new best friend
1<?php 2 3namespace React\Http\Io; 4 5use Evenement\EventEmitter; 6use React\Stream\ReadableStreamInterface; 7use React\Stream\Util; 8use React\Stream\WritableStreamInterface; 9 10/** 11 * [Internal] Pauses a given stream and buffers all events while paused 12 * 13 * This class is used to buffer all events that happen on a given stream while 14 * it is paused. This allows you to pause a stream and no longer watch for any 15 * of its events. Once the stream is resumed, all buffered events will be 16 * emitted. Explicitly closing the resulting stream clears all buffers. 17 * 18 * Note that this is an internal class only and nothing you should usually care 19 * about. 20 * 21 * @see ReadableStreamInterface 22 * @internal 23 */ 24class PauseBufferStream extends EventEmitter implements ReadableStreamInterface 25{ 26 private $input; 27 private $closed = false; 28 private $paused = false; 29 private $dataPaused = ''; 30 private $endPaused = false; 31 private $closePaused = false; 32 private $errorPaused; 33 private $implicit = false; 34 35 public function __construct(ReadableStreamInterface $input) 36 { 37 $this->input = $input; 38 39 $this->input->on('data', array($this, 'handleData')); 40 $this->input->on('end', array($this, 'handleEnd')); 41 $this->input->on('error', array($this, 'handleError')); 42 $this->input->on('close', array($this, 'handleClose')); 43 } 44 45 /** 46 * pause and remember this was not explicitly from user control 47 * 48 * @internal 49 */ 50 public function pauseImplicit() 51 { 52 $this->pause(); 53 $this->implicit = true; 54 } 55 56 /** 57 * resume only if this was previously paused implicitly and not explicitly from user control 58 * 59 * @internal 60 */ 61 public function resumeImplicit() 62 { 63 if ($this->implicit) { 64 $this->resume(); 65 } 66 } 67 68 public function isReadable() 69 { 70 return !$this->closed; 71 } 72 73 public function pause() 74 { 75 if ($this->closed) { 76 return; 77 } 78 79 $this->input->pause(); 80 $this->paused = true; 81 $this->implicit = false; 82 } 83 84 public function resume() 85 { 86 if ($this->closed) { 87 return; 88 } 89 90 $this->paused = false; 91 $this->implicit = false; 92 93 if ($this->dataPaused !== '') { 94 $this->emit('data', array($this->dataPaused)); 95 $this->dataPaused = ''; 96 } 97 98 if ($this->errorPaused) { 99 $this->emit('error', array($this->errorPaused)); 100 return $this->close(); 101 } 102 103 if ($this->endPaused) { 104 $this->endPaused = false; 105 $this->emit('end'); 106 return $this->close(); 107 } 108 109 if ($this->closePaused) { 110 $this->closePaused = false; 111 return $this->close(); 112 } 113 114 $this->input->resume(); 115 } 116 117 public function pipe(WritableStreamInterface $dest, array $options = array()) 118 { 119 Util::pipe($this, $dest, $options); 120 121 return $dest; 122 } 123 124 public function close() 125 { 126 if ($this->closed) { 127 return; 128 } 129 130 $this->closed = true; 131 $this->dataPaused = ''; 132 $this->endPaused = $this->closePaused = false; 133 $this->errorPaused = null; 134 135 $this->input->close(); 136 137 $this->emit('close'); 138 $this->removeAllListeners(); 139 } 140 141 /** @internal */ 142 public function handleData($data) 143 { 144 if ($this->paused) { 145 $this->dataPaused .= $data; 146 return; 147 } 148 149 $this->emit('data', array($data)); 150 } 151 152 /** @internal */ 153 public function handleError(\Exception $e) 154 { 155 if ($this->paused) { 156 $this->errorPaused = $e; 157 return; 158 } 159 160 $this->emit('error', array($e)); 161 $this->close(); 162 } 163 164 /** @internal */ 165 public function handleEnd() 166 { 167 if ($this->paused) { 168 $this->endPaused = true; 169 return; 170 } 171 172 if (!$this->closed) { 173 $this->emit('end'); 174 $this->close(); 175 } 176 } 177 178 /** @internal */ 179 public function handleClose() 180 { 181 if ($this->paused) { 182 $this->closePaused = true; 183 return; 184 } 185 186 $this->close(); 187 } 188}