| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420 | "use strict";Object.defineProperty(exports, "__esModule", { value: true });exports.MonitorInterval = exports.RTTPinger = exports.Monitor = void 0;const timers_1 = require("timers");const bson_1 = require("../bson");const connect_1 = require("../cmap/connect");const connection_1 = require("../cmap/connection");const constants_1 = require("../constants");const error_1 = require("../error");const mongo_types_1 = require("../mongo_types");const utils_1 = require("../utils");const common_1 = require("./common");const events_1 = require("./events");const server_1 = require("./server");/** @internal */const kServer = Symbol('server');/** @internal */const kMonitorId = Symbol('monitorId');/** @internal */const kConnection = Symbol('connection');/** @internal */const kCancellationToken = Symbol('cancellationToken');/** @internal */const kRTTPinger = Symbol('rttPinger');/** @internal */const kRoundTripTime = Symbol('roundTripTime');const STATE_IDLE = 'idle';const STATE_MONITORING = 'monitoring';const stateTransition = (0, utils_1.makeStateMachine)({    [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, STATE_IDLE, common_1.STATE_CLOSED],    [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, STATE_MONITORING],    [STATE_IDLE]: [STATE_IDLE, STATE_MONITORING, common_1.STATE_CLOSING],    [STATE_MONITORING]: [STATE_MONITORING, STATE_IDLE, common_1.STATE_CLOSING]});const INVALID_REQUEST_CHECK_STATES = new Set([common_1.STATE_CLOSING, common_1.STATE_CLOSED, STATE_MONITORING]);function isInCloseState(monitor) {    return monitor.s.state === common_1.STATE_CLOSED || monitor.s.state === common_1.STATE_CLOSING;}/** @internal */class Monitor extends mongo_types_1.TypedEventEmitter {    get connection() {        return this[kConnection];    }    constructor(server, options) {        super();        this[kServer] = server;        this[kConnection] = undefined;        this[kCancellationToken] = new mongo_types_1.CancellationToken();        this[kCancellationToken].setMaxListeners(Infinity);        this[kMonitorId] = undefined;        this.s = {            state: common_1.STATE_CLOSED        };        this.address = server.description.address;        this.options = Object.freeze({            connectTimeoutMS: options.connectTimeoutMS ?? 10000,            heartbeatFrequencyMS: options.heartbeatFrequencyMS ?? 10000,            minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500        });        const cancellationToken = this[kCancellationToken];        // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration        const connectOptions = Object.assign({            id: '<monitor>',            generation: server.pool.generation,            connectionType: connection_1.Connection,            cancellationToken,            hostAddress: server.description.hostAddress        }, options,         // force BSON serialization options        {            raw: false,            useBigInt64: false,            promoteLongs: true,            promoteValues: true,            promoteBuffers: true        });        // ensure no authentication is used for monitoring        delete connectOptions.credentials;        if (connectOptions.autoEncrypter) {            delete connectOptions.autoEncrypter;        }        this.connectOptions = Object.freeze(connectOptions);    }    connect() {        if (this.s.state !== common_1.STATE_CLOSED) {            return;        }        // start        const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;        const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;        this[kMonitorId] = new MonitorInterval(monitorServer(this), {            heartbeatFrequencyMS: heartbeatFrequencyMS,            minHeartbeatFrequencyMS: minHeartbeatFrequencyMS,            immediate: true        });    }    requestCheck() {        if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) {            return;        }        this[kMonitorId]?.wake();    }    reset() {        const topologyVersion = this[kServer].description.topologyVersion;        if (isInCloseState(this) || topologyVersion == null) {            return;        }        stateTransition(this, common_1.STATE_CLOSING);        resetMonitorState(this);        // restart monitor        stateTransition(this, STATE_IDLE);        // restart monitoring        const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;        const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;        this[kMonitorId] = new MonitorInterval(monitorServer(this), {            heartbeatFrequencyMS: heartbeatFrequencyMS,            minHeartbeatFrequencyMS: minHeartbeatFrequencyMS        });    }    close() {        if (isInCloseState(this)) {            return;        }        stateTransition(this, common_1.STATE_CLOSING);        resetMonitorState(this);        // close monitor        this.emit('close');        stateTransition(this, common_1.STATE_CLOSED);    }}exports.Monitor = Monitor;function resetMonitorState(monitor) {    monitor[kMonitorId]?.stop();    monitor[kMonitorId] = undefined;    monitor[kRTTPinger]?.close();    monitor[kRTTPinger] = undefined;    monitor[kCancellationToken].emit('cancel');    monitor[kConnection]?.destroy({ force: true });    monitor[kConnection] = undefined;}function checkServer(monitor, callback) {    let start = (0, utils_1.now)();    const topologyVersion = monitor[kServer].description.topologyVersion;    const isAwaitable = topologyVersion != null;    monitor.emit(server_1.Server.SERVER_HEARTBEAT_STARTED, new events_1.ServerHeartbeatStartedEvent(monitor.address, isAwaitable));    function failureHandler(err) {        monitor[kConnection]?.destroy({ force: true });        monitor[kConnection] = undefined;        monitor.emit(server_1.Server.SERVER_HEARTBEAT_FAILED, new events_1.ServerHeartbeatFailedEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), err, isAwaitable));        const error = !(err instanceof error_1.MongoError)            ? new error_1.MongoError(error_1.MongoError.buildErrorMessage(err), { cause: err })            : err;        error.addErrorLabel(error_1.MongoErrorLabel.ResetPool);        if (error instanceof error_1.MongoNetworkTimeoutError) {            error.addErrorLabel(error_1.MongoErrorLabel.InterruptInUseConnections);        }        monitor.emit('resetServer', error);        callback(err);    }    const connection = monitor[kConnection];    if (connection && !connection.closed) {        const { serverApi, helloOk } = connection;        const connectTimeoutMS = monitor.options.connectTimeoutMS;        const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;        const cmd = {            [serverApi?.version || helloOk ? 'hello' : constants_1.LEGACY_HELLO_COMMAND]: 1,            ...(isAwaitable && topologyVersion                ? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) }                : {})        };        const options = isAwaitable            ? {                socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0,                exhaustAllowed: true            }            : { socketTimeoutMS: connectTimeoutMS };        if (isAwaitable && monitor[kRTTPinger] == null) {            monitor[kRTTPinger] = new RTTPinger(monitor[kCancellationToken], Object.assign({ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS }, monitor.connectOptions));        }        connection.command((0, utils_1.ns)('admin.$cmd'), cmd, options, (err, hello) => {            if (err) {                return failureHandler(err);            }            if (!('isWritablePrimary' in hello)) {                // Provide hello-style response document.                hello.isWritablePrimary = hello[constants_1.LEGACY_HELLO_COMMAND];            }            const rttPinger = monitor[kRTTPinger];            const duration = isAwaitable && rttPinger ? rttPinger.roundTripTime : (0, utils_1.calculateDurationInMs)(start);            const awaited = isAwaitable && hello.topologyVersion != null;            monitor.emit(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, new events_1.ServerHeartbeatSucceededEvent(monitor.address, duration, hello, awaited));            // if we are using the streaming protocol then we immediately issue another `started`            // event, otherwise the "check" is complete and return to the main monitor loop            if (awaited) {                monitor.emit(server_1.Server.SERVER_HEARTBEAT_STARTED, new events_1.ServerHeartbeatStartedEvent(monitor.address, true));                start = (0, utils_1.now)();            }            else {                monitor[kRTTPinger]?.close();                monitor[kRTTPinger] = undefined;                callback(undefined, hello);            }        });        return;    }    // connecting does an implicit `hello`    (0, connect_1.connect)(monitor.connectOptions, (err, conn) => {        if (err) {            monitor[kConnection] = undefined;            failureHandler(err);            return;        }        if (conn) {            // Tell the connection that we are using the streaming protocol so that the            // connection's message stream will only read the last hello on the buffer.            conn.isMonitoringConnection = true;            if (isInCloseState(monitor)) {                conn.destroy({ force: true });                return;            }            monitor[kConnection] = conn;            monitor.emit(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, new events_1.ServerHeartbeatSucceededEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), conn.hello, false));            callback(undefined, conn.hello);        }    });}function monitorServer(monitor) {    return (callback) => {        if (monitor.s.state === STATE_MONITORING) {            process.nextTick(callback);            return;        }        stateTransition(monitor, STATE_MONITORING);        function done() {            if (!isInCloseState(monitor)) {                stateTransition(monitor, STATE_IDLE);            }            callback();        }        checkServer(monitor, (err, hello) => {            if (err) {                // otherwise an error occurred on initial discovery, also bail                if (monitor[kServer].description.type === common_1.ServerType.Unknown) {                    return done();                }            }            // if the check indicates streaming is supported, immediately reschedule monitoring            if (hello && hello.topologyVersion) {                (0, timers_1.setTimeout)(() => {                    if (!isInCloseState(monitor)) {                        monitor[kMonitorId]?.wake();                    }                }, 0);            }            done();        });    };}function makeTopologyVersion(tv) {    return {        processId: tv.processId,        // tests mock counter as just number, but in a real situation counter should always be a Long        // TODO(NODE-2674): Preserve int64 sent from MongoDB        counter: bson_1.Long.isLong(tv.counter) ? tv.counter : bson_1.Long.fromNumber(tv.counter)    };}/** @internal */class RTTPinger {    constructor(cancellationToken, options) {        this[kConnection] = undefined;        this[kCancellationToken] = cancellationToken;        this[kRoundTripTime] = 0;        this.closed = false;        const heartbeatFrequencyMS = options.heartbeatFrequencyMS;        this[kMonitorId] = (0, timers_1.setTimeout)(() => measureRoundTripTime(this, options), heartbeatFrequencyMS);    }    get roundTripTime() {        return this[kRoundTripTime];    }    close() {        this.closed = true;        (0, timers_1.clearTimeout)(this[kMonitorId]);        this[kConnection]?.destroy({ force: true });        this[kConnection] = undefined;    }}exports.RTTPinger = RTTPinger;function measureRoundTripTime(rttPinger, options) {    const start = (0, utils_1.now)();    options.cancellationToken = rttPinger[kCancellationToken];    const heartbeatFrequencyMS = options.heartbeatFrequencyMS;    if (rttPinger.closed) {        return;    }    function measureAndReschedule(conn) {        if (rttPinger.closed) {            conn?.destroy({ force: true });            return;        }        if (rttPinger[kConnection] == null) {            rttPinger[kConnection] = conn;        }        rttPinger[kRoundTripTime] = (0, utils_1.calculateDurationInMs)(start);        rttPinger[kMonitorId] = (0, timers_1.setTimeout)(() => measureRoundTripTime(rttPinger, options), heartbeatFrequencyMS);    }    const connection = rttPinger[kConnection];    if (connection == null) {        (0, connect_1.connect)(options, (err, conn) => {            if (err) {                rttPinger[kConnection] = undefined;                rttPinger[kRoundTripTime] = 0;                return;            }            measureAndReschedule(conn);        });        return;    }    connection.command((0, utils_1.ns)('admin.$cmd'), { [constants_1.LEGACY_HELLO_COMMAND]: 1 }, undefined, err => {        if (err) {            rttPinger[kConnection] = undefined;            rttPinger[kRoundTripTime] = 0;            return;        }        measureAndReschedule();    });}/** * @internal */class MonitorInterval {    constructor(fn, options = {}) {        this.isExpeditedCallToFnScheduled = false;        this.stopped = false;        this.isExecutionInProgress = false;        this.hasExecutedOnce = false;        this._executeAndReschedule = () => {            if (this.stopped)                return;            if (this.timerId) {                (0, timers_1.clearTimeout)(this.timerId);            }            this.isExpeditedCallToFnScheduled = false;            this.isExecutionInProgress = true;            this.fn(() => {                this.lastExecutionEnded = (0, utils_1.now)();                this.isExecutionInProgress = false;                this._reschedule(this.heartbeatFrequencyMS);            });        };        this.fn = fn;        this.lastExecutionEnded = -Infinity;        this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 1000;        this.minHeartbeatFrequencyMS = options.minHeartbeatFrequencyMS ?? 500;        if (options.immediate) {            this._executeAndReschedule();        }        else {            this._reschedule(undefined);        }    }    wake() {        const currentTime = (0, utils_1.now)();        const timeSinceLastCall = currentTime - this.lastExecutionEnded;        // TODO(NODE-4674): Add error handling and logging to the monitor        if (timeSinceLastCall < 0) {            return this._executeAndReschedule();        }        if (this.isExecutionInProgress) {            return;        }        // debounce multiple calls to wake within the `minInterval`        if (this.isExpeditedCallToFnScheduled) {            return;        }        // reschedule a call as soon as possible, ensuring the call never happens        // faster than the `minInterval`        if (timeSinceLastCall < this.minHeartbeatFrequencyMS) {            this.isExpeditedCallToFnScheduled = true;            this._reschedule(this.minHeartbeatFrequencyMS - timeSinceLastCall);            return;        }        this._executeAndReschedule();    }    stop() {        this.stopped = true;        if (this.timerId) {            (0, timers_1.clearTimeout)(this.timerId);            this.timerId = undefined;        }        this.lastExecutionEnded = -Infinity;        this.isExpeditedCallToFnScheduled = false;    }    toString() {        return JSON.stringify(this);    }    toJSON() {        const currentTime = (0, utils_1.now)();        const timeSinceLastCall = currentTime - this.lastExecutionEnded;        return {            timerId: this.timerId != null ? 'set' : 'cleared',            lastCallTime: this.lastExecutionEnded,            isExpeditedCheckScheduled: this.isExpeditedCallToFnScheduled,            stopped: this.stopped,            heartbeatFrequencyMS: this.heartbeatFrequencyMS,            minHeartbeatFrequencyMS: this.minHeartbeatFrequencyMS,            currentTime,            timeSinceLastCall        };    }    _reschedule(ms) {        if (this.stopped)            return;        if (this.timerId) {            (0, timers_1.clearTimeout)(this.timerId);        }        this.timerId = (0, timers_1.setTimeout)(this._executeAndReschedule, ms || this.heartbeatFrequencyMS);    }}exports.MonitorInterval = MonitorInterval;//# sourceMappingURL=monitor.js.map
 |