Skip to content
2 changes: 1 addition & 1 deletion lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ function spawn(file, args, options) {
if (signal.aborted) {
process.nextTick(onAbortListener);
} else {
addAbortListener ??= require('events').addAbortListener;
addAbortListener ??= require('internal/events/abort_listener').addAbortListener;
const disposable = addAbortListener(signal, onAbortListener);
child.once('exit', disposable[SymbolDispose]);
}
Expand Down
3 changes: 2 additions & 1 deletion lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const { Buffer } = require('buffer');
const { deprecate, guessHandleType, promisify } = require('internal/util');
const { isArrayBufferView } = require('internal/util/types');
const EventEmitter = require('events');
const { addAbortListener } = require('internal/events/abort_listener');
const {
defaultTriggerAsyncIdScope,
symbols: { async_id_symbol, owner_symbol },
Expand Down Expand Up @@ -146,7 +147,7 @@ function Socket(type, listener) {
if (signal.aborted) {
onAborted();
} else {
const disposable = EventEmitter.addAbortListener(signal, onAborted);
const disposable = addAbortListener(signal, onAborted);
this.once('close', disposable[SymbolDispose]);
}
}
Expand Down
30 changes: 1 addition & 29 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ const {
validateObject,
validateString,
} = require('internal/validators');
const { addAbortListener } = require('internal/events/abort_listener');

const kCapture = Symbol('kCapture');
const kErrorMonitor = Symbol('events.errorMonitor');
Expand Down Expand Up @@ -1222,32 +1223,3 @@ function listenersController() {
},
};
}

let queueMicrotask;

function addAbortListener(signal, listener) {
if (signal === undefined) {
throw new ERR_INVALID_ARG_TYPE('signal', 'AbortSignal', signal);
}
validateAbortSignal(signal, 'signal');
validateFunction(listener, 'listener');

let removeEventListener;
if (signal.aborted) {
queueMicrotask ??= require('internal/process/task_queues').queueMicrotask;
queueMicrotask(() => listener());
} else {
kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation;
// TODO(atlowChemi) add { subscription: true } and return directly
signal.addEventListener('abort', listener, { __proto__: null, once: true, [kResistStopPropagation]: true });
removeEventListener = () => {
signal.removeEventListener('abort', listener);
};
}
return {
__proto__: null,
[SymbolDispose]() {
removeEventListener?.();
},
};
}
54 changes: 54 additions & 0 deletions lib/internal/events/abort_listener.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';

const {
SymbolDispose,
} = primordials;
const {
validateAbortSignal,
validateFunction,
} = require('internal/validators');
const {
codes: {
ERR_INVALID_ARG_TYPE,
},
} = require('internal/errors');

let queueMicrotask;
let kResistStopPropagation;

/**
* @param {AbortSignal} signal
* @param {EventListener} listener
* @returns {Disposable}
*/
function addAbortListener(signal, listener) {
if (signal === undefined) {
throw new ERR_INVALID_ARG_TYPE('signal', 'AbortSignal', signal);
}
validateAbortSignal(signal, 'signal');
validateFunction(listener, 'listener');

let removeEventListener;
if (signal.aborted) {
queueMicrotask ??= require('internal/process/task_queues').queueMicrotask;
queueMicrotask(() => listener());
} else {
kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation;
// TODO(atlowChemi) add { subscription: true } and return directly
signal.addEventListener('abort', listener, { __proto__: null, once: true, [kResistStopPropagation]: true });
removeEventListener = () => {
signal.removeEventListener('abort', listener);
};
}
return {
__proto__: null,
[SymbolDispose]() {
removeEventListener?.();
},
};
}

module.exports = {
__proto__: null,
addAbortListener,
};
3 changes: 2 additions & 1 deletion lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ assertCrypto();

