From 4f8139e50c35ee5ddaee0fb4c1524172811fe367 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Wed, 27 Aug 2025 15:15:08 +0200 Subject: [PATCH 01/10] test(NODE-4763): add tests for resumeToken caching mechanism --- .../change-streams/change_stream.test.ts | 61 ++++++++++++------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index b941109048..9f26f7f71c 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -45,7 +45,7 @@ const pipeline = [ { $addFields: { comment: 'The documentKey field has been projected out of this document.' } } ]; -describe('Change Streams', function () { +describe.only('Change Streams', function () { let client: MongoClient; let collection: Collection; let changeStream: ChangeStream; @@ -370,31 +370,50 @@ describe('Change Streams', function () { } ); - it('should cache the change stream resume token using iterator form', { - metadata: { requires: { topology: 'replicaset' } }, + describe('cache the change stream resume token', () => { + describe('using iterator form', () => { + it('#next', { + metadata: { requires: { topology: 'replicaset' } }, - async test() { - await initIteratorMode(changeStream); - collection.insertOne({ a: 1 }); + async test() { + await initIteratorMode(changeStream); + collection.insertOne({ a: 1 }); - const hasNext = await changeStream.hasNext(); - expect(hasNext).to.be.true; + const hasNext = await changeStream.hasNext(); + expect(hasNext).to.be.true; - const change = await changeStream.next(); - expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); - } - }); + const change = await changeStream.next(); + expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); + } + }); - it('should cache the change stream resume token using event listener form', { - metadata: { requires: { topology: 'replicaset' } }, - async test() { - const willBeChange = once(changeStream, 'change'); - await once(changeStream.cursor, 'init'); - collection.insertOne({ a: 1 }); + it('#tryNext', { + metadata: { requires: { topology: 'replicaset' } }, - const [change] = await willBeChange; - expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); - } + async test() { + await initIteratorMode(changeStream); + collection.insertOne({ a: 1 }); + + const hasNext = await changeStream.hasNext(); + expect(hasNext).to.be.true; + + const change = await changeStream.tryNext(); + expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); + } + }); + }); + + it('should cache using event listener form', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + const willBeChange = once(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + collection.insertOne({ a: 1 }); + + const [change] = await willBeChange; + expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); + } + }); }); it('should error if resume token projected out of change stream document using iterator', { From 00b8a2861e6c75145e15bf24efb01b321501e6c6 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Fri, 29 Aug 2025 10:39:34 +0200 Subject: [PATCH 02/10] fix(NODE-4763): cache resume token in ChangeStream#tryNext() method --- src/change_stream.ts | 6 +- .../change-streams/change_stream.test.ts | 86 ++++++++++++++++--- test/tools/utils.ts | 7 +- 3 files changed, 87 insertions(+), 12 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index ed847519e8..1076b2f66b 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -809,7 +809,11 @@ export class ChangeStream< while (true) { try { const change = await this.cursor.tryNext(); - return change ?? null; + if (!change) { + return null; + } + const processedChange = this._processChange(change); + return processedChange; } catch (error) { try { await this._processErrorIteratorMode(error, this.cursor.id != null); diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 9f26f7f71c..890c073faf 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -45,7 +45,7 @@ const pipeline = [ { $addFields: { comment: 'The documentKey field has been projected out of this document.' } } ]; -describe.only('Change Streams', function () { +describe('Change Streams', function () { let client: MongoClient; let collection: Collection; let changeStream: ChangeStream; @@ -377,10 +377,7 @@ describe.only('Change Streams', function () { async test() { await initIteratorMode(changeStream); - collection.insertOne({ a: 1 }); - - const hasNext = await changeStream.hasNext(); - expect(hasNext).to.be.true; + await collection.insertOne({ a: 1 }); const change = await changeStream.next(); expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); @@ -392,10 +389,7 @@ describe.only('Change Streams', function () { async test() { await initIteratorMode(changeStream); - collection.insertOne({ a: 1 }); - - const hasNext = await changeStream.hasNext(); - expect(hasNext).to.be.true; + await collection.insertOne({ a: 1 }); const change = await changeStream.tryNext(); expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); @@ -408,7 +402,7 @@ describe.only('Change Streams', function () { async test() { const willBeChange = once(changeStream, 'change'); await once(changeStream.cursor, 'init'); - collection.insertOne({ a: 1 }); + await collection.insertOne({ a: 1 }); const [change] = await willBeChange; expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); @@ -1835,6 +1829,78 @@ describe.only('Change Streams', function () { }); }); }); + + describe("NODE-4763 - doesn't produce duplicates after resume", function () { + const resumableError = { code: 6, message: 'host unreachable' }; + + beforeEach(async function () { + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { skip: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableError.code, + errmsg: resumableError.message + } + } as FailPoint); + }); + + afterEach(async function () { + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: 'off' + } as FailPoint); + }); + + describe('when using iterator form', function () { + it('#next', { requires: { topology: 'replicaset' } }, async function test() { + await initIteratorMode(changeStream); + + await collection.insertOne({ a: 1 }); + const change = await changeStream.next(); + expect(change).to.have.property('operationType', 'insert'); + expect(change).to.have.nested.property('fullDocument.a', 1); + + await collection.insertOne({ a: 2 }); + const change2 = await changeStream.next(); + expect(change2).to.have.property('operationType', 'insert'); + expect(change2).to.have.nested.property('fullDocument.a', 2); + }); + + it('#tryNext', { requires: { topology: 'replicaset' } }, async function test() { + await initIteratorMode(changeStream); + + await collection.insertOne({ a: 1 }); + const change = await changeStream.tryNext(); + expect(change).to.have.property('operationType', 'insert'); + expect(change).to.have.nested.property('fullDocument.a', 1); + + await collection.insertOne({ a: 2 }); + const change2 = await changeStream.tryNext(); + expect(change2).to.have.property('operationType', 'insert'); + expect(change2).to.have.nested.property('fullDocument.a', 2); + }); + }); + + it('in an event listener form', { requires: { topology: 'replicaset' } }, async function () { + const willBeChange = on(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + + await collection.insertOne({ a: 1 }); + const change = await willBeChange.next(); + expect(change.value[0]).to.have.property('operationType', 'insert'); + expect(change.value[0]).to.have.nested.property('fullDocument.a', 1); + + await collection.insertOne({ a: 2 }); + const change2 = await willBeChange.next(); + expect(change2.value[0]).to.have.property('operationType', 'insert'); + expect(change2.value[0]).to.have.nested.property('fullDocument.a', 2); + }); + }); }); describe('ChangeStream resumability', function () { diff --git a/test/tools/utils.ts b/test/tools/utils.ts index 79d918a689..bc33efacde 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -207,7 +207,12 @@ export function extractAuthFromConnectionString(connectionString: string | any[] export interface FailPoint { configureFailPoint: 'failCommand' | 'failGetMoreAfterCursorCheckout' | 'maxTimeNeverTimeOut'; - mode: { activationProbability: number } | { times: number } | 'alwaysOn' | 'off'; + mode: + | { activationProbability: number } + | { times: number } + | { skip: number } + | 'alwaysOn' + | 'off'; } export interface FailCommandFailPoint extends FailPoint { From 35979ffe095e9f3bd214e657515726f610e84934 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Mon, 1 Sep 2025 12:08:07 +0200 Subject: [PATCH 03/10] test(NODE-4763): incorporated suggested changes in assertions --- .../change-streams/change_stream.test.ts | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 890c073faf..1e9a253022 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1862,13 +1862,17 @@ describe('Change Streams', function () { await collection.insertOne({ a: 1 }); const change = await changeStream.next(); - expect(change).to.have.property('operationType', 'insert'); - expect(change).to.have.nested.property('fullDocument.a', 1); + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 1 } + }); await collection.insertOne({ a: 2 }); const change2 = await changeStream.next(); - expect(change2).to.have.property('operationType', 'insert'); - expect(change2).to.have.nested.property('fullDocument.a', 2); + expect(change2).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 2 } + }); }); it('#tryNext', { requires: { topology: 'replicaset' } }, async function test() { @@ -1876,13 +1880,17 @@ describe('Change Streams', function () { await collection.insertOne({ a: 1 }); const change = await changeStream.tryNext(); - expect(change).to.have.property('operationType', 'insert'); - expect(change).to.have.nested.property('fullDocument.a', 1); + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 1 } + }); await collection.insertOne({ a: 2 }); const change2 = await changeStream.tryNext(); - expect(change2).to.have.property('operationType', 'insert'); - expect(change2).to.have.nested.property('fullDocument.a', 2); + expect(change2).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 2 } + }); }); }); @@ -1892,13 +1900,17 @@ describe('Change Streams', function () { await collection.insertOne({ a: 1 }); const change = await willBeChange.next(); - expect(change.value[0]).to.have.property('operationType', 'insert'); - expect(change.value[0]).to.have.nested.property('fullDocument.a', 1); + expect(change.value[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 1 } + }); await collection.insertOne({ a: 2 }); const change2 = await willBeChange.next(); - expect(change2.value[0]).to.have.property('operationType', 'insert'); - expect(change2.value[0]).to.have.nested.property('fullDocument.a', 2); + expect(change2.value[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 2 } + }); }); }); }); From cb9a88626f7379ba7bd834f98ccd61beb440d31b Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Tue, 2 Sep 2025 18:10:09 +0200 Subject: [PATCH 04/10] test(NODE-4763): make sure the resume happened after the failure --- .../change-streams/change_stream.test.ts | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 1e9a253022..e97005b290 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1831,9 +1831,22 @@ describe('Change Streams', function () { }); describe("NODE-4763 - doesn't produce duplicates after resume", function () { + let client: MongoClient; + let collection: Collection; + let changeStream: ChangeStream; + let aggregateEvents: CommandStartedEvent[] = []; const resumableError = { code: 6, message: 'host unreachable' }; beforeEach(async function () { + const dbName = 'node-4763'; + const collectionName = 'test-collection'; + + client = this.configuration.newClient({ monitorCommands: true }); + client.on('commandStarted', filterForCommands(['aggregate'], aggregateEvents)); + collection = client.db(dbName).collection(collectionName); + + changeStream = collection.watch([]); + await client.db('admin').command({ configureFailPoint: is4_2Server(this.configuration.version) ? 'failCommand' @@ -1844,7 +1857,7 @@ describe('Change Streams', function () { errorCode: resumableError.code, errmsg: resumableError.message } - } as FailPoint); + } as FailCommandFailPoint); }); afterEach(async function () { @@ -1853,7 +1866,11 @@ describe('Change Streams', function () { ? 'failCommand' : 'failGetMoreAfterCursorCheckout', mode: 'off' - } as FailPoint); + } as FailCommandFailPoint); + + await changeStream.close(); + await client.close(); + aggregateEvents = []; }); describe('when using iterator form', function () { @@ -1873,6 +1890,8 @@ describe('Change Streams', function () { operationType: 'insert', fullDocument: { a: 2 } }); + + expect(aggregateEvents.length).to.equal(2); }); it('#tryNext', { requires: { topology: 'replicaset' } }, async function test() { @@ -1891,6 +1910,8 @@ describe('Change Streams', function () { operationType: 'insert', fullDocument: { a: 2 } }); + + expect(aggregateEvents.length).to.equal(2); }); }); @@ -1911,6 +1932,8 @@ describe('Change Streams', function () { operationType: 'insert', fullDocument: { a: 2 } }); + + expect(aggregateEvents.length).to.equal(2); }); }); }); From 69e9f6d8e39f79c57dfd6715ad454fae3a7eda4b Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Wed, 3 Sep 2025 13:40:22 +0200 Subject: [PATCH 05/10] test(NODE-4763): add test for hasNext->next usage scenario --- .../change-streams/change_stream.test.ts | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index e97005b290..b8e7c20bd6 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -395,6 +395,25 @@ describe('Change Streams', function () { expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); } }); + + it('should cache the resume token on an actual change, not on a probe', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + await initIteratorMode(changeStream); + const resumeToken = changeStream.resumeToken; + + await collection.insertOne({ a: 1 }); + + const hasNext = await changeStream.hasNext(); + expect(hasNext).to.be.true; + + expect(changeStream.resumeToken).to.equal(resumeToken); + + const change = await changeStream.next(); + expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); + expect(resumeToken).to.not.equal(changeStream.resumeToken); + } + }); }); it('should cache using event listener form', { @@ -1847,6 +1866,8 @@ describe('Change Streams', function () { changeStream = collection.watch([]); + // Configure a fail point with skip: 1 to simulate a server failure on the second `getMore`, + // triggering the resume process. await client.db('admin').command({ configureFailPoint: is4_2Server(this.configuration.version) ? 'failCommand' From 34ac79b4324ddbeb734f4718ce9d80dd8cb48670 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Wed, 3 Sep 2025 16:21:32 +0200 Subject: [PATCH 06/10] test(NODE-4763): use times:1 for failpoint to explicitly set failure --- .../change-streams/change_stream.test.ts | 57 ++++++++++++++----- 1 file changed, 43 insertions(+), 14 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index b8e7c20bd6..d33c69c8e3 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1865,20 +1865,6 @@ describe('Change Streams', function () { collection = client.db(dbName).collection(collectionName); changeStream = collection.watch([]); - - // Configure a fail point with skip: 1 to simulate a server failure on the second `getMore`, - // triggering the resume process. - await client.db('admin').command({ - configureFailPoint: is4_2Server(this.configuration.version) - ? 'failCommand' - : 'failGetMoreAfterCursorCheckout', - mode: { skip: 1 }, - data: { - failCommands: ['getMore'], - errorCode: resumableError.code, - errmsg: resumableError.message - } - } as FailCommandFailPoint); }); afterEach(async function () { @@ -1905,6 +1891,18 @@ describe('Change Streams', function () { fullDocument: { a: 1 } }); + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableError.code, + errmsg: resumableError.message + } + } as FailCommandFailPoint); + await collection.insertOne({ a: 2 }); const change2 = await changeStream.next(); expect(change2).to.containSubset({ @@ -1925,6 +1923,18 @@ describe('Change Streams', function () { fullDocument: { a: 1 } }); + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableError.code, + errmsg: resumableError.message + } + } as FailCommandFailPoint); + await collection.insertOne({ a: 2 }); const change2 = await changeStream.tryNext(); expect(change2).to.containSubset({ @@ -1947,7 +1957,26 @@ describe('Change Streams', function () { fullDocument: { a: 1 } }); + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableError.code, + errmsg: resumableError.message + } + } as FailCommandFailPoint); + + // There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when + // resuming a change stream don't return the change event. + // So we defer the insert until a period of time after the change stream has received the first change. + // 2000ms is long enough for the change stream to attempt to resume and fail once before exhausting the failpoint + // and succeeding. + await sleep(2000); await collection.insertOne({ a: 2 }); + const change2 = await willBeChange.next(); expect(change2.value[0]).to.containSubset({ operationType: 'insert', From cd6681f2a70a1021adc7f8fe630ebafa53ed6963 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Wed, 3 Sep 2025 16:56:03 +0200 Subject: [PATCH 07/10] test(NODE-4763): remove `skip` mode from `FailPoint` --- test/tools/utils.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/test/tools/utils.ts b/test/tools/utils.ts index bc33efacde..79d918a689 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -207,12 +207,7 @@ export function extractAuthFromConnectionString(connectionString: string | any[] export interface FailPoint { configureFailPoint: 'failCommand' | 'failGetMoreAfterCursorCheckout' | 'maxTimeNeverTimeOut'; - mode: - | { activationProbability: number } - | { times: number } - | { skip: number } - | 'alwaysOn' - | 'off'; + mode: { activationProbability: number } | { times: number } | 'alwaysOn' | 'off'; } export interface FailCommandFailPoint extends FailPoint { From 089d1bbeff546f317dcd7e01820de99771297808 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Thu, 4 Sep 2025 10:47:26 +0200 Subject: [PATCH 08/10] test(NODE-4763): changed wording for title of the test Co-authored-by: Bailey Pearson --- test/integration/change-streams/change_stream.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index d33c69c8e3..fe7c0603c0 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -396,7 +396,7 @@ describe('Change Streams', function () { } }); - it('should cache the resume token on an actual change, not on a probe', { + it('hasNext() does not cache the resume token', { metadata: { requires: { topology: 'replicaset' } }, async test() { await initIteratorMode(changeStream); From f9e48e37f67f17d36dcdc49b54ae85961736eeb2 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Fri, 5 Sep 2025 14:00:57 +0200 Subject: [PATCH 09/10] test(NODE-4763): clarify hasNext test description and comments; no behavior change --- .../change-streams/change_stream.test.ts | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index fe7c0603c0..21cc879f04 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -396,23 +396,28 @@ describe('Change Streams', function () { } }); - it('hasNext() does not cache the resume token', { - metadata: { requires: { topology: 'replicaset' } }, - async test() { - await initIteratorMode(changeStream); - const resumeToken = changeStream.resumeToken; + describe('#hasNext', () => { + it('does not cache the resume token', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + await initIteratorMode(changeStream); + const resumeToken = changeStream.resumeToken; - await collection.insertOne({ a: 1 }); + await collection.insertOne({ a: 1 }); - const hasNext = await changeStream.hasNext(); - expect(hasNext).to.be.true; + const hasNext = await changeStream.hasNext(); + expect(hasNext).to.be.true; - expect(changeStream.resumeToken).to.equal(resumeToken); + // Calling .hasNext() does not allow the ChangeStream to update the token, + // even when changes are present. + expect(changeStream.resumeToken).to.equal(resumeToken); - const change = await changeStream.next(); - expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); - expect(resumeToken).to.not.equal(changeStream.resumeToken); - } + // Consuming the change causes the ChangeStream to cache the resume token. + const change = await changeStream.next(); + expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); + expect(resumeToken).to.not.equal(changeStream.resumeToken); + } + }); }); }); From bfe2fcdfc7be75e44573c0257b1e148d20e99250 Mon Sep 17 00:00:00 2001 From: Sergey Zelenov Date: Mon, 8 Sep 2025 09:13:00 +0200 Subject: [PATCH 10/10] test(NODE-4763): reorganize cache token test suite --- .../change-streams/change_stream.test.ts | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 21cc879f04..c6156ab954 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -372,16 +372,31 @@ describe('Change Streams', function () { describe('cache the change stream resume token', () => { describe('using iterator form', () => { - it('#next', { - metadata: { requires: { topology: 'replicaset' } }, + context('#next', () => { + it('caches the resume token on change', { + metadata: { requires: { topology: 'replicaset' } }, - async test() { - await initIteratorMode(changeStream); - await collection.insertOne({ a: 1 }); + async test() { + await initIteratorMode(changeStream); + await collection.insertOne({ a: 1 }); - const change = await changeStream.next(); - expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); - } + const change = await changeStream.next(); + expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); + } + }); + + it('caches the resume token correctly when preceded by #hasNext', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + await initIteratorMode(changeStream); + await collection.insertOne({ a: 1 }); + + await changeStream.hasNext(); + + const change = await changeStream.next(); + expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); + } + }); }); it('#tryNext', { @@ -396,7 +411,7 @@ describe('Change Streams', function () { } }); - describe('#hasNext', () => { + context('#hasNext', () => { it('does not cache the resume token', { metadata: { requires: { topology: 'replicaset' } }, async test() { @@ -408,14 +423,7 @@ describe('Change Streams', function () { const hasNext = await changeStream.hasNext(); expect(hasNext).to.be.true; - // Calling .hasNext() does not allow the ChangeStream to update the token, - // even when changes are present. expect(changeStream.resumeToken).to.equal(resumeToken); - - // Consuming the change causes the ChangeStream to cache the resume token. - const change = await changeStream.next(); - expect(change).to.have.property('_id').that.deep.equals(changeStream.resumeToken); - expect(resumeToken).to.not.equal(changeStream.resumeToken); } }); });