Skip to content

Commit c6d64e7

Browse files
tadjik1durran
andauthored
fix(NODE-6858): treat MongoServerSelectionError as a resumable error for Change Streams (#4653)
Co-authored-by: Durran Jordan <[email protected]>
1 parent 14303bc commit c6d64e7

File tree

2 files changed

+180
-3
lines changed

2 files changed

+180
-3
lines changed

src/error.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1551,6 +1551,10 @@ export function isResumableError(error?: Error, wireVersion?: number): boolean {
15511551
return true;
15521552
}
15531553

1554+
if (error instanceof MongoServerSelectionError) {
1555+
return true;
1556+
}
1557+
15541558
if (wireVersion != null && wireVersion >= 9) {
15551559
// DRIVERS-1308: For 4.4 drivers running against 4.4 servers, drivers will add a special case to treat the CursorNotFound error code as resumable
15561560
if (error.code === MONGODB_ERROR_CODES.CursorNotFound) {

test/integration/change-streams/change_stream.test.ts

Lines changed: 176 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { strict as assert } from 'assert';
2+
import { UUID } from 'bson';
23
import { expect } from 'chai';
34
import { on, once } from 'events';
45
import { gte, lt } from 'semver';
@@ -14,6 +15,7 @@ import {
1415
type CommandStartedEvent,
1516
type Db,
1617
isHello,
18+
LEGACY_HELLO_COMMAND,
1719
Long,
1820
MongoAPIError,
1921
MongoChangeStreamError,
@@ -45,6 +47,19 @@ const pipeline = [
4547
{ $addFields: { comment: 'The documentKey field has been projected out of this document.' } }
4648
];
4749

50+
async function forcePrimaryStepDown(client: MongoClient) {
51+
await client
52+
.db('admin')
53+
.command({ replSetFreeze: 0 }, { readPreference: ReadPreference.SECONDARY });
54+
await client
55+
.db('admin')
56+
.command({ replSetStepDown: 15, secondaryCatchUpPeriodSecs: 10, force: true });
57+
58+
// wait for secondary to become primary but also allow previous primary to become next primary
59+
// in subsequent test runs
60+
await sleep(15_000);
61+
}
62+
4863
describe('Change Streams', function () {
4964
let client: MongoClient;
5065
let collection: Collection;
@@ -2003,9 +2018,11 @@ describe('Change Streams', function () {
20032018

20042019
describe('ChangeStream resumability', function () {
20052020
let client: MongoClient;
2021+
let utilClient: MongoClient;
20062022
let collection: Collection;
20072023
let changeStream: ChangeStream;
20082024
let aggregateEvents: CommandStartedEvent[] = [];
2025+
let appName: string;
20092026

20102027
const changeStreamResumeOptions: ChangeStreamOptions = {
20112028
fullDocument: 'updateLookup',
@@ -2055,22 +2072,36 @@ describe('ChangeStream resumability', function () {
20552072
beforeEach(async function () {
20562073
const dbName = 'resumabilty_tests';
20572074
const collectionName = 'foo';
2058-
const utilClient = this.configuration.newClient();
2075+
2076+
utilClient = this.configuration.newClient();
2077+
20592078
// 3.6 servers do not support creating a change stream on a database that doesn't exist
20602079
await utilClient
20612080
.db(dbName)
20622081
.dropDatabase()
20632082
.catch(e => e);
20642083
await utilClient.db(dbName).createCollection(collectionName);
2065-
await utilClient.close();
20662084

2067-
client = this.configuration.newClient({ monitorCommands: true });
2085+
// we are going to switch primary in tests and cleanup of failpoints is difficult,
2086+
// so generating unique appname instead of cleaning for each test is an easier solution
2087+
appName = new UUID().toString();
2088+
2089+
client = this.configuration.newClient(
2090+
{},
2091+
{
2092+
monitorCommands: true,
2093+
serverSelectionTimeoutMS: 10_000,
2094+
heartbeatFrequencyMS: 5_000,
2095+
appName: appName
2096+
}
2097+
);
20682098
client.on('commandStarted', filterForCommands(['aggregate'], aggregateEvents));
20692099
collection = client.db(dbName).collection(collectionName);
20702100
});
20712101

20722102
afterEach(async function () {
20732103
await changeStream.close();
2104+
await utilClient.close();
20742105
await client.close();
20752106
aggregateEvents = [];
20762107
});
@@ -2228,6 +2259,38 @@ describe('ChangeStream resumability', function () {
22282259
expect(changeStream.closed).to.be.true;
22292260
});
22302261
});
2262+
2263+
context('when the error is not a server error', function () {
2264+
it(
2265+
'should resume on ServerSelectionError',
2266+
{ requires: { topology: ['replicaset'] } },
2267+
async function () {
2268+
changeStream = collection.watch([]);
2269+
await initIteratorMode(changeStream);
2270+
2271+
await collection.insertOne({ a: 1 });
2272+
2273+
await utilClient.db('admin').command({
2274+
configureFailPoint: 'failCommand',
2275+
mode: 'alwaysOn',
2276+
data: {
2277+
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
2278+
closeConnection: true,
2279+
appName: appName
2280+
}
2281+
} as FailCommandFailPoint);
2282+
2283+
await forcePrimaryStepDown(utilClient);
2284+
2285+
const change = await changeStream.next();
2286+
expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } });
2287+
2288+
expect(aggregateEvents).to.have.lengthOf(2);
2289+
const [e1, e2] = aggregateEvents;
2290+
expect(e1.address).to.not.equal(e2.address);
2291+
}
2292+
);
2293+
});
22312294
});
22322295

22332296
context('#hasNext', function () {
@@ -2541,6 +2604,37 @@ describe('ChangeStream resumability', function () {
25412604
expect(changeStream.closed).to.be.true;
25422605
});
25432606
});
2607+
2608+
context('when the error is not a server error', function () {
2609+
it(
2610+
'should resume on ServerSelectionError',
2611+
{ requires: { topology: ['replicaset'] } },
2612+
async function () {
2613+
changeStream = collection.watch([]);
2614+
await initIteratorMode(changeStream);
2615+
2616+
await collection.insertOne({ a: 1 });
2617+
2618+
await utilClient.db('admin').command({
2619+
configureFailPoint: 'failCommand',
2620+
mode: 'alwaysOn',
2621+
data: {
2622+
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
2623+
closeConnection: true,
2624+
appName: appName
2625+
}
2626+
} as FailCommandFailPoint);
2627+
await forcePrimaryStepDown(utilClient);
2628+
2629+
const change = await changeStream.tryNext();
2630+
expect(change).to.containSubset({ operationType: 'insert', fullDocument: { a: 1 } });
2631+
2632+
expect(aggregateEvents).to.have.lengthOf(2);
2633+
const [e1, e2] = aggregateEvents;
2634+
expect(e1.address).to.not.equal(e2.address);
2635+
}
2636+
);
2637+
});
25442638
});
25452639

25462640
context('#asyncIterator', function () {
@@ -2677,6 +2771,41 @@ describe('ChangeStream resumability', function () {
26772771
}
26782772
});
26792773
});
2774+
2775+
context('when the error is not a server error', function () {
2776+
it(
2777+
'should resume on ServerSelectionError',
2778+
{ requires: { topology: ['replicaset'] } },
2779+
async function () {
2780+
changeStream = collection.watch([]);
2781+
await initIteratorMode(changeStream);
2782+
const changeStreamIterator = changeStream[Symbol.asyncIterator]();
2783+
2784+
await collection.insertOne({ a: 1 });
2785+
2786+
await utilClient.db('admin').command({
2787+
configureFailPoint: 'failCommand',
2788+
mode: 'alwaysOn',
2789+
data: {
2790+
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
2791+
closeConnection: true,
2792+
appName: appName
2793+
}
2794+
} as FailCommandFailPoint);
2795+
await forcePrimaryStepDown(utilClient);
2796+
2797+
const change = await changeStreamIterator.next();
2798+
expect(change.value).to.containSubset({
2799+
operationType: 'insert',
2800+
fullDocument: { a: 1 }
2801+
});
2802+
2803+
expect(aggregateEvents).to.have.lengthOf(2);
2804+
const [e1, e2] = aggregateEvents;
2805+
expect(e1.address).to.not.equal(e2.address);
2806+
}
2807+
);
2808+
});
26802809
});
26812810
});
26822811

