Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 135 additions & 8 deletions src/handlers/handlerUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ export async function tryPost(
requestHeaders: Record<string, string>,
fn: endpointStrings,
currentIndex: number | string,
method: string = 'POST'
method: string = 'POST',
abortSignal?: AbortSignal
): Promise<Response> {
const requestContext = new RequestContext(
c,
Expand All @@ -298,6 +299,9 @@ export async function tryPost(
method,
currentIndex as number
);
if (abortSignal) {
requestContext.setAbortSignal(abortSignal);
}
const hooksService = new HooksService(requestContext);
const providerContext = new ProviderContext(requestContext.provider);
const logsService = new LogsService(c);
Expand Down Expand Up @@ -472,7 +476,8 @@ export async function tryTargetsRecursively(
fn: endpointStrings,
method: string,
jsonPath: string,
inheritedConfig: Record<string, any> = {}
inheritedConfig: Record<string, any> = {},
abortSignal?: AbortSignal
): Promise<Response> {
const currentTarget: any = { ...targetGroup };
let currentJsonPath = jsonPath;
Expand Down Expand Up @@ -662,7 +667,8 @@ export async function tryTargetsRecursively(
fn,
method,
`${currentJsonPath}.targets[${originalIndex}]`,
currentInheritedConfig
currentInheritedConfig,
abortSignal
);
const codes = currentTarget.strategy?.onStatusCodes;
const gatewayException =
Expand Down Expand Up @@ -705,14 +711,131 @@ export async function tryTargetsRecursively(
fn,
method,
currentJsonPath,
currentInheritedConfig
currentInheritedConfig,
abortSignal
);
break;
}
randomWeight -= provider.weight;
}
break;

case StrategyModes.SAMPLE: {
const targets = currentTarget.targets || [];
const onStatusCodes = currentTarget.strategy?.onStatusCodes;
const cancelOthers = currentTarget.strategy?.cancelOthers;

// v1 limitation: do not support sampling when request body is a ReadableStream
if (request instanceof ReadableStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Bug Fix

Issue: Type confusion bug - request instanceof ReadableStream will always be false. The request parameter is a Request object, not a ReadableStream. This check should verify if the request body is a stream.
Fix: Check the request body type instead of the request object type
Impact: Prevents incorrect rejection of streaming requests, ensuring proper validation

Suggested change
if (request instanceof ReadableStream) {
// v1 limitation: do not support sampling when request body is a ReadableStream
if (request.body instanceof ReadableStream) {

response = new Response(
JSON.stringify({
status: 'failure',
message:
'Strategy "sample" does not support streaming request bodies in v1',
}),
{ status: 400, headers: { 'content-type': 'application/json' } }
);
break;
}

// Fire all requests in parallel; pick first-success
let winnerResolved = false;
let resolveWinner: (value: Response) => void = () => {};
const winnerPromise = new Promise<Response>((resolve) => {
resolveWinner = resolve;
});

const controllers: AbortController[] = [];
const pendingPromises: Array<
Promise<{ resp: Response; idx: number; abort: AbortController }>
> = targets.map((t: Targets, index: number) => {
const originalIndex = (t.originalIndex as number | undefined) ?? index;
const controller = new AbortController();
controllers.push(controller);
return tryTargetsRecursively(
c,
t,
request,
requestHeaders,
fn,
method,
`${currentJsonPath}.targets[${originalIndex}]`,
currentInheritedConfig,
controller.signal
).then((resp) => ({ resp, idx: originalIndex, abort: controller }));
});

// Resolve on first-success
for (const p of pendingPromises) {
p.then(({ resp, abort }) => {
if (winnerResolved) return;
const gatewayException =
resp?.headers.get('x-portkey-gateway-exception') === 'true';
const isSuccess =
(Array.isArray(onStatusCodes) &&
!onStatusCodes.includes(resp?.status)) ||
(!onStatusCodes && resp?.ok) ||
gatewayException;
Comment on lines +774 to +778
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Bug Fix

Issue: Logic error in success condition - the boolean OR operator | should be logical OR ||. This causes incorrect type coercion and wrong success evaluation.
Fix: Use logical OR operator for proper boolean evaluation
Impact: Fixes success detection logic to properly identify successful responses

Suggested change
const isSuccess =
(Array.isArray(onStatusCodes) &&
!onStatusCodes.includes(resp?.status)) ||
(!onStatusCodes && resp?.ok) ||
gatewayException;
const isSuccess =
(Array.isArray(onStatusCodes) &&
!onStatusCodes.includes(resp?.status)) ||
(!onStatusCodes && resp?.ok) ||
gatewayException;

if (isSuccess && !winnerResolved) {
winnerResolved = true;
resolveWinner(resp);
if (cancelOthers) {
for (const ctl of controllers) {
try {
ctl.abort();
} catch {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Code Refactor

Issue: Empty catch block silently ignores abort errors, making debugging difficult
Fix: Add minimal error logging for abort operations
Impact: Improves debugging capability while maintaining non-blocking behavior

Suggested change
} catch {}
} catch (e) {
// Ignore abort errors but log for debugging
console.debug('Abort controller error:', e);
}

}
}
Comment on lines +780 to +788
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Code Refactor

Issue: Duplicate abort controller cleanup logic - same code block appears twice (lines 782-788 and 812-818)
Fix: Extract abort cleanup into a reusable function
Impact: Reduces code duplication and improves maintainability

Suggested change
winnerResolved = true;
resolveWinner(resp);
if (cancelOthers) {
for (const ctl of controllers) {
try {
ctl.abort();
} catch {}
}
}
const cleanupControllers = () => {
if (cancelOthers) {
for (const ctl of controllers) {
try {
ctl.abort();
} catch (e) {
console.debug('Abort controller error:', e);
}
}
}
};
winnerResolved = true;
resolveWinner(resp);
cleanupControllers();

}
}).catch(() => {
// Ignore individual errors; overall fallback handled below
});
}

// If none succeed, return the last completed response
(async () => {
const results = await Promise.allSettled(pendingPromises);
if (winnerResolved) return;
const fulfilled = results.filter(
(
r
): r is PromiseFulfilledResult<{
resp: Response;
idx: number;
abort: AbortController;
}> => r.status === 'fulfilled'
);
if (fulfilled.length) {
const { resp } = fulfilled[fulfilled.length - 1].value;
winnerResolved = true;
resolveWinner(resp);
if (cancelOthers) {
for (const ctl of controllers) {
try {
ctl.abort();
} catch {}
}
}
} else {
// If all rejected (shouldn't generally happen because tryTargetsRecursively guards), pick a generic 500
winnerResolved = true;
resolveWinner(
new Response(
JSON.stringify({
status: 'failure',
message: 'All sample targets failed',
}),
{ status: 500, headers: { 'content-type': 'application/json' } }
)
);
}
})();

response = await winnerPromise;
// Note: cancelOthers is a no-op for now; underlying fetch cancellation will be wired in a later update
break;
}

case StrategyModes.CONDITIONAL: {
let metadata: Record<string, string>;
try {
Expand Down Expand Up @@ -749,7 +872,8 @@ export async function tryTargetsRecursively(
fn,
method,
`${currentJsonPath}.targets[${originalIndex}]`,
currentInheritedConfig
currentInheritedConfig,
abortSignal
);
break;
}
Expand All @@ -764,7 +888,8 @@ export async function tryTargetsRecursively(
fn,
method,
`${currentJsonPath}.targets[${originalIndex}]`,
currentInheritedConfig
currentInheritedConfig,
abortSignal
);
break;

Expand All @@ -777,7 +902,8 @@ export async function tryTargetsRecursively(
requestHeaders,
fn,
currentJsonPath,
method
method,
abortSignal
);
if (isHandlingCircuitBreaker) {
await c.get('handleCircuitBreakerResponse')?.(
Expand Down Expand Up @@ -1165,7 +1291,8 @@ export async function recursiveAfterRequestHookHandler(
retry.onStatusCodes,
requestTimeout,
requestHandler,
retry.useRetryAfterHeader
retry.useRetryAfterHeader,
requestContext.abortSignal
));

// Check if sync hooks are available
Expand Down
22 changes: 18 additions & 4 deletions src/handlers/retryHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,18 @@ async function fetchWithTimeout(
url: string,
options: RequestInit,
timeout: number,
requestHandler?: () => Promise<Response>
requestHandler?: () => Promise<Response>,
externalAbortSignal?: AbortSignal
) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
if (externalAbortSignal) {
if (externalAbortSignal.aborted) {
controller.abort();
} else {
externalAbortSignal.addEventListener('abort', () => controller.abort());
}
}
Comment on lines +13 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🐛 Bug Fix

Issue: Race condition - external abort signal could be triggered between the aborted check and event listener registration
Fix: Register event listener first, then check abort status
Impact: Prevents missed abort signals and ensures reliable cancellation

Suggested change
if (externalAbortSignal) {
if (externalAbortSignal.aborted) {
controller.abort();
} else {
externalAbortSignal.addEventListener('abort', () => controller.abort());
}
}
if (externalAbortSignal) {
externalAbortSignal.addEventListener('abort', () => controller.abort());
if (externalAbortSignal.aborted) {
controller.abort();
}
}

const timeoutRequestOptions = {
...options,
signal: controller.signal,
Expand Down Expand Up @@ -69,7 +77,8 @@ export const retryRequest = async (
statusCodesToRetry: number[],
timeout: number | null,
requestHandler?: () => Promise<Response>,
followProviderRetry?: boolean
followProviderRetry?: boolean,
externalAbortSignal?: AbortSignal
): Promise<{
response: Response;
attempt: number | undefined;
Expand All @@ -93,12 +102,17 @@ export const retryRequest = async (
url,
options,
timeout,
requestHandler
requestHandler,
externalAbortSignal
);
} else if (requestHandler) {
response = await requestHandler();
} else {
response = await fetch(url, options);
const noTimeoutOptions = { ...options } as RequestInit;
if (externalAbortSignal) {
noTimeoutOptions.signal = externalAbortSignal;
}
response = await fetch(url, noTimeoutOptions);
}
if (statusCodesToRetry.includes(response.status)) {
const errorObj: any = new Error(await response.text());
Expand Down
9 changes: 9 additions & 0 deletions src/handlers/services/requestContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export class RequestContext {
private _transformedRequestBody: any;
public readonly providerOption: Options;
private _requestURL: string = ''; // Is set at the beginning of tryPost()
private _externalAbortSignal: AbortSignal | undefined;

constructor(
public readonly honoContext: Context,
Expand All @@ -44,6 +45,14 @@ export class RequestContext {
this._requestURL = requestURL;
}

get abortSignal(): AbortSignal | undefined {
return this._externalAbortSignal;
}

setAbortSignal(signal: AbortSignal) {
this._externalAbortSignal = signal;
}

get overrideParams(): Params {
return this.providerOption?.overrideParams ?? {};
}
Expand Down
13 changes: 9 additions & 4 deletions src/middlewares/requestValidator/schema/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@ export const configSchema: any = z
.string()
.refine(
(value) =>
['single', 'loadbalance', 'fallback', 'conditional'].includes(
value
),
[
'single',
'loadbalance',
'fallback',
'conditional',
'sample',
].includes(value),
{
message:
"Invalid 'mode' value. Must be one of: single, loadbalance, fallback, conditional",
"Invalid 'mode' value. Must be one of: single, loadbalance, fallback, conditional, sample",
}
),
on_status_codes: z.array(z.number()).optional(),
cancel_others: z.boolean().optional(),
conditions: z
.array(
z.object({
Expand Down
2 changes: 2 additions & 0 deletions src/types/requestBody.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ export enum StrategyModes {
FALLBACK = 'fallback',
SINGLE = 'single',
CONDITIONAL = 'conditional',
SAMPLE = 'sample',
}

interface Strategy {
mode: StrategyModes;
onStatusCodes?: Array<number>;
cancelOthers?: boolean;
conditions?: {
query: {
[key: string]: any;
Expand Down