diff --git a/src/change_stream.ts b/src/change_stream.ts index ed847519e8..1076b2f66b 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -809,7 +809,11 @@ export class ChangeStream< while (true) { try { const change = await this.cursor.tryNext(); - return change ?? null; + if (!change) { + return null; + } + const processedChange = this._processChange(change); + return processedChange; } catch (error) { try { await this._processErrorIteratorMode(error, this.cursor.id != null); diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index b941109048..c6156ab954 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -370,31 +370,76 @@ describe('Change Streams', function () { } ); - it('should cache the change stream resume token using iterator form', { - metadata: { requires: { topology: 'replicaset' } }, + describe('cache the change stream resume token', () => { + describe('using iterator form', () => { + context('#next', () => { + it('caches the resume token on change', { + metadata: { requires: { topology: 'replicaset' } }, - async test() { - await initIteratorMode(changeStream); - collection.insertOne({ a: 1 }); + async test() { + await initIteratorMode(changeStream); + await collection.insertOne({ a: 1 }); - const hasNext = await changeStream.hasNext(); - expect(hasNext).to.be.true; + const change = await changeStream.next(); + expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); + } + }); - const change = await changeStream.next(); - expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); - } - }); + it('caches the resume token correctly when preceded by #hasNext', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + await initIteratorMode(changeStream); + await collection.insertOne({ a: 1 }); - it('should cache the change stream resume token using event listener form', { - metadata: { requires: { topology: 'replicaset' } }, - async test() { - const willBeChange = once(changeStream, 'change'); - await once(changeStream.cursor, 'init'); - collection.insertOne({ a: 1 }); + await changeStream.hasNext(); - const [change] = await willBeChange; - expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); - } + const change = await changeStream.next(); + expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); + } + }); + }); + + it('#tryNext', { + metadata: { requires: { topology: 'replicaset' } }, + + async test() { + await initIteratorMode(changeStream); + await collection.insertOne({ a: 1 }); + + const change = await changeStream.tryNext(); + expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); + } + }); + + context('#hasNext', () => { + it('does not cache the resume token', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + await initIteratorMode(changeStream); + const resumeToken = changeStream.resumeToken; + + await collection.insertOne({ a: 1 }); + + const hasNext = await changeStream.hasNext(); + expect(hasNext).to.be.true; + + expect(changeStream.resumeToken).to.equal(resumeToken); + } + }); + }); + }); + + it('should cache using event listener form', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + const willBeChange = once(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + await collection.insertOne({ a: 1 }); + + const [change] = await willBeChange; + expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); + } + }); }); it('should error if resume token projected out of change stream document using iterator', { @@ -1816,6 +1861,144 @@ describe('Change Streams', function () { }); }); }); + + describe("NODE-4763 - doesn't produce duplicates after resume", function () { + let client: MongoClient; + let collection: Collection; + let changeStream: ChangeStream; + let aggregateEvents: CommandStartedEvent[] = []; + const resumableError = { code: 6, message: 'host unreachable' }; + + beforeEach(async function () { + const dbName = 'node-4763'; + const collectionName = 'test-collection'; + + client = this.configuration.newClient({ monitorCommands: true }); + client.on('commandStarted', filterForCommands(['aggregate'], aggregateEvents)); + collection = client.db(dbName).collection(collectionName); + + changeStream = collection.watch([]); + }); + + afterEach(async function () { + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: 'off' + } as FailCommandFailPoint); + + await changeStream.close(); + await client.close(); + aggregateEvents = []; + }); + + describe('when using iterator form', function () { + it('#next', { requires: { topology: 'replicaset' } }, async function test() { + await initIteratorMode(changeStream); + + await collection.insertOne({ a: 1 }); + const change = await changeStream.next(); + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 1 } + }); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableError.code, + errmsg: resumableError.message + } + } as FailCommandFailPoint); + + await collection.insertOne({ a: 2 }); + const change2 = await changeStream.next(); + expect(change2).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 2 } + }); + + expect(aggregateEvents.length).to.equal(2); + }); + + it('#tryNext', { requires: { topology: 'replicaset' } }, async function test() { + await initIteratorMode(changeStream); + + await collection.insertOne({ a: 1 }); + const change = await changeStream.tryNext(); + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 1 } + }); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableError.code, + errmsg: resumableError.message + } + } as FailCommandFailPoint); + + await collection.insertOne({ a: 2 }); + const change2 = await changeStream.tryNext(); + expect(change2).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 2 } + }); + + expect(aggregateEvents.length).to.equal(2); + }); + }); + + it('in an event listener form', { requires: { topology: 'replicaset' } }, async function () { + const willBeChange = on(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + + await collection.insertOne({ a: 1 }); + const change = await willBeChange.next(); + expect(change.value[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 1 } + }); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableError.code, + errmsg: resumableError.message + } + } as FailCommandFailPoint); + + // There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when + // resuming a change stream don't return the change event. + // So we defer the insert until a period of time after the change stream has received the first change. + // 2000ms is long enough for the change stream to attempt to resume and fail once before exhausting the failpoint + // and succeeding. + await sleep(2000); + await collection.insertOne({ a: 2 }); + + const change2 = await willBeChange.next(); + expect(change2.value[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 2 } + }); + + expect(aggregateEvents.length).to.equal(2); + }); + }); }); describe('ChangeStream resumability', function () {