Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v3.2.1 2.1 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type bufferStateT('a) = { 5 mutable buffer: Rebel.MutableQueue.t('a), 6 mutable sourceTalkback: (. talkbackT) => unit, 7 mutable notifierTalkback: (. talkbackT) => unit, 8 mutable ended: bool, 9}; 10 11let buffer = (notifier: sourceT('a)) => 12 curry((source: sourceT('b)) => 13 curry((sink: sinkT(array('b))) => { 14 let state: bufferStateT('b) = { 15 buffer: Rebel.MutableQueue.make(), 16 sourceTalkback: talkbackPlaceholder, 17 notifierTalkback: talkbackPlaceholder, 18 ended: false, 19 }; 20 21 source((. signal) => 22 switch (signal) { 23 | Start(tb) => 24 state.sourceTalkback = tb; 25 26 notifier((. signal) => 27 switch (signal) { 28 | Start(tb) => 29 state.notifierTalkback = tb; 30 state.notifierTalkback(. Pull); 31 | Push(_) when !state.ended => 32 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 33 state.buffer = Rebel.MutableQueue.make(); 34 state.notifierTalkback(. Pull); 35 | Push(_) => () 36 | End when !state.ended => 37 state.ended = true; 38 state.sourceTalkback(. Close); 39 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 40 sink(. End); 41 | End => () 42 } 43 ); 44 | Push(value) when !state.ended => 45 Rebel.MutableQueue.add(state.buffer, value); 46 state.sourceTalkback(. Pull); 47 | Push(_) => () 48 | End when !state.ended => 49 state.ended = true; 50 state.notifierTalkback(. Close); 51 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 52 sink(. End); 53 | End => () 54 } 55 ); 56 57 sink(. 58 Start( 59 (. signal) => 60 if (!state.ended) { 61 switch (signal) { 62 | Close => 63 state.ended = true; 64 state.sourceTalkback(. Close); 65 state.notifierTalkback(. Close); 66 | Pull => state.sourceTalkback(. Pull) 67 }; 68 }, 69 ), 70 ); 71 }) 72 );