| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632 | "use strict";Object.defineProperty(exports, "__esModule", { value: true });exports.ConnectionPool = exports.PoolState = void 0;const timers_1 = require("timers");const constants_1 = require("../constants");const error_1 = require("../error");const mongo_types_1 = require("../mongo_types");const utils_1 = require("../utils");const connect_1 = require("./connect");const connection_1 = require("./connection");const connection_pool_events_1 = require("./connection_pool_events");const errors_1 = require("./errors");const metrics_1 = require("./metrics");/** @internal */const kServer = Symbol('server');/** @internal */const kConnections = Symbol('connections');/** @internal */const kPending = Symbol('pending');/** @internal */const kCheckedOut = Symbol('checkedOut');/** @internal */const kMinPoolSizeTimer = Symbol('minPoolSizeTimer');/** @internal */const kGeneration = Symbol('generation');/** @internal */const kServiceGenerations = Symbol('serviceGenerations');/** @internal */const kConnectionCounter = Symbol('connectionCounter');/** @internal */const kCancellationToken = Symbol('cancellationToken');/** @internal */const kWaitQueue = Symbol('waitQueue');/** @internal */const kCancelled = Symbol('cancelled');/** @internal */const kMetrics = Symbol('metrics');/** @internal */const kProcessingWaitQueue = Symbol('processingWaitQueue');/** @internal */const kPoolState = Symbol('poolState');/** @internal */exports.PoolState = Object.freeze({    paused: 'paused',    ready: 'ready',    closed: 'closed'});/** * A pool of connections which dynamically resizes, and emit events related to pool activity * @internal */class ConnectionPool extends mongo_types_1.TypedEventEmitter {    constructor(server, options) {        super();        this.options = Object.freeze({            ...options,            connectionType: connection_1.Connection,            maxPoolSize: options.maxPoolSize ?? 100,            minPoolSize: options.minPoolSize ?? 0,            maxConnecting: options.maxConnecting ?? 2,            maxIdleTimeMS: options.maxIdleTimeMS ?? 0,            waitQueueTimeoutMS: options.waitQueueTimeoutMS ?? 0,            minPoolSizeCheckFrequencyMS: options.minPoolSizeCheckFrequencyMS ?? 100,            autoEncrypter: options.autoEncrypter,            metadata: options.metadata        });        if (this.options.minPoolSize > this.options.maxPoolSize) {            throw new error_1.MongoInvalidArgumentError('Connection pool minimum size must not be greater than maximum pool size');        }        this[kPoolState] = exports.PoolState.paused;        this[kServer] = server;        this[kConnections] = new utils_1.List();        this[kPending] = 0;        this[kCheckedOut] = new Set();        this[kMinPoolSizeTimer] = undefined;        this[kGeneration] = 0;        this[kServiceGenerations] = new Map();        this[kConnectionCounter] = (0, utils_1.makeCounter)(1);        this[kCancellationToken] = new mongo_types_1.CancellationToken();        this[kCancellationToken].setMaxListeners(Infinity);        this[kWaitQueue] = new utils_1.List();        this[kMetrics] = new metrics_1.ConnectionPoolMetrics();        this[kProcessingWaitQueue] = false;        this.mongoLogger = this[kServer].topology.client.mongoLogger;        this.component = 'connection';        process.nextTick(() => {            this.emitAndLog(ConnectionPool.CONNECTION_POOL_CREATED, new connection_pool_events_1.ConnectionPoolCreatedEvent(this));        });    }    /** The address of the endpoint the pool is connected to */    get address() {        return this.options.hostAddress.toString();    }    /**     * Check if the pool has been closed     *     * TODO(NODE-3263): We can remove this property once shell no longer needs it     */    get closed() {        return this[kPoolState] === exports.PoolState.closed;    }    /** An integer representing the SDAM generation of the pool */    get generation() {        return this[kGeneration];    }    /** An integer expressing how many total connections (available + pending + in use) the pool currently has */    get totalConnectionCount() {        return (this.availableConnectionCount + this.pendingConnectionCount + this.currentCheckedOutCount);    }    /** An integer expressing how many connections are currently available in the pool. */    get availableConnectionCount() {        return this[kConnections].length;    }    get pendingConnectionCount() {        return this[kPending];    }    get currentCheckedOutCount() {        return this[kCheckedOut].size;    }    get waitQueueSize() {        return this[kWaitQueue].length;    }    get loadBalanced() {        return this.options.loadBalanced;    }    get serviceGenerations() {        return this[kServiceGenerations];    }    get serverError() {        return this[kServer].description.error;    }    /**     * This is exposed ONLY for use in mongosh, to enable     * killing all connections if a user quits the shell with     * operations in progress.     *     * This property may be removed as a part of NODE-3263.     */    get checkedOutConnections() {        return this[kCheckedOut];    }    /**     * Get the metrics information for the pool when a wait queue timeout occurs.     */    waitQueueErrorMetrics() {        return this[kMetrics].info(this.options.maxPoolSize);    }    /**     * Set the pool state to "ready"     */    ready() {        if (this[kPoolState] !== exports.PoolState.paused) {            return;        }        this[kPoolState] = exports.PoolState.ready;        this.emitAndLog(ConnectionPool.CONNECTION_POOL_READY, new connection_pool_events_1.ConnectionPoolReadyEvent(this));        (0, timers_1.clearTimeout)(this[kMinPoolSizeTimer]);        this.ensureMinPoolSize();    }    /**     * Check a connection out of this pool. The connection will continue to be tracked, but no reference to it     * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or     * explicitly destroyed by the new owner.     */    checkOut(callback) {        this.emitAndLog(ConnectionPool.CONNECTION_CHECK_OUT_STARTED, new connection_pool_events_1.ConnectionCheckOutStartedEvent(this));        const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;        const waitQueueMember = {            callback,            timeoutController: new utils_1.TimeoutController(waitQueueTimeoutMS)        };        waitQueueMember.timeoutController.signal.addEventListener('abort', () => {            waitQueueMember[kCancelled] = true;            waitQueueMember.timeoutController.clear();            this.emitAndLog(ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new connection_pool_events_1.ConnectionCheckOutFailedEvent(this, 'timeout'));            waitQueueMember.callback(new errors_1.WaitQueueTimeoutError(this.loadBalanced                ? this.waitQueueErrorMetrics()                : 'Timed out while checking out a connection from connection pool', this.address));        });        this[kWaitQueue].push(waitQueueMember);        process.nextTick(() => this.processWaitQueue());    }    /**     * Check a connection into the pool.     *     * @param connection - The connection to check in     */    checkIn(connection) {        if (!this[kCheckedOut].has(connection)) {            return;        }        const poolClosed = this.closed;        const stale = this.connectionIsStale(connection);        const willDestroy = !!(poolClosed || stale || connection.closed);        if (!willDestroy) {            connection.markAvailable();            this[kConnections].unshift(connection);        }        this[kCheckedOut].delete(connection);        this.emitAndLog(ConnectionPool.CONNECTION_CHECKED_IN, new connection_pool_events_1.ConnectionCheckedInEvent(this, connection));        if (willDestroy) {            const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale';            this.destroyConnection(connection, reason);        }        process.nextTick(() => this.processWaitQueue());    }    /**     * Clear the pool     *     * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a     * previous generation will eventually be pruned during subsequent checkouts.     */    clear(options = {}) {        if (this.closed) {            return;        }        // handle load balanced case        if (this.loadBalanced) {            const { serviceId } = options;            if (!serviceId) {                throw new error_1.MongoRuntimeError('ConnectionPool.clear() called in load balanced mode with no serviceId.');            }            const sid = serviceId.toHexString();            const generation = this.serviceGenerations.get(sid);            // Only need to worry if the generation exists, since it should            // always be there but typescript needs the check.            if (generation == null) {                throw new error_1.MongoRuntimeError('Service generations are required in load balancer mode.');            }            else {                // Increment the generation for the service id.                this.serviceGenerations.set(sid, generation + 1);            }            this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLEARED, new connection_pool_events_1.ConnectionPoolClearedEvent(this, { serviceId }));            return;        }        // handle non load-balanced case        const interruptInUseConnections = options.interruptInUseConnections ?? false;        const oldGeneration = this[kGeneration];        this[kGeneration] += 1;        const alreadyPaused = this[kPoolState] === exports.PoolState.paused;        this[kPoolState] = exports.PoolState.paused;        this.clearMinPoolSizeTimer();        if (!alreadyPaused) {            this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLEARED, new connection_pool_events_1.ConnectionPoolClearedEvent(this, {                interruptInUseConnections            }));        }        if (interruptInUseConnections) {            process.nextTick(() => this.interruptInUseConnections(oldGeneration));        }        this.processWaitQueue();    }    /**     * Closes all stale in-use connections in the pool with a resumable PoolClearedOnNetworkError.     *     * Only connections where `connection.generation <= minGeneration` are killed.     */    interruptInUseConnections(minGeneration) {        for (const connection of this[kCheckedOut]) {            if (connection.generation <= minGeneration) {                this.checkIn(connection);                connection.onError(new errors_1.PoolClearedOnNetworkError(this));            }        }    }    close(_options, _cb) {        let options = _options;        const callback = (_cb ?? _options);        if (typeof options === 'function') {            options = {};        }        options = Object.assign({ force: false }, options);        if (this.closed) {            return callback();        }        // immediately cancel any in-flight connections        this[kCancellationToken].emit('cancel');        // end the connection counter        if (typeof this[kConnectionCounter].return === 'function') {            this[kConnectionCounter].return(undefined);        }        this[kPoolState] = exports.PoolState.closed;        this.clearMinPoolSizeTimer();        this.processWaitQueue();        (0, utils_1.eachAsync)(this[kConnections].toArray(), (conn, cb) => {            this.emitAndLog(ConnectionPool.CONNECTION_CLOSED, new connection_pool_events_1.ConnectionClosedEvent(this, conn, 'poolClosed'));            conn.destroy({ force: !!options.force }, cb);        }, err => {            this[kConnections].clear();            this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLOSED, new connection_pool_events_1.ConnectionPoolClosedEvent(this));            callback(err);        });    }    /**     * Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda     * has completed by calling back.     *     * NOTE: please note the required signature of `fn`     *     * @remarks When in load balancer mode, connections can be pinned to cursors or transactions.     *   In these cases we pass the connection in to this method to ensure it is used and a new     *   connection is not checked out.     *     * @param conn - A pinned connection for use in load balancing mode.     * @param fn - A function which operates on a managed connection     * @param callback - The original callback     */    withConnection(conn, fn, callback) {        if (conn) {            // use the provided connection, and do _not_ check it in after execution            fn(undefined, conn, (fnErr, result) => {                if (fnErr) {                    return this.withReauthentication(fnErr, conn, fn, callback);                }                callback(undefined, result);            });            return;        }        this.checkOut((err, conn) => {            // don't callback with `err` here, we might want to act upon it inside `fn`            fn(err, conn, (fnErr, result) => {                if (fnErr) {                    if (conn) {                        this.withReauthentication(fnErr, conn, fn, callback);                    }                    else {                        callback(fnErr);                    }                }                else {                    callback(undefined, result);                }                if (conn) {                    this.checkIn(conn);                }            });        });    }    withReauthentication(fnErr, conn, fn, callback) {        if (fnErr instanceof error_1.MongoError && fnErr.code === error_1.MONGODB_ERROR_CODES.Reauthenticate) {            this.reauthenticate(conn, fn, (error, res) => {                if (error) {                    return callback(error);                }                callback(undefined, res);            });        }        else {            callback(fnErr);        }    }    /**     * Reauthenticate on the same connection and then retry the operation.     */    reauthenticate(connection, fn, callback) {        const authContext = connection.authContext;        if (!authContext) {            return callback(new error_1.MongoRuntimeError('No auth context found on connection.'));        }        const credentials = authContext.credentials;        if (!credentials) {            return callback(new error_1.MongoMissingCredentialsError('Connection is missing credentials when asked to reauthenticate'));        }        const resolvedCredentials = credentials.resolveAuthMechanism(connection.hello || undefined);        const provider = connect_1.AUTH_PROVIDERS.get(resolvedCredentials.mechanism);        if (!provider) {            return callback(new error_1.MongoMissingCredentialsError(`Reauthenticate failed due to no auth provider for ${credentials.mechanism}`));        }        provider.reauth(authContext).then(() => {            fn(undefined, connection, (fnErr, fnResult) => {                if (fnErr) {                    return callback(fnErr);                }                callback(undefined, fnResult);            });        }, error => callback(error));    }    /** Clear the min pool size timer */    clearMinPoolSizeTimer() {        const minPoolSizeTimer = this[kMinPoolSizeTimer];        if (minPoolSizeTimer) {            (0, timers_1.clearTimeout)(minPoolSizeTimer);        }    }    destroyConnection(connection, reason) {        this.emitAndLog(ConnectionPool.CONNECTION_CLOSED, new connection_pool_events_1.ConnectionClosedEvent(this, connection, reason));        // destroy the connection        process.nextTick(() => connection.destroy({ force: false }));    }    connectionIsStale(connection) {        const serviceId = connection.serviceId;        if (this.loadBalanced && serviceId) {            const sid = serviceId.toHexString();            const generation = this.serviceGenerations.get(sid);            return connection.generation !== generation;        }        return connection.generation !== this[kGeneration];    }    connectionIsIdle(connection) {        return !!(this.options.maxIdleTimeMS && connection.idleTime > this.options.maxIdleTimeMS);    }    /**     * Destroys a connection if the connection is perished.     *     * @returns `true` if the connection was destroyed, `false` otherwise.     */    destroyConnectionIfPerished(connection) {        const isStale = this.connectionIsStale(connection);        const isIdle = this.connectionIsIdle(connection);        if (!isStale && !isIdle && !connection.closed) {            return false;        }        const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle';        this.destroyConnection(connection, reason);        return true;    }    createConnection(callback) {        const connectOptions = {            ...this.options,            id: this[kConnectionCounter].next().value,            generation: this[kGeneration],            cancellationToken: this[kCancellationToken]        };        this[kPending]++;        // This is our version of a "virtual" no-I/O connection as the spec requires        this.emitAndLog(ConnectionPool.CONNECTION_CREATED, new connection_pool_events_1.ConnectionCreatedEvent(this, { id: connectOptions.id }));        (0, connect_1.connect)(connectOptions, (err, connection) => {            if (err || !connection) {                this[kPending]--;                this.emitAndLog(ConnectionPool.CONNECTION_CLOSED, new connection_pool_events_1.ConnectionClosedEvent(this, { id: connectOptions.id, serviceId: undefined }, 'error',                 // TODO(NODE-5192): Remove this cast                err));                if (err instanceof error_1.MongoNetworkError || err instanceof error_1.MongoServerError) {                    err.connectionGeneration = connectOptions.generation;                }                callback(err ?? new error_1.MongoRuntimeError('Connection creation failed without error'));                return;            }            // The pool might have closed since we started trying to create a connection            if (this[kPoolState] !== exports.PoolState.ready) {                this[kPending]--;                connection.destroy({ force: true });                callback(this.closed ? new errors_1.PoolClosedError(this) : new errors_1.PoolClearedError(this));                return;            }            // forward all events from the connection to the pool            for (const event of [...constants_1.APM_EVENTS, connection_1.Connection.CLUSTER_TIME_RECEIVED]) {                connection.on(event, (e) => this.emit(event, e));            }            if (this.loadBalanced) {                connection.on(connection_1.Connection.PINNED, pinType => this[kMetrics].markPinned(pinType));                connection.on(connection_1.Connection.UNPINNED, pinType => this[kMetrics].markUnpinned(pinType));                const serviceId = connection.serviceId;                if (serviceId) {                    let generation;                    const sid = serviceId.toHexString();                    if ((generation = this.serviceGenerations.get(sid))) {                        connection.generation = generation;                    }                    else {                        this.serviceGenerations.set(sid, 0);                        connection.generation = 0;                    }                }            }            connection.markAvailable();            this.emitAndLog(ConnectionPool.CONNECTION_READY, new connection_pool_events_1.ConnectionReadyEvent(this, connection));            this[kPending]--;            callback(undefined, connection);            return;        });    }    ensureMinPoolSize() {        const minPoolSize = this.options.minPoolSize;        if (this[kPoolState] !== exports.PoolState.ready || minPoolSize === 0) {            return;        }        this[kConnections].prune(connection => this.destroyConnectionIfPerished(connection));        if (this.totalConnectionCount < minPoolSize &&            this.pendingConnectionCount < this.options.maxConnecting) {            // NOTE: ensureMinPoolSize should not try to get all the pending            // connection permits because that potentially delays the availability of            // the connection to a checkout request            this.createConnection((err, connection) => {                if (err) {                    this[kServer].handleError(err);                }                if (!err && connection) {                    this[kConnections].push(connection);                    process.nextTick(() => this.processWaitQueue());                }                if (this[kPoolState] === exports.PoolState.ready) {                    (0, timers_1.clearTimeout)(this[kMinPoolSizeTimer]);                    this[kMinPoolSizeTimer] = (0, timers_1.setTimeout)(() => this.ensureMinPoolSize(), this.options.minPoolSizeCheckFrequencyMS);                }            });        }        else {            (0, timers_1.clearTimeout)(this[kMinPoolSizeTimer]);            this[kMinPoolSizeTimer] = (0, timers_1.setTimeout)(() => this.ensureMinPoolSize(), this.options.minPoolSizeCheckFrequencyMS);        }    }    processWaitQueue() {        if (this[kProcessingWaitQueue]) {            return;        }        this[kProcessingWaitQueue] = true;        while (this.waitQueueSize) {            const waitQueueMember = this[kWaitQueue].first();            if (!waitQueueMember) {                this[kWaitQueue].shift();                continue;            }            if (waitQueueMember[kCancelled]) {                this[kWaitQueue].shift();                continue;            }            if (this[kPoolState] !== exports.PoolState.ready) {                const reason = this.closed ? 'poolClosed' : 'connectionError';                const error = this.closed ? new errors_1.PoolClosedError(this) : new errors_1.PoolClearedError(this);                this.emitAndLog(ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new connection_pool_events_1.ConnectionCheckOutFailedEvent(this, reason, error));                waitQueueMember.timeoutController.clear();                this[kWaitQueue].shift();                waitQueueMember.callback(error);                continue;            }            if (!this.availableConnectionCount) {                break;            }            const connection = this[kConnections].shift();            if (!connection) {                break;            }            if (!this.destroyConnectionIfPerished(connection)) {                this[kCheckedOut].add(connection);                this.emitAndLog(ConnectionPool.CONNECTION_CHECKED_OUT, new connection_pool_events_1.ConnectionCheckedOutEvent(this, connection));                waitQueueMember.timeoutController.clear();                this[kWaitQueue].shift();                waitQueueMember.callback(undefined, connection);            }        }        const { maxPoolSize, maxConnecting } = this.options;        while (this.waitQueueSize > 0 &&            this.pendingConnectionCount < maxConnecting &&            (maxPoolSize === 0 || this.totalConnectionCount < maxPoolSize)) {            const waitQueueMember = this[kWaitQueue].shift();            if (!waitQueueMember || waitQueueMember[kCancelled]) {                continue;            }            this.createConnection((err, connection) => {                if (waitQueueMember[kCancelled]) {                    if (!err && connection) {                        this[kConnections].push(connection);                    }                }                else {                    if (err) {                        this.emitAndLog(ConnectionPool.CONNECTION_CHECK_OUT_FAILED,                         // TODO(NODE-5192): Remove this cast                        new connection_pool_events_1.ConnectionCheckOutFailedEvent(this, 'connectionError', err));                    }                    else if (connection) {                        this[kCheckedOut].add(connection);                        this.emitAndLog(ConnectionPool.CONNECTION_CHECKED_OUT, new connection_pool_events_1.ConnectionCheckedOutEvent(this, connection));                    }                    waitQueueMember.timeoutController.clear();                    waitQueueMember.callback(err, connection);                }                process.nextTick(() => this.processWaitQueue());            });        }        this[kProcessingWaitQueue] = false;    }}/** * Emitted when the connection pool is created. * @event */ConnectionPool.CONNECTION_POOL_CREATED = constants_1.CONNECTION_POOL_CREATED;/** * Emitted once when the connection pool is closed * @event */ConnectionPool.CONNECTION_POOL_CLOSED = constants_1.CONNECTION_POOL_CLOSED;/** * Emitted each time the connection pool is cleared and it's generation incremented * @event */ConnectionPool.CONNECTION_POOL_CLEARED = constants_1.CONNECTION_POOL_CLEARED;/** * Emitted each time the connection pool is marked ready * @event */ConnectionPool.CONNECTION_POOL_READY = constants_1.CONNECTION_POOL_READY;/** * Emitted when a connection is created. * @event */ConnectionPool.CONNECTION_CREATED = constants_1.CONNECTION_CREATED;/** * Emitted when a connection becomes established, and is ready to use * @event */ConnectionPool.CONNECTION_READY = constants_1.CONNECTION_READY;/** * Emitted when a connection is closed * @event */ConnectionPool.CONNECTION_CLOSED = constants_1.CONNECTION_CLOSED;/** * Emitted when an attempt to check out a connection begins * @event */ConnectionPool.CONNECTION_CHECK_OUT_STARTED = constants_1.CONNECTION_CHECK_OUT_STARTED;/** * Emitted when an attempt to check out a connection fails * @event */ConnectionPool.CONNECTION_CHECK_OUT_FAILED = constants_1.CONNECTION_CHECK_OUT_FAILED;/** * Emitted each time a connection is successfully checked out of the connection pool * @event */ConnectionPool.CONNECTION_CHECKED_OUT = constants_1.CONNECTION_CHECKED_OUT;/** * Emitted each time a connection is successfully checked into the connection pool * @event */ConnectionPool.CONNECTION_CHECKED_IN = constants_1.CONNECTION_CHECKED_IN;exports.ConnectionPool = ConnectionPool;//# sourceMappingURL=connection_pool.js.map
 |