From de77dc782aa3653d42ba242a90996538fd7e94e4 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 16 Apr 2025 16:16:10 -0400 Subject: [PATCH 1/6] fix(NODE-6630): read all messages in buffer when chunk arrives --- src/cmap/connection.ts | 47 +++++++++++++++++++++++-------- test/unit/cmap/connection.test.ts | 47 +++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index b6d92f56e0c..a8b74e13ed4 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -27,6 +27,7 @@ import { MongoNetworkTimeoutError, MongoOperationTimeoutError, MongoParseError, + MongoRuntimeError, MongoServerError, MongoUnexpectedServerResponseError } from '../error'; @@ -791,22 +792,46 @@ export class SizedMessageTransform extends Transform { } this.bufferPool.append(chunk); - const sizeOfMessage = this.bufferPool.getInt32(); - if (sizeOfMessage == null) { - return callback(); - } + while (this.bufferPool.length) { + // While there are any bytes in the buffer - if (sizeOfMessage < 0) { - return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}, too small`)); - } + // Try to fetch a size from the top 4 bytes + const sizeOfMessage = this.bufferPool.getInt32(); + + if (sizeOfMessage == null) { + // Not even an int32 worth of data. Stop the loop, we need more chunks. + break; + } - if (sizeOfMessage > this.bufferPool.length) { - return callback(); + if (sizeOfMessage < 0) { + // The size in the message has a negative value, this is probably corruption, throw: + return callback(new MongoParseError(`Message size cannot be negative: ${sizeOfMessage}`)); + } + + if (sizeOfMessage > this.bufferPool.length) { + // We do not have enough bytes to make a sizeOfMessage chunk + break; + } + + // Add a message to the stream + const message = this.bufferPool.read(sizeOfMessage); + + if (!this.push(message)) { + // We only subscribe to data events so we should never get backpressure + // if we do, we do not have the handling for it. + return callback( + new MongoRuntimeError(`SizedMessageTransform does not support backpressure`) + ); + } } - const message = this.bufferPool.read(sizeOfMessage); - return callback(null, message); + callback(); + } + + override pipe(destination: T, options?: { end?: boolean }): T { + destination.on('drain', this.emit.bind('drain')); + return super.pipe(destination, options); } } diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index aa3e86e2dc6..6d60c217820 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -1,4 +1,5 @@ import { Socket } from 'node:net'; +import { Writable } from 'node:stream'; import { expect } from 'chai'; import * as sinon from 'sinon'; @@ -11,7 +12,9 @@ import { MongoClientAuthProviders, MongoDBCollectionNamespace, MongoNetworkTimeoutError, + MongoRuntimeError, ns, + promiseWithResolvers, SizedMessageTransform } from '../../mongodb'; import * as mock from '../../tools/mongodb-mock/index'; @@ -333,5 +336,49 @@ describe('new Connection()', function () { expect(stream.read(1)).to.deep.equal(Buffer.from([6, 0, 0, 0, 5, 6])); expect(stream.read(1)).to.equal(null); }); + + it('parses many wire messages when chunk arrives', function () { + const stream = new SizedMessageTransform({ connection: {} as any }); + + let dataCount = 0; + stream.on('data', () => { + dataCount += 1; + }); + + // 3 messages of size 8 + stream.write( + Buffer.from([ + ...[8, 0, 0, 0, 0, 0, 0, 0], + ...[8, 0, 0, 0, 0, 0, 0, 0], + ...[8, 0, 0, 0, 0, 0, 0, 0] + ]) + ); + + expect(dataCount).to.equal(3); + }); + + it('waits for a drain event when destination needs backpressure', async function () { + const stream = new SizedMessageTransform({ connection: {} as any }); + const destination = new Writable({ + highWaterMark: 1, + objectMode: true, + write: (chunk, encoding, callback) => { + void stream; + setTimeout(1).then(() => callback()); + } + }); + + // 1000 messages of size 8 + stream.write( + Buffer.from(Array.from({ length: 1000 }, () => [8, 0, 0, 0, 0, 0, 0, 0]).flat(1)) + ); + + const { promise, resolve, reject } = promiseWithResolvers(); + + stream.on('error', reject).pipe(destination).on('error', reject).on('finish', resolve); + + const error = await promise.catch(error => error); + expect(error).to.be.instanceOf(MongoRuntimeError); + }); }); }); From 26ebc5b97699c38cf7d073974949d768a5cd5014 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 16 Apr 2025 16:41:48 -0400 Subject: [PATCH 2/6] test: add check for many messages and a partial --- test/unit/cmap/connection.test.ts | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index 6d60c217820..65c449ebe16 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -323,7 +323,7 @@ describe('new Connection()', function () { }); }); - describe('SizedMessageTransform', function () { + describe.only('SizedMessageTransform', function () { it('parses chunks of wire messages', function () { const stream = new SizedMessageTransform({ connection: {} as any }); // Message of length 4 + 4 = 8 @@ -337,7 +337,7 @@ describe('new Connection()', function () { expect(stream.read(1)).to.equal(null); }); - it('parses many wire messages when chunk arrives', function () { + it('parses many wire messages when a single chunk arrives', function () { const stream = new SizedMessageTransform({ connection: {} as any }); let dataCount = 0; @@ -357,7 +357,32 @@ describe('new Connection()', function () { expect(dataCount).to.equal(3); }); - it('waits for a drain event when destination needs backpressure', async function () { + it('parses many wire messages when a single chunk arrives and processes the remaining partial when it is complete', function () { + const stream = new SizedMessageTransform({ connection: {} as any }); + + let dataCount = 0; + stream.on('data', () => { + dataCount += 1; + }); + + // 3 messages of size 8 + stream.write( + Buffer.from([ + ...[8, 0, 0, 0, 0, 0, 0, 0], + ...[8, 0, 0, 0, 0, 0, 0, 0], + ...[8, 0, 0, 0, 0, 0, 0, 0], + ...[8, 0, 0, 0, 0, 0] // two shy of 8 + ]) + ); + + expect(dataCount).to.equal(3); + + stream.write(Buffer.from([0, 0])); // the rest of the last 8 + + expect(dataCount).to.equal(4); + }); + + it('throws an error when backpressure detected', async function () { const stream = new SizedMessageTransform({ connection: {} as any }); const destination = new Writable({ highWaterMark: 1, From 40b7e1814533892a37f6db09c2caf955a7f12e09 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 16 Apr 2025 16:52:09 -0400 Subject: [PATCH 3/6] test: add size checks --- src/cmap/connection.ts | 5 ----- test/unit/cmap/connection.test.ts | 8 +++++--- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index a8b74e13ed4..bbe65a25103 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -828,11 +828,6 @@ export class SizedMessageTransform extends Transform { callback(); } - - override pipe(destination: T, options?: { end?: boolean }): T { - destination.on('drain', this.emit.bind('drain')); - return super.pipe(destination, options); - } } /** @internal */ diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index 65c449ebe16..79fe9ea863d 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -323,7 +323,7 @@ describe('new Connection()', function () { }); }); - describe.only('SizedMessageTransform', function () { + describe('SizedMessageTransform', function () { it('parses chunks of wire messages', function () { const stream = new SizedMessageTransform({ connection: {} as any }); // Message of length 4 + 4 = 8 @@ -341,7 +341,8 @@ describe('new Connection()', function () { const stream = new SizedMessageTransform({ connection: {} as any }); let dataCount = 0; - stream.on('data', () => { + stream.on('data', chunk => { + expect(chunk).to.have.lengthOf(8); dataCount += 1; }); @@ -361,7 +362,8 @@ describe('new Connection()', function () { const stream = new SizedMessageTransform({ connection: {} as any }); let dataCount = 0; - stream.on('data', () => { + stream.on('data', chunk => { + expect(chunk).to.have.lengthOf(8); dataCount += 1; }); From 1ebadc6512093750f312fe3cb17805b45a9b445e Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 17 Apr 2025 09:24:28 -0400 Subject: [PATCH 4/6] test: add integration test --- .../connection.test.ts | 51 ++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.ts b/test/integration/connection-monitoring-and-pooling/connection.test.ts index 4307ee32f21..da0cc4b6597 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection.test.ts @@ -22,7 +22,7 @@ import { } from '../../mongodb'; import * as mock from '../../tools/mongodb-mock/index'; import { skipBrokenAuthTestBeforeEachHook } from '../../tools/runner/hooks/configuration'; -import { sleep } from '../../tools/utils'; +import { processTick, sleep } from '../../tools/utils'; import { assert as test, setupDatabase } from '../shared'; const commonConnectOptions = { @@ -249,6 +249,55 @@ describe('Connection', function () { client.connect(); }); + describe( + 'when a monitoring Connection receives many hellos in one chunk', + { requires: { topology: 'replicaset' } }, + function () { + let client: MongoClient; + let hbSuccess = 0; + + beforeEach(async function () { + client = this.configuration.newClient({}, { heartbeatFrequencyMS: 100 }); // just so we don't have to wait so long for a hello + hbSuccess = 0; + client.on('serverHeartbeatSucceeded', () => (hbSuccess += 1)); + }); + + afterEach(async function () { + hbSuccess = 0; + await client.close(); + }); + + // In the future we may want to skip processing concatenated heartbeats. + // This test exists to prevent regression of processing many messages inside one chunk. + it( + 'processes all of them and emits heartbeats', + { requires: { topology: 'replicaset' } }, + async function () { + expect(hbSuccess).to.equal(0); + + await client.db().command({ ping: 1 }); // start monitoring. + const monitor = [...client.topology.s.servers.values()][0].monitor; + + // @ts-expect-error: accessing private property + const messageStream = monitor.connection.messageStream; + // @ts-expect-error: accessing private property + const socket = monitor.connection.socket; + + const [hello] = (await once(messageStream, 'data')) as [Buffer]; + + const thousandHellos = Array.from({ length: 1000 }, () => [...hello]).flat(1); + + // pretend this came from the server + socket.emit('data', Buffer.from(thousandHellos)); + + // All of the hb will be emitted synchronously in the next tick as the entire chunk is processed. + await processTick(); + expect(hbSuccess).to.be.greaterThan(100); + } + ); + } + ); + context( 'when a large message is written to the socket', { requires: { topology: 'single', auth: 'disabled' } }, From 3a77f5ce64326c0efa5db7ea982c20769fd9df0c Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 17 Apr 2025 10:53:05 -0400 Subject: [PATCH 5/6] chore: need to test on streaming server versions --- .../connection-monitoring-and-pooling/connection.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.ts b/test/integration/connection-monitoring-and-pooling/connection.test.ts index da0cc4b6597..88eb5a6757d 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection.test.ts @@ -251,7 +251,7 @@ describe('Connection', function () { describe( 'when a monitoring Connection receives many hellos in one chunk', - { requires: { topology: 'replicaset' } }, + { requires: { topology: 'replicaset', mongodb: '>=4.4' } }, // need to be on a streaming hello version function () { let client: MongoClient; let hbSuccess = 0; @@ -271,7 +271,7 @@ describe('Connection', function () { // This test exists to prevent regression of processing many messages inside one chunk. it( 'processes all of them and emits heartbeats', - { requires: { topology: 'replicaset' } }, + { requires: { topology: 'replicaset', mongodb: '>=4.4' } }, async function () { expect(hbSuccess).to.equal(0); From 539a08515972752bf8f6d21e2f5dc74f4f29e6da Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 17 Apr 2025 13:58:56 -0400 Subject: [PATCH 6/6] test: assert 1000 not 100 --- .../connection.test.ts | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/test/integration/connection-monitoring-and-pooling/connection.test.ts b/test/integration/connection-monitoring-and-pooling/connection.test.ts index 88eb5a6757d..0e5fd45323f 100644 --- a/test/integration/connection-monitoring-and-pooling/connection.test.ts +++ b/test/integration/connection-monitoring-and-pooling/connection.test.ts @@ -254,16 +254,13 @@ describe('Connection', function () { { requires: { topology: 'replicaset', mongodb: '>=4.4' } }, // need to be on a streaming hello version function () { let client: MongoClient; - let hbSuccess = 0; beforeEach(async function () { - client = this.configuration.newClient({}, { heartbeatFrequencyMS: 100 }); // just so we don't have to wait so long for a hello - hbSuccess = 0; - client.on('serverHeartbeatSucceeded', () => (hbSuccess += 1)); + // set heartbeatFrequencyMS just so we don't have to wait so long for a hello + client = this.configuration.newClient({}, { heartbeatFrequencyMS: 10 }); }); afterEach(async function () { - hbSuccess = 0; await client.close(); }); @@ -273,6 +270,8 @@ describe('Connection', function () { 'processes all of them and emits heartbeats', { requires: { topology: 'replicaset', mongodb: '>=4.4' } }, async function () { + let hbSuccess = 0; + client.on('serverHeartbeatSucceeded', () => (hbSuccess += 1)); expect(hbSuccess).to.equal(0); await client.db().command({ ping: 1 }); // start monitoring. @@ -292,7 +291,7 @@ describe('Connection', function () { // All of the hb will be emitted synchronously in the next tick as the entire chunk is processed. await processTick(); - expect(hbSuccess).to.be.greaterThan(100); + expect(hbSuccess).to.be.greaterThan(1000); } ); }