Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.0-rc.0 2.4 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4[@genType] 5let fromArray = (arr: array('a)): sourceT('a) => 6 curry(sink => { 7 let size = Rebel.Array.size(arr); 8 let index = ref(0); 9 10 makeTrampoline(sink, (.) => 11 if (index^ < size) { 12 let x = Rebel.Array.getUnsafe(arr, index^); 13 index := index^ + 1; 14 Some(x); 15 } else { 16 None; 17 } 18 ); 19 }); 20 21[@genType] 22let fromList = (ls: list('a)): sourceT('a) => 23 curry(sink => { 24 let value = ref(ls); 25 26 makeTrampoline(sink, (.) => 27 switch (value^) { 28 | [x, ...rest] => 29 value := rest; 30 Some(x); 31 | [] => None 32 } 33 ); 34 }); 35 36[@genType] 37let fromValue = (x: 'a): sourceT('a) => 38 curry(sink => { 39 let ended = ref(false); 40 41 sink(. 42 Start( 43 (. signal) => 44 switch (signal) { 45 | Pull when ! ended^ => 46 ended := true; 47 sink(. Push(x)); 48 sink(. End); 49 | _ => () 50 }, 51 ), 52 ); 53 }); 54 55[@genType] 56let make = (f: (. observerT('a)) => teardownT): sourceT('a) => 57 curry(sink => { 58 let teardown = ref((.) => ()); 59 60 sink(. 61 Start( 62 (. signal) => 63 switch (signal) { 64 | Close => teardown^(.) 65 | Pull => () 66 }, 67 ), 68 ); 69 70 teardown := 71 f(. { 72 next: value => sink(. Push(value)), 73 complete: () => sink(. End), 74 }); 75 }); 76 77type subjectState('a) = { 78 mutable sinks: Rebel.Array.t(sinkT('a)), 79 mutable ended: bool, 80}; 81 82[@genType] 83let makeSubject = (): subjectT('a) => { 84 let state: subjectState('a) = { 85 sinks: Rebel.Array.makeEmpty(), 86 ended: false, 87 }; 88 89 let source = sink => { 90 state.sinks = Rebel.Array.append(state.sinks, sink); 91 sink(. 92 Start( 93 (. signal) => 94 if (signal === Close) { 95 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 96 }, 97 ), 98 ); 99 }; 100 101 let next = value => 102 if (!state.ended) { 103 Rebel.Array.forEach(state.sinks, sink => sink(. Push(value))); 104 }; 105 106 let complete = () => 107 if (!state.ended) { 108 state.ended = true; 109 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 110 }; 111 112 {source, next, complete}; 113}; 114 115[@genType] 116let empty = (sink: sinkT('a)): unit => { 117 sink(. Start(talkbackPlaceholder)); 118 sink(. End); 119}; 120 121[@genType] 122let never = (sink: sinkT('a)): unit => { 123 sink(. Start(talkbackPlaceholder)); 124};