@@ -2866,6 +2995,50 @@ describe('ChangeStream resumability', function () {
28662995
expect(changeStream.closed).to.be.true;
28672996
});
28682997
});
2998+
2999+
context('when the error is not a server error', function () {
3000+
it(
3001+
'should resume on ServerSelectionError',
3002+
{ requires: { topology: ['replicaset'] } },
3003+
async function () {
3004+
changeStream = collection.watch([]);
3005+
3006+
const changes = on(changeStream, 'change');
3007+
await once(changeStream.cursor, 'init');
3008+
3009+
await collection.insertOne({ a: 1 });
3010+
3011+
const change = await changes.next();
3012+
expect(change.value[0]).to.containSubset({
3013+
operationType: 'insert',
3014+
fullDocument: { a: 1 }
3015+
});
3016+
3017+
await utilClient.db('admin').command({
3018+
configureFailPoint: 'failCommand',
3019+
mode: 'alwaysOn',
3020+
data: {
3021+
failCommands: ['ping', 'hello', LEGACY_HELLO_COMMAND],
3022+
closeConnection: true,
3023+
appName: appName
3024+
}
3025+
} as FailCommandFailPoint);
3026+
await forcePrimaryStepDown(utilClient);
3027+
3028+
await collection.insertOne({ a: 2 });
3029+
3030+
const change2 = await changes.next();
3031+
expect(change2.value[0]).to.containSubset({
3032+
operationType: 'insert',
3033+
fullDocument: { a: 2 }
3034+
});
3035+
3036+
expect(aggregateEvents).to.have.lengthOf(2);
3037+
const [e1, e2] = aggregateEvents;
3038+
expect(e1.address).to.not.equal(e2.address);
3039+
}
3040+
);
3041+
});
28693042
});
28703043

28713044
it(

0 commit comments

Comments
 (0)