friendship ended with social-app. php is my new best friend
1<?php
2
3namespace React\Http\Io;
4
5use Evenement\EventEmitter;
6use React\Stream\ReadableStreamInterface;
7use React\Stream\Util;
8use React\Stream\WritableStreamInterface;
9
10/**
11 * [Internal] Protects a given stream from actually closing and only discards its incoming data instead.
12 *
13 * This is used internally to prevent the underlying connection from closing, so
14 * that we can still send back a response over the same stream.
15 *
16 * @internal
17 * */
18class CloseProtectionStream extends EventEmitter implements ReadableStreamInterface
19{
20 private $input;
21 private $closed = false;
22 private $paused = false;
23
24 /**
25 * @param ReadableStreamInterface $input stream that will be discarded instead of closing it on an 'close' event.
26 */
27 public function __construct(ReadableStreamInterface $input)
28 {
29 $this->input = $input;
30
31 $this->input->on('data', array($this, 'handleData'));
32 $this->input->on('end', array($this, 'handleEnd'));
33 $this->input->on('error', array($this, 'handleError'));
34 $this->input->on('close', array($this, 'close'));
35 }
36
37 public function isReadable()
38 {
39 return !$this->closed && $this->input->isReadable();
40 }
41
42 public function pause()
43 {
44 if ($this->closed) {
45 return;
46 }
47
48 $this->paused = true;
49 $this->input->pause();
50 }
51
52 public function resume()
53 {
54 if ($this->closed) {
55 return;
56 }
57
58 $this->paused = false;
59 $this->input->resume();
60 }
61
62 public function pipe(WritableStreamInterface $dest, array $options = array())
63 {
64 Util::pipe($this, $dest, $options);
65
66 return $dest;
67 }
68
69 public function close()
70 {
71 if ($this->closed) {
72 return;
73 }
74
75 $this->closed = true;
76
77 // stop listening for incoming events
78 $this->input->removeListener('data', array($this, 'handleData'));
79 $this->input->removeListener('error', array($this, 'handleError'));
80 $this->input->removeListener('end', array($this, 'handleEnd'));
81 $this->input->removeListener('close', array($this, 'close'));
82
83 // resume the stream to ensure we discard everything from incoming connection
84 if ($this->paused) {
85 $this->paused = false;
86 $this->input->resume();
87 }
88
89 $this->emit('close');
90 $this->removeAllListeners();
91 }
92
93 /** @internal */
94 public function handleData($data)
95 {
96 $this->emit('data', array($data));
97 }
98
99 /** @internal */
100 public function handleEnd()
101 {
102 $this->emit('end');
103 $this->close();
104 }
105
106 /** @internal */
107 public function handleError(\Exception $e)
108 {
109 $this->emit('error', array($e));
110 }
111}