Tuesday, 25 July 2017

Build a real-time data application using NodeJs, Socket.io

In older days real-time data updates was achieved by having a timer to refresh a page or sending an Ajax request to the server to get the new data. Sometimes it makes requests unnecessarily to server even though no data updates. This concept is called polling.
Alternatively, Real-time data update is when data is updated on server, clients get notified for a refresh or data will be pushed to clients from server. It avoids unnecessary hits to the server.

In this section, we are going to build a very simple app which will have real-time data update capability.

Requirements
Nodejs
AngularJs

Download and install NodeJs. if it is already installed, then we can go for next step.
After nodejs is installed, open command prompt and create a app directory called 'realtime-app', navigate to the directory and run the following command.

 $ npm init  

The npm init command will create an empty project with a package.json file.

We need to install Express, Socket.io and socket.io-client libraries

 $ npm install socket.io express --save  

-save will save package information in package.json and download everything to node_modeules folder.
Now, Install Jquery

 $ npm install jquery --save  

Now, we are ready to code. First code server side portion.
Create a new file called app.js in realtime-app folder.

 var express = require('express');   
 var app = express();   
 var server = require('http').createServer(app);   
 var io = require('socket.io')(server);  
 app.use(express.static(__dirname + '/node_modules'));   
 app.get('/', function(req, res,next) {   
   res.sendFile(__dirname + '/index.html');  
 });   
 server.listen(4200);   

So the server is ready to listen on port 4200.
Now we add publish and subscribe events on server. Add below code in app.js just before server.listen(4200);

 io.on('connection', function(client) {   
   console.log('Client connected...');  
   client.on('messages', function(data) {  
       client.emit('broad', data);  
   });  
 });  

io.on listening connections.
client.on listening broadcast / emit event from client. Here, server subscribes to 'message' event from client.
client.emit will broadcast an event to client. Here, when server receives 'message' event from client it broadcast 'broad' event to clients which are subscribed to 'broad' event.
so the server code is like

 var express = require('express');   
 var app = express();   
 var server = require('http').createServer(app);   
 var io = require('socket.io')(server);  
 app.use(express.static(__dirname + '/node_modules'));   
 app.get('/', function(req, res,next) {   
   res.sendFile(__dirname + '/index.html');  
 });   
 io.on('connection', function(client) {   
   console.log('Client connected...');  
   client.on('messages', function(data) {  
       client.emit('broad', data);  
   });  
 });  
 server.listen(4200);   

Now the server portion is ready.
Lets move to client portions.
First part is index.html which is the actual client to display data. Other one is a web API which will broadcast / publish the data to client when the data is updated on data source.
Create index.html file.

 <!doctype html5>   
 <html lang="en" ng-app="realtimeapp">   
   <head>  
   </head>  
   <body ng-controller="appCtrl as appc">  
     <h1>{{appc.msg}}</h1>  
     <div >{{appc.data}}</div>  
            <script src="/jquery/dist/jquery.js"></script>  
            <script src="https://ajax.googleapis.com/ajax/libs/angularjs/1.5.6/angular.min.js"></script>  
     <script src="/socket.io/socket.io.js"></script>  
 <script>    
 var app1 = angular.module('realtimeapp', []);  
 app1.controller('appCtrl', function AppController($scope,socket) {  
      self = this;  
      self.msg="Welcome Hello world";  
       socket.on('broad', function(data) {  
                 self.data = data;  
        });  
 });  
 app1.factory('socket', function ($rootScope) {  
  var socket = io.connect('http://uscgspare:4200');  
  return {  
   on: function (eventName, callback) {  
    socket.on(eventName, function () {   
     var args = arguments;  
     $rootScope.$apply(function () {  
      callback.apply(socket, args);  
     });  
    });  
   },  
   emit: function (eventName, data, callback) {  
    socket.emit(eventName, data, function () {  
     var args = arguments;  
     $rootScope.$apply(function () {  
      if (callback) {  
       callback.apply(socket, args);  
      }  
     });  
    })  
   }  
  };  
 });  
 </script>   
   </body>  
 </html>   

Inlude Angularjs, jquery and socket.io client libraries in script part. socket.io.js is client library for socket.io.

