Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.3 2.9 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4let observableSymbol: string = [%raw 5 {| 6 typeof Symbol === 'function' 7 ? Symbol.observable || (Symbol.observable = Symbol('observable')) 8 : '@@observable' 9|} 10]; 11 12[@genType.import "../shims/Js.shim"] 13type observableSubscriptionT = {. [@bs.meth] "unsubscribe": unit => unit}; 14 15[@genType.import "../shims/Js.shim"] 16type observableObserverT('a) = { 17 . 18 [@bs.meth] "next": 'a => unit, 19 [@bs.meth] "error": Js.Exn.t => unit, 20 [@bs.meth] "complete": unit => unit, 21}; 22 23[@genType.import "../shims/Js.shim"] 24type observableT('a) = { 25 . 26 [@bs.meth] "subscribe": observableObserverT('a) => observableSubscriptionT, 27}; 28 29type observableFactoryT('a) = (. unit) => observableT('a); 30 31[@bs.get_index] 32external observable_get: 33 (observableT('a), string) => option(observableFactoryT('a)) = 34 ""; 35[@bs.get_index] 36external observable_unsafe_get: 37 (observableT('a), string) => observableFactoryT('a) = 38 ""; 39[@bs.set_index] 40external observable_set: 41 (observableT('a), string, unit => observableT('a)) => unit = 42 ""; 43 44[@genType] 45let fromObservable = (input: observableT('a)): sourceT('a) => { 46 let observable = 47 switch (input->observable_get(observableSymbol)) { 48 | Some(_) => (input->observable_unsafe_get(observableSymbol))(.) 49 | None => input 50 }; 51 52 curry(sink => { 53 let observer: observableObserverT('a) = 54 [@bs] 55 { 56 as _; 57 pub next = value => sink(. Push(value)); 58 pub complete = () => sink(. End); 59 pub error = _ => () 60 }; 61 62 let subscription = observable##subscribe(observer); 63 64 sink(. 65 Start( 66 (. signal) => 67 switch (signal) { 68 | Close => subscription##unsubscribe() 69 | _ => () 70 }, 71 ), 72 ); 73 }); 74}; 75 76type observableStateT = { 77 mutable talkback: (. talkbackT) => unit, 78 mutable ended: bool, 79}; 80 81[@genType] 82let toObservable = (source: sourceT('a)): observableT('a) => { 83 let observable: observableT('a) = 84 [@bs] 85 { 86 as _; 87 pub subscribe = 88 (observer: observableObserverT('a)): observableSubscriptionT => { 89 let state: observableStateT = { 90 talkback: talkbackPlaceholder, 91 ended: false, 92 }; 93 94 source((. signal) => 95 switch (signal) { 96 | Start(x) => 97 state.talkback = x; 98 x(. Pull); 99 | Push(x) when !state.ended => 100 observer##next(x); 101 state.talkback(. Pull); 102 | Push(_) => () 103 | End => 104 state.ended = true; 105 observer##complete(); 106 } 107 ); 108 109 [@bs] 110 { 111 as _; 112 pub unsubscribe = () => 113 if (!state.ended) { 114 state.ended = true; 115 state.talkback(. Close); 116 } 117 }; 118 } 119 }; 120 121 observable->observable_set(observableSymbol, () => observable); 122 observable; 123};