| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 | "use strict";Object.defineProperty(exports, "__esModule", { value: true });exports.readPreferenceServerSelector = exports.secondaryWritableServerSelector = exports.sameServerSelector = exports.writableServerSelector = exports.MIN_SECONDARY_WRITE_WIRE_VERSION = void 0;const error_1 = require("../error");const read_preference_1 = require("../read_preference");const common_1 = require("./common");// max staleness constantsconst IDLE_WRITE_PERIOD = 10000;const SMALLEST_MAX_STALENESS_SECONDS = 90;//  Minimum version to try writes on secondaries.exports.MIN_SECONDARY_WRITE_WIRE_VERSION = 13;/** * Returns a server selector that selects for writable servers */function writableServerSelector() {    return (topologyDescription, servers) => latencyWindowReducer(topologyDescription, servers.filter((s) => s.isWritable));}exports.writableServerSelector = writableServerSelector;/** * The purpose of this selector is to select the same server, only * if it is in a state that it can have commands sent to it. */function sameServerSelector(description) {    return (topologyDescription, servers) => {        if (!description)            return [];        // Filter the servers to match the provided description only if        // the type is not unknown.        return servers.filter(sd => {            return sd.address === description.address && sd.type !== common_1.ServerType.Unknown;        });    };}exports.sameServerSelector = sameServerSelector;/** * Returns a server selector that uses a read preference to select a * server potentially for a write on a secondary. */function secondaryWritableServerSelector(wireVersion, readPreference) {    // If server version < 5.0, read preference always primary.    // If server version >= 5.0...    // - If read preference is supplied, use that.    // - If no read preference is supplied, use primary.    if (!readPreference ||        !wireVersion ||        (wireVersion && wireVersion < exports.MIN_SECONDARY_WRITE_WIRE_VERSION)) {        return readPreferenceServerSelector(read_preference_1.ReadPreference.primary);    }    return readPreferenceServerSelector(readPreference);}exports.secondaryWritableServerSelector = secondaryWritableServerSelector;/** * Reduces the passed in array of servers by the rules of the "Max Staleness" specification * found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst * * @param readPreference - The read preference providing max staleness guidance * @param topologyDescription - The topology description * @param servers - The list of server descriptions to be reduced * @returns The list of servers that satisfy the requirements of max staleness */function maxStalenessReducer(readPreference, topologyDescription, servers) {    if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) {        return servers;    }    const maxStaleness = readPreference.maxStalenessSeconds;    const maxStalenessVariance = (topologyDescription.heartbeatFrequencyMS + IDLE_WRITE_PERIOD) / 1000;    if (maxStaleness < maxStalenessVariance) {        throw new error_1.MongoInvalidArgumentError(`Option "maxStalenessSeconds" must be at least ${maxStalenessVariance} seconds`);    }    if (maxStaleness < SMALLEST_MAX_STALENESS_SECONDS) {        throw new error_1.MongoInvalidArgumentError(`Option "maxStalenessSeconds" must be at least ${SMALLEST_MAX_STALENESS_SECONDS} seconds`);    }    if (topologyDescription.type === common_1.TopologyType.ReplicaSetWithPrimary) {        const primary = Array.from(topologyDescription.servers.values()).filter(primaryFilter)[0];        return servers.reduce((result, server) => {            const stalenessMS = server.lastUpdateTime -                server.lastWriteDate -                (primary.lastUpdateTime - primary.lastWriteDate) +                topologyDescription.heartbeatFrequencyMS;            const staleness = stalenessMS / 1000;            const maxStalenessSeconds = readPreference.maxStalenessSeconds ?? 0;            if (staleness <= maxStalenessSeconds) {                result.push(server);            }            return result;        }, []);    }    if (topologyDescription.type === common_1.TopologyType.ReplicaSetNoPrimary) {        if (servers.length === 0) {            return servers;        }        const sMax = servers.reduce((max, s) => s.lastWriteDate > max.lastWriteDate ? s : max);        return servers.reduce((result, server) => {            const stalenessMS = sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS;            const staleness = stalenessMS / 1000;            const maxStalenessSeconds = readPreference.maxStalenessSeconds ?? 0;            if (staleness <= maxStalenessSeconds) {                result.push(server);            }            return result;        }, []);    }    return servers;}/** * Determines whether a server's tags match a given set of tags * * @param tagSet - The requested tag set to match * @param serverTags - The server's tags */function tagSetMatch(tagSet, serverTags) {    const keys = Object.keys(tagSet);    const serverTagKeys = Object.keys(serverTags);    for (let i = 0; i < keys.length; ++i) {        const key = keys[i];        if (serverTagKeys.indexOf(key) === -1 || serverTags[key] !== tagSet[key]) {            return false;        }    }    return true;}/** * Reduces a set of server descriptions based on tags requested by the read preference * * @param readPreference - The read preference providing the requested tags * @param servers - The list of server descriptions to reduce * @returns The list of servers matching the requested tags */function tagSetReducer(readPreference, servers) {    if (readPreference.tags == null ||        (Array.isArray(readPreference.tags) && readPreference.tags.length === 0)) {        return servers;    }    for (let i = 0; i < readPreference.tags.length; ++i) {        const tagSet = readPreference.tags[i];        const serversMatchingTagset = servers.reduce((matched, server) => {            if (tagSetMatch(tagSet, server.tags))                matched.push(server);            return matched;        }, []);        if (serversMatchingTagset.length) {            return serversMatchingTagset;        }    }    return [];}/** * Reduces a list of servers to ensure they fall within an acceptable latency window. This is * further specified in the "Server Selection" specification, found here: * https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst * * @param topologyDescription - The topology description * @param servers - The list of servers to reduce * @returns The servers which fall within an acceptable latency window */function latencyWindowReducer(topologyDescription, servers) {    const low = servers.reduce((min, server) => min === -1 ? server.roundTripTime : Math.min(server.roundTripTime, min), -1);    const high = low + topologyDescription.localThresholdMS;    return servers.reduce((result, server) => {        if (server.roundTripTime <= high && server.roundTripTime >= low)            result.push(server);        return result;    }, []);}// filtersfunction primaryFilter(server) {    return server.type === common_1.ServerType.RSPrimary;}function secondaryFilter(server) {    return server.type === common_1.ServerType.RSSecondary;}function nearestFilter(server) {    return server.type === common_1.ServerType.RSSecondary || server.type === common_1.ServerType.RSPrimary;}function knownFilter(server) {    return server.type !== common_1.ServerType.Unknown;}function loadBalancerFilter(server) {    return server.type === common_1.ServerType.LoadBalancer;}/** * Returns a function which selects servers based on a provided read preference * * @param readPreference - The read preference to select with */function readPreferenceServerSelector(readPreference) {    if (!readPreference.isValid()) {        throw new error_1.MongoInvalidArgumentError('Invalid read preference specified');    }    return (topologyDescription, servers) => {        const commonWireVersion = topologyDescription.commonWireVersion;        if (commonWireVersion &&            readPreference.minWireVersion &&            readPreference.minWireVersion > commonWireVersion) {            throw new error_1.MongoCompatibilityError(`Minimum wire version '${readPreference.minWireVersion}' required, but found '${commonWireVersion}'`);        }        if (topologyDescription.type === common_1.TopologyType.LoadBalanced) {            return servers.filter(loadBalancerFilter);        }        if (topologyDescription.type === common_1.TopologyType.Unknown) {            return [];        }        if (topologyDescription.type === common_1.TopologyType.Single ||            topologyDescription.type === common_1.TopologyType.Sharded) {            return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));        }        const mode = readPreference.mode;        if (mode === read_preference_1.ReadPreference.PRIMARY) {            return servers.filter(primaryFilter);        }        if (mode === read_preference_1.ReadPreference.PRIMARY_PREFERRED) {            const result = servers.filter(primaryFilter);            if (result.length) {                return result;            }        }        const filter = mode === read_preference_1.ReadPreference.NEAREST ? nearestFilter : secondaryFilter;        const selectedServers = latencyWindowReducer(topologyDescription, tagSetReducer(readPreference, maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter))));        if (mode === read_preference_1.ReadPreference.SECONDARY_PREFERRED && selectedServers.length === 0) {            return servers.filter(primaryFilter);        }        return selectedServers;    };}exports.readPreferenceServerSelector = readPreferenceServerSelector;//# sourceMappingURL=server_selection.js.map
 |