1open Wonka_types;
2open Wonka_helpers;
3
4[@genType]
5let fromArray = (arr: array('a)): sourceT('a) =>
6 curry(sink => {
7 let size = Rebel.Array.size(arr);
8 let index = ref(0);
9
10 makeTrampoline(sink, (.) =>
11 if (index^ < size) {
12 let x = Rebel.Array.getUnsafe(arr, index^);
13 index := index^ + 1;
14 Some(x);
15 } else {
16 None;
17 }
18 );
19 });
20
21[@genType]
22let fromList = (ls: list('a)): sourceT('a) =>
23 curry(sink => {
24 let value = ref(ls);
25
26 makeTrampoline(sink, (.) =>
27 switch (value^) {
28 | [x, ...rest] =>
29 value := rest;
30 Some(x);
31 | [] => None
32 }
33 );
34 });
35
36[@genType]
37let fromValue = (x: 'a): sourceT('a) =>
38 curry(sink => {
39 let ended = ref(false);
40
41 sink(.
42 Start(
43 (. signal) =>
44 switch (signal) {
45 | Pull when ! ended^ =>
46 ended := true;
47 sink(. Push(x));
48 sink(. End);
49 | _ => ()
50 },
51 ),
52 );
53 });
54
55[@genType]
56let make = (f: (. observerT('a)) => teardownT): sourceT('a) =>
57 curry(sink => {
58 let teardown = ref((.) => ());
59
60 sink(.
61 Start(
62 (. signal) =>
63 switch (signal) {
64 | Close => teardown^(.)
65 | Pull => ()
66 },
67 ),
68 );
69
70 teardown :=
71 f(. {
72 next: value => sink(. Push(value)),
73 complete: () => sink(. End),
74 });
75 });
76
77type subjectState('a) = {
78 mutable sinks: Rebel.Array.t(sinkT('a)),
79 mutable ended: bool,
80};
81
82[@genType]
83let makeSubject = (): subjectT('a) => {
84 let state: subjectState('a) = {
85 sinks: Rebel.Array.makeEmpty(),
86 ended: false,
87 };
88
89 let source = sink => {
90 state.sinks = Rebel.Array.append(state.sinks, sink);
91 sink(.
92 Start(
93 (. signal) =>
94 if (signal === Close) {
95 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
96 },
97 ),
98 );
99 };
100
101 let next = value =>
102 if (!state.ended) {
103 Rebel.Array.forEach(state.sinks, sink => sink(. Push(value)));
104 };
105
106 let complete = () =>
107 if (!state.ended) {
108 state.ended = true;
109 Rebel.Array.forEach(state.sinks, sink => sink(. End));
110 };
111
112 {source, next, complete};
113};
114
115[@genType]
116let empty = (sink: sinkT('a)): unit => {
117 sink(. Start(talkbackPlaceholder));
118 sink(. End);
119};
120
121[@genType]
122let never = (sink: sinkT('a)): unit => {
123 sink(. Start(talkbackPlaceholder));
124};