1open Wonka_types;
2open Wonka_helpers;
3
4let takeLast = max =>
5 curry(source =>
6 curry(sink => {
7 open Rebel;
8 let queue = MutableQueue.make();
9
10 captureTalkback(source, (. signal, talkback) =>
11 switch (signal) {
12 | Start(_) => talkback(. Pull)
13 | Push(x) =>
14 let size = MutableQueue.size(queue);
15 if (size >= max && max > 0) {
16 ignore(MutableQueue.pop(queue));
17 };
18
19 MutableQueue.add(queue, x);
20 talkback(. Pull);
21 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue))
22 }
23 );
24 })
25 );