In javascript part, a factory module is created to publish (broadcast) / subscribe (recieve) the events from and to server.

socket.on('broad', function(data){..}) will listen 'broad' event from server. when 'broad' event is published from server, client will listen and get the data to display / pocess.

When data needs to be updated in client, invoke the server 'message' event will emit the 'broad' event and client index.html will receive the 'broad' event.

Assume lets have a watcher which keep on watching the data changes in data source /database. When it finds a change in data needs to be pushed to client or client needs to be notified for  new data availability. We can go either way. but i prefer the second one.

So the watcher required to have a way to invoke the Server to push the notification to client that there is data refresh required. the Second part will do invoke the server.

Before starting code, install socket.io-client

 $ npm install socket.io-client --save  

We need to modify the app.js as below

 var ioClient = require('socket.io-client');  

 var router = express.Router();  
 var socket = ioClient.connect('http://localhost:4200');  
 router.get('/', function(req, res) {  
 //logic for finding data changes  
      socket.emit('messages', 'New data available for update');  
      res.json({ message: 'hooray! welcome to our api!' });   
 });  
 app.use('/api', router);  

Here the idea is create a rest API, will be called from the watcher when it finds new data is available.
Create socket.io-client object to emit (boradcast) 'message' event to server. ioClient is connected to server and ready to listen and emit the events.

So final app.js will be like below
 var express = require('express');   
 var app = express();   
 var server = require('http').createServer(app);   
 var io = require('socket.io')(server);  
 var ioClient = require('socket.io-client');  
 app.use(express.static(__dirname + '/node_modules'));   
 app.get('/', function(req, res,next) {   
   res.sendFile(__dirname + '/index.html');  
 });   
 io.on('connection', function(client) {   
   console.log('Client connected...');  
   client.on('messages', function(data) {  
       client.emit('broad', data);  
   });  
 });  
 var router = express.Router();  
 var socket = ioClient.connect('http://localhost:4200');  
 router.get('/', function(req, res) {  
 //logic for finding data changes  
      socket.emit('messages', 'New data available for update');  
      res.json({ message: 'hooray! welcome to our api!' });   
 });  
 app.use('/api', router);  
 server.listen(4200);   

Thats it ready to run.
Step 1
 $ node app.js  

Above will up the server and available for clients on localhost and port 4200
Step 2.
Open a browser and run the index.html as 'http://localhost:4200/index.html"
It displays the default message 'Welcome Hello world'

Now open other browser window and call the api to notify new data availability to the index.html.
'http://localhost:4200/api'.

Now, the index.html will be updated with 'New data available for update'.

Calling 'http://localhost:4200/api' is emitting the 'message' event.
 socket.emit('messages', 'New data available for update');  

and server
 client.on('messages', function(data) {  
       client.emit('broad', data);  
   });  
is listening the 'message' event and again emit the 'broad' event.

Index.html is subscribed to 'broad' event,
 socket.on('broad', function(data) {  
           self.data = data;  
 });  
the above event listening 'broad' event and update the controller variable, then it reflects to view page.

So when index.html receives the flag for new data availability from data watcher, client can request the data refresh.

Thursday, 29 June 2017

MongoDB - Using Aggreagte and Text Search to compare and map records from two different datasets collections

Recently I had a scenario to compare two different data sets to find the matched records based on a text fields. The challenge is cant do it exact matching.

For Example
DataSet1 contains a record with field as
'Name : ABC Company'

DataSet2 may contains records as
Name : 'ABC Company Ltd.'

So decided to find the match based on ranking system.
Take every record in CUSTOMER1 and compare with CUSTOMER2 records and put score for the match. If the score is greater than 0.75 include CUSTOMER2 record to CUSTOMER1 record with score. Then get the highest match scored record.

Step 1.
Imported both data sets to MongoDB as two different collection, customer1 and customer2.
CUSTOMER1 data Sample
{
    "CUSTOMER1NAME": "ABC COMPANY",
    "CUSTOMER1ID": "CRM10003"
}


