Browse Source

fix: reconnection

e22m4u 2 years ago
parent
commit
eb187a5a74
5 changed files with 15 additions and 411 deletions
  1. 13 201
      src/mongodb-adapter.js
  2. 1 151
      src/mongodb-adapter.spec.js
  3. 1 1
      src/utils/index.js
  4. 0 21
      src/utils/wait-async.js
  5. 0 37
      src/utils/wait-async.spec.js

+ 13 - 201
src/mongodb-adapter.js

@@ -1,12 +1,10 @@
 /* eslint no-unused-vars: 0 */
 import {ObjectId} from 'mongodb';
 import {MongoClient} from 'mongodb';
-import {EventEmitter} from 'events';
-import {waitAsync} from './utils/index.js';
+import {isIsoDate} from './utils/index.js';
 import {isObjectId} from './utils/index.js';
 import {Adapter} from '@e22m4u/js-repository';
 import {DataType} from '@e22m4u/js-repository';
-import {isIsoDate} from './utils/is-iso-date.js';
 import {capitalize} from '@e22m4u/js-repository';
 import {createMongodbUrl} from './utils/index.js';
 import {ServiceContainer} from '@e22m4u/js-service';
@@ -70,50 +68,14 @@ const MONGODB_OPTION_NAMES = [
   'zlibCompressionLevel',
 ];
 
-/**
- * Mongo client events.
- * 5.8.1
- *
- * @type {string[]}
- */
-const MONGO_CLIENT_EVENTS = [
-  'connectionPoolCreated',
-  'connectionPoolReady',
-  'connectionPoolCleared',
-  'connectionPoolClosed',
-  'connectionCreated',
-  'connectionReady',
-  'connectionClosed',
-  'connectionCheckOutStarted',
-  'connectionCheckOutFailed',
-  'connectionCheckedOut',
-  'connectionCheckedIn',
-  'commandStarted',
-  'commandSucceeded',
-  'commandFailed',
-  'serverOpening',
-  'serverClosed',
-  'serverDescriptionChanged',
-  'topologyOpening',
-  'topologyClosed',
-  'topologyDescriptionChanged',
-  'error',
-  'timeout',
-  'close',
-  'serverHeartbeatStarted',
-  'serverHeartbeatSucceeded',
-  'serverHeartbeatFailed',
-];
-
 /**
  * Default settings.
  *
- * @type {{connectTimeoutMS: number}}
+ * @type {object}
  */
 const DEFAULT_SETTINGS = {
-  reconnectInterval: 2000, // adapter specific option
-  connectTimeoutMS: 2000,
-  serverSelectionTimeoutMS: 2000,
+  //  connectTimeoutMS: 2500,
+  //  serverSelectionTimeoutMS: 2500,
 };
 
 /**
@@ -123,74 +85,27 @@ export class MongodbAdapter extends Adapter {
   /**
    * Mongodb instance.
    *
+   * @type {MongoClient}
    * @private
    */
   _client;
 
   /**
-   * Collections.
-   *
-   * @type {Map<any, any>}
-   * @private
-   */
-  _collections = new Map();
-
-  /**
-   * Connected.
+   * Client.
    *
-   * @type {boolean}
-   * @private
+   * @returns {MongoClient}
    */
