Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1open Wonka_types; 2open Wonka_helpers; 3 4type takeUntilStateT = { 5 mutable ended: bool, 6 mutable sourceTalkback: (. talkbackT) => unit, 7 mutable notifierTalkback: (. talkbackT) => unit, 8}; 9 10let takeUntil = notifier => 11 curry(source => 12 curry(sink => { 13 let state: takeUntilStateT = { 14 ended: false, 15 sourceTalkback: talkbackPlaceholder, 16 notifierTalkback: talkbackPlaceholder, 17 }; 18 19 source((. signal) => 20 switch (signal) { 21 | Start(tb) => 22 state.sourceTalkback = tb; 23 24 notifier((. signal) => 25 switch (signal) { 26 | Start(innerTb) => 27 state.notifierTalkback = innerTb; 28 innerTb(. Pull); 29 | Push(_) => 30 state.ended = true; 31 state.notifierTalkback(. Close); 32 state.sourceTalkback(. Close); 33 sink(. End); 34 | End => () 35 } 36 ); 37 | End when !state.ended => 38 state.notifierTalkback(. Close); 39 state.ended = true; 40 sink(. End); 41 | End => () 42 | Push(_) when !state.ended => sink(. signal) 43 | Push(_) => () 44 } 45 ); 46 47 sink(. 48 Start( 49 (. signal) => 50 if (!state.ended) { 51 switch (signal) { 52 | Close => 53 state.sourceTalkback(. Close); 54 state.notifierTalkback(. Close); 55 | Pull => state.sourceTalkback(. Pull) 56 }; 57 }, 58 ), 59 ); 60 }) 61 );