Mirror: The highly customizable and versatile GraphQL client with which you add on features like normalized caching as you grow.

feat(core): Wrap ExchangeIO in share calls in composeExchanges (QOL) (#3082)

+11
.changeset/cuddly-actors-look.md
···
+
---
+
'@urql/exchange-multipart-fetch': minor
+
'@urql/exchange-graphcache': minor
+
'@urql/exchange-persisted': minor
+
'@urql/exchange-context': minor
+
'@urql/exchange-execute': minor
+
'@urql/exchange-retry': minor
+
'@urql/exchange-auth': minor
+
---
+
+
Update exchanges to drop redundant `share` calls, since `@urql/core`’s `composeExchanges` utility now automatically does so for us.
+5
.changeset/five-lies-collect.md
···
+
---
+
'@urql/core': patch
+
---
+
+
Update `Exchange` contract and `composeExchanges` utility to remove the need to manually call `share` on either incoming `Source<Operation>` or `forward()`’s `Source<OperationResult>`. This is now taken care of internally in `composeExchanges` and should make it easier for you to create custom exchanges and for us to explain them.
+22 -85
docs/advanced/authoring-exchanges.md
···
});
```
-
### Only One Operations Stream
-
-
When writing an Exchange we have to be careful not to _split_ the stream into multiple ones by
-
subscribing multiple times. Streams are lazy and immutable by default. Every time you use them,
-
a new chain of streaming operators is created; since Exchanges are technically side effects, we don't
-
want to accidentally have multiple instances of them in parallel.
-
-
The `ExchangeIO` function receives an `operations$` stream. It's important to be careful to either only
-
use it once, or to _share_ its subscription.
-
-
```js
-
import { pipe, filter, merge, share } from 'wonka';
-
-
// DON'T: split use operations$ twice
-
({ forward }) => operations$ => {
-
// <-- The ExchangeIO function (inline)
-
const queries = pipe(
-
operations$,
-
filter(op => op.kind === 'query')
-
);
-
const others = pipe(
-
operations$,
-
filter(op => op.kind !== 'query')
-
);
-
return forward(merge([queries, others]));
-
};
-
-
// DO: share operations$ if you have to use it twice
-
({ forward }) => operations$ => {
-
// <-- The ExchangeIO function (inline)
-
const shared = pipe(operations$, share);
-
const queries = pipe(
-
shared,
-
filter(op => op.kind === 'query')
-
);
-
const others = pipe(
-
shared,
-
filter(op => op.kind !== 'query')
-
);
-
return forward(merge([queries, others]));
-
};
-
-
// DO: use operations$ only once alternatively
-
({ forward }) => (
-
operations$ // <-- The ExchangeIO function (inline)
-
) =>
-
pipe(
-
operations$,
-
map(op => {
-
if (op.kind === 'query') {
-
/* ... */
-
} else {
-
/* ... */
-
}
-
return op;
-
}),
-
forward
-
);
-
```
-
-
So if you see the `operations$` stream twice in your exchange code, make sure to
-
use Wonka's [`share`](https://wonka.kitten.sh/api/operators#share) operator, to share the underlying
-
subscription between all your streams.
-
### How to Avoid Accidentally Dropping Operations
Typically the `operations$` stream will send you `query`, `mutation`,
···
not `filter` operations too aggressively.
```js
-
import { pipe, filter, merge, share } from 'wonka';
+
import { pipe, filter, merge } from 'wonka';
// DON'T: drop unknown operations
-
({ forward }) => operations$ => {
-
// This doesn't handle operations that aren't queries
-
const queries = pipe(
-
operations$,
-
filter(op => op.kind === 'query')
-
);
-
return forward(queries);
-
};
+
({ forward }) =>
+
operations$ => {
+
// This doesn't handle operations that aren't queries
+
const queries = pipe(
+
operations$,
+
filter(op => op.kind === 'query')
+
);
+
return forward(queries);
+
};
// DO: forward operations that you don't handle
-
({ forward }) => operations$ => {
-
const shared = pipe(operations$, share);
-
const queries = pipe(
-
shared,
-
filter(op => op.kind === 'query')
-
);
-
const rest = pipe(
-
shared,
-
filter(op => op.kind !== 'query')
-
);
-
return forward(merge([queries, rest]));
-
};
+
({ forward }) =>
+
operations$ => {
+
const queries = pipe(
+
operations$,
+
filter(op => op.kind === 'query')
+
);
+
const rest = pipe(
+
operations$,
+
filter(op => op.kind !== 'query')
+
);
+
return forward(merge([queries, rest]));
+
};
```
If operations are grouped and/or filtered by what the exchange is handling, then it's also important to
+3 -1
exchanges/auth/src/authExchange.test.ts
···
toPromise,
take,
makeSubject,
+
share,
publish,
scan,
tap,
···
pipe(
op$,
tap(op => operations.push(op)),
-
map(result)
+
map(result),
+
share
),
client: new Client({
url: '/api',
+3 -6
exchanges/auth/src/authExchange.ts
···
makeSubject,
toPromise,
merge,
-
share,
} from 'wonka';
import {
···
return config ? config.addAuthToOperation(operation) : operation;
}
-
const sharedOps$ = pipe(operations$, share);
-
const teardownOps$ = pipe(
-
sharedOps$,
+
operations$,
filter(operation => operation.kind === 'teardown')
);
const pendingOps$ = pipe(
-
sharedOps$,
+
operations$,
filter(operation => operation.kind !== 'teardown')
);
···
filter(Boolean)
) as Source<Operation>;
-
const result$ = pipe(merge([opsWithAuth$, teardownOps$]), forward, share);
+
const result$ = pipe(merge([opsWithAuth$, teardownOps$]), forward);
return pipe(
result$,
+1
exchanges/context/src/context.ts
···
Operation,
OperationContext,
} from '@urql/core';
+
import { fromPromise, fromValue, mergeMap, pipe } from 'wonka';
/** Input parameters for the {@link contextExchange}. */
+5 -16
exchanges/execute/src/execute.ts
···
-
import {
-
Source,
-
pipe,
-
share,
-
filter,
-
takeUntil,
-
mergeMap,
-
merge,
-
make,
-
} from 'wonka';
+
import { Source, pipe, filter, takeUntil, mergeMap, merge, make } from 'wonka';
import {
GraphQLSchema,
···
export const executeExchange =
(options: ExecuteExchangeArgs): Exchange =>
({ forward }) => {
-
return ops$ => {
-
const sharedOps$ = share(ops$);
-
+
return operations$ => {
const executedOps$ = pipe(
-
sharedOps$,
+
operations$,
filter((operation: Operation) => {
return (
operation.kind === 'query' ||
···
mergeMap((operation: Operation) => {
const { key } = operation;
const teardown$ = pipe(
-
sharedOps$,
+
operations$,
filter(op => op.kind === 'teardown' && op.key === key)
);
···
);
const forwardedOps$ = pipe(
-
sharedOps$,
+
operations$,
filter(operation => operation.kind === 'teardown'),
forward
);
+83 -59
exchanges/graphcache/src/cacheExchange.test.ts
···
import {
Source,
pipe,
+
share,
map,
merge,
mergeMap,
···
const { source: ops$, next } = makeSubject<Operation>();
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, map(response));
+
const forward: ExchangeIO = ops$ => pipe(ops$, map(response), share);
pipe(
cacheExchange({})({ forward, client, dispatchDebug })(ops$),
···
const { source: ops$, next } = makeSubject<Operation>();
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, map(response));
+
const forward: ExchangeIO = ops$ => pipe(ops$, map(response), share);
pipe(
cacheExchange({})({ forward, client, dispatchDebug })(ops$),
···
return undefined as any;
});
-
const forward: ExchangeIO = ops$ => pipe(ops$, map(response));
+
const forward: ExchangeIO = ops$ => pipe(ops$, map(response), share);
const result = vi.fn();
pipe(
···
});
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, delay(1), map(response));
+
const forward: ExchangeIO = ops$ =>
+
pipe(ops$, delay(1), map(response), share);
const updates = {
Mutation: {
···
return undefined as any;
});
-
const forward: ExchangeIO = ops$ => pipe(ops$, map(response));
+
const forward: ExchangeIO = ops$ => pipe(ops$, map(response), share);
const result = vi.fn();
pipe(
···
});
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, delay(1), map(response));
+
const forward: ExchangeIO = ops$ =>
+
pipe(ops$, delay(1), map(response), share);
const updates = {
Mutation: {
···
});
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, delay(1), map(response));
+
const forward: ExchangeIO = ops$ =>
+
pipe(ops$, delay(1), map(response), share);
const updates = {
Mutation: {
···
});
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, map(response));
+
const forward: ExchangeIO = ops$ => pipe(ops$, map(response), share);
pipe(
cacheExchange({})({ forward, client, dispatchDebug })(ops$),
···
});
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, delay(1), map(response));
+
const forward: ExchangeIO = ops$ =>
+
pipe(ops$, delay(1), map(response), share);
const optimistic = {
concealAuthor: vi.fn(() => optimisticMutationData.concealAuthor) as any,
···
});
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, delay(3), map(response));
+
const forward: ExchangeIO = ops$ =>
+
pipe(ops$, delay(3), map(response), share);
const optimistic = {
concealAuthor: vi.fn(() => optimisticMutationData.concealAuthor) as any,
···
ops$,
delay(1),
filter(x => x.kind !== 'mutation'),
-
map(response)
+
map(response),
+
share
);
const optimistic = {
···
});
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, delay(1), map(response));
+
const forward: ExchangeIO = ops$ =>
+
pipe(ops$, delay(1), map(response), share);
const optimistic = {
addAuthor: vi.fn(() => optimisticMutationData.addAuthor) as any,
···
return undefined as any;
});
-
const forward: ExchangeIO = ops$ => pipe(ops$, map(response));
+
const forward: ExchangeIO = ops$ => pipe(ops$, map(response), share);
const result = vi.fn();
const fakeResolver = vi.fn();
···
});
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, delay(1), map(response));
+
const forward: ExchangeIO = ops$ =>
+
pipe(ops$, delay(1), map(response), share);
const fakeResolver = vi.fn();
···
});
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, delay(1), map(response));
+
const forward: ExchangeIO = ops$ =>
+
pipe(ops$, delay(1), map(response), share);
const fakeResolver = vi.fn();
const called: any[] = [];
···
});
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, delay(1), map(response));
+
const forward: ExchangeIO = ops$ =>
+
pipe(ops$, delay(1), map(response), share);
pipe(
cacheExchange({
···
});
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, delay(1), map(response));
+
const forward: ExchangeIO = ops$ =>
+
pipe(ops$, delay(1), map(response), share);
pipe(
cacheExchange({
···
`;
const forward = (ops$: Source<Operation>): Source<OperationResult> =>
-
merge([
-
pipe(
-
ops$,
-
filter(() => false)
-
) as any,
-
res$,
-
]);
+
share(
+
merge([
+
pipe(
+
ops$,
+
filter(() => false)
+
) as any,
+
res$,
+
])
+
);
const optimistic = {
node: () => ({
···
`;
const forward = (ops$: Source<Operation>): Source<OperationResult> =>
-
merge([
-
pipe(
-
ops$,
-
filter(() => false)
-
) as any,
-
res$,
-
]);
+
share(
+
merge([
+
pipe(
+
ops$,
+
filter(() => false)
+
) as any,
+
res$,
+
])
+
);
pipe(
cacheExchange()({ forward, client, dispatchDebug })(ops$),
···
`;
const forward = (ops$: Source<Operation>): Source<OperationResult> =>
-
merge([
-
pipe(
-
ops$,
-
filter(() => false)
-
) as any,
-
res$,
-
]);
+
share(
+
merge([
+
pipe(
+
ops$,
+
filter(() => false)
+
) as any,
+
res$,
+
])
+
);
const optimistic = {
node: () => ({
···
`;
const forward = (ops$: Source<Operation>): Source<OperationResult> =>
-
merge([
-
pipe(
-
ops$,
-
filter(() => false)
-
) as any,
-
res$,
-
]);
+
share(
+
merge([
+
pipe(
+
ops$,
+
filter(() => false)
+
) as any,
+
res$,
+
])
+
);
pipe(
cacheExchange()({ forward, client, dispatchDebug })(ops$),
···
`;
const forward = (ops$: Source<Operation>): Source<OperationResult> =>
-
merge([
-
pipe(
-
ops$,
-
filter(() => false)
-
) as any,
-
res$,
-
]);
+
share(
+
merge([
+
pipe(
+
ops$,
+
filter(() => false)
+
) as any,
+
res$,
+
])
+
);
pipe(
cacheExchange()({ forward, client, dispatchDebug })(ops$),
···
`;
const forward = (ops$: Source<Operation>): Source<OperationResult> =>
-
merge([
-
pipe(
-
ops$,
-
filter(() => false)
-
) as any,
-
res$,
-
]);
+
share(
+
merge([
+
pipe(
+
ops$,
+
filter(() => false)
+
) as any,
+
res$,
+
])
+
);
pipe(
cacheExchange()({ forward, client, dispatchDebug })(ops$),
+4 -7
exchanges/graphcache/src/cacheExchange.ts
···
};
};
-
return ops$ => {
-
const sharedOps$ = pipe(ops$, share);
-
+
return operations$ => {
// Filter by operations that are cacheable and attempt to query them from the cache
const cacheOps$ = pipe(
-
sharedOps$,
+
operations$,
filter(
op =>
op.kind === 'query' && op.context.requestPolicy !== 'network-only'
···
);
const nonCacheOps$ = pipe(
-
sharedOps$,
+
operations$,
filter(
op =>
op.kind !== 'query' || op.context.requestPolicy === 'network-only'
···
const result$ = pipe(
merge([nonCacheOps$, cacheMissOps$]),
map(prepareForwardedOperation),
-
forward,
-
share
+
forward
);
// Results that can immediately be resolved
+7 -7
exchanges/graphcache/src/offlineExchange.test.ts
···
import { print } from 'graphql';
import { vi, expect, it, describe } from 'vitest';
-
import { pipe, map, makeSubject, tap, publish } from 'wonka';
+
import { pipe, share, map, makeSubject, tap, publish } from 'wonka';
import { queryResponse } from '../../../packages/core/src/test-utils';
import { offlineExchange } from './offlineExchange';
···
const { source: ops$ } = makeSubject<Operation>();
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, map(response));
+
const forward: ExchangeIO = ops$ => pipe(ops$, map(response), share);
vi.useFakeTimers();
pipe(
···
const { source: ops$, next } = makeSubject<Operation>();
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, map(response));
+
const forward: ExchangeIO = ops$ => pipe(ops$, map(response), share);
pipe(
offlineExchange({
···
next(mutationOp);
expect(result).toBeCalledTimes(1);
-
expect(storage.writeMetadata).toBeCalledTimes(1);
+
expect(storage.writeMetadata).toHaveBeenCalled();
expect(storage.writeMetadata).toHaveBeenCalledWith([
{
query: `mutation {
···
const { source: ops$, next } = makeSubject<Operation>();
const result = vi.fn();
-
const forward: ExchangeIO = ops$ => pipe(ops$, map(response));
+
const forward: ExchangeIO = ops$ => pipe(ops$, map(response), share);
pipe(
offlineExchange({
···
);
next(mutationOp);
-
expect(storage.writeMetadata).toBeCalledTimes(1);
+
expect(storage.writeMetadata).toHaveBeenCalled();
expect(storage.writeMetadata).toHaveBeenCalledWith([
{
query: `mutation {
···
await onOnlineCalled;
flush!();
-
expect(reexecuteOperation).toHaveBeenCalledTimes(1);
+
expect(reexecuteOperation).toHaveBeenCalled();
expect((reexecuteOperation.mock.calls[0][0] as any).key).toEqual(1);
expect(print((reexecuteOperation.mock.calls[0][0] as any).query)).toEqual(
print(gql`
+3 -5
exchanges/graphcache/src/offlineExchange.ts
···
-
import { pipe, merge, makeSubject, share, filter } from 'wonka';
+
import { pipe, merge, makeSubject, filter } from 'wonka';
import { print, SelectionNode } from 'graphql';
import {
···
forward,
});
-
return ops$ => {
-
const sharedOps$ = pipe(ops$, share);
-
-
const opsAndRebound$ = merge([reboundOps$, sharedOps$]);
+
return operations$ => {
+
const opsAndRebound$ = merge([reboundOps$, operations$]);
return pipe(
cacheResults$(opsAndRebound$),
+5 -6
exchanges/multipart-fetch/src/multipartFetchExchange.ts
···
-
import { filter, merge, mergeMap, pipe, share, takeUntil, onPush } from 'wonka';
+
import { filter, merge, mergeMap, pipe, takeUntil, onPush } from 'wonka';
import { extractFiles } from 'extract-files';
import { Exchange } from '@urql/core';
···
*/
export const multipartFetchExchange: Exchange =
({ forward, dispatchDebug }) =>
-
ops$ => {
-
const sharedOps$ = share(ops$);
+
operations$ => {
const fetchResults$ = pipe(
-
sharedOps$,
+
operations$,
filter(operation => {
return operation.kind === 'query' || operation.kind === 'mutation';
}),
mergeMap(operation => {
const teardown$ = pipe(
-
sharedOps$,
+
operations$,
filter(op => op.kind === 'teardown' && op.key === operation.key)
);
···
);
const forward$ = pipe(
-
sharedOps$,
+
operations$,
filter(operation => {
return operation.kind !== 'query' && operation.kind !== 'mutation';
}),
+2 -4
exchanges/persisted/src/persistedExchange.ts
···
merge,
mergeMap,
pipe,
-
share,
} from 'wonka';
import {
···
return operations$ => {
const retries = makeSubject<Operation>();
-
const sharedOps$ = share(operations$);
const forwardedOps$ = pipe(
-
sharedOps$,
+
operations$,
filter(operation => !operationFilter(operation))
);
const persistedOps$ = pipe(
-
sharedOps$,
+
operations$,
filter(operationFilter),
map(async operation => {
const persistedOperation = makeOperation(operation.kind, operation, {
+6 -18
exchanges/retry/src/retryExchange.ts
···
import {
-
Source,
makeSubject,
-
share,
pipe,
merge,
filter,
···
takeUntil,
} from 'wonka';
-
import {
-
makeOperation,
-
Exchange,
-
Operation,
-
CombinedError,
-
OperationResult,
-
} from '@urql/core';
+
import { makeOperation, Exchange, Operation, CombinedError } from '@urql/core';
/** Input parameters for the {@link retryExchange}. */
export interface RetryExchangeOptions {
···
options.randomDelay != null ? !!options.randomDelay : true;
return ({ forward, dispatchDebug }) =>
-
ops$ => {
-
const sharedOps$ = pipe(ops$, share);
+
operations$ => {
const { source: retry$, next: nextRetryOperation } =
makeSubject<Operation>();
···
// But if this event comes through regularly we also stop the retries, since it's
// basically the query retrying itself, no backoff should be added!
const teardown$ = pipe(
-
sharedOps$,
+
operations$,
filter(op => {
return (
(op.kind === 'query' || op.kind === 'teardown') &&
···
})
);
-
const result$ = pipe(
-
merge([sharedOps$, retryWithBackoff$]),
+
return pipe(
+
merge([operations$, retryWithBackoff$]),
forward,
-
share,
filter(res => {
// Only retry if the error passes the conditional retryIf function (if passed)
// or if the error contains a networkError
···
return true;
})
-
) as Source<OperationResult>;
-
-
return result$;
+
);
};
};
+4 -6
packages/core/src/exchanges/cache.ts
···
/* eslint-disable @typescript-eslint/no-use-before-define */
-
import { filter, map, merge, pipe, share, tap } from 'wonka';
+
import { filter, map, merge, pipe, tap } from 'wonka';
import { Client } from '../client';
import { Exchange, Operation, OperationResult } from '../types';
···
resultCache.has(operation.key));
return ops$ => {
-
const sharedOps$ = share(ops$);
-
const cachedOps$ = pipe(
-
sharedOps$,
+
ops$,
filter(op => !shouldSkip(op) && isOperationCached(op)),
map(operation => {
const cachedResult = resultCache.get(operation.key);
···
const forwardedOps$ = pipe(
merge([
pipe(
-
sharedOps$,
+
ops$,
filter(op => !shouldSkip(op) && !isOperationCached(op)),
map(mapTypeNames)
),
pipe(
-
sharedOps$,
+
ops$,
filter(op => shouldSkip(op))
),
]),
+29 -15
packages/core/src/exchanges/compose.ts
···
+
import { share } from 'wonka';
import type { ExchangeIO, Exchange, ExchangeInput } from '../types';
/** Composes an array of Exchanges into a single one.
···
*
* This simply merges all exchanges into one and is used by the {@link Client}
* to merge the `exchanges` option it receives.
+
*
+
* @throws
+
* In development, if {@link ExchangeInput.forward} is called repeatedly
+
* by an {@link Exchange} an error is thrown, since `forward()` must only
+
* be called once per `Exchange`.
*/
export const composeExchanges =
(exchanges: Exchange[]): Exchange =>
({ client, forward, dispatchDebug }: ExchangeInput): ExchangeIO =>
-
exchanges.reduceRight(
-
(forward, exchange) =>
-
exchange({
-
client,
-
forward,
-
dispatchDebug(event) {
-
dispatchDebug({
-
timestamp: Date.now(),
-
source: exchange.name,
-
...event,
-
});
-
},
-
}),
-
forward
-
);
+
exchanges.reduceRight((forward, exchange) => {
+
let forwarded = false;
+
return exchange({
+
client,
+
forward(operations$) {
+
if (process.env.NODE_ENV !== 'production') {
+
if (forwarded)
+
throw new Error(
+
'forward() must only be called once in each Exchange.'
+
);
+
forwarded = true;
+
}
+
return share(forward(share(operations$)));
+
},
+
dispatchDebug(event) {
+
dispatchDebug({
+
timestamp: Date.now(),
+
source: exchange.name,
+
...event,
+
});
+
},
+
});
+
}, forward);
+4 -5
packages/core/src/exchanges/fetch.ts
···
/* eslint-disable @typescript-eslint/no-use-before-define */
-
import { filter, merge, mergeMap, pipe, share, takeUntil, onPush } from 'wonka';
+
import { filter, merge, mergeMap, pipe, takeUntil, onPush } from 'wonka';
import { Exchange } from '../types';
import {
···
*/
export const fetchExchange: Exchange = ({ forward, dispatchDebug }) => {
return ops$ => {
-
const sharedOps$ = share(ops$);
const fetchResults$ = pipe(
-
sharedOps$,
+
ops$,
filter(operation => {
return operation.kind === 'query' || operation.kind === 'mutation';
}),
···
makeFetchSource(operation, url, fetchOptions),
takeUntil(
pipe(
-
sharedOps$,
+
ops$,
filter(op => op.kind === 'teardown' && op.key === operation.key)
)
)
···
);
const forward$ = pipe(
-
sharedOps$,
+
ops$,
filter(operation => {
return operation.kind !== 'query' && operation.kind !== 'mutation';
}),
+3 -5
packages/core/src/exchanges/ssr.ts
···
import { GraphQLError } from 'graphql';
-
import { pipe, share, filter, merge, map, tap } from 'wonka';
+
import { pipe, filter, merge, map, tap } from 'wonka';
import { Exchange, OperationResult, Operation } from '../types';
import { addMetadata, CombinedError } from '../utils';
import { reexecuteOperation } from './cache';
···
? !!params.isClient
: !client.suspense;
-
const sharedOps$ = share(ops$);
-
let forwardedOps$ = pipe(
-
sharedOps$,
+
ops$,
filter(
operation =>
!data[operation.key] ||
···
// NOTE: Since below we might delete the cached entry after accessing
// it once, cachedOps$ needs to be merged after forwardedOps$
let cachedOps$ = pipe(
-
sharedOps$,
+
ops$,
filter(
operation =>
!!data[operation.key] &&
+3 -5
packages/core/src/exchanges/subscription.ts
···
merge,
mergeMap,
pipe,
-
share,
Subscription,
Source,
takeUntil,
···
});
return ops$ => {
-
const sharedOps$ = share(ops$);
const subscriptionResults$ = pipe(
-
sharedOps$,
+
ops$,
filter(isSubscriptionOperationFn),
mergeMap(operation => {
const { key } = operation;
const teardown$ = pipe(
-
sharedOps$,
+
ops$,
filter(op => op.kind === 'teardown' && op.key === key)
);
···
);
const forward$ = pipe(
-
sharedOps$,
+
ops$,
filter(op => !isSubscriptionOperationFn(op)),
forward
);
+15
packages/core/src/types.ts
···
*
* @see {@link https://urql.dev/goto/docs/architecture/#the-client-and-exchanges} for more information on Exchanges.
* @see {@link https://urql.dev/goto/docs/advanced/authoring-exchanges} on how Exchanges are authored.
+
*
+
* @example
+
* ```ts
+
* import { pipe, onPush } from 'wonka';
+
* import { Exchange } from '@urql/core';
+
*
+
* const debugExchange: Exchange => {
+
* return ops$ => pipe(
+
* ops$,
+
* onPush(operation => console.log(operation)),
+
* forward,
+
* onPush(result => console.log(result)),
+
* );
+
* };
+
* ```
*/
export type Exchange = (input: ExchangeInput) => ExchangeIO;