operation.ts 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. import { promisify } from 'util';
  2. import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '../bson';
  3. import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
  4. import type { Server } from '../sdam/server';
  5. import type { ClientSession } from '../sessions';
  6. import type { Callback, MongoDBNamespace } from '../utils';
  7. export const Aspect = {
  8. READ_OPERATION: Symbol('READ_OPERATION'),
  9. WRITE_OPERATION: Symbol('WRITE_OPERATION'),
  10. RETRYABLE: Symbol('RETRYABLE'),
  11. EXPLAINABLE: Symbol('EXPLAINABLE'),
  12. SKIP_COLLATION: Symbol('SKIP_COLLATION'),
  13. CURSOR_CREATING: Symbol('CURSOR_CREATING'),
  14. MUST_SELECT_SAME_SERVER: Symbol('MUST_SELECT_SAME_SERVER')
  15. } as const;
  16. /** @public */
  17. export type Hint = string | Document;
  18. export interface OperationConstructor extends Function {
  19. aspects?: Set<symbol>;
  20. }
  21. /** @public */
  22. export interface OperationOptions extends BSONSerializeOptions {
  23. /** Specify ClientSession for this command */
  24. session?: ClientSession;
  25. willRetryWrite?: boolean;
  26. /** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */
  27. readPreference?: ReadPreferenceLike;
  28. /** @internal Hints to `executeOperation` that this operation should not unpin on an ended transaction */
  29. bypassPinningCheck?: boolean;
  30. omitReadPreference?: boolean;
  31. }
  32. /** @internal */
  33. const kSession = Symbol('session');
  34. /**
  35. * This class acts as a parent class for any operation and is responsible for setting this.options,
  36. * as well as setting and getting a session.
  37. * Additionally, this class implements `hasAspect`, which determines whether an operation has
  38. * a specific aspect.
  39. * @internal
  40. */
  41. export abstract class AbstractOperation<TResult = any> {
  42. ns!: MongoDBNamespace;
  43. cmd!: Document;
  44. readPreference: ReadPreference;
  45. server!: Server;
  46. bypassPinningCheck: boolean;
  47. trySecondaryWrite: boolean;
  48. // BSON serialization options
  49. bsonOptions?: BSONSerializeOptions;
  50. options: OperationOptions;
  51. [kSession]: ClientSession | undefined;
  52. constructor(options: OperationOptions = {}) {
  53. this.readPreference = this.hasAspect(Aspect.WRITE_OPERATION)
  54. ? ReadPreference.primary
  55. : ReadPreference.fromOptions(options) ?? ReadPreference.primary;
  56. // Pull the BSON serialize options from the already-resolved options
  57. this.bsonOptions = resolveBSONOptions(options);
  58. this[kSession] = options.session != null ? options.session : undefined;
  59. this.options = options;
  60. this.bypassPinningCheck = !!options.bypassPinningCheck;
  61. this.trySecondaryWrite = false;
  62. }
  63. abstract execute(server: Server, session: ClientSession | undefined): Promise<TResult>;
  64. hasAspect(aspect: symbol): boolean {
  65. const ctor = this.constructor as OperationConstructor;
  66. if (ctor.aspects == null) {
  67. return false;
  68. }
  69. return ctor.aspects.has(aspect);
  70. }
  71. get session(): ClientSession | undefined {
  72. return this[kSession];
  73. }
  74. clearSession() {
  75. this[kSession] = undefined;
  76. }
  77. get canRetryRead(): boolean {
  78. return true;
  79. }
  80. get canRetryWrite(): boolean {
  81. return true;
  82. }
  83. }
  84. /** @internal */
  85. export abstract class AbstractCallbackOperation<TResult = any> extends AbstractOperation {
  86. constructor(options: OperationOptions = {}) {
  87. super(options);
  88. }
  89. execute(server: Server, session: ClientSession | undefined): Promise<TResult> {
  90. return promisify((callback: (e: Error, r: TResult) => void) => {
  91. this.executeCallback(server, session, callback as any);
  92. })();
  93. }
  94. protected abstract executeCallback(
  95. server: Server,
  96. session: ClientSession | undefined,
  97. callback: Callback<TResult>
  98. ): void;
  99. }
  100. export function defineAspects(
  101. operation: OperationConstructor,
  102. aspects: symbol | symbol[] | Set<symbol>
  103. ): Set<symbol> {
  104. if (!Array.isArray(aspects) && !(aspects instanceof Set)) {
  105. aspects = [aspects];
  106. }
  107. aspects = new Set(aspects);
  108. Object.defineProperty(operation, 'aspects', {
  109. value: aspects,
  110. writable: false
  111. });
  112. return aspects;
  113. }