Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.0-rc.2 3.0 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4[@genType] 5let fromArray = (arr: array('a)): sourceT('a) => 6 curry(sink => { 7 let size = Rebel.Array.size(arr); 8 let index = ref(0); 9 10 makeTrampoline(sink, (.) => 11 if (index^ < size) { 12 let x = Rebel.Array.getUnsafe(arr, index^); 13 index := index^ + 1; 14 Some(x); 15 } else { 16 None; 17 } 18 ); 19 }); 20 21[@genType] 22let fromList = (ls: list('a)): sourceT('a) => 23 curry(sink => { 24 let value = ref(ls); 25 26 makeTrampoline(sink, (.) => 27 switch (value^) { 28 | [x, ...rest] => 29 value := rest; 30 Some(x); 31 | [] => None 32 } 33 ); 34 }); 35 36[@genType] 37let fromValue = (x: 'a): sourceT('a) => 38 curry(sink => { 39 let ended = ref(false); 40 41 sink(. 42 Start( 43 (. signal) => 44 switch (signal) { 45 | Pull when ! ended^ => 46 ended := true; 47 sink(. Push(x)); 48 sink(. End); 49 | Pull => () 50 | Close => ended := true 51 }, 52 ), 53 ); 54 }); 55 56type makeStateT = { 57 mutable teardown: (. unit) => unit, 58 mutable ended: bool, 59}; 60 61[@genType] 62let make = (f: (. observerT('a)) => teardownT): sourceT('a) => 63 curry(sink => { 64 let state: makeStateT = {teardown: (.) => (), ended: false}; 65 66 sink(. 67 Start( 68 (. signal) => 69 switch (signal) { 70 | Close when !state.ended => 71 state.ended = true; 72 state.teardown(.); 73 | _ => () 74 }, 75 ), 76 ); 77 78 state.teardown = 79 f(. { 80 next: value => 81 if (!state.ended) { 82 sink(. Push(value)); 83 }, 84 complete: () => 85 if (!state.ended) { 86 state.ended = true; 87 sink(. End); 88 }, 89 }); 90 }); 91 92type subjectState('a) = { 93 mutable sinks: Rebel.Array.t(sinkT('a)), 94 mutable ended: bool, 95}; 96 97[@genType] 98let makeSubject = (): subjectT('a) => { 99 let state: subjectState('a) = { 100 sinks: Rebel.Array.makeEmpty(), 101 ended: false, 102 }; 103 104 let source = sink => { 105 state.sinks = Rebel.Array.append(state.sinks, sink); 106 sink(. 107 Start( 108 (. signal) => 109 switch (signal) { 110 | Close => 111 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink) 112 | _ => () 113 }, 114 ), 115 ); 116 }; 117 118 let next = value => 119 if (!state.ended) { 120 Rebel.Array.forEach(state.sinks, sink => sink(. Push(value))); 121 }; 122 123 let complete = () => 124 if (!state.ended) { 125 state.ended = true; 126 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 127 }; 128 129 {source, next, complete}; 130}; 131 132[@genType] 133let empty = (sink: sinkT('a)): unit => { 134 let ended = ref(false); 135 sink(. 136 Start( 137 (. signal) => { 138 switch (signal) { 139 | Close => ended := true 140 | _ => () 141 } 142 }, 143 ), 144 ); 145 if (! ended^) { 146 sink(. End); 147 }; 148}; 149 150[@genType] 151let never = (sink: sinkT('a)): unit => { 152 sink(. Start(talkbackPlaceholder)); 153};