1open Wonka_types;
2open Wonka_helpers;
3
4[@genType]
5let fromArray = (arr: array('a)): sourceT('a) =>
6 curry(sink => {
7 let size = Rebel.Array.size(arr);
8 let index = ref(0);
9
10 makeTrampoline(sink, (.) =>
11 if (index^ < size) {
12 let x = Rebel.Array.getUnsafe(arr, index^);
13 index := index^ + 1;
14 Some(x);
15 } else {
16 None;
17 }
18 );
19 });
20
21[@genType]
22let fromList = (ls: list('a)): sourceT('a) =>
23 curry(sink => {
24 let value = ref(ls);
25
26 makeTrampoline(sink, (.) =>
27 switch (value^) {
28 | [x, ...rest] =>
29 value := rest;
30 Some(x);
31 | [] => None
32 }
33 );
34 });
35
36[@genType]
37let fromValue = (x: 'a): sourceT('a) =>
38 curry(sink => {
39 let ended = ref(false);
40
41 sink(.
42 Start(
43 (. signal) =>
44 switch (signal) {
45 | Pull when ! ended^ =>
46 ended := true;
47 sink(. Push(x));
48 sink(. End);
49 | Pull => ()
50 | Close => ended := true
51 },
52 ),
53 );
54 });
55
56type makeStateT = {
57 mutable teardown: (. unit) => unit,
58 mutable ended: bool,
59};
60
61[@genType]
62let make = (f: (. observerT('a)) => teardownT): sourceT('a) =>
63 curry(sink => {
64 let state: makeStateT = {teardown: (.) => (), ended: false};
65
66 state.teardown =
67 f(. {
68 next: value =>
69 if (!state.ended) {
70 sink(. Push(value));
71 },
72 complete: () =>
73 if (!state.ended) {
74 state.ended = true;
75 sink(. End);
76 },
77 });
78
79 sink(.
80 Start(
81 (. signal) =>
82 switch (signal) {
83 | Close when !state.ended =>
84 state.ended = true;
85 state.teardown(.);
86 | _ => ()
87 },
88 ),
89 );
90 });
91
92type subjectState('a) = {
93 mutable sinks: Rebel.Array.t(sinkT('a)),
94 mutable ended: bool,
95};
96
97[@genType]
98let makeSubject = (): subjectT('a) => {
99 let state: subjectState('a) = {
100 sinks: Rebel.Array.makeEmpty(),
101 ended: false,
102 };
103
104 let source = sink => {
105 state.sinks = Rebel.Array.append(state.sinks, sink);
106 sink(.
107 Start(
108 (. signal) =>
109 switch (signal) {
110 | Close =>
111 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink)
112 | _ => ()
113 },
114 ),
115 );
116 };
117
118 let next = value =>
119 if (!state.ended) {
120 Rebel.Array.forEach(state.sinks, sink => sink(. Push(value)));
121 };
122
123 let complete = () =>
124 if (!state.ended) {
125 state.ended = true;
126 Rebel.Array.forEach(state.sinks, sink => sink(. End));
127 };
128
129 {source, next, complete};
130};
131
132[@genType]
133let empty = (sink: sinkT('a)): unit => {
134 let ended = ref(false);
135 sink(.
136 Start(
137 (. signal) => {
138 switch (signal) {
139 | Close => ended := true
140 | Pull when ! ended^ => sink(. End)
141 | _ => ()
142 }
143 },
144 ),
145 );
146};
147
148[@genType]
149let never = (sink: sinkT('a)): unit => {
150 sink(. Start(talkbackPlaceholder));
151};