CUSTOMER2 data sample
{
  "CUSTOMER2NAME": "ABC COMPANY LTD.",
  "CUSTOMER2ID": "ORAXYZ100"
},
{
  "CUSTOMER2NAME": "XYZ Logistics",
  "CUSTOMER2ID": "ORAXYZ10333"
},
{
  "CUSTOMER2NAME": "ABC COMPANY (INDIA) LIMITED.",
  "CUSTOMER2ID": "ORAXYZ10022"
},
{
  "CUSTOMER2NAME": "EXCEL WORKS",
  "CUSTOMER2ID": "ORAXYZ10032"
}
 

Step 2.
Since we are going to use aggregate function with $text option, it is necessary to create TextIndex for CUSTOMER2 collection.

db.CUSTOMER2.createIndex(
                           { "CUSTOMER2NAME": "text" },
                           { name: "TextIndex" }
                         )
 
Step 3.
Below script will compare & score every record in CUSTOMER1 with CUSTOMER2 records using aggregate function.Score greater than 0.75 records are stored in CUSTOMER1 record item as match property.

  
 db.getCollection('CUSTOMER1').find({}).noCursorTimeout().forEach(function(item) {  
        var customername = item.CUSTOMER1NAME    
        var collection =    db.getCollection('CUSTOMER2').aggregate( [
        { $match: { $text: { $search: customername ,$diacriticSensitive: true  }} },
        { $project: { CUSTOMER2NAME:1, CUSTOMER2ID:1, _id: 0, score: { $meta: "textScore" } } },
        { $match: { score: { $gt: 0.75} } },
     
        ])
        if(collection._batch.length >0)
        {
            item.match = collection._batch;
        }
        db.CUSTOMERMAPPING.insert(item); 
    
})
Output of the above script will be like
  {
    "_id" : ObjectId("59540979a2109c38f5ca7112"),
    "CUSTOMER1NAME" : "ABC COMPANY",
    "CUSTOMER1ID" : "CRM10003",
    "match" : [ 
        {
            "CUSTOMER2NAME" : "ABC COMPANY (INDIA) LIMITED.",
            "CUSTOMER2ID" : "ORAXYZ10022",
            "score" : 1.25
        }, 
        {
            "CUSTOMER2NAME" : "ABC COMPANY LTD.",
            "CUSTOMER2ID" : "ORAXYZ100",
            "score" : 1.33333333333333
        }
    ]
}
 
Result will have all records from CUSTOMER2 with score property.

Step 4.
Now the step is take the maximum score from match records
 
db.getCollection('CUSTOMERMAPPING').aggregate([{
    $unwind: '$match'
}, {
    "$group": {
        "_id": "$CUSTOMER1ID",
        "CUSTOMER1NAME": {"$first": "$CUSTOMER1NAME"},
        "maxScore": {   
            "$max": "$match.score"
        },
        "matchgrp": {
            "$push": {
                "account_name": "$match.CUSTOMER2NAME",
                "acount_number": "$match.CUSTOMER2ID",
                "score": "$match.score"
            }
        }
    }
}
, {
    "$project": {
        _id: "$_id",
        CUSTOMER1NAME: "$CUSTOMER1NAME",
        topmatch: {
            "$setDifference": [{
                    "$map": {
                        "input": "$matchgrp",
                        "as": "matched",
                        "in": {
                            "$cond": [{
                                    "$eq": ["$maxScore", "$$matched.score"]
                                },
                                "$$matched",
                                false
                            ]
                        }
                    }
                },
                [false]
            ]
        }
 
    }
},{ $out : "HIGHSCOREMATCH" }],{allowDiskUse:true})  
 
$out will create result to new collection 'HIGHSCOREMATCH'
  {
    "_id" : "CRM10003",
    "CUSTOMER1NAME" : "ABC COMPANY",
    "topmatch" : [ 
        {
            "account_name" : "ABC COMPANY LTD.",
            "acount_number" : "ORAXYZ100",
            "score" : 1.33333333333333
        }
    ]
}
 
Most matched record from CUSTOMER2 will be mapped to CUSTOMER1 record.
Even the match will not be 100% accurate, it reduces our time for mapping manually.
Exported the final output in a excel and business user reviewed fixed the wrong matches. It reduces the time for us, instead of doing millions of records manually search and mapping.

Thanks