diff --git a/docs/reference/streamedQuery.md b/docs/reference/streamedQuery.md index c02cc907ba..d6ea7963af 100644 --- a/docs/reference/streamedQuery.md +++ b/docs/reference/streamedQuery.md @@ -33,9 +33,13 @@ const query = queryOptions({ - When set to `'reset'`, the query will erase all data and go back into `pending` state. - When set to `'append'`, data will be appended to existing data. - When set to `'replace'`, all data will be written to the cache once the stream ends. -- `maxChunks?: number` +- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData` - Optional - - The maximum number of chunks to keep in the cache. - - Defaults to `undefined`, meaning all chunks will be kept. - - If `undefined` or `0`, the number of chunks is unlimited. - - If the number of chunks exceeds this number, the oldest chunk will be removed. + - Reduces streamed chunks (`TQueryFnData`) into the final data shape (`TData`). + - Default: appends each chunk to the end of the accumulator when `TData` is an array. + - If `TData` is not an array, you must provide a custom `reducer`. +- `initialValue?: TData = TQueryFnData` + - Optional + - Defines the initial data to be used while the first chunk is being fetched. + - It is mandatory when custom `reducer` is provided. + - Defaults to an empty array. diff --git a/packages/query-core/src/__tests__/streamedQuery.test.tsx b/packages/query-core/src/__tests__/streamedQuery.test.tsx index 6eccf4536c..d542935fea 100644 --- a/packages/query-core/src/__tests__/streamedQuery.test.tsx +++ b/packages/query-core/src/__tests__/streamedQuery.test.tsx @@ -391,13 +391,18 @@ describe('streamedQuery', () => { }) }) - test('should support maxChunks', async () => { + test('should support custom reducer', async () => { const key = queryKey() + const observer = new QueryObserver(queryClient, { queryKey: key, queryFn: streamedQuery({ - queryFn: () => createAsyncNumberGenerator(3), - maxChunks: 2, + queryFn: () => createAsyncNumberGenerator(2), + reducer: (acc, chunk) => ({ + ...acc, + [chunk]: true, + }), + initialValue: {} as Record, }), }) @@ -409,41 +414,34 @@ describe('streamedQuery', () => { data: undefined, }) - await vi.advanceTimersByTimeAsync(50) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [0], - }) - - await vi.advanceTimersByTimeAsync(50) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [0, 1], - }) - - await vi.advanceTimersByTimeAsync(50) + await vi.advanceTimersByTimeAsync(100) expect(observer.getCurrentResult()).toMatchObject({ status: 'success', fetchStatus: 'idle', - data: [1, 2], + data: { + 0: true, + 1: true, + }, }) unsubscribe() }) - test('maxChunks with append refetch', async () => { + test('should support custom reducer with initialValue', async () => { const key = queryKey() const observer = new QueryObserver(queryClient, { queryKey: key, queryFn: streamedQuery({ - queryFn: () => createAsyncNumberGenerator(3), - maxChunks: 2, - refetchMode: 'append', + queryFn: () => createAsyncNumberGenerator(2), + reducer: (acc, chunk) => ({ + ...acc, + [chunk]: true, + }), + initialValue: { + 10: true, + 11: true, + } as Record, }), }) @@ -455,62 +453,17 @@ describe('streamedQuery', () => { data: undefined, }) - await vi.advanceTimersByTimeAsync(50) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [0], - }) - - await vi.advanceTimersByTimeAsync(50) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [0, 1], - }) - - await vi.advanceTimersByTimeAsync(50) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'idle', - data: [1, 2], - }) - - void observer.refetch() - - await vi.advanceTimersByTimeAsync(10) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [1, 2], - }) - - await vi.advanceTimersByTimeAsync(40) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [2, 0], - }) - - await vi.advanceTimersByTimeAsync(50) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [0, 1], - }) - - await vi.advanceTimersByTimeAsync(50) + await vi.advanceTimersByTimeAsync(100) expect(observer.getCurrentResult()).toMatchObject({ status: 'success', fetchStatus: 'idle', - data: [1, 2], + data: { + 10: true, + 11: true, + 0: true, + 1: true, + }, }) unsubscribe() diff --git a/packages/query-core/src/streamedQuery.ts b/packages/query-core/src/streamedQuery.ts index 03c36bf23a..d50063c0be 100644 --- a/packages/query-core/src/streamedQuery.ts +++ b/packages/query-core/src/streamedQuery.ts @@ -1,6 +1,34 @@ import { addToEnd } from './utils' import type { QueryFunction, QueryFunctionContext, QueryKey } from './types' +type BaseStreamedQueryParams = { + queryFn: ( + context: QueryFunctionContext, + ) => AsyncIterable | Promise> + refetchMode?: 'append' | 'reset' | 'replace' +} + +type SimpleStreamedQueryParams< + TQueryFnData, + TQueryKey extends QueryKey, +> = BaseStreamedQueryParams & { + reducer?: never + initialValue?: never +} + +type ReducibleStreamedQueryParams< + TQueryFnData, + TData, + TQueryKey extends QueryKey, +> = BaseStreamedQueryParams & { + reducer: (acc: TData, chunk: TQueryFnData) => TData + initialValue: TData +} + +type StreamedQueryParams = + | SimpleStreamedQueryParams + | ReducibleStreamedQueryParams + /** * This is a helper function to create a query function that streams data from an AsyncIterable. * Data will be an Array of all the chunks received. @@ -11,31 +39,29 @@ import type { QueryFunction, QueryFunctionContext, QueryKey } from './types' * Defaults to `'reset'`, erases all data and puts the query back into `pending` state. * Set to `'append'` to append new data to the existing data. * Set to `'replace'` to write all data to the cache once the stream ends. - * @param maxChunks - The maximum number of chunks to keep in the cache. - * Defaults to `undefined`, meaning all chunks will be kept. - * If `undefined` or `0`, the number of chunks is unlimited. - * If the number of chunks exceeds this number, the oldest chunk will be removed. + * @param reducer - A function to reduce the streamed chunks into the final data. + * Defaults to a function that appends chunks to the end of the array. + * @param initialValue - Initial value to be used while the first chunk is being fetched. */ export function streamedQuery< TQueryFnData = unknown, + TData = Array, TQueryKey extends QueryKey = QueryKey, >({ queryFn, refetchMode = 'reset', - maxChunks, -}: { - queryFn: ( - context: QueryFunctionContext, - ) => AsyncIterable | Promise> - refetchMode?: 'append' | 'reset' | 'replace' - maxChunks?: number -}): QueryFunction, TQueryKey> { + reducer = (items, chunk) => + addToEnd(items as Array, chunk) as TData, + initialValue = [] as TData, +}: StreamedQueryParams): QueryFunction< + TData, + TQueryKey +> { return async (context) => { const query = context.client .getQueryCache() .find({ queryKey: context.queryKey, exact: true }) const isRefetch = !!query && query.state.data !== undefined - if (isRefetch && refetchMode === 'reset') { query.setState({ status: 'pending', @@ -45,7 +71,8 @@ export function streamedQuery< }) } - let result: Array = [] + let result = initialValue + const stream = await queryFn(context) for await (const chunk of stream) { @@ -55,19 +82,16 @@ export function streamedQuery< // don't append to the cache directly when replace-refetching if (!isRefetch || refetchMode !== 'replace') { - context.client.setQueryData>( - context.queryKey, - (prev = []) => { - return addToEnd(prev, chunk, maxChunks) - }, + context.client.setQueryData(context.queryKey, (prev) => + reducer(prev === undefined ? initialValue : prev, chunk), ) } - result = addToEnd(result, chunk, maxChunks) + result = reducer(result, chunk) } // finalize result: replace-refetching needs to write to the cache if (isRefetch && refetchMode === 'replace' && !context.signal.aborted) { - context.client.setQueryData>(context.queryKey, result) + context.client.setQueryData(context.queryKey, result) } return context.client.getQueryData(context.queryKey)!