friendship ended with social-app. php is my new best friend
1<?php
2
3namespace React\Http\Io;
4
5use Evenement\EventEmitter;
6use Psr\Http\Message\StreamInterface;
7use React\Stream\ReadableStreamInterface;
8use React\Stream\Util;
9use React\Stream\WritableStreamInterface;
10
11/**
12 * @internal
13 */
14class ReadableBodyStream extends EventEmitter implements ReadableStreamInterface, StreamInterface
15{
16 private $input;
17 private $position = 0;
18 private $size;
19 private $closed = false;
20
21 public function __construct(ReadableStreamInterface $input, $size = null)
22 {
23 $this->input = $input;
24 $this->size = $size;
25
26 $that = $this;
27 $pos =& $this->position;
28 $input->on('data', function ($data) use ($that, &$pos, $size) {
29 $that->emit('data', array($data));
30
31 $pos += \strlen($data);
32 if ($size !== null && $pos >= $size) {
33 $that->handleEnd();
34 }
35 });
36 $input->on('error', function ($error) use ($that) {
37 $that->emit('error', array($error));
38 $that->close();
39 });
40 $input->on('end', array($that, 'handleEnd'));
41 $input->on('close', array($that, 'close'));
42 }
43
44 public function close()
45 {
46 if (!$this->closed) {
47 $this->closed = true;
48 $this->input->close();
49
50 $this->emit('close');
51 $this->removeAllListeners();
52 }
53 }
54
55 public function isReadable()
56 {
57 return $this->input->isReadable();
58 }
59
60 public function pause()
61 {
62 $this->input->pause();
63 }
64
65 public function resume()
66 {
67 $this->input->resume();
68 }
69
70 public function pipe(WritableStreamInterface $dest, array $options = array())
71 {
72 Util::pipe($this, $dest, $options);
73
74 return $dest;
75 }
76
77 public function eof()
78 {
79 return !$this->isReadable();
80 }
81
82 public function __toString()
83 {
84 return '';
85 }
86
87 public function detach()
88 {
89 throw new \BadMethodCallException();
90 }
91
92 public function getSize()
93 {
94 return $this->size;
95 }
96
97 public function tell()
98 {
99 throw new \BadMethodCallException();
100 }
101
102 public function isSeekable()
103 {
104 return false;
105 }
106
107 public function seek($offset, $whence = SEEK_SET)
108 {
109 throw new \BadMethodCallException();
110 }
111
112 public function rewind()
113 {
114 throw new \BadMethodCallException();
115 }
116
117 public function isWritable()
118 {
119 return false;
120 }
121
122 public function write($string)
123 {
124 throw new \BadMethodCallException();
125 }
126
127 public function read($length)
128 {
129 throw new \BadMethodCallException();
130 }
131
132 public function getContents()
133 {
134 throw new \BadMethodCallException();
135 }
136
137 public function getMetadata($key = null)
138 {
139 return ($key === null) ? array() : null;
140 }
141
142 /** @internal */
143 public function handleEnd()
144 {
145 if ($this->position !== $this->size && $this->size !== null) {
146 $this->emit('error', array(new \UnderflowException('Unexpected end of response body after ' . $this->position . '/' . $this->size . ' bytes')));
147 } else {
148 $this->emit('end');
149 }
150
151 $this->close();
152 }
153}