Skip to content

Commit 69c8141

Browse files
committed
test(NODE-6858): add tests for all iterator methods
1 parent aa4e65f commit 69c8141

File tree

2 files changed

+114
-35
lines changed

2 files changed

+114
-35
lines changed

src/error.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1551,6 +1551,10 @@ export function isResumableError(error?: Error, wireVersion?: number): boolean {
15511551
return true;
15521552
}
15531553

1554+
// if (error instanceof MongoServerSelectionError) {
1555+
// return true;
1556+
// }
1557+
15541558
if (wireVersion != null && wireVersion >= 9) {
15551559
// DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable
15561560
if (error.code === MONGODB_ERROR_CODES.CursorNotFound) {

test/integration/change-streams/change_stream.test.ts

Lines changed: 110 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import * as mock from '../../tools/mongodb-mock/index';
2727
import { TestBuilder, UnifiedTestSuiteBuilder } from '../../tools/unified_suite_builder';
2828
import { type FailCommandFailPoint, sleep } from '../../tools/utils';
2929
import { delay, filterForCommands } from '../shared';
30+
import { UUID } from 'bson';
3031

3132
const initIteratorMode = async (cs: ChangeStream) => {
3233
const initEvent = once(cs.cursor, 'init');
@@ -1824,6 +1825,7 @@ describe.only('ChangeStream resumability', function () {
18241825
let collection: Collection;
18251826
let changeStream: ChangeStream;
18261827
let aggregateEvents: CommandStartedEvent[] = [];
1828+
let appName: string;
18271829

18281830
const changeStreamResumeOptions: ChangeStreamOptions = {
18291831
fullDocument: 'updateLookup',
@@ -1882,7 +1884,15 @@ describe.only('ChangeStream resumability', function () {
18821884
await utilClient.db(dbName).createCollection(collectionName);
18831885
await utilClient.close();
18841886

1885-
client = this.configuration.newClient({ monitorCommands: true });
1887+
// we are going to switch primary in tests and cleanup of failpoints is difficult,
1888+
// so generating unique appname instead of cleaning for each test is an easier solution
1889+
appName = new UUID().toString();
1890+
1891+
client = this.configuration.newClient({
1892+
monitorCommands: true,
1893+
serverSelectionTimeoutMS: 5_000,
1894+
appName: appName
1895+
});
18861896
client.on('commandStarted', filterForCommands(['aggregate'], aggregateEvents));
18871897
collection = client.db(dbName).collection(collectionName);
18881898
});
@@ -2047,61 +2057,42 @@ describe.only('ChangeStream resumability', function () {
20472057
});
20482058
});
20492059

2050-
context.only('when the error is not a server error', function () {
2051-
let client1: MongoClient;
2052-
let client2: MongoClient;
2053-
2054-
beforeEach(async function () {
2055-
client1 = this.configuration.newClient(
2056-
{},
2057-
{ serverSelectionTimeoutMS: 1000, appName: 'client-errors' }
2058-
);
2059-
client2 = this.configuration.newClient();
2060-
2061-
collection = client1.db('client-errors').collection('test');
2062-
});
2063-
2064-
afterEach(async function () {
2065-
await client2.db('admin').command({
2066-
configureFailPoint: 'failCommand',
2067-
mode: 'off',
2068-
data: { appName: 'client-errors' }
2069-
} as FailCommandFailPoint);
2070-
2071-
await client1?.close();
2072-
await client2?.close();
2073-
});
2074-
2060+
context('when the error is not a server error', function () {
2061+
// This test requires a replica set to call replSetFreeze command
20752062
it(
20762063
'should resume on ServerSelectionError',
2077-
{ requires: { topology: '!single' } },
2064+
{ requires: { topology: ['replicaset'] } },
20782065
async function () {
20792066
changeStream = collection.watch([]);
20802067
await initIteratorMode(changeStream);
20812068

20822069
await collection.insertOne({ a: 1 });
20832070

2084-
await client2.db('admin').command({
2071+
// mimic the node termination by closing the connection and failing on heartbeat
2072+
await client.db('admin').command({
20852073
configureFailPoint: 'failCommand',
20862074
mode: 'alwaysOn',
20872075
data: {
20882076
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
20892077
closeConnection: true,
20902078
handshakeCommands: true,
20912079
failInternalCommands: true,
2092-
appName: 'client-errors'
2080+
appName: appName
20932081
}
20942082
} as FailCommandFailPoint);
2095-
await client2
2096-
.db('admin')
2097-
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.secondary });
2098-
await client2
2083+
// force new election in the cluster
2084+
await client
20992085
.db('admin')
2100-
.command({ replSetStepDown: 15, secondaryCatchUpPeriodSecs: 10, force: true });
2101-
// await sleep(15_000);
2086+
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2087+
await client.db('admin').command({ replSetStepDown: 30, force: true });
2088+
await sleep(1500);
21022089

21032090
const change = await changeStream.next();
21042091
expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } });
2092+
2093+
expect(aggregateEvents).to.have.lengthOf(2);
2094+
const [e1, e2] = aggregateEvents;
2095+
expect(e1.address).to.not.equal(e2.address);
21052096
}
21062097
);
21072098
});
@@ -2418,6 +2409,46 @@ describe.only('ChangeStream resumability', function () {
24182409
expect(changeStream.closed).to.be.true;
24192410
});
24202411
});
2412+
2413+
context('when the error is not a server error', function () {
2414+
// This test requires a replica set to call replSetFreeze command
2415+
it(
2416+
'should resume on ServerSelectionError',
2417+
{ requires: { topology: ['replicaset'] } },
2418+
async function () {
2419+
changeStream = collection.watch([]);
2420+
await initIteratorMode(changeStream);
2421+
2422+
await collection.insertOne({ a: 1 });
2423+
2424+
// mimic the node termination by closing the connection and failing on heartbeat
2425+
await client.db('admin').command({
2426+
configureFailPoint: 'failCommand',
2427+
mode: 'alwaysOn',
2428+
data: {
2429+
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
2430+
closeConnection: true,
2431+
handshakeCommands: true,
2432+
failInternalCommands: true,
2433+
appName: appName
2434+
}
2435+
} as FailCommandFailPoint);
2436+
// force new election in the cluster
2437+
await client
2438+
.db('admin')
2439+
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2440+
await client.db('admin').command({ replSetStepDown: 30, force: true });
2441+
await sleep(1500);
2442+
2443+
const change = await changeStream.tryNext();
2444+
expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } });
2445+
2446+
expect(aggregateEvents).to.have.lengthOf(2);
2447+
const [e1, e2] = aggregateEvents;
2448+
expect(e1.address).to.not.equal(e2.address);
2449+
}
2450+
);
2451+
});
24212452
});
24222453

