Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.11 4.3 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type trampolineT('a) = { 5 mutable ended: bool, 6 mutable looping: bool, 7 mutable pulled: bool, 8 mutable current: 'a, 9}; 10 11[@genType] 12let fromArray = (arr: array('a)): sourceT('a) => 13 curry(sink => { 14 let size = Rebel.Array.size(arr); 15 let state = {ended: false, looping: false, pulled: false, current: 0}; 16 17 sink(. 18 Start( 19 (. signal) => 20 switch (signal, state.looping) { 21 | (Pull, false) => 22 state.pulled = true; 23 state.looping = true; 24 25 while (state.pulled && !state.ended) { 26 if (state.current < size) { 27 let x = Rebel.Array.getUnsafe(arr, state.current); 28 state.current = state.current + 1; 29 state.pulled = false; 30 sink(. Push(x)); 31 } else { 32 state.ended = true; 33 sink(. End); 34 }; 35 }; 36 37 state.looping = false; 38 | (Pull, true) => state.pulled = true 39 | (Close, _) => state.ended = true 40 }, 41 ), 42 ); 43 }); 44 45[@genType] 46let fromList = (ls: list('a)): sourceT('a) => 47 curry(sink => { 48 let state = {ended: false, looping: false, pulled: false, current: ls}; 49 50 sink(. 51 Start( 52 (. signal) => 53 switch (signal, state.looping) { 54 | (Pull, false) => 55 state.pulled = true; 56 state.looping = true; 57 58 while (state.pulled && !state.ended) { 59 switch (state.current) { 60 | [x, ...rest] => 61 state.current = rest; 62 state.pulled = false; 63 sink(. Push(x)); 64 | [] => 65 state.ended = true; 66 sink(. End); 67 }; 68 }; 69 70 state.looping = false; 71 | (Pull, true) => state.pulled = true 72 | (Close, _) => state.ended = true 73 }, 74 ), 75 ); 76 }); 77 78[@genType] 79let fromValue = (x: 'a): sourceT('a) => 80 curry(sink => { 81 let ended = ref(false); 82 83 sink(. 84 Start( 85 (. signal) => 86 switch (signal) { 87 | Pull when ! ended^ => 88 ended := true; 89 sink(. Push(x)); 90 sink(. End); 91 | Pull => () 92 | Close => ended := true 93 }, 94 ), 95 ); 96 }); 97 98type makeStateT = { 99 mutable teardown: (. unit) => unit, 100 mutable ended: bool, 101}; 102 103[@genType] 104let make = (f: (. observerT('a)) => teardownT): sourceT('a) => 105 curry(sink => { 106 let state: makeStateT = {teardown: (.) => (), ended: false}; 107 108 state.teardown = 109 f(. { 110 next: value => 111 if (!state.ended) { 112 sink(. Push(value)); 113 }, 114 complete: () => 115 if (!state.ended) { 116 state.ended = true; 117 sink(. End); 118 }, 119 }); 120 121 sink(. 122 Start( 123 (. signal) => 124 switch (signal) { 125 | Close when !state.ended => 126 state.ended = true; 127 state.teardown(.); 128 | _ => () 129 }, 130 ), 131 ); 132 }); 133 134type subjectState('a) = { 135 mutable sinks: Rebel.Array.t(sinkT('a)), 136 mutable ended: bool, 137}; 138 139[@genType] 140let makeSubject = (): subjectT('a) => { 141 let state: subjectState('a) = { 142 sinks: Rebel.Array.makeEmpty(), 143 ended: false, 144 }; 145 146 let source = sink => { 147 state.sinks = Rebel.Array.append(state.sinks, sink); 148 sink(. 149 Start( 150 (. signal) => 151 switch (signal) { 152 | Close => 153 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink) 154 | _ => () 155 }, 156 ), 157 ); 158 }; 159 160 let next = value => 161 if (!state.ended) { 162 Rebel.Array.forEach(state.sinks, sink => sink(. Push(value))); 163 }; 164 165 let complete = () => 166 if (!state.ended) { 167 state.ended = true; 168 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 169 }; 170 171 {source, next, complete}; 172}; 173 174[@genType] 175let empty = (sink: sinkT('a)): unit => { 176 let ended = ref(false); 177 sink(. 178 Start( 179 (. signal) => { 180 switch (signal) { 181 | Close => ended := true 182 | Pull when ! ended^ => sink(. End) 183 | _ => () 184 } 185 }, 186 ), 187 ); 188}; 189 190[@genType] 191let never = (sink: sinkT('a)): unit => { 192 sink(. Start(talkbackPlaceholder)); 193};