| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 | 
							- 'use strict'
 
- const { EventEmitter } = require('events')
 
- const Result = require('./result')
 
- const utils = require('./utils')
 
- class Query extends EventEmitter {
 
-   constructor(config, values, callback) {
 
-     super()
 
-     config = utils.normalizeQueryConfig(config, values, callback)
 
-     this.text = config.text
 
-     this.values = config.values
 
-     this.rows = config.rows
 
-     this.types = config.types
 
-     this.name = config.name
 
-     this.binary = config.binary
 
-     // use unique portal name each time
 
-     this.portal = config.portal || ''
 
-     this.callback = config.callback
 
-     this._rowMode = config.rowMode
 
-     if (process.domain && config.callback) {
 
-       this.callback = process.domain.bind(config.callback)
 
-     }
 
-     this._result = new Result(this._rowMode, this.types)
 
-     // potential for multiple results
 
-     this._results = this._result
 
-     this.isPreparedStatement = false
 
-     this._canceledDueToError = false
 
-     this._promise = null
 
-   }
 
-   requiresPreparation() {
 
-     // named queries must always be prepared
 
-     if (this.name) {
 
-       return true
 
-     }
 
-     // always prepare if there are max number of rows expected per
 
-     // portal execution
 
-     if (this.rows) {
 
-       return true
 
-     }
 
-     // don't prepare empty text queries
 
-     if (!this.text) {
 
-       return false
 
-     }
 
-     // prepare if there are values
 
-     if (!this.values) {
 
-       return false
 
-     }
 
-     return this.values.length > 0
 
-   }
 
-   _checkForMultirow() {
 
-     // if we already have a result with a command property
 
-     // then we've already executed one query in a multi-statement simple query
 
-     // turn our results into an array of results
 
-     if (this._result.command) {
 
-       if (!Array.isArray(this._results)) {
 
-         this._results = [this._result]
 
-       }
 
-       this._result = new Result(this._rowMode, this.types)
 
-       this._results.push(this._result)
 
-     }
 
-   }
 
-   // associates row metadata from the supplied
 
-   // message with this query object
 
-   // metadata used when parsing row results
 
-   handleRowDescription(msg) {
 
-     this._checkForMultirow()
 
-     this._result.addFields(msg.fields)
 
-     this._accumulateRows = this.callback || !this.listeners('row').length
 
-   }
 
-   handleDataRow(msg) {
 
-     let row
 
-     if (this._canceledDueToError) {
 
-       return
 
-     }
 
-     try {
 
-       row = this._result.parseRow(msg.fields)
 
-     } catch (err) {
 
-       this._canceledDueToError = err
 
-       return
 
-     }
 
-     this.emit('row', row, this._result)
 
-     if (this._accumulateRows) {
 
-       this._result.addRow(row)
 
-     }
 
-   }
 
-   handleCommandComplete(msg, connection) {
 
-     this._checkForMultirow()
 
-     this._result.addCommandComplete(msg)
 
-     // need to sync after each command complete of a prepared statement
 
-     // if we were using a row count which results in multiple calls to _getRows
 
-     if (this.rows) {
 
-       connection.sync()
 
-     }
 
-   }
 
-   // if a named prepared statement is created with empty query text
 
-   // the backend will send an emptyQuery message but *not* a command complete message
 
-   // since we pipeline sync immediately after execute we don't need to do anything here
 
-   // unless we have rows specified, in which case we did not pipeline the intial sync call
 
-   handleEmptyQuery(connection) {
 
-     if (this.rows) {
 
-       connection.sync()
 
-     }
 
-   }
 
-   handleError(err, connection) {
 
-     // need to sync after error during a prepared statement
 
-     if (this._canceledDueToError) {
 
-       err = this._canceledDueToError
 
-       this._canceledDueToError = false
 
-     }
 
-     // if callback supplied do not emit error event as uncaught error
 
-     // events will bubble up to node process
 
-     if (this.callback) {
 
-       return this.callback(err)
 
-     }
 
-     this.emit('error', err)
 
-   }
 
-   handleReadyForQuery(con) {
 
-     if (this._canceledDueToError) {
 
-       return this.handleError(this._canceledDueToError, con)
 
-     }
 
-     if (this.callback) {
 
-       try {
 
-         this.callback(null, this._results)
 
-       }
 
-       catch(err) {
 
-         process.nextTick(() => {
 
-           throw err
 
-         })
 
-       }
 
-     }
 
-     this.emit('end', this._results)
 
-   }
 
-   submit(connection) {
 
-     if (typeof this.text !== 'string' && typeof this.name !== 'string') {
 
-       return new Error('A query must have either text or a name. Supplying neither is unsupported.')
 
-     }
 
-     const previous = connection.parsedStatements[this.name]
 
-     if (this.text && previous && this.text !== previous) {
 
-       return new Error(`Prepared statements must be unique - '${this.name}' was used for a different statement`)
 
-     }
 
-     if (this.values && !Array.isArray(this.values)) {
 
-       return new Error('Query values must be an array')
 
-     }
 
-     if (this.requiresPreparation()) {
 
-       this.prepare(connection)
 
-     } else {
 
-       connection.query(this.text)
 
-     }
 
-     return null
 
-   }
 
-   hasBeenParsed(connection) {
 
-     return this.name && connection.parsedStatements[this.name]
 
-   }
 
-   handlePortalSuspended(connection) {
 
-     this._getRows(connection, this.rows)
 
-   }
 
-   _getRows(connection, rows) {
 
-     connection.execute({
 
-       portal: this.portal,
 
-       rows: rows,
 
-     })
 
-     // if we're not reading pages of rows send the sync command
 
-     // to indicate the pipeline is finished
 
-     if (!rows) {
 
-       connection.sync()
 
-     } else {
 
-       // otherwise flush the call out to read more rows
 
-       connection.flush()
 
-     }
 
-   }
 
-   // http://developer.postgresql.org/pgdocs/postgres/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY
 
-   prepare(connection) {
 
-     // prepared statements need sync to be called after each command
 
-     // complete or when an error is encountered
 
-     this.isPreparedStatement = true
 
-     // TODO refactor this poor encapsulation
 
-     if (!this.hasBeenParsed(connection)) {
 
-       connection.parse({
 
-         text: this.text,
 
-         name: this.name,
 
-         types: this.types,
 
-       })
 
-     }
 
-     // because we're mapping user supplied values to
 
-     // postgres wire protocol compatible values it could
 
-     // throw an exception, so try/catch this section
 
-     try {
 
-       connection.bind({
 
-         portal: this.portal,
 
-         statement: this.name,
 
-         values: this.values,
 
-         binary: this.binary,
 
-         valueMapper: utils.prepareValue,
 
-       })
 
-     } catch (err) {
 
-       this.handleError(err, connection)
 
-       return
 
-     }
 
-     connection.describe({
 
-       type: 'P',
 
-       name: this.portal || '',
 
-     })
 
-     this._getRows(connection, this.rows)
 
-   }
 
-   handleCopyInResponse(connection) {
 
-     connection.sendCopyFail('No source stream defined')
 
-   }
 
-   // eslint-disable-next-line no-unused-vars
 
-   handleCopyData(msg, connection) {
 
-     // noop
 
-   }
 
- }
 
- module.exports = Query
 
 
  |