Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v3.2.0 1.5 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type takeUntilStateT = { 5 mutable ended: bool, 6 mutable sourceTalkback: (. talkbackT) => unit, 7 mutable notifierTalkback: (. talkbackT) => unit, 8}; 9 10let takeUntil = notifier => 11 curry(source => 12 curry(sink => { 13 let state: takeUntilStateT = { 14 ended: false, 15 sourceTalkback: talkbackPlaceholder, 16 notifierTalkback: talkbackPlaceholder, 17 }; 18 19 source((. signal) => 20 switch (signal) { 21 | Start(tb) => 22 state.sourceTalkback = tb; 23 24 notifier((. signal) => 25 switch (signal) { 26 | Start(innerTb) => 27 state.notifierTalkback = innerTb; 28 innerTb(. Pull); 29 | Push(_) => 30 state.ended = true; 31 state.sourceTalkback(. Close); 32 sink(. End); 33 | End => () 34 } 35 ); 36 | End when !state.ended => 37 state.ended = true; 38 state.notifierTalkback(. Close); 39 sink(. End); 40 | End => () 41 | Push(_) when !state.ended => sink(. signal) 42 | Push(_) => () 43 } 44 ); 45 46 sink(. 47 Start( 48 (. signal) => 49 if (!state.ended) { 50 switch (signal) { 51 | Close => 52 state.ended = true; 53 state.sourceTalkback(. Close); 54 state.notifierTalkback(. Close); 55 | Pull => state.sourceTalkback(. Pull) 56 }; 57 }, 58 ), 59 ); 60 }) 61 );