-  _connected = false;
-
-  /**
-   * Connected.
-   *
-   * @return {boolean}
-   */
-  get connected() {
-    return this._connected;
+  get client() {
+    return this._client;
   }
 
   /**
-   * Connecting.
-   *
-   * @type {boolean}
-   * @private
-   */
-  _connecting = false;
-
-  /**
-   * Connecting.
-   *
-   * @return {boolean}
-   */
-  get connecting() {
-    return this._connecting;
-  }
-
-  /**
-   * Event emitter.
+   * Collections.
    *
+   * @type {Map<any, any>}
    * @private
    */
-  _emitter;
-
-  /**
-   * Event emitter.
-   *
-   * @returns {EventEmitter}
-   */
-  get emitter() {
-    if (this._emitter) return this._emitter;
-    this._emitter = new EventEmitter();
-    const emit = this._emitter.emit;
-    this._emitter.emit = function (name, ...args) {
-      emit.call(this, '*', name, ...args);
-      return emit.call(this, name, ...args);
-    };
-    return this._emitter;
-  }
+  _collections = new Map();
 
   /**
    * Constructor.
@@ -205,90 +120,9 @@ export class MongodbAdapter extends Adapter {
     settings.port = settings.port || 27017;
     settings.database = settings.database || settings.db || 'database';
     super(container, settings);
-  }
-
-  /**
-   * Connect.
-   *
-   * @return {Promise<*|undefined>}
-   * @private
-   */
-  async connect() {
-    if (this._connecting) {
-      await waitAsync(500);
-      return this.connect();
-    }
-
-    if (this._connected) return;
-    this._connecting = true;
-
     const options = selectObjectKeys(this.settings, MONGODB_OPTION_NAMES);
     const url = createMongodbUrl(this.settings);
-
-    // console.log(`Connecting to ${url}`);
-    if (this._client) {
-      this._client.removeAllListeners();
-      this._client.close(true);
-    }
     this._client = new MongoClient(url, options);
-    for (const event of MONGO_CLIENT_EVENTS) {
-      const listener = (...args) => this.emitter.emit(event, ...args);
-      this._client.on(event, listener);
-    }
-
-    const {reconnectInterval} = this.settings;
-    const connectFn = async () => {
-      if (this._connecting === false) return;
-      this.emitter.emit('connecting');
-      try {
-        await this._client.connect();
-      } catch (e) {
-        this.emitter.emit('error', e);
-        console.error(e);
-        // console.log('MongoDB connection failed!');
-        // console.log(`Reconnecting after ${reconnectInterval} ms.`);
-        await waitAsync(reconnectInterval);
-        return connectFn();
-      }
-      // console.log('MongoDB is connected.');
-      this._connected = true;
-      this._connecting = false;
-      reconnectOnClose();
-      this.emitter.emit('connected');
-    };
-
-    const reconnectOnClose = () =>
-      this._client.once('serverClosed', event => {
-        this.emitter.emit('disconnected', event);
-        if (this._connected) {
-          this._connected = false;
-          this._connecting = true;
-          // console.log('MongoDB lost connection!');
-          // console.log(event);
-          // console.log(`Reconnecting after ${reconnectInterval} ms.`);
-          setTimeout(() => connectFn(), reconnectInterval);
-        } else {
-          // console.log('MongoDB connection closed.');
-        }
-      });
-
-    return connectFn();
-  }
-
-  /**
-   * Disconnect.
-   *
-   * @return {Promise<undefined>}
-   */
-  async disconnect() {
-    this._connected = false;
-    this._connecting = false;
-    if (this._client) {
-      const client = this._client;
-      this._client = undefined;
-      await client.close();
-      client.removeAllListeners();
-    }
   }
 
   /**
@@ -326,19 +160,6 @@ export class MongodbAdapter extends Adapter {
     return value;
   }
 
-  /**
-   * Coerce iso date.
-   *
-   * @param value
-   * @return {*|Date}
-   * @private
-   */
-  _coerceIsoDate(value) {
-    if (value === null) return value;
-    if (isIsoDate(value)) return new Date(value);
-    return value;
-  }
-
   /**
    * To database.
    *
@@ -419,7 +240,7 @@ export class MongodbAdapter extends Adapter {
     if (collection) return collection;
     const tableName =
       this.getService(ModelDefinitionUtils).getTableNameByModelName(modelName);
-    collection = this._client.db(this.settings.database).collection(tableName);
+    collection = this.client.db(this.settings.database).collection(tableName);
     this._collections.set(modelName, collection);
     return collection;
   }
@@ -716,7 +537,6 @@ export class MongodbAdapter extends Adapter {
    * @return {Promise<object>}
    */
   async create(modelName, modelData, filter = undefined) {
-    await this.connect();
     const idPropName = this._getIdPropName(modelName);
     const idValue = modelData[idPropName];
     if (idValue == null) {
@@ -752,7 +572,6 @@ export class MongodbAdapter extends Adapter {
    * @return {Promise<object>}
    */
   async replaceById(modelName, id, modelData, filter = undefined) {
-    await this.connect();
     id = this._coerceId(id);
     const idPropName = this._getIdPropName(modelName);
     modelData[idPropName] = id;
@@ -779,7 +598,6 @@ export class MongodbAdapter extends Adapter {
    * @return {Promise<object>}
    */
   async patchById(modelName, id, modelData, filter = undefined) {
-    await this.connect();
     id = this._coerceId(id);
     const idPropName = this._getIdPropName(modelName);
     delete modelData[idPropName];
@@ -804,7 +622,6 @@ export class MongodbAdapter extends Adapter {
    * @return {Promise<object[]>}
    */
   async find(modelName, filter = undefined) {
-    await this.connect();
     filter = filter || {};
     const query = this._buildQuery(modelName, filter.where);
     const sort = this._buildSort(modelName, filter.order);
@@ -826,7 +643,6 @@ export class MongodbAdapter extends Adapter {
    * @return {Promise<object>}
    */
   async findById(modelName, id, filter = undefined) {
-    await this.connect();
     id = this._coerceId(id);
     const table = this._getCollection(modelName);
     const projection = this._buildProjection(
@@ -847,7 +663,6 @@ export class MongodbAdapter extends Adapter {
    * @return {Promise<number>}
    */
   async delete(modelName, where = undefined) {
-    await this.connect();
     const table = this._getCollection(modelName);
     const query = this._buildQuery(modelName, where);
     const {deletedCount} = await table.deleteMany(query);
@@ -862,7 +677,6 @@ export class MongodbAdapter extends Adapter {
    * @return {Promise<boolean>}
    */
   async deleteById(modelName, id) {
-    await this.connect();
     id = this._coerceId(id);
     const table = this._getCollection(modelName);
     const {deletedCount} = await table.deleteOne({_id: id});
@@ -877,7 +691,6 @@ export class MongodbAdapter extends Adapter {
    * @return {Promise<boolean>}
    */
   async exists(modelName, id) {
-    await this.connect();
     id = this._coerceId(id);
     const table = this._getCollection(modelName);
     const result = await table.findOne({_id: id}, {});
@@ -892,7 +705,6 @@ export class MongodbAdapter extends Adapter {
    * @return {Promise<number>}
    */
   async count(modelName, where = undefined) {
-    await this.connect();
     const query = this._buildQuery(modelName, where);
     const table = this._getCollection(modelName);
     return await table.count(query);

+ 1 - 151
src/mongodb-adapter.spec.js

@@ -1,17 +1,13 @@
-import net from 'net';
-import chai from 'chai';
 import {expect} from 'chai';
 import {ObjectId} from 'mongodb';
 import {MongoClient} from 'mongodb';
 import {format} from '@e22m4u/js-format';
-import {Service} from '@e22m4u/js-service';
 import {Schema} from '@e22m4u/js-repository';
 import {DataType} from '@e22m4u/js-repository';
 import {createMongodbUrl} from './utils/index.js';
 import {MongodbAdapter} from './mongodb-adapter.js';
 import {AdapterRegistry} from '@e22m4u/js-repository';
 import {DEFAULT_PRIMARY_KEY_PROPERTY_NAME as DEF_PK} from '@e22m4u/js-repository';
-const sandbox = chai.spy.sandbox();
 
 const CONFIG = {
   host: process.env.MONGODB_HOST || 'localhost',
@@ -35,162 +31,16 @@ describe('MongodbAdapter', function () {
   this.timeout(15000);
 
   afterEach(async function () {
-    sandbox.restore();
     await MDB_CLIENT.db(CONFIG.database).dropDatabase();
   });
 
   after(async function () {
     for await (const adapter of ADAPTERS_STACK) {
-      await adapter.disconnect();
+      await adapter.client.close(true);
     }
     await MDB_CLIENT.close(true);
   });
 
-  it('updates "connected" and "connecting" properties', async function () {
-    const S = new Service();
-    const events = [];
-    const adapter = new MongodbAdapter(S.container, CONFIG);
-    adapter.emitter.addListener('*', name => events.push(name));
-    expect(adapter.connected).to.be.false;
-    expect(adapter.connecting).to.be.false;
-    expect(events).to.be.empty;
-    const promise = adapter.connect();
-    expect(adapter.connected).to.be.false;
-    expect(adapter.connecting).to.be.true;
-    expect(events).to.include('serverOpening');
-    expect(events).to.not.include('connectionPoolReady');
-    expect(events).to.not.include('serverClosed');
-    await promise;
-    expect(adapter.connected).to.be.true;
-    expect(adapter.connecting).to.be.false;
-    expect(events).to.include('connectionPoolReady');
-    expect(events).to.not.include('serverClosed');
-    await adapter.disconnect();
-    expect(adapter.connected).to.be.false;
-    expect(adapter.connecting).to.be.false;
-    expect(events).to.include('serverClosed');
-  });
-
-  it('emits "connecting", "connected" and "disconnected" events', async function () {
-    const S = new Service();
-    const events = [];
-    const adapter = new MongodbAdapter(S.container, CONFIG);
-    adapter.emitter.addListener('*', name => events.push(name));
-    expect(adapter.connected).to.be.false;
-    expect(adapter.connecting).to.be.false;
-    expect(events).to.be.empty;
-    const promise = adapter.connect();
-    expect(adapter.connected).to.be.false;
-    expect(adapter.connecting).to.be.true;
-    expect(events).to.include('connecting');
-    expect(events).to.not.include('connected');
-    expect(events).to.not.include('disconnected');
-    await promise;
-    expect(adapter.connected).to.be.true;
-    expect(adapter.connecting).to.be.false;
-    expect(events).to.include('connected');
-    expect(events).to.not.include('disconnected');
-    await adapter.disconnect();
-    expect(adapter.connected).to.be.false;
-    expect(adapter.connecting).to.be.false;
-    expect(events).to.include('disconnected');
-  });
-
-  it('reconnects on server selection error', function (done) {
-    const S = new Service();
-    const server = net.createServer();
-    let startupCounter = 0;
-    server.listen(0, 'localhost', 2, () => {
-      startupCounter++;
-      expect(startupCounter).to.be.eq(1);
-      const {address, port} = server.address();
-      const attemptsLimit = 3;
-      const serverSelectionTimeoutMS = 50;
-      const adapter = new MongodbAdapter(S.container, {
-        port,
-        host: address,
-        reconnectInterval: 0,
-        serverSelectionTimeoutMS,
-      });
-      let attempts = 0;
-      const startTime = new Date();
-      adapter.emitter.addListener('connecting', () => {
-        ++attempts;
-        if (attempts !== attemptsLimit) return;
-        const duration = new Date() - startTime;
-        const accuracy = 10;
-        server.close();
-        adapter.disconnect();
-        expect(adapter.connect).to.have.been.called.once;
-        const attemptMs = duration / (attemptsLimit - 1);
-        expect(attemptMs).to.be.gte(serverSelectionTimeoutMS - accuracy);
-        expect(attemptMs).to.be.lte(serverSelectionTimeoutMS + accuracy);
-        done();
-      });
-      adapter.emitter.addListener('error', error => {
-        expect(error.message).to.be.eq(
-          'Server selection timed out after 50 ms',
-        );
-      });
-      sandbox.on(adapter, 'connect');
-      adapter.connect();
-    });
-  });
-
-  it('reconnects on implicit disconnect', function (done) {
-    const S = new Service();
-    const reconnectsLimit = 2;
-    const reconnectInterval = 50;
-    const adapter = new MongodbAdapter(S.container, {
-      ...CONFIG,
-      reconnectInterval,
-    });
-    let startTime;
-    let connects = 0;
-    let reconnects = 0;
-    adapter.emitter.on('connected', () => {
-      ++connects;
-      if (connects === 1) {
-        adapter._client.close();
-        return;
-      }
-      ++reconnects;
-      if (startTime == null) startTime = new Date();
-      if (reconnects < reconnectsLimit) {
-        adapter._client.close();
-        return;
-      }
-      const duration = new Date() - startTime;
-      const accuracy = 10;
-      adapter.disconnect();
-      expect(adapter.connect).to.have.been.called.once;
-      const attemptMs = duration / (reconnectsLimit - 1);
-      expect(attemptMs).to.be.gt(reconnectInterval - accuracy);
-      expect(attemptMs).to.be.lt(reconnectInterval + accuracy);
-      expect(connects).to.be.eq(reconnectsLimit + 1);
-      done();
-    });
-    sandbox.on(adapter, 'connect');
-    adapter.connect();
-  });
-
-  it('does not reconnect on explicit disconnect', function (done) {
-    const S = new Service();
-    const reconnectInterval = 0;
-    const adapter = new MongodbAdapter(S.container, {
-      ...CONFIG,
-      reconnectInterval,
-    });
-    adapter.emitter.once('connected', () => {
-      adapter.emitter.once('connecting', () => {
-        throw new Error('Unexpected reconnection');
-      });
-      adapter.emitter.once('disconnected', () => setTimeout(() => done(), 50));
-      adapter.disconnect();
-    });
-    adapter.connect();
-  });
-
   describe('create', function () {
     it('generates a new identifier when a value of a primary key is not provided', async function () {
       const schema = createSchema();

+ 1 - 1
src/utils/index.js

@@ -1,4 +1,4 @@
-export * from './wait-async.js';
+export * from './is-iso-date.js';
 export * from './is-object-id.js';
 export * from './create-mongodb-url.js';
 export * from './transform-values-deep.js';

+ 0 - 21
src/utils/wait-async.js

@@ -1,21 +0,0 @@
-import {InvalidArgumentError} from '@e22m4u/js-repository';
-
-/**
- * Wait.
- *
- * @example
- * ```ts
- * await waitAsync(1000); // 1sec
- * ```
- *
- * @param {number} ms Milliseconds
- * @returns {Promise<undefined>}
- */
-export function waitAsync(ms) {
-  if (typeof ms !== 'number')
-    throw new InvalidArgumentError(
-      'The first argument of "waitAsync" must be a Number, but %v given.',
-      ms,
-    );
-  return new Promise(r => setTimeout(() => r(), ms));
-}

+ 0 - 37
src/utils/wait-async.spec.js

@@ -1,37 +0,0 @@
-import {expect} from 'chai';
-import {format} from '@e22m4u/js-format';
-import {waitAsync} from './wait-async.js';
-
-describe('wait', function () {
-  it('requires the first argument as a number', function () {
-    const throwable = v => () => waitAsync(v);
-    const error = v =>
-      format(
-        'The first argument of "waitAsync" must be a Number, but %s given.',
-        v,
-      );
-    expect(throwable('string')).to.throw(error('"string"'));
-    expect(throwable('')).to.throw(error('""'));
-    expect(throwable(true)).to.throw(error('true'));
-    expect(throwable(false)).to.throw(error('false'));
-    expect(throwable([])).to.throw(error('Array'));
-    expect(throwable({})).to.throw(error('Object'));
-    expect(throwable(undefined)).to.throw(error('undefined'));
-    expect(throwable(null)).to.throw(error('null'));
-    throwable(10)();
-    throwable(0)();
-  });
-
-  it('returns a promise that resolves after given milliseconds', async function () {
-    const startTime = new Date();
-    const delayMs = 15;
-    const accuracyMs = 5;
-    const promise = waitAsync(delayMs);
-    expect(promise).to.be.instanceof(Promise);
-    await promise;
-    const endTime = new Date();
-    const duration = endTime - startTime;
-    expect(duration).to.be.gte(delayMs - accuracyMs);
-    expect(duration).to.be.lte(delayMs + accuracyMs);
-  });
-});