Browse Source

chore: adds reconnection tests

e22m4u 2 years ago
parent
commit
61c0cb64bf
2 changed files with 230 additions and 21 deletions
  1. 96 20
      src/mongodb-adapter.js
  2. 134 1
      src/mongodb-adapter.spec.js

+ 96 - 20
src/mongodb-adapter.js

@@ -1,6 +1,7 @@
 /* eslint no-unused-vars: 0 */
 import {ObjectId} from 'mongodb';
 import {MongoClient} from 'mongodb';
+import {EventEmitter} from 'events';
 import {waitAsync} from './utils/index.js';
 import {isObjectId} from './utils/index.js';
 import {Adapter} from '@e22m4u/js-repository';
@@ -69,6 +70,41 @@ const MONGODB_OPTION_NAMES = [
   'zlibCompressionLevel',
 ];
 
+/**
+ * Mongo client events.
+ * 5.8.1
+ *
+ * @type {string[]}
+ */
+const MONGO_CLIENT_EVENTS = [
+  'connectionPoolCreated',
+  'connectionPoolReady',
+  'connectionPoolCleared',
+  'connectionPoolClosed',
+  'connectionCreated',
+  'connectionReady',
+  'connectionClosed',
+  'connectionCheckOutStarted',
+  'connectionCheckOutFailed',
+  'connectionCheckedOut',
+  'connectionCheckedIn',
+  'commandStarted',
+  'commandSucceeded',
+  'commandFailed',
+  'serverOpening',
+  'serverClosed',
+  'serverDescriptionChanged',
+  'topologyOpening',
+  'topologyClosed',
+  'topologyDescriptionChanged',
+  'error',
+  'timeout',
+  'close',
+  'serverHeartbeatStarted',
+  'serverHeartbeatSucceeded',
+  'serverHeartbeatFailed',
+];
+
 /**
  * Default settings.
  *
@@ -133,6 +169,29 @@ export class MongodbAdapter extends Adapter {
     return this._connecting;
   }
 
+  /**
+   * Event emitter.
+   *
+   * @private
+   */
+  _emitter;
+
+  /**
+   * Event emitter.
+   *
+   * @returns {EventEmitter}
+   */
+  get emitter() {
+    if (this._emitter) return this._emitter;
+    this._emitter = new EventEmitter();
+    const emit = this._emitter.emit;
+    this._emitter.emit = function (name, ...args) {
+      emit.call(this, '*', name, ...args);
+      return emit.call(this, name, ...args);
+    };
+    return this._emitter;
+  }
+
   /**
    * Constructor.
    *
@@ -167,52 +226,69 @@ export class MongodbAdapter extends Adapter {
     const url = createMongodbUrl(this.settings);
 
     // console.log(`Connecting to ${url}`);
+    if (this._client) {
+      this._client.removeAllListeners();
+      this._client.close(true);
+    }
     this._client = new MongoClient(url, options);
+    for (const event of MONGO_CLIENT_EVENTS) {
+      const listener = (...args) => this.emitter.emit(event, ...args);
+      this._client.on(event, listener);
+    }
 
     const {reconnectInterval} = this.settings;
     const connectFn = async () => {
+      if (this._connecting === false) return;
+      this.emitter.emit('connecting');
       try {
         await this._client.connect();
       } catch (e) {
+        this.emitter.emit('error', e);
         console.error(e);
         // console.log('MongoDB connection failed!');
         // console.log(`Reconnecting after ${reconnectInterval} ms.`);
-        await new Promise(r => setTimeout(() => r(), reconnectInterval));
+        await waitAsync(reconnectInterval);
         return connectFn();
       }
       // console.log('MongoDB is connected.');
       this._connected = true;
       this._connecting = false;
+      reconnectOnClose();
+      this.emitter.emit('connected');
     };
 
-    await connectFn();
+    const reconnectOnClose = () =>
+      this._client.once('serverClosed', event => {
+        this.emitter.emit('disconnected', event);
+        if (this._connected) {
+          this._connected = false;
+          this._connecting = true;
+          // console.log('MongoDB lost connection!');
+          // console.log(event);
+          // console.log(`Reconnecting after ${reconnectInterval} ms.`);
+          setTimeout(() => connectFn(), reconnectInterval);
+        } else {
+          // console.log('MongoDB connection closed.');
+        }
+      });
 
-    this._client.once('serverClosed', event => {
-      if (this._connected) {
-        this._connected = false;
-        // console.log('MongoDB lost connection!');
-        console.log(event);
-        // console.log(`Reconnecting after ${reconnectInterval} ms.`);
-        setTimeout(() => connectFn(), reconnectInterval);
-      } else {
-        // console.log('MongoDB connection closed.');
-      }
-    });
+    return connectFn();
   }
 
   /**
    * Disconnect.
    *
-   * @return {Promise<*|undefined>}
+   * @return {Promise<undefined>}
    */
   async disconnect() {
-    if (this._connecting) {
-      await waitAsync(500);
-      return this.disconnect();
-    }
-    if (!this._connected) return;
     this._connected = false;
-    if (this._client) await this._client.close();
+    this._connecting = false;
+    if (this._client) {
+      const client = this._client;
+      this._client = undefined;
+      await client.close();
+      client.removeAllListeners();
+    }
   }
 
   /**

+ 134 - 1
src/mongodb-adapter.spec.js

@@ -1,3 +1,5 @@
+import net from 'net';
+import chai from 'chai';
 import {expect} from 'chai';
 import {ObjectId} from 'mongodb';
 import {MongoClient} from 'mongodb';
@@ -9,6 +11,7 @@ import {createMongodbUrl} from './utils/index.js';
 import {MongodbAdapter} from './mongodb-adapter.js';
 import {AdapterRegistry} from '@e22m4u/js-repository';
 import {DEFAULT_PRIMARY_KEY_PROPERTY_NAME as DEF_PK} from '@e22m4u/js-repository';
+const sandbox = chai.spy.sandbox();
 
 const CONFIG = {
   host: process.env.MONGODB_HOST || 'localhost',
@@ -32,6 +35,7 @@ describe('MongodbAdapter', function () {
   this.timeout(15000);
 
   afterEach(async function () {
+    sandbox.restore();
     await MDB_CLIENT.db(CONFIG.database).dropDatabase();
   });
 
@@ -42,20 +46,149 @@ describe('MongodbAdapter', function () {
     await MDB_CLIENT.close(true);
   });
 
-  it('sets the "connected" and "connecting" statuses', async function () {
+  it('updates "connected" and "connecting" properties', async function () {
     const S = new Service();
+    const events = [];
     const adapter = new MongodbAdapter(S.container, CONFIG);
+    adapter.emitter.addListener('*', name => events.push(name));
     expect(adapter.connected).to.be.false;
     expect(adapter.connecting).to.be.false;
+    expect(events).to.be.empty;
     const promise = adapter.connect();
     expect(adapter.connected).to.be.false;
     expect(adapter.connecting).to.be.true;
+    expect(events).to.include('serverOpening');
+    expect(events).to.not.include('connectionPoolReady');
+    expect(events).to.not.include('serverClosed');
     await promise;
     expect(adapter.connected).to.be.true;
     expect(adapter.connecting).to.be.false;
+    expect(events).to.include('connectionPoolReady');
+    expect(events).to.not.include('serverClosed');
     await adapter.disconnect();
     expect(adapter.connected).to.be.false;
     expect(adapter.connecting).to.be.false;
+    expect(events).to.include('serverClosed');
+  });
+
+  it('emits "connecting", "connected" and "disconnected" events', async function () {
+    const S = new Service();
+    const events = [];
+    const adapter = new MongodbAdapter(S.container, CONFIG);
+    adapter.emitter.addListener('*', name => events.push(name));
+    expect(adapter.connected).to.be.false;
+    expect(adapter.connecting).to.be.false;
+    expect(events).to.be.empty;
+    const promise = adapter.connect();
+    expect(adapter.connected).to.be.false;
+    expect(adapter.connecting).to.be.true;
+    expect(events).to.include('connecting');
+    expect(events).to.not.include('connected');
+    expect(events).to.not.include('disconnected');
+    await promise;
+    expect(adapter.connected).to.be.true;
+    expect(adapter.connecting).to.be.false;
+    expect(events).to.include('connected');
+    expect(events).to.not.include('disconnected');
+    await adapter.disconnect();
+    expect(adapter.connected).to.be.false;
+    expect(adapter.connecting).to.be.false;
+    expect(events).to.include('disconnected');
+  });
+
+  it('reconnects on server selection error', function (done) {
+    const S = new Service();
+    const server = net.createServer();
+    let startupCounter = 0;
+    server.listen(0, 'localhost', 2, () => {
+      startupCounter++;
+      expect(startupCounter).to.be.eq(1);
+      const {address, port} = server.address();
+      const attemptsLimit = 3;
+      const serverSelectionTimeoutMS = 50;
+      const adapter = new MongodbAdapter(S.container, {
+        port,
+        host: address,
+        reconnectInterval: 0,
+        serverSelectionTimeoutMS,
+      });
+      let attempts = 0;
+      const startTime = new Date();
+      adapter.emitter.addListener('connecting', () => {
+        ++attempts;
+        if (attempts !== attemptsLimit) return;
+        const duration = new Date() - startTime;
+        const accuracy = 10;
+        server.close();
+        adapter.disconnect();
+        expect(adapter.connect).to.have.been.called.once;
+        const attemptMs = duration / (attemptsLimit - 1);
+        expect(attemptMs).to.be.gte(serverSelectionTimeoutMS - accuracy);
+        expect(attemptMs).to.be.lte(serverSelectionTimeoutMS + accuracy);
+        done();
+      });
+      adapter.emitter.addListener('error', error => {
+        expect(error.message).to.be.eq(
+          'Server selection timed out after 50 ms',
+        );
+      });
+      sandbox.on(adapter, 'connect');
+      adapter.connect();
+    });
+  });
+
+  it('reconnects on implicit disconnect', function (done) {
+    const S = new Service();
+    const reconnectsLimit = 2;
+    const reconnectInterval = 50;
+    const adapter = new MongodbAdapter(S.container, {
+      ...CONFIG,
+      reconnectInterval,
+    });
+    let startTime;
+    let connects = 0;
+    let reconnects = 0;
+    adapter.emitter.on('connected', () => {
+      ++connects;
+      if (connects === 1) {
+        adapter._client.close();
+        return;
+      }
+      ++reconnects;
+      if (startTime == null) startTime = new Date();
+      if (reconnects < reconnectsLimit) {
+        adapter._client.close();
+        return;
+      }
+      const duration = new Date() - startTime;
+      const accuracy = 10;
+      adapter.disconnect();
+      expect(adapter.connect).to.have.been.called.once;
+      const attemptMs = duration / (reconnectsLimit - 1);
+      expect(attemptMs).to.be.gt(reconnectInterval - accuracy);
+      expect(attemptMs).to.be.lt(reconnectInterval + accuracy);
+      expect(connects).to.be.eq(reconnectsLimit + 1);
+      done();
+    });
+    sandbox.on(adapter, 'connect');
+    adapter.connect();
+  });
+
+  it('does not reconnect on explicit disconnect', function (done) {
+    const S = new Service();
+    const reconnectInterval = 0;
+    const adapter = new MongodbAdapter(S.container, {
+      ...CONFIG,
+      reconnectInterval,
+    });
+    adapter.emitter.once('connected', () => {
+      adapter.emitter.once('connecting', () => {
+        throw new Error('Unexpected reconnection');
+      });
+      adapter.emitter.once('disconnected', () => setTimeout(() => done(), 50));
+      adapter.disconnect();
+    });
+    adapter.connect();
   });
 
   describe('create', function () {