Skip to content

Commit 853ecc4

Browse files
committed
stream: add fast-path for readable streams
1 parent c74dbd2 commit 853ecc4

File tree

3 files changed

+64
-2
lines changed

3 files changed

+64
-2
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const Readable = require('stream').Readable;
5+
6+
const BASE = 'hello world\n\n';
7+
8+
const bench = common.createBenchmark(main, {
9+
encoding: ['utf-8', 'latin1'],
10+
len: [256, 512, 1024 * 16],
11+
op: ['unshift', 'push'],
12+
n: [1e3]
13+
});
14+
15+
function main({ n, encoding, len, op }) {
16+
const b = BASE.repeat(len);
17+
const s = new Readable({
18+
objectMode: false,
19+
});
20+
21+
bench.start();
22+
switch (op) {
23+
case 'unshift': {
24+
for (let i = 0; i < n; i++)
25+
s.unshift(b, encoding);
26+
break;
27+
}
28+
case 'push': {
29+
for (let i = 0; i < n; i++)
30+
s.push(b, encoding);
31+
break;
32+
}
33+
}
34+
bench.end(n);
35+
}

lib/internal/streams/readable.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const {
3232
Promise,
3333
SafeSet,
3434
SymbolAsyncIterator,
35-
Symbol
35+
Symbol,
3636
} = primordials;
3737

3838
module.exports = Readable;
@@ -73,6 +73,7 @@ const kPaused = Symbol('kPaused');
7373

7474
const { StringDecoder } = require('string_decoder');
7575
const from = require('internal/streams/from');
76+
const { encodeWithFastPath } = require('internal/streams/utils');
7677

7778
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
7879
ObjectSetPrototypeOf(Readable, Stream);
@@ -251,9 +252,10 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
251252
if (addToFront && state.encoding) {
252253
// When unshifting, if state.encoding is set, we have to save
253254
// the string in the BufferList with the state encoding.
255+
254256
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
255257
} else {
256-
chunk = Buffer.from(chunk, encoding);
258+
chunk = encodeWithFastPath(chunk, encoding);
257259
encoding = '';
258260
}
259261
}

lib/internal/streams/utils.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,17 @@ const {
44
Symbol,
55
SymbolAsyncIterator,
66
SymbolIterator,
7+
TypedArrayPrototypeGetBuffer,
8+
TypedArrayPrototypeGetByteOffset,
9+
TypedArrayPrototypeGetByteLength,
710
} = primordials;
811

12+
const { normalizeEncoding } = require('internal/util');
13+
const { FastBuffer } = require('internal/buffer');
14+
const { TextEncoder } = require('internal/encoding');
15+
16+
const encoder = new TextEncoder();
17+
918
const kDestroyed = Symbol('kDestroyed');
1019
const kIsErrored = Symbol('kIsErrored');
1120
const kIsReadable = Symbol('kIsReadable');
@@ -261,7 +270,23 @@ function isErrored(stream) {
261270
));
262271
}
263272

273+
function encodeWithFastPath(input, encoding) {
274+
const enc = normalizeEncoding(encoding);
275+
276+
if (enc === 'utf8') {
277+
const buf = encoder.encode(input);
278+
return new FastBuffer(
279+
TypedArrayPrototypeGetBuffer(buf),
280+
TypedArrayPrototypeGetByteOffset(buf),
281+
TypedArrayPrototypeGetByteLength(buf),
282+
);
283+
}
284+
285+
return Buffer.from(input, enc);
286+
}
287+
264288
module.exports = {
289+
encodeWithFastPath,
265290
kDestroyed,
266291
isDisturbed,
267292
kIsDisturbed,

0 commit comments

Comments
 (0)