24232454
context('#asyncIterator', function () {
@@ -2554,6 +2585,50 @@ describe.only('ChangeStream resumability', function () {
25542585
}
25552586
});
25562587
});
2588+
2589+
context('when the error is not a server error', function () {
2590+
// This test requires a replica set to call replSetFreeze command
2591+
it(
2592+
'should resume on ServerSelectionError',
2593+
{ requires: { topology: ['replicaset'] } },
2594+
async function () {
2595+
changeStream = collection.watch([]);
2596+
await initIteratorMode(changeStream);
2597+
const changeStreamIterator = changeStream[Symbol.asyncIterator]();
2598+
2599+
await collection.insertOne({ a: 1 });
2600+
2601+
// mimic the node termination by closing the connection and failing on heartbeat
2602+
await client.db('admin').command({
2603+
configureFailPoint: 'failCommand',
2604+
mode: 'alwaysOn',
2605+
data: {
2606+
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
2607+
closeConnection: true,
2608+
handshakeCommands: true,
2609+
failInternalCommands: true,
2610+
appName: appName
2611+
}
2612+
} as FailCommandFailPoint);
2613+
// force new election in the cluster
2614+
await client
2615+
.db('admin')
2616+
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
2617+
await client.db('admin').command({ replSetStepDown: 30, force: true });
2618+
await sleep(1500);
2619+
2620+
const change = await changeStreamIterator.next();
2621+
expect(change.value).to.containSubset({
2622+
operationType: 'insert',
2623+
fullDocument: { a: 1 }
2624+
});
2625+
2626+
expect(aggregateEvents).to.have.lengthOf(2);
2627+
const [e1, e2] = aggregateEvents;
2628+
expect(e1.address).to.not.equal(e2.address);
2629+
}
2630+
);
2631+
});
25572632
});
25582633
});
25592634

0 commit comments

Comments
 (0)