Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v5.0.0-rc.1 3.5 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4let observableSymbol: string = [%raw 5 {| 6 typeof Symbol === 'function' 7 ? Symbol.observable || (Symbol.observable = Symbol('observable')) 8 : '@@observable' 9|} 10]; 11 12[@genType.import "../shims/Js.shim"] 13type observableSubscriptionT = {. [@bs.meth] "unsubscribe": unit => unit}; 14 15[@bs.set_index] 16external subscription_set: (observableSubscriptionT, string, bool) => unit; 17 18[@genType.import "../shims/Js.shim"] 19type observableObserverT('a) = { 20 . 21 [@bs.meth] "next": 'a => unit, 22 [@bs.meth] "error": Js.Exn.t => unit, 23 [@bs.meth] "complete": unit => unit, 24}; 25 26[@genType.import "../shims/Js.shim"] 27type observableT('a) = { 28 . 29 [@bs.meth] "subscribe": observableObserverT('a) => observableSubscriptionT, 30}; 31 32type observableFactoryT('a) = (. unit) => observableT('a); 33 34[@bs.get_index] 35external observable_get: 36 (observableT('a), string) => option(observableFactoryT('a)); 37[@bs.get_index] 38external observable_unsafe_get: 39 (observableT('a), string) => observableFactoryT('a); 40[@bs.set_index] 41external observable_set: 42 (observableT('a), string, unit => observableT('a)) => unit; 43 44[@genType] 45let fromObservable = (input: observableT('a)): sourceT('a) => { 46 let observable = 47 switch (input->observable_get(observableSymbol)) { 48 | Some(_) => (input->observable_unsafe_get(observableSymbol))(.) 49 | None => input 50 }; 51 52 curry(sink => { 53 let observer: observableObserverT('a) = 54 [@bs] 55 { 56 as _; 57 pub next = value => sink(. Push(value)); 58 pub complete = () => sink(. End); 59 pub error = _ => () 60 }; 61 62 let subscription = observable##subscribe(observer); 63 64 sink(. 65 Start( 66 (. signal) => 67 switch (signal) { 68 | Close => subscription##unsubscribe() 69 | _ => () 70 }, 71 ), 72 ); 73 }); 74}; 75 76type observableStateT = { 77 mutable talkback: (. talkbackT) => unit, 78 mutable ended: bool, 79}; 80 81[@genType] 82let toObservable = (source: sourceT('a)): observableT('a) => { 83 let observable: observableT('a) = 84 [@bs] 85 { 86 as _; 87 pub subscribe = 88 (_observer: observableObserverT('a)): observableSubscriptionT => { 89 let next: (. 'a) => unit = [%raw 90 {| 91 (typeof _observer === 'object' ? _observer.next.bind(_observer) : _observer) || function () {} 92 |} 93 ]; 94 95 let complete: (. unit) => unit = [%raw 96 {| 97 (typeof _observer === 'object' ? _observer.complete.bind(_observer) : arguments[2]) || function () {} 98 |} 99 ]; 100 101 let state: observableStateT = { 102 talkback: talkbackPlaceholder, 103 ended: false, 104 }; 105 106 source((. signal) => 107 switch (signal) { 108 | Start(x) => 109 state.talkback = x; 110 x(. Pull); 111 | Push(x) when !state.ended => 112 next(. x); 113 state.talkback(. Pull); 114 | Push(_) => () 115 | End => 116 state.ended = true; 117 complete(.); 118 } 119 ); 120 121 let subscription = 122 [@bs] 123 { 124 as self; 125 pub unsubscribe = () => 126 if (!state.ended) { 127 self->subscription_set("closed", false); 128 state.ended = true; 129 state.talkback(. Close); 130 } 131 }; 132 133 subscription->subscription_set("closed", false); 134 subscription; 135 } 136 }; 137 138 observable->observable_set(observableSymbol, () => observable); 139 observable; 140};