| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 | 'use strict'const { EventEmitter } = require('events')const Result = require('./result')const utils = require('./utils')class Query extends EventEmitter {  constructor(config, values, callback) {    super()    config = utils.normalizeQueryConfig(config, values, callback)    this.text = config.text    this.values = config.values    this.rows = config.rows    this.types = config.types    this.name = config.name    this.binary = config.binary    // use unique portal name each time    this.portal = config.portal || ''    this.callback = config.callback    this._rowMode = config.rowMode    if (process.domain && config.callback) {      this.callback = process.domain.bind(config.callback)    }    this._result = new Result(this._rowMode, this.types)    // potential for multiple results    this._results = this._result    this.isPreparedStatement = false    this._canceledDueToError = false    this._promise = null  }  requiresPreparation() {    // named queries must always be prepared    if (this.name) {      return true    }    // always prepare if there are max number of rows expected per    // portal execution    if (this.rows) {      return true    }    // don't prepare empty text queries    if (!this.text) {      return false    }    // prepare if there are values    if (!this.values) {      return false    }    return this.values.length > 0  }  _checkForMultirow() {    // if we already have a result with a command property    // then we've already executed one query in a multi-statement simple query    // turn our results into an array of results    if (this._result.command) {      if (!Array.isArray(this._results)) {        this._results = [this._result]      }      this._result = new Result(this._rowMode, this.types)      this._results.push(this._result)    }  }  // associates row metadata from the supplied  // message with this query object  // metadata used when parsing row results  handleRowDescription(msg) {    this._checkForMultirow()    this._result.addFields(msg.fields)    this._accumulateRows = this.callback || !this.listeners('row').length  }  handleDataRow(msg) {    let row    if (this._canceledDueToError) {      return    }    try {      row = this._result.parseRow(msg.fields)    } catch (err) {      this._canceledDueToError = err      return    }    this.emit('row', row, this._result)    if (this._accumulateRows) {      this._result.addRow(row)    }  }  handleCommandComplete(msg, connection) {    this._checkForMultirow()    this._result.addCommandComplete(msg)    // need to sync after each command complete of a prepared statement    // if we were using a row count which results in multiple calls to _getRows    if (this.rows) {      connection.sync()    }  }  // if a named prepared statement is created with empty query text  // the backend will send an emptyQuery message but *not* a command complete message  // since we pipeline sync immediately after execute we don't need to do anything here  // unless we have rows specified, in which case we did not pipeline the intial sync call  handleEmptyQuery(connection) {    if (this.rows) {      connection.sync()    }  }  handleError(err, connection) {    // need to sync after error during a prepared statement    if (this._canceledDueToError) {      err = this._canceledDueToError      this._canceledDueToError = false    }    // if callback supplied do not emit error event as uncaught error    // events will bubble up to node process    if (this.callback) {      return this.callback(err)    }    this.emit('error', err)  }  handleReadyForQuery(con) {    if (this._canceledDueToError) {      return this.handleError(this._canceledDueToError, con)    }    if (this.callback) {      try {        this.callback(null, this._results)      }      catch(err) {        process.nextTick(() => {          throw err        })      }    }    this.emit('end', this._results)  }  submit(connection) {    if (typeof this.text !== 'string' && typeof this.name !== 'string') {      return new Error('A query must have either text or a name. Supplying neither is unsupported.')    }    const previous = connection.parsedStatements[this.name]    if (this.text && previous && this.text !== previous) {      return new Error(`Prepared statements must be unique - '${this.name}' was used for a different statement`)    }    if (this.values && !Array.isArray(this.values)) {      return new Error('Query values must be an array')    }    if (this.requiresPreparation()) {      this.prepare(connection)    } else {      connection.query(this.text)    }    return null  }  hasBeenParsed(connection) {    return this.name && connection.parsedStatements[this.name]  }  handlePortalSuspended(connection) {    this._getRows(connection, this.rows)  }  _getRows(connection, rows) {    connection.execute({      portal: this.portal,      rows: rows,    })    // if we're not reading pages of rows send the sync command    // to indicate the pipeline is finished    if (!rows) {      connection.sync()    } else {      // otherwise flush the call out to read more rows      connection.flush()    }  }  // http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY  prepare(connection) {    // prepared statements need sync to be called after each command    // complete or when an error is encountered    this.isPreparedStatement = true    // TODO refactor this poor encapsulation    if (!this.hasBeenParsed(connection)) {      connection.parse({        text: this.text,        name: this.name,        types: this.types,      })    }    // because we're mapping user supplied values to    // postgres wire protocol compatible values it could    // throw an exception, so try/catch this section    try {      connection.bind({        portal: this.portal,        statement: this.name,        values: this.values,        binary: this.binary,        valueMapper: utils.prepareValue,      })    } catch (err) {      this.handleError(err, connection)      return    }    connection.describe({      type: 'P',      name: this.portal || '',    })    this._getRows(connection, this.rows)  }  handleCopyInResponse(connection) {    connection.sendCopyFail('No source stream defined')  }  // eslint-disable-next-line no-unused-vars  handleCopyData(msg, connection) {    // noop  }}module.exports = Query
 |