friendship ended with social-app. php is my new best friend
1<?php 2 3namespace React\Http\Io; 4 5use Evenement\EventEmitter; 6use React\Stream\ReadableStreamInterface; 7use React\Stream\Util; 8use React\Stream\WritableStreamInterface; 9 10/** 11 * [Internal] Protects a given stream from actually closing and only discards its incoming data instead. 12 * 13 * This is used internally to prevent the underlying connection from closing, so 14 * that we can still send back a response over the same stream. 15 * 16 * @internal 17 * */ 18class CloseProtectionStream extends EventEmitter implements ReadableStreamInterface 19{ 20 private $input; 21 private $closed = false; 22 private $paused = false; 23 24 /** 25 * @param ReadableStreamInterface $input stream that will be discarded instead of closing it on an 'close' event. 26 */ 27 public function __construct(ReadableStreamInterface $input) 28 { 29 $this->input = $input; 30 31 $this->input->on('data', array($this, 'handleData')); 32 $this->input->on('end', array($this, 'handleEnd')); 33 $this->input->on('error', array($this, 'handleError')); 34 $this->input->on('close', array($this, 'close')); 35 } 36 37 public function isReadable() 38 { 39 return !$this->closed && $this->input->isReadable(); 40 } 41 42 public function pause() 43 { 44 if ($this->closed) { 45 return; 46 } 47 48 $this->paused = true; 49 $this->input->pause(); 50 } 51 52 public function resume() 53 { 54 if ($this->closed) { 55 return; 56 } 57 58 $this->paused = false; 59 $this->input->resume(); 60 } 61 62 public function pipe(WritableStreamInterface $dest, array $options = array()) 63 { 64 Util::pipe($this, $dest, $options); 65 66 return $dest; 67 } 68 69 public function close() 70 { 71 if ($this->closed) { 72 return; 73 } 74 75 $this->closed = true; 76 77 // stop listening for incoming events 78 $this->input->removeListener('data', array($this, 'handleData')); 79 $this->input->removeListener('error', array($this, 'handleError')); 80 $this->input->removeListener('end', array($this, 'handleEnd')); 81 $this->input->removeListener('close', array($this, 'close')); 82 83 // resume the stream to ensure we discard everything from incoming connection 84 if ($this->paused) { 85 $this->paused = false; 86 $this->input->resume(); 87 } 88 89 $this->emit('close'); 90 $this->removeAllListeners(); 91 } 92 93 /** @internal */ 94 public function handleData($data) 95 { 96 $this->emit('data', array($data)); 97 } 98 99 /** @internal */ 100 public function handleEnd() 101 { 102 $this->emit('end'); 103 $this->close(); 104 } 105 106 /** @internal */ 107 public function handleError(\Exception $e) 108 { 109 $this->emit('error', array($e)); 110 } 111}