const assert = require('assert');
const EventEmitter = require('events');
const { addAbortListener } = require('internal/events/abort_listener');
const fs = require('fs');
const http = require('http');
const { readUInt16BE, readUInt32BE } = require('internal/buffer');
Expand Down Expand Up @@ -1832,7 +1833,7 @@ class ClientHttp2Session extends Http2Session {
if (signal.aborted) {
aborter();
} else {
const disposable = EventEmitter.addAbortListener(signal, aborter);
const disposable = addAbortListener(signal, aborter);
stream.once('close', disposable[SymbolDispose]);
}
}
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/readline/interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const {
stripVTControlCharacters,
} = require('internal/util/inspect');
const EventEmitter = require('events');
const { addAbortListener } = require('internal/events/abort_listener');
const {
charLengthAt,
charLengthLeft,
Expand Down Expand Up @@ -326,7 +327,7 @@ function InterfaceConstructor(input, output, completer, terminal) {
if (signal.aborted) {
process.nextTick(onAborted);
} else {
const disposable = EventEmitter.addAbortListener(signal, onAborted);
const disposable = addAbortListener(signal, onAborted);
self.once('close', disposable[SymbolDispose]);
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) {
if (signal.aborted) {
onAbort();
} else {
addAbortListener ??= require('events').addAbortListener;
addAbortListener ??= require('internal/events/abort_listener').addAbortListener;
const disposable = addAbortListener(signal, onAbort);
eos(stream, disposable[SymbolDispose]);
}
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ function eos(stream, options, callback) {
if (options.signal.aborted) {
process.nextTick(abort);
} else {
addAbortListener ??= require('events').addAbortListener;
addAbortListener ??= require('internal/events/abort_listener').addAbortListener;
const disposable = addAbortListener(options.signal, abort);
const originalCallback = callback;
callback = once((...args) => {
Expand All @@ -278,7 +278,7 @@ function eosWeb(stream, options, callback) {
if (options.signal.aborted) {
process.nextTick(abort);
} else {
addAbortListener ??= require('events').addAbortListener;
addAbortListener ??= require('internal/events/abort_listener').addAbortListener;
const disposable = addAbortListener(options.signal, abort);
const originalCallback = callback;
callback = once((...args) => {
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ function pipelineImpl(streams, callback, opts) {
finishImpl(new AbortError());
}

addAbortListener ??= require('events').addAbortListener;
addAbortListener ??= require('internal/events/abort_listener').addAbortListener;
let disposable;
if (outerSignal) {
disposable = addAbortListener(outerSignal, abort);
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/test_runner/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const {
Symbol,
} = primordials;
const { getCallerLocation } = internalBinding('util');
const { addAbortListener } = require('events');
const { addAbortListener } = require('internal/events/abort_listener');
const { AsyncResource } = require('async_hooks');
const { AbortController } = require('internal/abort_controller');
const {
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/watch_mode/files_watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { kEmptyObject } = require('internal/util');
const { TIMEOUT_MAX } = require('internal/timers');

const EventEmitter = require('events');
const { addAbortListener } = require('internal/events/abort_listener');
const { watch } = require('fs');
const { fileURLToPath } = require('internal/url');
const { resolve, dirname } = require('path');
Expand Down Expand Up @@ -41,7 +42,7 @@ class FilesWatcher extends EventEmitter {
this.#signal = signal;

if (signal) {
EventEmitter.addAbortListener(signal, () => this.clear());
addAbortListener(signal, () => this.clear());
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,7 @@ function readableStreamPipeTo(
abortAlgorithm();
return promise.promise;
}
addAbortListener ??= require('events').addAbortListener;
addAbortListener ??= require('internal/events/abort_listener').addAbortListener;
disposable = addAbortListener(signal, abortAlgorithm);
}

Expand Down
5 changes: 3 additions & 2 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const {
} = primordials;

const EventEmitter = require('events');
const { addAbortListener } = require('internal/events/abort_listener');
const stream = require('stream');
let debug = require('internal/util/debuglog').debuglog('net', (fn) => {
debug = fn;
Expand Down Expand Up @@ -1631,7 +1632,7 @@ function addClientAbortSignalOption(self, options) {
process.nextTick(onAbort);
} else {
process.nextTick(() => {
disposable = EventEmitter.addAbortListener(signal, onAbort);
disposable = addAbortListener(signal, onAbort);
});
}
}
Expand Down Expand Up @@ -1723,7 +1724,7 @@ function addServerAbortSignalOption(self, options) {
if (signal.aborted) {
process.nextTick(onAborted);
} else {
const disposable = EventEmitter.addAbortListener(signal, onAborted);
const disposable = addAbortListener(signal, onAborted);
self.once('close', disposable[SymbolDispose]);
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/readline.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Interface.prototype.question = function question(query, options, cb) {
const onAbort = () => {
this[kQuestionCancel]();
};
addAbortListener ??= require('events').addAbortListener;
addAbortListener ??= require('internal/events/abort_listener').addAbortListener;
const disposable = addAbortListener(options.signal, onAbort);
const originalCb = cb;
cb = typeof cb === 'function' ? (answer) => {
Expand Down Expand Up @@ -175,7 +175,7 @@ Interface.prototype.question[promisify.custom] = function question(query, option
const onAbort = () => {
reject(new AbortError(undefined, { cause: options.signal.reason }));
};
addAbortListener ??= require('events').addAbortListener;
addAbortListener ??= require('internal/events/abort_listener').addAbortListener;
const disposable = addAbortListener(options.signal, onAbort);
cb = (answer) => {
disposable[SymbolDispose]();
Expand Down
2 changes: 1 addition & 1 deletion lib/readline/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Interface extends _Interface {
this[kQuestionCancel]();
reject(new AbortError(undefined, { cause: options.signal.reason }));
};
addAbortListener ??= require('events').addAbortListener;
addAbortListener ??= require('internal/events/abort_listener').addAbortListener;
const disposable = addAbortListener(options.signal, onAbort);

cb = (answer) => {
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ expected.beforePreExec = new Set([
'Internal Binding module_wrap',
'NativeModule internal/modules/cjs/loader',
'Internal Binding wasm_web_api',
'NativeModule internal/events/abort_listener',
]);

expected.atRunTime = new Set([
Expand Down