-
-
Notifications
You must be signed in to change notification settings - Fork 8k
Description
Is there an existing issue that is already proposing this?
- I have searched the existing issues
NestJS version
latest
Is your performance suggestion related to a problem? Please describe it
🚀 Fix ServiceBus Connection Performance Issue
Problem
ServiceBus client was creating new connections for every message due to NestJS's RxJS defer()
pattern in ClientProxy.emit()
. Each subscription (whether firstValueFrom()
, .subscribe()
, etc.) triggered a fresh dispatchEvent()
execution, causing redundant ServiceBusClient creation.
Root Cause
https://github.com/nestjs/nest/blob/master/packages/microservices/client/client-proxy.ts#L111
// NestJS ClientProxy.emit() implementation:
const source = defer(() => this.dispatchEvent({ pattern, data }));
// ^^^^^ Executes fresh for every subscription
Understanding the Issue (Please Correct If Wrong)
Forgive me if I'm misunderstanding how this works, but from what I can observe, the RxJS defer()
operator in NestJS's ClientProxy.emit()
creates a fresh execution context for every subscription. This means that even though we have a single ServiceBusClientProxy
instance, each call to emit a message results in a completely new dispatchEvent()
execution with no memory of previous connections.
The issue isn't with how we're calling the client, but rather that the reactive pattern itself prevents connection state from persisting between individual message emissions, even when using the same client proxy instance.
Solution Implementation
Our custom transport extends the base NestJS ClientProxy
class:
export class ServiceBusClientProxy extends ClientProxy {
protected readonly logger = new Logger(`${ServiceBusClientProxy.name}@${version}`);
protected serviceBusClient?: ServiceBusClient; // ← The connection we need to reuse
constructor(protected readonly options: ClientOptions) {
super(); // ← Inherits emit() method that calls dispatchEvent()
// ... initialization
}
// connect() was being called every ServiceBusClientProxy.client.emit() call
public async connect(): Promise<void> {
if (this.serviceBusClient) {
this.logger.debug('Service Bus client already exists, reusing connection');
return; // ← Prevents redundant client creation
}
this.logger.debug('Connecting to Service Bus...');
this.serviceBusClient = await this.createClient();
this.logger.debug('Service Bus client created successfully');
}
}
It seems that the connect() method was being called every time emit was being called. Further context:
@Controller()
export class AppController {
constructor(
@Inject('SB_CLIENT') private readonly client: ServiceBusClientProxy,
) {}
@Post('publish')
async publishMessages(@Body() payload: any) {
const outgoingMessage = OutgoingCloudEvent.create<TestEvent>({
data: {
message: payload.message || `Test message ${i + 1}`,
},
options: {
eventType: payload.eventType || 'TestEvent',
correlationId:
payload.correlationId || `correlation-${Date.now()}-${i}`,
},
});
await firstValueFrom(
this.client.emit(
{ topic: process.env.SB_TOPIC || 'sb-loadtest-send' },
outgoingMessage,
),
);
publishedMessages.push({
messageId: outgoingMessage.options.contextId,
correlationId: outgoingMessage.options.correlationId,
});
}
return {
success: true,
publishedCount: count,
messages: publishedMessages,
};
}
Describe the performance enhancement you are proposing and how we can try it out
Make the observable connection-aware or cache the client somehow?