| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 | 'use strict'// eslint-disable-next-linevar Nativetry {  // Wrap this `require()` in a try-catch to avoid upstream bundlers from complaining that this might not be available since it is an optional import  Native = require('pg-native')} catch (e) {  throw e}var TypeOverrides = require('../type-overrides')var EventEmitter = require('events').EventEmittervar util = require('util')var ConnectionParameters = require('../connection-parameters')var NativeQuery = require('./query')var Client = (module.exports = function (config) {  EventEmitter.call(this)  config = config || {}  this._Promise = config.Promise || global.Promise  this._types = new TypeOverrides(config.types)  this.native = new Native({    types: this._types,  })  this._queryQueue = []  this._ending = false  this._connecting = false  this._connected = false  this._queryable = true  // keep these on the object for legacy reasons  // for the time being. TODO: deprecate all this jazz  var cp = (this.connectionParameters = new ConnectionParameters(config))  if (config.nativeConnectionString) cp.nativeConnectionString = config.nativeConnectionString  this.user = cp.user  // "hiding" the password so it doesn't show up in stack traces  // or if the client is console.logged  Object.defineProperty(this, 'password', {    configurable: true,    enumerable: false,    writable: true,    value: cp.password,  })  this.database = cp.database  this.host = cp.host  this.port = cp.port  // a hash to hold named queries  this.namedQueries = {}})Client.Query = NativeQueryutil.inherits(Client, EventEmitter)Client.prototype._errorAllQueries = function (err) {  const enqueueError = (query) => {    process.nextTick(() => {      query.native = this.native      query.handleError(err)    })  }  if (this._hasActiveQuery()) {    enqueueError(this._activeQuery)    this._activeQuery = null  }  this._queryQueue.forEach(enqueueError)  this._queryQueue.length = 0}// connect to the backend// pass an optional callback to be called once connected// or with an error if there was a connection errorClient.prototype._connect = function (cb) {  var self = this  if (this._connecting) {    process.nextTick(() => cb(new Error('Client has already been connected. You cannot reuse a client.')))    return  }  this._connecting = true  this.connectionParameters.getLibpqConnectionString(function (err, conString) {    if (self.connectionParameters.nativeConnectionString) conString = self.connectionParameters.nativeConnectionString    if (err) return cb(err)    self.native.connect(conString, function (err) {      if (err) {        self.native.end()        return cb(err)      }      // set internal states to connected      self._connected = true      // handle connection errors from the native layer      self.native.on('error', function (err) {        self._queryable = false        self._errorAllQueries(err)        self.emit('error', err)      })      self.native.on('notification', function (msg) {        self.emit('notification', {          channel: msg.relname,          payload: msg.extra,        })      })      // signal we are connected now      self.emit('connect')      self._pulseQueryQueue(true)      cb()    })  })}Client.prototype.connect = function (callback) {  if (callback) {    this._connect(callback)    return  }  return new this._Promise((resolve, reject) => {    this._connect((error) => {      if (error) {        reject(error)      } else {        resolve()      }    })  })}// send a query to the server// this method is highly overloaded to take// 1) string query, optional array of parameters, optional function callback// 2) object query with {//    string query//    optional array values,//    optional function callback instead of as a separate parameter//    optional string name to name & cache the query plan//    optional string rowMode = 'array' for an array of results//  }Client.prototype.query = function (config, values, callback) {  var query  var result  var readTimeout  var readTimeoutTimer  var queryCallback  if (config === null || config === undefined) {    throw new TypeError('Client was passed a null or undefined query')  } else if (typeof config.submit === 'function') {    readTimeout = config.query_timeout || this.connectionParameters.query_timeout    result = query = config    // accept query(new Query(...), (err, res) => { }) style    if (typeof values === 'function') {      config.callback = values    }  } else {    readTimeout = this.connectionParameters.query_timeout    query = new NativeQuery(config, values, callback)    if (!query.callback) {      let resolveOut, rejectOut      result = new this._Promise((resolve, reject) => {        resolveOut = resolve        rejectOut = reject      }).catch(err => {        Error.captureStackTrace(err);        throw err;      })      query.callback = (err, res) => (err ? rejectOut(err) : resolveOut(res))    }  }  if (readTimeout) {    queryCallback = query.callback    readTimeoutTimer = setTimeout(() => {      var error = new Error('Query read timeout')      process.nextTick(() => {        query.handleError(error, this.connection)      })      queryCallback(error)      // we already returned an error,      // just do nothing if query completes      query.callback = () => {}      // Remove from queue      var index = this._queryQueue.indexOf(query)      if (index > -1) {        this._queryQueue.splice(index, 1)      }      this._pulseQueryQueue()    }, readTimeout)    query.callback = (err, res) => {      clearTimeout(readTimeoutTimer)      queryCallback(err, res)    }  }  if (!this._queryable) {    query.native = this.native    process.nextTick(() => {      query.handleError(new Error('Client has encountered a connection error and is not queryable'))    })    return result  }  if (this._ending) {    query.native = this.native    process.nextTick(() => {      query.handleError(new Error('Client was closed and is not queryable'))    })    return result  }  this._queryQueue.push(query)  this._pulseQueryQueue()  return result}// disconnect from the backend serverClient.prototype.end = function (cb) {  var self = this  this._ending = true  if (!this._connected) {    this.once('connect', this.end.bind(this, cb))  }  var result  if (!cb) {    result = new this._Promise(function (resolve, reject) {      cb = (err) => (err ? reject(err) : resolve())    })  }  this.native.end(function () {    self._errorAllQueries(new Error('Connection terminated'))    process.nextTick(() => {      self.emit('end')      if (cb) cb()    })  })  return result}Client.prototype._hasActiveQuery = function () {  return this._activeQuery && this._activeQuery.state !== 'error' && this._activeQuery.state !== 'end'}Client.prototype._pulseQueryQueue = function (initialConnection) {  if (!this._connected) {    return  }  if (this._hasActiveQuery()) {    return  }  var query = this._queryQueue.shift()  if (!query) {    if (!initialConnection) {      this.emit('drain')    }    return  }  this._activeQuery = query  query.submit(this)  var self = this  query.once('_done', function () {    self._pulseQueryQueue()  })}// attempt to cancel an in-progress queryClient.prototype.cancel = function (query) {  if (this._activeQuery === query) {    this.native.cancel(function () {})  } else if (this._queryQueue.indexOf(query) !== -1) {    this._queryQueue.splice(this._queryQueue.indexOf(query), 1)  }}Client.prototype.ref = function () {}Client.prototype.unref = function () {}Client.prototype.setTypeParser = function (oid, format, parseFn) {  return this._types.setTypeParser(oid, format, parseFn)}Client.prototype.getTypeParser = function (oid, format) {  return this._types.getTypeParser(oid, format)}
 |