| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 | /* * Copyright (c) 2015-present, Vitaly Tomilov * * See the LICENSE file at the top-level directory of this distribution * for licensing information. * * Removal or modification of this copyright notice is prohibited. */const {Events} = require('./events');const npm = {    utils: require('./utils'),    text: require('./text')};////////////////////////////////////////////// Streams query data into any destination,// with the help of pg-query-stream library.function $stream(ctx, qs, initCB, config) {    const $p = config.promise;    // istanbul ignore next:    // we do not provide code coverage for the Native Bindings specifics    if (ctx.options.pgNative) {        return $p.reject(new Error(npm.text.nativeStreaming));    }    // Stream class was renamed again, see the following issue:    // https://github.com/brianc/node-postgres/issues/2412    if (!qs || !qs.constructor || qs.constructor.name !== 'QueryStream') {        // invalid or missing stream object;        return $p.reject(new TypeError(npm.text.invalidStream));    }    if (qs._reading || qs._closed) {        // stream object is in the wrong state;        return $p.reject(new Error(npm.text.invalidStreamState));    }    if (typeof initCB !== 'function') {        // parameter `initCB` must be passed as the initialization callback;        return $p.reject(new TypeError(npm.text.invalidStreamCB));    }    let error = Events.query(ctx.options, getContext());    if (error) {        error = getError(error);        Events.error(ctx.options, error, getContext());        return $p.reject(error);    }    const stream = ctx.db.client.query(qs);    stream.on('data', onData);    stream.on('error', onError);    stream.on('end', onEnd);    try {        initCB.call(this, stream); // the stream must be initialized during the call;    } catch (e) {        release();        error = getError(e);        Events.error(ctx.options, error, getContext());        return $p.reject(error);    }    const start = Date.now();    let resolve, reject, nRows = 0;    function onData(data) {        nRows++;        error = Events.receive(ctx.options, [data], undefined, getContext());        if (error) {            onError(error);        }    }    function onError(e) {        release();        stream.destroy();        e = getError(e);        Events.error(ctx.options, e, getContext());        reject(e);    }    function onEnd() {        release();        resolve({            processed: nRows, // total number of rows processed;            duration: Date.now() - start // duration, in milliseconds;        });    }    function release() {        stream.removeListener('data', onData);        stream.removeListener('error', onError);        stream.removeListener('end', onEnd);    }    function getError(e) {        return e instanceof npm.utils.InternalError ? e.error : e;    }    function getContext() {        let client;        if (ctx.db) {            client = ctx.db.client;        } else {            error = new Error(npm.text.looseQuery);        }        return {            client,            dc: ctx.dc,            query: qs.cursor.text,            params: qs.cursor.values,            ctx: ctx.ctx        };    }    return $p((res, rej) => {        resolve = res;        reject = rej;    });}module.exports = $stream;
 |