Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.6 3.0 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4[@genType] 5let fromArray = (arr: array('a)): sourceT('a) => 6 curry(sink => { 7 let size = Rebel.Array.size(arr); 8 let index = ref(0); 9 10 makeTrampoline(sink, (.) => 11 if (index^ < size) { 12 let x = Rebel.Array.getUnsafe(arr, index^); 13 index := index^ + 1; 14 Some(x); 15 } else { 16 None; 17 } 18 ); 19 }); 20 21[@genType] 22let fromList = (ls: list('a)): sourceT('a) => 23 curry(sink => { 24 let value = ref(ls); 25 26 makeTrampoline(sink, (.) => 27 switch (value^) { 28 | [x, ...rest] => 29 value := rest; 30 Some(x); 31 | [] => None 32 } 33 ); 34 }); 35 36[@genType] 37let fromValue = (x: 'a): sourceT('a) => 38 curry(sink => { 39 let ended = ref(false); 40 41 sink(. 42 Start( 43 (. signal) => 44 switch (signal) { 45 | Pull when ! ended^ => 46 ended := true; 47 sink(. Push(x)); 48 sink(. End); 49 | Pull => () 50 | Close => ended := true 51 }, 52 ), 53 ); 54 }); 55 56type makeStateT = { 57 mutable teardown: (. unit) => unit, 58 mutable ended: bool, 59}; 60 61[@genType] 62let make = (f: (. observerT('a)) => teardownT): sourceT('a) => 63 curry(sink => { 64 let state: makeStateT = {teardown: (.) => (), ended: false}; 65 66 state.teardown = 67 f(. { 68 next: value => 69 if (!state.ended) { 70 sink(. Push(value)); 71 }, 72 complete: () => 73 if (!state.ended) { 74 state.ended = true; 75 sink(. End); 76 }, 77 }); 78 79 sink(. 80 Start( 81 (. signal) => 82 switch (signal) { 83 | Close when !state.ended => 84 state.ended = true; 85 state.teardown(.); 86 | _ => () 87 }, 88 ), 89 ); 90 }); 91 92type subjectState('a) = { 93 mutable sinks: Rebel.Array.t(sinkT('a)), 94 mutable ended: bool, 95}; 96 97[@genType] 98let makeSubject = (): subjectT('a) => { 99 let state: subjectState('a) = { 100 sinks: Rebel.Array.makeEmpty(), 101 ended: false, 102 }; 103 104 let source = sink => { 105 state.sinks = Rebel.Array.append(state.sinks, sink); 106 sink(. 107 Start( 108 (. signal) => 109 switch (signal) { 110 | Close => 111 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink) 112 | _ => () 113 }, 114 ), 115 ); 116 }; 117 118 let next = value => 119 if (!state.ended) { 120 Rebel.Array.forEach(state.sinks, sink => sink(. Push(value))); 121 }; 122 123 let complete = () => 124 if (!state.ended) { 125 state.ended = true; 126 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 127 }; 128 129 {source, next, complete}; 130}; 131 132[@genType] 133let empty = (sink: sinkT('a)): unit => { 134 let ended = ref(false); 135 sink(. 136 Start( 137 (. signal) => { 138 switch (signal) { 139 | Close => ended := true 140 | Pull when ! ended^ => sink(. End) 141 | _ => () 142 } 143 }, 144 ), 145 ); 146}; 147 148[@genType] 149let never = (sink: sinkT('a)): unit => { 150 sink(. Start(talkbackPlaceholder)); 151};