Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v3.2.1 2.7 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4let observableSymbol: string = [%raw 5 {| 6 typeof Symbol === 'function' 7 ? Symbol.observable || (Symbol.observable = Symbol('observable')) 8 : '@@observable' 9|} 10]; 11 12type subscriptionT = {. [@bs.meth] "unsubscribe": unit => unit}; 13 14type observerT('a) = { 15 . 16 [@bs.meth] "next": 'a => unit, 17 [@bs.meth] "error": Js.Exn.t => unit, 18 [@bs.meth] "complete": unit => unit, 19}; 20 21type observableT('a) = { 22 . 23 [@bs.meth] "subscribe": observerT('a) => subscriptionT, 24}; 25 26type observableFactoryT('a) = (. unit) => observableT('a); 27 28[@bs.get_index] 29external observable_get: 30 (observableT('a), string) => option(observableFactoryT('a)) = 31 ""; 32[@bs.get_index] 33external observable_unsafe_get: 34 (observableT('a), string) => observableFactoryT('a) = 35 ""; 36[@bs.set_index] 37external observable_set: 38 (observableT('a), string, unit => observableT('a)) => unit = 39 ""; 40 41let fromObservable = (input: observableT('a)): sourceT('a) => { 42 let observable = 43 switch (input->observable_get(observableSymbol)) { 44 | Some(_) => (input->observable_unsafe_get(observableSymbol))(.) 45 | None => input 46 }; 47 48 curry(sink => { 49 let observer: observerT('a) = 50 [@bs] 51 { 52 as _; 53 pub next = value => sink(. Push(value)); 54 pub complete = () => sink(. End); 55 pub error = _ => () 56 }; 57 58 let subscription = observable##subscribe(observer); 59 60 sink(. 61 Start( 62 (. signal) => 63 switch (signal) { 64 | Close => subscription##unsubscribe() 65 | _ => () 66 }, 67 ), 68 ); 69 }); 70}; 71 72type observableStateT = { 73 mutable talkback: (. talkbackT) => unit, 74 mutable ended: bool, 75}; 76 77let toObservable = (source: sourceT('a)): observableT('a) => { 78 let observable: observableT('a) = 79 [@bs] 80 { 81 as _; 82 pub subscribe = (observer: observerT('a)): subscriptionT => { 83 let state: observableStateT = { 84 talkback: talkbackPlaceholder, 85 ended: false, 86 }; 87 88 source((. signal) => 89 switch (signal) { 90 | Start(x) => 91 state.talkback = x; 92 x(. Pull); 93 | Push(x) when !state.ended => 94 observer##next(x); 95 state.talkback(. Pull); 96 | Push(_) => () 97 | End => 98 state.ended = true; 99 observer##complete(); 100 } 101 ); 102 103 [@bs] 104 { 105 as _; 106 pub unsubscribe = () => 107 if (!state.ended) { 108 state.ended = true; 109 state.talkback(. Close); 110 } 111 }; 112 } 113 }; 114 115 observable->observable_set(observableSymbol, () => observable); 116 observable; 117};