From d2c48073f2592de2370916dd0c60509e48168e33 Mon Sep 17 00:00:00 2001 From: gobbimar Date: Mon, 4 Aug 2025 12:04:16 +0200 Subject: [PATCH 01/10] feat(query-core): add custom reducer support to streamedQuery Replace maxChunks parameter with flexible reducer function that delegates data aggregation to consumer code. This provides full control over how streamed chunks are combined into the final data structure. Add support for custom placeholderData that works seamlessly with the reducer function, allowing initialization of complex data types beyond simple arrays. https://github.com/TanStack/query/discussions/9065 BREAKING CHANGE: The maxChunks parameter has been removed from streamedQuery. Use a custom reducer function to control data aggregation behavior instead. --- docs/reference/streamedQuery.md | 12 +- .../src/__tests__/streamedQuery.test.tsx | 116 ++++++------------ packages/query-core/src/streamedQuery.ts | 47 +++---- 3 files changed, 67 insertions(+), 108 deletions(-) diff --git a/docs/reference/streamedQuery.md b/docs/reference/streamedQuery.md index c02cc907ba..25e4fe80c2 100644 --- a/docs/reference/streamedQuery.md +++ b/docs/reference/streamedQuery.md @@ -33,9 +33,11 @@ const query = queryOptions({ - When set to `'reset'`, the query will erase all data and go back into `pending` state. - When set to `'append'`, data will be appended to existing data. - When set to `'replace'`, all data will be written to the cache once the stream ends. -- `maxChunks?: number` +- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData` - Optional - - The maximum number of chunks to keep in the cache. - - Defaults to `undefined`, meaning all chunks will be kept. - - If `undefined` or `0`, the number of chunks is unlimited. - - If the number of chunks exceeds this number, the oldest chunk will be removed. + - A function to reduce the streamed chunks into the final data. + - Defaults to a function that appends chunks to the end of the array. +- `placeholderData?: TData = TQueryFnData` + - Optional + - Defines the initial data to be used while the first chunk is being fetched. + - Defaults to an empty array. diff --git a/packages/query-core/src/__tests__/streamedQuery.test.tsx b/packages/query-core/src/__tests__/streamedQuery.test.tsx index c2cebb46e2..33370bbf90 100644 --- a/packages/query-core/src/__tests__/streamedQuery.test.tsx +++ b/packages/query-core/src/__tests__/streamedQuery.test.tsx @@ -35,7 +35,7 @@ describe('streamedQuery', () => { queryKey: key, queryFn: streamedQuery({ queryFn: () => createAsyncNumberGenerator(3), - }), + }) }) const unsubscribe = observer.subscribe(vi.fn()) @@ -350,14 +350,18 @@ describe('streamedQuery', () => { unsubscribe() }) - test('should support maxChunks', async () => { + test('should support custom reducer', async () => { const key = queryKey() + const observer = new QueryObserver(queryClient, { queryKey: key, - queryFn: streamedQuery({ - queryFn: () => createAsyncNumberGenerator(3), - maxChunks: 2, - }), + queryFn: streamedQuery>({ + queryFn: () => createAsyncNumberGenerator(2), + reducer: (acc, chunk) => ({ + ...acc, + [chunk]: true + }), + }) }) const unsubscribe = observer.subscribe(vi.fn()) @@ -368,42 +372,36 @@ describe('streamedQuery', () => { data: undefined, }) - await vi.advanceTimersByTimeAsync(50) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [0], - }) - - await vi.advanceTimersByTimeAsync(50) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [0, 1], - }) - - await vi.advanceTimersByTimeAsync(50) + await vi.advanceTimersByTimeAsync(100) expect(observer.getCurrentResult()).toMatchObject({ status: 'success', fetchStatus: 'idle', - data: [1, 2], + data: { + 0: true, + 1: true + }, }) unsubscribe() }) - test('maxChunks with append refetch', async () => { + test('should support custom reducer with placeholderData', async () => { const key = queryKey() const observer = new QueryObserver(queryClient, { queryKey: key, - queryFn: streamedQuery({ - queryFn: () => createAsyncNumberGenerator(3), - maxChunks: 2, - refetchMode: 'append', + queryFn: streamedQuery>({ + queryFn: () => createAsyncNumberGenerator(2), + reducer: (acc, chunk) => ({ + ...acc, + [chunk]: true + }), + placeholderData: { + 10: true, + 11: true, + } }), + }) const unsubscribe = observer.subscribe(vi.fn()) @@ -414,64 +412,20 @@ describe('streamedQuery', () => { data: undefined, }) - await vi.advanceTimersByTimeAsync(50) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [0], - }) - - await vi.advanceTimersByTimeAsync(50) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [0, 1], - }) - - await vi.advanceTimersByTimeAsync(50) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'idle', - data: [1, 2], - }) - - void observer.refetch() - - await vi.advanceTimersByTimeAsync(10) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [1, 2], - }) - - await vi.advanceTimersByTimeAsync(40) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [2, 0], - }) - - await vi.advanceTimersByTimeAsync(50) - - expect(observer.getCurrentResult()).toMatchObject({ - status: 'success', - fetchStatus: 'fetching', - data: [0, 1], - }) - - await vi.advanceTimersByTimeAsync(50) + await vi.advanceTimersByTimeAsync(100) expect(observer.getCurrentResult()).toMatchObject({ status: 'success', fetchStatus: 'idle', - data: [1, 2], + data: { + 10: true, + 11: true, + 0: true, + 1: true, + }, }) unsubscribe() }) + }) diff --git a/packages/query-core/src/streamedQuery.ts b/packages/query-core/src/streamedQuery.ts index 03c36bf23a..d0d3b22a2c 100644 --- a/packages/query-core/src/streamedQuery.ts +++ b/packages/query-core/src/streamedQuery.ts @@ -1,6 +1,15 @@ import { addToEnd } from './utils' import type { QueryFunction, QueryFunctionContext, QueryKey } from './types' +type StreamedQueryParams = { + queryFn: ( + context: QueryFunctionContext, + ) => AsyncIterable | Promise> + refetchMode?: 'append' | 'reset' | 'replace' + reducer?: (acc: TData, chunk: TQueryFnData) => TData + placeholderData?: TData +} + /** * This is a helper function to create a query function that streams data from an AsyncIterable. * Data will be an Array of all the chunks received. @@ -11,31 +20,26 @@ import type { QueryFunction, QueryFunctionContext, QueryKey } from './types' * Defaults to `'reset'`, erases all data and puts the query back into `pending` state. * Set to `'append'` to append new data to the existing data. * Set to `'replace'` to write all data to the cache once the stream ends. - * @param maxChunks - The maximum number of chunks to keep in the cache. - * Defaults to `undefined`, meaning all chunks will be kept. - * If `undefined` or `0`, the number of chunks is unlimited. - * If the number of chunks exceeds this number, the oldest chunk will be removed. + * @param reducer - A function to reduce the streamed chunks into the final data. + * Defaults to a function that appends chunks to the end of the array. + * @param placeholderData - Initial data to be used while the first chunk is being fetched. + * Defaults to an empty array. */ export function streamedQuery< TQueryFnData = unknown, + TData = Array, TQueryKey extends QueryKey = QueryKey, >({ queryFn, refetchMode = 'reset', - maxChunks, -}: { - queryFn: ( - context: QueryFunctionContext, - ) => AsyncIterable | Promise> - refetchMode?: 'append' | 'reset' | 'replace' - maxChunks?: number -}): QueryFunction, TQueryKey> { + reducer = (items, chunk) => addToEnd((items ?? []) as Array, chunk) as TData, + placeholderData = [] as TData, +}: StreamedQueryParams): QueryFunction { return async (context) => { const query = context.client .getQueryCache() .find({ queryKey: context.queryKey, exact: true }) const isRefetch = !!query && query.state.data !== undefined - if (isRefetch && refetchMode === 'reset') { query.setState({ status: 'pending', @@ -45,29 +49,28 @@ export function streamedQuery< }) } - let result: Array = [] + let result = placeholderData; + const stream = await queryFn(context) for await (const chunk of stream) { if (context.signal.aborted) { break } - + // don't append to the cache directly when replace-refetching - if (!isRefetch || refetchMode !== 'replace') { - context.client.setQueryData>( + if (!isRefetch || refetchMode !== 'replace') { + context.client.setQueryData( context.queryKey, - (prev = []) => { - return addToEnd(prev, chunk, maxChunks) - }, + (prev) => reducer(prev ?? placeholderData, chunk) ) } - result = addToEnd(result, chunk, maxChunks) + result = reducer(result, chunk) } // finalize result: replace-refetching needs to write to the cache if (isRefetch && refetchMode === 'replace' && !context.signal.aborted) { - context.client.setQueryData>(context.queryKey, result) + context.client.setQueryData(context.queryKey, result) } return context.client.getQueryData(context.queryKey)! From 890e3735cf9d00a915e1bfc75a18d37249c4a83e Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 11 Aug 2025 12:31:58 +0000 Subject: [PATCH 02/10] ci: apply automated fixes --- .../src/__tests__/streamedQuery.test.tsx | 24 +++++++++---------- packages/query-core/src/streamedQuery.ts | 19 ++++++++------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/packages/query-core/src/__tests__/streamedQuery.test.tsx b/packages/query-core/src/__tests__/streamedQuery.test.tsx index 33370bbf90..321329d004 100644 --- a/packages/query-core/src/__tests__/streamedQuery.test.tsx +++ b/packages/query-core/src/__tests__/streamedQuery.test.tsx @@ -35,7 +35,7 @@ describe('streamedQuery', () => { queryKey: key, queryFn: streamedQuery({ queryFn: () => createAsyncNumberGenerator(3), - }) + }), }) const unsubscribe = observer.subscribe(vi.fn()) @@ -358,10 +358,10 @@ describe('streamedQuery', () => { queryFn: streamedQuery>({ queryFn: () => createAsyncNumberGenerator(2), reducer: (acc, chunk) => ({ - ...acc, - [chunk]: true - }), - }) + ...acc, + [chunk]: true, + }), + }), }) const unsubscribe = observer.subscribe(vi.fn()) @@ -378,8 +378,8 @@ describe('streamedQuery', () => { status: 'success', fetchStatus: 'idle', data: { - 0: true, - 1: true + 0: true, + 1: true, }, }) @@ -394,14 +394,13 @@ describe('streamedQuery', () => { queryFn: () => createAsyncNumberGenerator(2), reducer: (acc, chunk) => ({ ...acc, - [chunk]: true + [chunk]: true, }), placeholderData: { - 10: true, - 11: true, - } + 10: true, + 11: true, + }, }), - }) const unsubscribe = observer.subscribe(vi.fn()) @@ -427,5 +426,4 @@ describe('streamedQuery', () => { unsubscribe() }) - }) diff --git a/packages/query-core/src/streamedQuery.ts b/packages/query-core/src/streamedQuery.ts index d0d3b22a2c..30cf868ea6 100644 --- a/packages/query-core/src/streamedQuery.ts +++ b/packages/query-core/src/streamedQuery.ts @@ -32,9 +32,13 @@ export function streamedQuery< >({ queryFn, refetchMode = 'reset', - reducer = (items, chunk) => addToEnd((items ?? []) as Array, chunk) as TData, + reducer = (items, chunk) => + addToEnd((items ?? []) as Array, chunk) as TData, placeholderData = [] as TData, -}: StreamedQueryParams): QueryFunction { +}: StreamedQueryParams): QueryFunction< + TData, + TQueryKey +> { return async (context) => { const query = context.client .getQueryCache() @@ -49,7 +53,7 @@ export function streamedQuery< }) } - let result = placeholderData; + let result = placeholderData const stream = await queryFn(context) @@ -57,12 +61,11 @@ export function streamedQuery< if (context.signal.aborted) { break } - + // don't append to the cache directly when replace-refetching - if (!isRefetch || refetchMode !== 'replace') { - context.client.setQueryData( - context.queryKey, - (prev) => reducer(prev ?? placeholderData, chunk) + if (!isRefetch || refetchMode !== 'replace') { + context.client.setQueryData(context.queryKey, (prev) => + reducer(prev ?? placeholderData, chunk), ) } result = reducer(result, chunk) From 0a4e06501a56285afe0e934fc21cb00057d30346 Mon Sep 17 00:00:00 2001 From: gobbimar Date: Wed, 27 Aug 2025 23:52:28 +0200 Subject: [PATCH 03/10] feat(query-core): require initialValue when using custom reducer in streamedQuery Add type safety by making initialValue mandatory when providing a custom reducer function. This prevents runtime errors and ensures proper data initialization for custom data structures beyond simple arrays. Use conditional types to enforce the relationship between reducer and initialValue parameters, maintaining backward compatibility for simple array-based streaming while requiring explicit initialization for custom reducers. BREAKING CHANGE: When using a custom reducer function with streamedQuery, the initialValue parameter is now required and must be provided. --- .../src/__tests__/streamedQuery.test.tsx | 5 +- packages/query-core/src/streamedQuery.ts | 48 +++++++++++++------ 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/packages/query-core/src/__tests__/streamedQuery.test.tsx b/packages/query-core/src/__tests__/streamedQuery.test.tsx index 321329d004..c72a4a0689 100644 --- a/packages/query-core/src/__tests__/streamedQuery.test.tsx +++ b/packages/query-core/src/__tests__/streamedQuery.test.tsx @@ -361,6 +361,7 @@ describe('streamedQuery', () => { ...acc, [chunk]: true, }), + initialValue: {} }), }) @@ -386,7 +387,7 @@ describe('streamedQuery', () => { unsubscribe() }) - test('should support custom reducer with placeholderData', async () => { + test('should support custom reducer with initialValue', async () => { const key = queryKey() const observer = new QueryObserver(queryClient, { queryKey: key, @@ -396,7 +397,7 @@ describe('streamedQuery', () => { ...acc, [chunk]: true, }), - placeholderData: { + initialValue: { 10: true, 11: true, }, diff --git a/packages/query-core/src/streamedQuery.ts b/packages/query-core/src/streamedQuery.ts index 30cf868ea6..48ccf70be1 100644 --- a/packages/query-core/src/streamedQuery.ts +++ b/packages/query-core/src/streamedQuery.ts @@ -1,15 +1,28 @@ import { addToEnd } from './utils' import type { QueryFunction, QueryFunctionContext, QueryKey } from './types' -type StreamedQueryParams = { - queryFn: ( +type BaseStreamedQueryParams = { + queryFn: ( context: QueryFunctionContext, ) => AsyncIterable | Promise> refetchMode?: 'append' | 'reset' | 'replace' - reducer?: (acc: TData, chunk: TQueryFnData) => TData - placeholderData?: TData } +type SimpleStreamedQueryParams = BaseStreamedQueryParams & { + reducer?: never; + initialValue?: never; +} + +type ReducibleStreamedQueryParams = BaseStreamedQueryParams & { + reducer: (acc: TData, chunk: TQueryFnData) => TData + initialValue: TData +} + +type StreamedQueryParams = + | SimpleStreamedQueryParams + | ReducibleStreamedQueryParams + + /** * This is a helper function to create a query function that streams data from an AsyncIterable. * Data will be an Array of all the chunks received. @@ -22,23 +35,28 @@ type StreamedQueryParams = { * Set to `'replace'` to write all data to the cache once the stream ends. * @param reducer - A function to reduce the streamed chunks into the final data. * Defaults to a function that appends chunks to the end of the array. - * @param placeholderData - Initial data to be used while the first chunk is being fetched. - * Defaults to an empty array. + * @param initialValue - Initial value to be used while the first chunk is being fetched. */ export function streamedQuery< TQueryFnData = unknown, TData = Array, TQueryKey extends QueryKey = QueryKey, ->({ - queryFn, - refetchMode = 'reset', - reducer = (items, chunk) => - addToEnd((items ?? []) as Array, chunk) as TData, - placeholderData = [] as TData, -}: StreamedQueryParams): QueryFunction< +>(params: StreamedQueryParams): QueryFunction< TData, TQueryKey > { + let reducer; + let initialValue; + const {refetchMode='reset', queryFn} = params; + + if('reducer' in params && typeof params.reducer === 'function'){ + reducer=params.reducer; + initialValue=params.initialValue; + }else{ + initialValue=[] as TData; + reducer=(items: TData, chunk: TQueryFnData) => addToEnd(items as Array, chunk) as TData; + } + return async (context) => { const query = context.client .getQueryCache() @@ -53,7 +71,7 @@ export function streamedQuery< }) } - let result = placeholderData + let result = initialValue const stream = await queryFn(context) @@ -65,7 +83,7 @@ export function streamedQuery< // don't append to the cache directly when replace-refetching if (!isRefetch || refetchMode !== 'replace') { context.client.setQueryData(context.queryKey, (prev) => - reducer(prev ?? placeholderData, chunk), + reducer(prev ?? initialValue, chunk), ) } result = reducer(result, chunk) From d46d48870cc34f96b77743d699819d94e4d9481d Mon Sep 17 00:00:00 2001 From: gobbimar Date: Thu, 28 Aug 2025 00:00:01 +0200 Subject: [PATCH 04/10] feat(query-core): require initialValue when using custom reducer in streamedQuery Add type safety by making initialValue mandatory when providing a custom reducer function. This prevents runtime errors and ensures proper data initialization for custom data structures beyond simple arrays. Use conditional types to enforce the relationship between reducer and initialValue parameters, maintaining backward compatibility for simple array-based streaming while requiring explicit initialization for custom reducers. BREAKING CHANGE: When using a custom reducer function with streamedQuery, the initialValue parameter is now required and must be provided. --- .vscode/query.code-workspace | 42 ++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 .vscode/query.code-workspace diff --git a/.vscode/query.code-workspace b/.vscode/query.code-workspace new file mode 100644 index 0000000000..64b922a522 --- /dev/null +++ b/.vscode/query.code-workspace @@ -0,0 +1,42 @@ +{ + "folders": [ + { + "path": "../" + } + ], + "settings": { + "terminal.integrated.profiles.osx": { "zsh": { "path": "/bin/zsh", "args": ["-l", "-i"] } }, + "css.validate": false, + "editor.codeActionsOnSave": { + "source.fixAll": "explicit" + }, + "eslint.format.enable": true, + "eslint.enable": true, + "files.autoSave": "afterDelay", + "files.eol": "\n", + "javascript.validate.enable": false, + "json.validate.enable": true, + "less.validate": false, + "scss.validate": false, + "scss.completion.completePropertyWithSemicolon": true, + "scss.completion.triggerPropertyValueCompletion": true, + "stylelint.validate": [ + "css", + "scss", + "less" + ], + + "window.zoomLevel": 2, + "typescript.format.enable": false, + "typescript.preferences.includePackageJsonAutoImports": "on", + "typescript.validate.enable": false, + "javascript.updateImportsOnFileMove.enabled": "prompt", + "javascript.suggest.autoImports": true, + "typescript.suggest.autoImports": true, + "typescript.updateImportsOnFileMove.enabled": "always", + "typescript.disableAutomaticTypeAcquisition": true, + "typescript.tsdk": "node_modules/typescript/lib", + "vitest.maximumConfigs":15 + + } +} \ No newline at end of file From 42b9908c19eaa5977d23dfaa8382ea424601a0e4 Mon Sep 17 00:00:00 2001 From: gobbimar Date: Thu, 28 Aug 2025 00:13:31 +0200 Subject: [PATCH 05/10] feat(query-core): require initialValue when using custom reducer in streamedQuery Add type safety by making initialValue mandatory when providing a custom reducer function. This prevents runtime errors and ensures proper data initialization for custom data structures beyond simple arrays. Use conditional types to enforce the relationship between reducer and initialValue parameters, maintaining backward compatibility for simple array-based streaming while requiring explicit initialization for custom reducers. BREAKING CHANGE: When using a custom reducer function with streamedQuery, the initialValue parameter is now required and must be provided. --- packages/query-core/src/streamedQuery.ts | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/packages/query-core/src/streamedQuery.ts b/packages/query-core/src/streamedQuery.ts index 48ccf70be1..e3f6602fe5 100644 --- a/packages/query-core/src/streamedQuery.ts +++ b/packages/query-core/src/streamedQuery.ts @@ -41,22 +41,16 @@ export function streamedQuery< TQueryFnData = unknown, TData = Array, TQueryKey extends QueryKey = QueryKey, ->(params: StreamedQueryParams): QueryFunction< +>({ + queryFn, + refetchMode = 'reset', + reducer = (items, chunk) => addToEnd(items as Array, chunk) as TData, + initialValue = [] as TData, +}: StreamedQueryParams): QueryFunction< TData, TQueryKey > { - let reducer; - let initialValue; - const {refetchMode='reset', queryFn} = params; - if('reducer' in params && typeof params.reducer === 'function'){ - reducer=params.reducer; - initialValue=params.initialValue; - }else{ - initialValue=[] as TData; - reducer=(items: TData, chunk: TQueryFnData) => addToEnd(items as Array, chunk) as TData; - } - return async (context) => { const query = context.client .getQueryCache() From 14b6241377500a19ed8c1bec3a44a51236c2d552 Mon Sep 17 00:00:00 2001 From: gobbimar Date: Thu, 28 Aug 2025 00:20:38 +0200 Subject: [PATCH 06/10] removed personal vscode workspace file --- .vscode/query.code-workspace | 42 ------------------------------------ 1 file changed, 42 deletions(-) delete mode 100644 .vscode/query.code-workspace diff --git a/.vscode/query.code-workspace b/.vscode/query.code-workspace deleted file mode 100644 index 64b922a522..0000000000 --- a/.vscode/query.code-workspace +++ /dev/null @@ -1,42 +0,0 @@ -{ - "folders": [ - { - "path": "../" - } - ], - "settings": { - "terminal.integrated.profiles.osx": { "zsh": { "path": "/bin/zsh", "args": ["-l", "-i"] } }, - "css.validate": false, - "editor.codeActionsOnSave": { - "source.fixAll": "explicit" - }, - "eslint.format.enable": true, - "eslint.enable": true, - "files.autoSave": "afterDelay", - "files.eol": "\n", - "javascript.validate.enable": false, - "json.validate.enable": true, - "less.validate": false, - "scss.validate": false, - "scss.completion.completePropertyWithSemicolon": true, - "scss.completion.triggerPropertyValueCompletion": true, - "stylelint.validate": [ - "css", - "scss", - "less" - ], - - "window.zoomLevel": 2, - "typescript.format.enable": false, - "typescript.preferences.includePackageJsonAutoImports": "on", - "typescript.validate.enable": false, - "javascript.updateImportsOnFileMove.enabled": "prompt", - "javascript.suggest.autoImports": true, - "typescript.suggest.autoImports": true, - "typescript.updateImportsOnFileMove.enabled": "always", - "typescript.disableAutomaticTypeAcquisition": true, - "typescript.tsdk": "node_modules/typescript/lib", - "vitest.maximumConfigs":15 - - } -} \ No newline at end of file From 5f68a6299cdad798d2455f9dfee8605b945a40d7 Mon Sep 17 00:00:00 2001 From: gobbimar Date: Thu, 28 Aug 2025 00:24:14 +0200 Subject: [PATCH 07/10] updated documentation --- docs/reference/streamedQuery.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/reference/streamedQuery.md b/docs/reference/streamedQuery.md index 25e4fe80c2..c7aac47486 100644 --- a/docs/reference/streamedQuery.md +++ b/docs/reference/streamedQuery.md @@ -37,7 +37,8 @@ const query = queryOptions({ - Optional - A function to reduce the streamed chunks into the final data. - Defaults to a function that appends chunks to the end of the array. -- `placeholderData?: TData = TQueryFnData` +- `initialValue?: TData = TQueryFnData` - Optional - - Defines the initial data to be used while the first chunk is being fetched. + - Defines the initial data to be used while the first chunk is being fetched. + - It is mandatory when custom `reducer` is provided. - Defaults to an empty array. From da93cc774e11a4ce41c6df0c63b33f0eb7ec7d73 Mon Sep 17 00:00:00 2001 From: gobbimar Date: Wed, 3 Sep 2025 11:59:58 +0200 Subject: [PATCH 08/10] fix(docs): clarify reducer function description in streamedQuery documentation --- docs/reference/streamedQuery.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/reference/streamedQuery.md b/docs/reference/streamedQuery.md index c7aac47486..3faa94e5e3 100644 --- a/docs/reference/streamedQuery.md +++ b/docs/reference/streamedQuery.md @@ -35,8 +35,9 @@ const query = queryOptions({ - When set to `'replace'`, all data will be written to the cache once the stream ends. - `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData` - Optional - - A function to reduce the streamed chunks into the final data. - - Defaults to a function that appends chunks to the end of the array. + - Reduces streamed chunks (`TQueryFnData`) into the final data shape (`TData`). + - Default: appends each chunk to the end of the accumulator when `TData` is an array. + - If `TData` is not an array, you must provide a custom `reducer`. - `initialValue?: TData = TQueryFnData` - Optional - Defines the initial data to be used while the first chunk is being fetched. From e3a54ea80207de4bde24ac0b203699ecb9282a9e Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 3 Sep 2025 10:55:03 +0000 Subject: [PATCH 09/10] ci: apply automated fixes --- docs/reference/streamedQuery.md | 2 +- .../src/__tests__/streamedQuery.test.tsx | 2 +- packages/query-core/src/streamedQuery.ts | 24 ++++++++++++------- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/docs/reference/streamedQuery.md b/docs/reference/streamedQuery.md index 3faa94e5e3..d6ea7963af 100644 --- a/docs/reference/streamedQuery.md +++ b/docs/reference/streamedQuery.md @@ -40,6 +40,6 @@ const query = queryOptions({ - If `TData` is not an array, you must provide a custom `reducer`. - `initialValue?: TData = TQueryFnData` - Optional - - Defines the initial data to be used while the first chunk is being fetched. + - Defines the initial data to be used while the first chunk is being fetched. - It is mandatory when custom `reducer` is provided. - Defaults to an empty array. diff --git a/packages/query-core/src/__tests__/streamedQuery.test.tsx b/packages/query-core/src/__tests__/streamedQuery.test.tsx index de4ea77c08..385a74145b 100644 --- a/packages/query-core/src/__tests__/streamedQuery.test.tsx +++ b/packages/query-core/src/__tests__/streamedQuery.test.tsx @@ -402,7 +402,7 @@ describe('streamedQuery', () => { ...acc, [chunk]: true, }), - initialValue: {} + initialValue: {}, }), }) diff --git a/packages/query-core/src/streamedQuery.ts b/packages/query-core/src/streamedQuery.ts index e3f6602fe5..b8dafaf9c5 100644 --- a/packages/query-core/src/streamedQuery.ts +++ b/packages/query-core/src/streamedQuery.ts @@ -1,19 +1,26 @@ import { addToEnd } from './utils' import type { QueryFunction, QueryFunctionContext, QueryKey } from './types' -type BaseStreamedQueryParams = { - queryFn: ( +type BaseStreamedQueryParams = { + queryFn: ( context: QueryFunctionContext, ) => AsyncIterable | Promise> refetchMode?: 'append' | 'reset' | 'replace' } -type SimpleStreamedQueryParams = BaseStreamedQueryParams & { - reducer?: never; - initialValue?: never; +type SimpleStreamedQueryParams< + TQueryFnData, + TQueryKey extends QueryKey, +> = BaseStreamedQueryParams & { + reducer?: never + initialValue?: never } -type ReducibleStreamedQueryParams = BaseStreamedQueryParams & { +type ReducibleStreamedQueryParams< + TQueryFnData, + TData, + TQueryKey extends QueryKey, +> = BaseStreamedQueryParams & { reducer: (acc: TData, chunk: TQueryFnData) => TData initialValue: TData } @@ -22,7 +29,6 @@ type StreamedQueryParams = | SimpleStreamedQueryParams | ReducibleStreamedQueryParams - /** * This is a helper function to create a query function that streams data from an AsyncIterable. * Data will be an Array of all the chunks received. @@ -44,13 +50,13 @@ export function streamedQuery< >({ queryFn, refetchMode = 'reset', - reducer = (items, chunk) => addToEnd(items as Array, chunk) as TData, + reducer = (items, chunk) => + addToEnd(items as Array, chunk) as TData, initialValue = [] as TData, }: StreamedQueryParams): QueryFunction< TData, TQueryKey > { - return async (context) => { const query = context.client .getQueryCache() From ade50e9ee62b8a80229934337df0c9d20029fdc6 Mon Sep 17 00:00:00 2001 From: gobbimar Date: Wed, 3 Sep 2025 14:05:00 +0200 Subject: [PATCH 10/10] fix(tests): Code Review :: update streamedQuery tests to use correct initialValue type --- packages/query-core/src/__tests__/streamedQuery.test.tsx | 8 ++++---- packages/query-core/src/streamedQuery.ts | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/query-core/src/__tests__/streamedQuery.test.tsx b/packages/query-core/src/__tests__/streamedQuery.test.tsx index de4ea77c08..d542935fea 100644 --- a/packages/query-core/src/__tests__/streamedQuery.test.tsx +++ b/packages/query-core/src/__tests__/streamedQuery.test.tsx @@ -396,13 +396,13 @@ describe('streamedQuery', () => { const observer = new QueryObserver(queryClient, { queryKey: key, - queryFn: streamedQuery>({ + queryFn: streamedQuery({ queryFn: () => createAsyncNumberGenerator(2), reducer: (acc, chunk) => ({ ...acc, [chunk]: true, }), - initialValue: {} + initialValue: {} as Record, }), }) @@ -432,7 +432,7 @@ describe('streamedQuery', () => { const key = queryKey() const observer = new QueryObserver(queryClient, { queryKey: key, - queryFn: streamedQuery>({ + queryFn: streamedQuery({ queryFn: () => createAsyncNumberGenerator(2), reducer: (acc, chunk) => ({ ...acc, @@ -441,7 +441,7 @@ describe('streamedQuery', () => { initialValue: { 10: true, 11: true, - }, + } as Record, }), }) diff --git a/packages/query-core/src/streamedQuery.ts b/packages/query-core/src/streamedQuery.ts index e3f6602fe5..37ec9231db 100644 --- a/packages/query-core/src/streamedQuery.ts +++ b/packages/query-core/src/streamedQuery.ts @@ -77,7 +77,7 @@ export function streamedQuery< // don't append to the cache directly when replace-refetching if (!isRefetch || refetchMode !== 'replace') { context.client.setQueryData(context.queryKey, (prev) => - reducer(prev ?? initialValue, chunk), + reducer(prev === undefined ? initialValue : prev, chunk), ) } result = reducer(result, chunk)