Skip to content

Commit

Permalink
Merge pull request #1 from JarvusInnovations/develop
Browse files Browse the repository at this point in the history
Version v0.9.0 Release
  • Loading branch information
jmealo committed Feb 13, 2016
2 parents f376a19 + f6b0188 commit 5a1573e
Show file tree
Hide file tree
Showing 5 changed files with 525 additions and 182 deletions.
24 changes: 21 additions & 3 deletions node-services/spark-realtime/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,30 @@
{
"port": 8094,
"schema": "mta-staging",
"nats": "nats://10.240.103.217:4222",
"nats": "nats://spark:VRK5wReendhh1cmMZJoZm0F0y22E3@10.128.161.19:4222",
"ignore": [
"sessions",
"learn_playlist_cache"
],
"broadcast": true
}
]
}
],

"middleware": {

"load_order": [
"session",
"stats"
],

"session": {
"sessionHeaderName": "x-nginx-session",
"requiredKeys": [
"userId",
"firstName",
"lastName",
"email"
]
}
}
}
9 changes: 9 additions & 0 deletions node-services/spark-realtime/middleware/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Auto-loader
require('fs').readdirSync(__dirname + '/').forEach(function(file) {
var ext = file.match(/\.js(on)?$/);

if (ext && file !== 'index.js') {
var name = file.replace(ext[0], '');
exports[name] = require('./' + file);
}
});
72 changes: 72 additions & 0 deletions node-services/spark-realtime/middleware/session.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
'use strict';

function ioSession(options) {
options = Object.assign({
sessionHeaderName: 'session',
requiredKeys: null,
validationFn: null,
requireSession: true,
defaultSession: null
}, options || {});

if (options.validationFn !== null && typeof options.validationFn !== 'function') {
throw new Error('validationFn must be a function');
}

if (options.requiredKeys !== null && !Array.isArray(options.requiredKeys)) {
throw new Error('requiredKeys must be an array or null');
}

if (!options.requireSession && options.requiredKeys !== null) {
throw new Error('You cannot provide requiredKeys if requireSession is false');
}

return function handleSession(socket, next) {
var sessionHeaderName = options.sessionHeaderName,
requiredKeys = options.requiredKeys,
session = socket.request.headers[sessionHeaderName];

if (options.requireSession) {
if (session === undefined) {
return next(new Error(`Session header (${sessionHeaderName}) is missing from the request.`));
}

try {
session = JSON.parse(session);
} catch (e) {
return next(e);
}

// Verify that all required keys are present
if (requiredKeys) {
let missingKeys = requiredKeys.filter(key => session[key] === undefined);

if (missingKeys.length > 0) {
return next(new Error(`Session is missing required key(s): ${missingKeys.join(', ')}`));
}
}
}

// Perform custom validation
if (options.validationFn) {
let error = options.validationFn(session, socket, next);

if (error) {
return next(error);
}
}

socket.session = session || options.defaultSession;

socket.isStudent = socket.session.accountLevel === 'Student';
socket.isTeacher = !socket.isStudent;

console.log(session);

socket.join('user:' + session.userId);

return next();
};
}

module.exports = ioSession;
186 changes: 186 additions & 0 deletions node-services/spark-realtime/middleware/stats.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
'use strict';

function ioStats(options) {
options = options || {};

function SimpleMetric(name) {
Object.defineProperty(this, 'name', {
enumerable: false,
configurable: false,
writable: false,
value: name
});

this.current = 0;
this.min = 0;
this.max = 0;
this.change_timestamp = null;
this.since_timestamp = new Date();
this.min_timestamp = null;
this.max_timestamp = null;
}

SimpleMetric.prototype.increment = function increment() {
this.set(++this.current);
};

SimpleMetric.prototype.decrement = function decrement() {
this.set(--this.current);
};

SimpleMetric.prototype.set = function set(val) {
var now = new Date();

if (this.current !== val) {
this.change_timestamp = now;
}

this.current = val;

if (this.current > this.max) {
this.max = this.current;
this.max_timestamp = now;
} else if (this.current < this.min || !this.min_timestamp) {
this.min = this.current;
this.min_timestamp = now;
}
};

function AdvancedMetric(name, subtypes, total) {
var self = this;

Object.defineProperty(this, 'name', {
enumerable: false,
configurable: false,
writable: false,
value: name
});

if (total !== false) {
this.total = new SimpleMetric('total');
}

this.metrics = subtypes.map(function(subtype) {
var metric = new SimpleMetric(subtype);
self[subtype] = metric;

if (total !== false) {
metric.set = function set(val) {
self.total.set(subtypes.reduce(function(prev, current) {
return prev + self[current].current;
}, 0));

this.__proto__.set.call(this, val);
};
}

return metric;
});
}

AdvancedMetric.prototype.set = function set(values) {
for (var metric in values) {
this[metric].set(values[metric]);
}
};

AdvancedMetric.prototype.toJSON = function() {
var obj = {};

this.metrics.forEach(metric => obj[metric.name] = metric);

if (obj.total) {
obj.total = this.total;
}

return obj;
};

var aggregates = {
connections: new SimpleMetric('connections'),
subscriptions: new SimpleMetric('subscriptions'),
incoming: new SimpleMetric('incoming'),
outgoing: new AdvancedMetric('outgoing', ['broadcast', 'identified', 'unidentified']),
nats: new AdvancedMetric('nats', ['processed', 'dropped', 'ignored']),
users: new AdvancedMetric('users', ['online', 'offline']),
memory_usage: new AdvancedMetric('memory_usage', ['heapUsed', 'heapTotal', 'rss'], false)
},
users = {},
sections = {};

function disconnectHandler () {
aggregates.connections.set(this.server.sockets.sockets.length);

this.stats.connections.decrement();
this.stats.last_seen = new Date();

if (this.stats.connections.current === 0) {
aggregates.users.online.decrement();
aggregates.users.offline.increment();
}
}

if (options.httpServer) {
options.httpServer.on('request', function statsRequestHandler(req, res) {
switch (req.url) {
case '/stats':
case '/healthcheck':
aggregates.connections.set(global.io.sockets.sockets.length);
aggregates.memory_usage.set(process.memoryUsage());
res.setHeader('Content-Type', 'application/json');
let body = JSON.stringify(aggregates, null, '\t');
res.end(body);
break;
default:
res.statusCode = 404;
return res.end('File not found');
}
});
}

global.stats = {
aggregates: aggregates,
users: users,
sections: sections
};

return function statsCollector(socket, next) {
try {
var session = socket.session;

if (!session || !session.userId) {
return next();
}

users[session.userId] || (users[session.userId] = {
connections: new SimpleMetric('connections'),
subscriptions: new SimpleMetric('subscriptions')
});

socket.stats = users[session.userId];

aggregates.connections.set(socket.server.sockets.sockets.length);

if (socket.stats.connections.current === 0) {
aggregates.users.online.increment();

if (socket.stats.last_seen) {
aggregates.users.offline.decrement();
}
}

socket.stats.last_seen = new Date();
socket.stats.connections.increment();

socket.on('disconnect', disconnectHandler);
socket.on('error', disconnectHandler);

return next();
} catch (e) {
console.error(e);
throw e;
}
};
}

module.exports = ioStats;
Loading

0 comments on commit 5a1573e

Please sign in to comment.