Skip to content

Performance: ClientProxy.emit() defer() pattern prevents connection reuse in custom client-proxy #15587

@oscarwest

Description

@oscarwest

Is there an existing issue that is already proposing this?

  • I have searched the existing issues

NestJS version

latest

Is your performance suggestion related to a problem? Please describe it

🚀 Fix ServiceBus Connection Performance Issue

Problem

ServiceBus client was creating new connections for every message due to NestJS's RxJS defer() pattern in ClientProxy.emit(). Each subscription (whether firstValueFrom(), .subscribe(), etc.) triggered a fresh dispatchEvent() execution, causing redundant ServiceBusClient creation.

Root Cause

https://github.com/nestjs/nest/blob/master/packages/microservices/client/client-proxy.ts#L111

// NestJS ClientProxy.emit() implementation:
const source = defer(() => this.dispatchEvent({ pattern, data }));
//             ^^^^^ Executes fresh for every subscription

Understanding the Issue (Please Correct If Wrong)

Forgive me if I'm misunderstanding how this works, but from what I can observe, the RxJS defer() operator in NestJS's ClientProxy.emit() creates a fresh execution context for every subscription. This means that even though we have a single ServiceBusClientProxy instance, each call to emit a message results in a completely new dispatchEvent() execution with no memory of previous connections.

The issue isn't with how we're calling the client, but rather that the reactive pattern itself prevents connection state from persisting between individual message emissions, even when using the same client proxy instance.

Solution Implementation

Our custom transport extends the base NestJS ClientProxy class:

export class ServiceBusClientProxy extends ClientProxy {
  protected readonly logger = new Logger(`${ServiceBusClientProxy.name}@${version}`);
  protected serviceBusClient?: ServiceBusClient; // ← The connection we need to reuse

  constructor(protected readonly options: ClientOptions) {
    super(); // ← Inherits emit() method that calls dispatchEvent()
    // ... initialization
  }

  // connect() was being called every ServiceBusClientProxy.client.emit() call 
  public async connect(): Promise<void> {
    if (this.serviceBusClient) {
      this.logger.debug('Service Bus client already exists, reusing connection');
      return; // ← Prevents redundant client creation
    }
    
    this.logger.debug('Connecting to Service Bus...');
    this.serviceBusClient = await this.createClient();
    this.logger.debug('Service Bus client created successfully');
  }
}

It seems that the connect() method was being called every time emit was being called. Further context:

@Controller()
export class AppController {
  constructor(
    @Inject('SB_CLIENT') private readonly client: ServiceBusClientProxy,
  ) {}

  @Post('publish')
  async publishMessages(@Body() payload: any) {
      const outgoingMessage = OutgoingCloudEvent.create<TestEvent>({
        data: {
          message: payload.message || `Test message ${i + 1}`,
        },
        options: {
          eventType: payload.eventType || 'TestEvent',
          correlationId:
            payload.correlationId || `correlation-${Date.now()}-${i}`,
        },
      });

      await firstValueFrom(
        this.client.emit(
          { topic: process.env.SB_TOPIC || 'sb-loadtest-send' },
          outgoingMessage,
        ),
      );

      publishedMessages.push({
        messageId: outgoingMessage.options.contextId,
        correlationId: outgoingMessage.options.correlationId,
      });
    }

    return {
      success: true,
      publishedCount: count,
      messages: publishedMessages,
    };
  }

Describe the performance enhancement you are proposing and how we can try it out

Make the observable connection-aware or cache the client somehow?

Benchmarks result or another proof (eg: POC)

Metadata

Metadata

Assignees

No one assigned

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions