| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 | 'use strict'var net = require('net')var EventEmitter = require('events').EventEmitterconst { parse, serialize } = require('pg-protocol')const { getStream, getSecureStream } = require('./stream')const flushBuffer = serialize.flush()const syncBuffer = serialize.sync()const endBuffer = serialize.end()// TODO(bmc) support binary mode at some pointclass Connection extends EventEmitter {  constructor(config) {    super()    config = config || {}    this.stream = config.stream || getStream(config.ssl)    if (typeof this.stream === 'function') {      this.stream = this.stream(config)    }    this._keepAlive = config.keepAlive    this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis    this.lastBuffer = false    this.parsedStatements = {}    this.ssl = config.ssl || false    this._ending = false    this._emitMessage = false    var self = this    this.on('newListener', function (eventName) {      if (eventName === 'message') {        self._emitMessage = true      }    })  }  connect(port, host) {    var self = this    this._connecting = true    this.stream.setNoDelay(true)    this.stream.connect(port, host)    this.stream.once('connect', function () {      if (self._keepAlive) {        self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)      }      self.emit('connect')    })    const reportStreamError = function (error) {      // errors about disconnections should be ignored during disconnect      if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {        return      }      self.emit('error', error)    }    this.stream.on('error', reportStreamError)    this.stream.on('close', function () {      self.emit('end')    })    if (!this.ssl) {      return this.attachListeners(this.stream)    }    this.stream.once('data', function (buffer) {      var responseCode = buffer.toString('utf8')      switch (responseCode) {        case 'S': // Server supports SSL connections, continue with a secure connection          break        case 'N': // Server does not support SSL connections          self.stream.end()          return self.emit('error', new Error('The server does not support SSL connections'))        default:          // Any other response byte, including 'E' (ErrorResponse) indicating a server error          self.stream.end()          return self.emit('error', new Error('There was an error establishing an SSL connection'))      }      const options = {        socket: self.stream,      }      if (self.ssl !== true) {        Object.assign(options, self.ssl)        if ('key' in self.ssl) {          options.key = self.ssl.key        }      }      var net = require('net')      if (net.isIP && net.isIP(host) === 0) {        options.servername = host      }      try {        self.stream = getSecureStream(options)      } catch (err) {        return self.emit('error', err)      }      self.attachListeners(self.stream)      self.stream.on('error', reportStreamError)      self.emit('sslconnect')    })  }  attachListeners(stream) {    parse(stream, (msg) => {      var eventName = msg.name === 'error' ? 'errorMessage' : msg.name      if (this._emitMessage) {        this.emit('message', msg)      }      this.emit(eventName, msg)    })  }  requestSsl() {    this.stream.write(serialize.requestSsl())  }  startup(config) {    this.stream.write(serialize.startup(config))  }  cancel(processID, secretKey) {    this._send(serialize.cancel(processID, secretKey))  }  password(password) {    this._send(serialize.password(password))  }  sendSASLInitialResponseMessage(mechanism, initialResponse) {    this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse))  }  sendSCRAMClientFinalMessage(additionalData) {    this._send(serialize.sendSCRAMClientFinalMessage(additionalData))  }  _send(buffer) {    if (!this.stream.writable) {      return false    }    return this.stream.write(buffer)  }  query(text) {    this._send(serialize.query(text))  }  // send parse message  parse(query) {    this._send(serialize.parse(query))  }  // send bind message  bind(config) {    this._send(serialize.bind(config))  }  // send execute message  execute(config) {    this._send(serialize.execute(config))  }  flush() {    if (this.stream.writable) {      this.stream.write(flushBuffer)    }  }  sync() {    this._ending = true    this._send(syncBuffer)  }  ref() {    this.stream.ref()  }  unref() {    this.stream.unref()  }  end() {    // 0x58 = 'X'    this._ending = true    if (!this._connecting || !this.stream.writable) {      this.stream.end()      return    }    return this.stream.write(endBuffer, () => {      this.stream.end()    })  }  close(msg) {    this._send(serialize.close(msg))  }  describe(msg) {    this._send(serialize.describe(msg))  }  sendCopyFromChunk(chunk) {    this._send(serialize.copyData(chunk))  }  endCopyFrom() {    this._send(serialize.copyDone())  }  sendCopyFail(msg) {    this._send(serialize.copyFail(msg))  }}module.exports = Connection
 |