Implement basic functionalities for workflow control.
- Manage join/leave of clients
- All clients communicate via socket.io
- Probes emit events
- Managers register workflows (by using a workflow essence)
- Send kickstart request to Managers to launch workflows
- Route events to workflow runs
- Queue events to not lose events between workflow tasks
- Fixed some issues found while working on testcases
- Set to perform coverage and unittest and generate outputs to files
Change-Id: I678723edc20df9247d63a4bf6380785ab8b2b221
diff --git a/spec/eventrouter.spec.js b/spec/eventrouter.spec.js
new file mode 100644
index 0000000..6184f58
--- /dev/null
+++ b/spec/eventrouter.spec.js
@@ -0,0 +1,353 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const path = require('path');
+ const chai = require('chai');
+ const expect = chai.expect;
+ const sinonChai = require('sinon-chai');
+ chai.use(sinonChai);
+ const io = require('socket.io-client');
+ const async = require('async');
+ const _ = require('lodash');
+ const server = require('../src/server.js');
+ const port = 4000;
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ const essenceLoader = require('../src/workflows/loader.js');
+ const essenceFileName = path.join(__dirname, 'test_multi_workflow_essence.json');
+
+ var probeClient;
+ var workflowManagerClients = [];
+ var workflowRunClients = [];
+ var workflowIds = [];
+ var workflowRunInfos = [];
+
+ var receivedKickstartMessages = [[],[]];
+
+ describe('Workflow kickstart test', function() {
+
+ before(function() {
+ // Start our server
+ server.start(port);
+ });
+
+ after(function() {
+ server.stop();
+ });
+
+ beforeEach(function(done) {
+ async.series([
+ (callback) => {
+ // connect a probe to the server
+ // to send events for test
+ probeClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=probe_id&type=probe' +
+ '&name=probe@xos.org'
+ });
+
+ probeClient.on('connect', () => {
+ callback(null, true);
+ });
+ return;
+ },
+ (callback) => {
+ // connect first workflow manager to the server
+ // this manager will kickstart a workflow
+ let workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_manager_id1&type=workflow_manager' +
+ '&name=manager1@xos.org'
+ });
+
+ workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ // save it for check
+ receivedKickstartMessages[0].push(message);
+
+ // save workflow_id and workflow_run_id
+ workflowRunInfos.push({
+ workflowId: message.workflow_id,
+ workflowRunId: message.workflow_run_id
+ });
+
+ setTimeout(() => {
+ // call-back
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_KICKSTART, {
+ workflow_id: message.workflow_id,
+ workflow_run_id: message.workflow_run_id
+ })
+ }, 2000);
+ });
+
+ workflowManagerClient.on('connect', () => {
+ callback(null, true);
+ });
+
+ workflowManagerClients.push(workflowManagerClient);
+ return;
+ },
+ (callback) => {
+ // connect second workflow manager to the server
+ // this manager will not kickstart a workflow
+ let workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_manager_id2&type=workflow_manager' +
+ '&name=manager2@xos.org'
+ });
+
+ workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ receivedKickstartMessages[1].push(message);
+ });
+
+ workflowManagerClient.on('connect', () => {
+ callback(null, true);
+ });
+
+ workflowManagerClients.push(workflowManagerClient);
+ return;
+ },
+ (callback) => {
+ let essence = essenceLoader.loadEssence(essenceFileName, true);
+
+ // register the workflow
+ workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, essence);
+
+ _.forOwn(essence, (_value, workflowId) => {
+ // save
+ workflowIds.push(workflowId);
+ });
+
+ // handle return
+ workflowManagerClients[0].on(
+ eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE,
+ (workflowRegResult) => {
+ callback(null, workflowRegResult);
+ }
+ );
+ return;
+ }
+ ],
+ function(err, results) {
+ // we do not actually check results;
+ if(results.includes(false)) {
+ done.fail(err);
+ }
+ else {
+ done();
+ }
+ });
+ return;
+ });
+
+ afterEach(function() {
+ // remove workflow runs
+ _.forOwn(workflowRunInfos, (workflowRunInfo) => {
+ workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_RUN_REMOVE, {
+ workflow_id: workflowRunInfo.workflowId,
+ workflow_run_id: workflowRunInfo.workflowRunId
+ });
+ });
+ workflowRunInfos.length = 0;
+
+ // remove workflows
+ _.forOwn(workflowIds, (workflowId) => {
+ workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE, workflowId);
+ });
+ workflowIds.length = 0;
+
+ // remove message store
+ receivedKickstartMessages.forEach((receivedKickstartMessageStore) => {
+ receivedKickstartMessageStore.length = 0;
+ });
+
+ // disconnect clients
+ workflowManagerClients.forEach((workflowManagerClient) => {
+ if(workflowManagerClient.connected) {
+ workflowManagerClient.disconnect();
+ }
+ });
+ workflowManagerClients.length = 0;
+
+ workflowRunClients.forEach((workflowRunClient) => {
+ if(workflowRunClient.connected) {
+ workflowRunClient.disconnect();
+ }
+ });
+ workflowRunClients.length = 0;
+
+ if(probeClient.connected) {
+ probeClient.disconnect();
+ }
+ probeClient = null;
+ });
+
+ it('should have two workflows', function(done) {
+ workflowManagerClients[0].on(eventrouter.serviceEvents.WORKFLOW_LIST, (result) => {
+ let workflowsList = result.result;
+ expect(workflowsList.length).to.equal(2);
+ workflowsList.forEach((workflowIdInList) => {
+ expect(workflowIds).to.includes(workflowIdInList);
+ });
+ done();
+ });
+
+ workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_LIST, {});
+ });
+
+ it('all managers should receive kickstart messages', function(done) {
+ // kickstart the workflow
+ probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ setTimeout(() => {
+ expect(receivedKickstartMessages.length, 'num of message stores').to.equal(2);
+ receivedKickstartMessages.forEach((receivedKickstartMessageStore) => {
+ expect(receivedKickstartMessageStore.length, 'num of messages in a store').to.equal(1);
+ });
+ done();
+ }, 500);
+ });
+
+ it('should have only one workflow run', function(done) {
+ this.timeout(5000);
+
+ // kickstart the workflow
+ probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ setTimeout(() => {
+ // kickstart will take 2 seconds roughly
+ expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
+ // the workflow must be 'should_be_called'
+ expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
+ done();
+ }, 3000);
+ });
+
+ it('should be able to read an event that is used for workflow kickstart', function(done) {
+ this.timeout(5000);
+
+ // kickstart the workflow
+ probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ setTimeout(() => {
+ // kickstart will take 2 seconds roughly
+ expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
+ // the workflow must be 'should_be_called'
+ expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
+
+ // connect a workflow run client to the server
+ let workflowRunClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_run_id1&type=workflow_run' +
+ `&workflow_id=${workflowRunInfos[0].workflowId}` +
+ `&workflow_run_id=${workflowRunInfos[0].workflowRunId}` +
+ '&name=run1@xos.org'
+ });
+ workflowRunClients.push(workflowRunClient);
+
+ workflowRunClient.on('connect', () => {
+ workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, {
+ workflow_id: workflowRunInfos[0].workflowId,
+ workflow_run_id: workflowRunInfos[0].workflowRunId,
+ task_id: 'onu_event_handler',
+ topic: 'onu.events'
+ });
+ });
+
+ workflowRunClient.on(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, (result) => {
+ let event = result.result;
+ expect(event.topic).to.equal('onu.events');
+ expect(event.message.serialNumber).to.equal('testSerialXXX');
+ done();
+ });
+ }, 3000);
+ });
+
+ /*
+ it('should store user details for a new connection', () => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+
+ const probe = eventrouter.getClients()['probe_id'];
+ expect(probe.getParams().name).to.equal('probe@xos.org');
+
+ const manager = eventrouter.getClients()['workflow_manager_id'];
+ expect(manager.getParams().name).to.equal('manager@xos.org');
+
+ const run = eventrouter.getClients()['workflow_run_id'];
+ expect(run.getParams().name).to.equal('run@xos.org');
+ });
+
+ it('should not store the same user twice', (done) => {
+ // This test case makes cleaning up process taking long time because it leaves
+ // a client socket. It seems there's no way to release it from server-side.
+
+ // connect a client to the server
+ const client2 = io.connect(`http://localhost:${port}`, {
+ query: 'id=probe_id&type=probe' +
+ '&name=probe@xos.org&value=different_value'
+ });
+
+ // when is connected start testing
+ client2.on('connect', () => {
+ setTimeout(() => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(3);
+
+ done();
+ }, 100);
+ });
+ });
+
+ it('should remove a user on disconnect', (done) => {
+ workflowManagerClient.disconnect();
+ workflowRunClient.disconnect();
+ probeClient.disconnect();
+
+ // we need to wait for the event to be dispatched
+ setTimeout(() => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(0);
+ done();
+ }, 100);
+ });
+ */
+ });
+})();
\ No newline at end of file