Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Implement toArray sink (#35)

* Add initial toArray sink implementation

* Add tests for toArray source

* Modify toArray to match subscribe behaviour

* Close non-synchronous sources in toArray

* Add additional toArray test for async sources

* Add docs for toArray sink

* Refactor makeTrampoline and use it for fromArray and fromList

+39
__tests__/wonka_test.re
···
})
)
);
});
describe("chains (integration)", () =>
···
})
)
);
+
+
describe("toArray", () => {
+
open Expect;
+
+
it("converts iterable sources to arrays", () => {
+
let input = [|1, 2, 3|];
+
let output = Wonka.fromArray(input) |> Wonka.toArray;
+
expect(output) |> toEqual(input);
+
});
+
+
it("converts mapped iterable sources to arrays", () => {
+
let input = [|1, 2, 3|];
+
let output =
+
Wonka.fromArray(input) |> Wonka.map((. x) => x) |> Wonka.toArray;
+
expect(output) |> toEqual(input);
+
});
+
+
it("converts concatenated iterable sources to arrays", () => {
+
let inputA = [|1, 2, 3|];
+
let inputB = [|1, 2, 3|];
+
let input = Array.append(inputA, inputB);
+
+
let output =
+
Wonka.concat([|Wonka.fromArray(inputA), Wonka.fromArray(inputB)|])
+
|> Wonka.toArray;
+
+
expect(output) |> toEqual(input);
+
});
+
+
it("ignores and closes asynchronous push streams", () => {
+
let input = [|1, 2, 3|];
+
let source = Wonka.concat([|Wonka.fromArray(input), Wonka.never|]);
+
let (signals, wrappedSource) =
+
source |> Wonka_thelpers.testSourceOperator;
+
+
ignore(Wonka.toArray(wrappedSource));
+
expect(signals) |> toEqual([|Pull, Close|]);
+
});
+
});
});
describe("chains (integration)", () =>
+22
__tests__/wonka_thelpers.re
···
);
};
type observableClassT;
[@bs.module] external observableClass: observableClassT = "zen-observable";
···
);
};
+
let testSourceOperator = source => {
+
let res = [||];
+
let innerSource = sink => {
+
source((. signal) =>
+
switch (signal) {
+
| Start(outerTalkback) =>
+
sink(.
+
Start(
+
(. talkback) => {
+
Js.Array.push(talkback, res);
+
outerTalkback(. talkback);
+
},
+
),
+
)
+
| _ => sink(. signal)
+
}
+
);
+
};
+
+
(res, innerSource);
+
};
+
type observableClassT;
[@bs.module] external observableClass: observableClassT = "zen-observable";
+29
docs/api/sinks.md
···
); // Prints 123 to the console.
```
## toPromise
`toPromise` returns a promise, which resolves on the last value of a source.
···
); // Prints 123 to the console.
```
+
## toArray
+
+
`toArray` returns an array, which contains all values from a pull source.
+
This sink is primarily intended for synchronous pull streams. Passing it
+
an asynchronous push streams may result in an empty array being returned.
+
+
If you're passing an asynchronous push stream `toArray` will cancel it
+
before it returns an array.
+
+
> _Note:_ If you're using this sink, make sure that your input source streams
+
> the values you're collecting partly or fully synchronously.
+
+
```reason
+
Wonka.fromArray([|1, 2, 3|])
+
|> Wonka.map((. x) => x * 2)
+
|> Wonka.toArray
+
/* Returns [|2, 4, 6|] */
+
```
+
+
```typescript
+
import { pipe, fromArray, map, toArray } from 'wonka';
+
+
pipe(
+
fromArray([1, 2, 3]),
+
map(x => x * 2),
+
toArray
+
); // Returns [2, 4, 6]
+
```
+
## toPromise
`toPromise` returns a promise, which resolves on the last value of a source.
+3
src/sinks/wonka_sink_toArray.d.ts
···
···
+
import { Source } from '../wonka_types';
+
+
export const toArray: <A>(source: Source<A>) => A[];
+36
src/sinks/wonka_sink_toArray.re
···
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
type toArrayStateT('a) = {
+
values: Rebel.MutableQueue.t('a),
+
mutable talkback: (. talkbackT) => unit,
+
mutable value: option('a),
+
mutable ended: bool,
+
};
+
+
let toArray = (source: sourceT('a)): array('a) => {
+
let state: toArrayStateT('a) = {
+
values: Rebel.MutableQueue.make(),
+
talkback: talkbackPlaceholder,
+
value: None,
+
ended: false,
+
};
+
+
source((. signal) =>
+
switch (signal) {
+
| Start(x) =>
+
state.talkback = x;
+
x(. Pull);
+
| Push(value) =>
+
Rebel.MutableQueue.add(state.values, value);
+
state.talkback(. Pull);
+
| End => state.ended = true
+
}
+
);
+
+
if (!state.ended) {
+
state.talkback(. Close);
+
};
+
+
Rebel.MutableQueue.toArray(state.values);
+
};
+3
src/sinks/wonka_sink_toArray.rei
···
···
+
open Wonka_types;
+
+
let toArray: sourceT('a) => array('a);
+10 -34
src/sources/wonka_source_fromArray.re
···
open Wonka_types;
-
-
type fromArrayState('a) = {
-
mutable index: int,
-
mutable ended: bool,
-
mutable looping: bool,
-
mutable pull: bool,
-
};
let fromArray = arr =>
curry(sink => {
let size = Rebel.Array.size(arr);
-
let state = {index: 0, ended: false, looping: false, pull: false};
-
-
sink(.
-
Start(
-
(. signal) =>
-
switch (signal, state.looping) {
-
| (Pull, false) =>
-
state.pull = true;
-
state.looping = true;
-
-
while (state.pull && !state.ended) {
-
let index = state.index;
-
if (index < size) {
-
let x = Rebel.Array.getUnsafe(arr, index);
-
state.index = index + 1;
-
state.pull = false;
-
sink(. Push(x));
-
} else {
-
state.ended = true;
-
sink(. End);
-
};
-
};
-
state.looping = false;
-
| (Pull, true) => state.pull = true
-
| (Close, _) => state.ended = true
-
},
-
),
);
});
···
open Wonka_types;
+
open Wonka_helpers;
let fromArray = arr =>
curry(sink => {
let size = Rebel.Array.size(arr);
+
let index = ref(0);
+
makeTrampoline(sink, (.) =>
+
if (index^ < size) {
+
let x = Rebel.Array.getUnsafe(arr, index^);
+
index := index^ + 1;
+
Some(x);
+
} else {
+
None;
+
}
);
});
+9 -33
src/sources/wonka_source_fromList.re
···
open Wonka_types;
-
-
type fromListState('a) = {
-
mutable value: 'a,
-
mutable ended: bool,
-
mutable looping: bool,
-
mutable pull: bool,
-
};
let fromList = ls =>
curry(sink => {
-
let state = {value: ls, ended: false, looping: false, pull: false};
-
-
sink(.
-
Start(
-
(. signal) =>
-
switch (signal, state.looping) {
-
| (Pull, false) =>
-
state.pull = true;
-
state.looping = true;
-
-
while (state.pull && !state.ended) {
-
switch (state.value) {
-
| [x, ...rest] =>
-
state.value = rest;
-
state.pull = false;
-
sink(. Push(x));
-
| [] =>
-
state.ended = true;
-
sink(. End);
-
};
-
};
-
state.looping = false;
-
| (Pull, true) => state.pull = true
-
| (Close, _) => state.ended = true
-
},
-
),
);
});
···
open Wonka_types;
+
open Wonka_helpers;
let fromList = ls =>
curry(sink => {
+
let value = ref(ls);
+
makeTrampoline(sink, (.) =>
+
switch (value^) {
+
| [x, ...rest] =>
+
value := rest;
+
Some(x);
+
| [] => None
+
}
);
});
+1
src/wonka.d.ts
···
/* sinks */
export * from './sinks/wonka_sink_publish';
export * from './sinks/wonka_sink_subscribe';
export * from './web/wonkaJs';
···
/* sinks */
export * from './sinks/wonka_sink_publish';
export * from './sinks/wonka_sink_subscribe';
+
export * from './sinks/wonka_sink_toArray';
export * from './web/wonkaJs';
+21 -32
src/wonka_helpers.re
···
};
type trampolineT = {
-
mutable exhausted: bool,
-
mutable inLoop: bool,
-
mutable gotSignal: bool,
};
let makeTrampoline = (sink: sinkT('a), f: (. unit) => option('a)) => {
-
let state: trampolineT = {
-
exhausted: false,
-
inLoop: false,
-
gotSignal: false,
-
};
-
-
let loop = () => {
-
let rec explode = () =>
-
switch (f(.)) {
-
| Some(x) =>
-
state.gotSignal = false;
-
sink(. Push(x));
-
if (state.gotSignal) {
-
explode();
-
};
-
| None =>
-
state.exhausted = true;
-
sink(. End);
-
};
-
-
state.inLoop = true;
-
explode();
-
state.inLoop = false;
-
};
sink(.
Start(
(. signal) =>
-
switch (signal, state.exhausted) {
| (Pull, false) =>
-
state.gotSignal = true;
-
if (!state.inLoop) {
-
loop();
};
-
| _ => ()
},
),
);
···
};
type trampolineT = {
+
mutable ended: bool,
+
mutable looping: bool,
+
mutable pull: bool,
};
let makeTrampoline = (sink: sinkT('a), f: (. unit) => option('a)) => {
+
let state: trampolineT = {ended: false, looping: false, pull: false};
sink(.
Start(
(. signal) =>
+
switch (signal, state.looping) {
| (Pull, false) =>
+
state.pull = true;
+
state.looping = true;
+
+
while (state.pull && !state.ended) {
+
switch (f(.)) {
+
| Some(x) =>
+
state.pull = false;
+
sink(. Push(x));
+
| None =>
+
state.ended = true;
+
sink(. End);
+
};
};
+
+
state.looping = false;
+
| (Pull, true) => state.pull = true
+
| (Close, _) => state.ended = true
},
),
);
+1
src/wonka_sinks.re
···
include Wonka_sink_publish;
include Wonka_sink_subscribe;
···
include Wonka_sink_publish;
include Wonka_sink_subscribe;
+
include Wonka_sink_toArray;