diff --git a/node-services/spark-realtime/config.json b/node-services/spark-realtime/config.json index b085095af..2ae3573cf 100644 --- a/node-services/spark-realtime/config.json +++ b/node-services/spark-realtime/config.json @@ -50,12 +50,30 @@ { "port": 8094, "schema": "mta-staging", - "nats": "nats://10.240.103.217:4222", + "nats": "nats://spark:VRK5wReendhh1cmMZJoZm0F0y22E3@10.128.161.19:4222", "ignore": [ "sessions", "learn_playlist_cache" ], "broadcast": true } - ] -} \ No newline at end of file + ], + + "middleware": { + + "load_order": [ + "session", + "stats" + ], + + "session": { + "sessionHeaderName": "x-nginx-session", + "requiredKeys": [ + "userId", + "firstName", + "lastName", + "email" + ] + } + } +} diff --git a/node-services/spark-realtime/middleware/index.js b/node-services/spark-realtime/middleware/index.js new file mode 100644 index 000000000..eafeec073 --- /dev/null +++ b/node-services/spark-realtime/middleware/index.js @@ -0,0 +1,9 @@ +// Auto-loader +require('fs').readdirSync(__dirname + '/').forEach(function(file) { + var ext = file.match(/\.js(on)?$/); + + if (ext && file !== 'index.js') { + var name = file.replace(ext[0], ''); + exports[name] = require('./' + file); + } +}); diff --git a/node-services/spark-realtime/middleware/session.js b/node-services/spark-realtime/middleware/session.js new file mode 100644 index 000000000..44a3256a8 --- /dev/null +++ b/node-services/spark-realtime/middleware/session.js @@ -0,0 +1,72 @@ +'use strict'; + +function ioSession(options) { + options = Object.assign({ + sessionHeaderName: 'session', + requiredKeys: null, + validationFn: null, + requireSession: true, + defaultSession: null + }, options || {}); + + if (options.validationFn !== null && typeof options.validationFn !== 'function') { + throw new Error('validationFn must be a function'); + } + + if (options.requiredKeys !== null && !Array.isArray(options.requiredKeys)) { + throw new Error('requiredKeys must be an array or null'); + } + + if (!options.requireSession && options.requiredKeys !== null) { + throw new Error('You cannot provide requiredKeys if requireSession is false'); + } + + return function handleSession(socket, next) { + var sessionHeaderName = options.sessionHeaderName, + requiredKeys = options.requiredKeys, + session = socket.request.headers[sessionHeaderName]; + + if (options.requireSession) { + if (session === undefined) { + return next(new Error(`Session header (${sessionHeaderName}) is missing from the request.`)); + } + + try { + session = JSON.parse(session); + } catch (e) { + return next(e); + } + + // Verify that all required keys are present + if (requiredKeys) { + let missingKeys = requiredKeys.filter(key => session[key] === undefined); + + if (missingKeys.length > 0) { + return next(new Error(`Session is missing required key(s): ${missingKeys.join(', ')}`)); + } + } + } + + // Perform custom validation + if (options.validationFn) { + let error = options.validationFn(session, socket, next); + + if (error) { + return next(error); + } + } + + socket.session = session || options.defaultSession; + + socket.isStudent = socket.session.accountLevel === 'Student'; + socket.isTeacher = !socket.isStudent; + + console.log(session); + + socket.join('user:' + session.userId); + + return next(); + }; +} + +module.exports = ioSession; \ No newline at end of file diff --git a/node-services/spark-realtime/middleware/stats.js b/node-services/spark-realtime/middleware/stats.js new file mode 100644 index 000000000..f6f25ba77 --- /dev/null +++ b/node-services/spark-realtime/middleware/stats.js @@ -0,0 +1,186 @@ +'use strict'; + +function ioStats(options) { + options = options || {}; + + function SimpleMetric(name) { + Object.defineProperty(this, 'name', { + enumerable: false, + configurable: false, + writable: false, + value: name + }); + + this.current = 0; + this.min = 0; + this.max = 0; + this.change_timestamp = null; + this.since_timestamp = new Date(); + this.min_timestamp = null; + this.max_timestamp = null; + } + + SimpleMetric.prototype.increment = function increment() { + this.set(++this.current); + }; + + SimpleMetric.prototype.decrement = function decrement() { + this.set(--this.current); + }; + + SimpleMetric.prototype.set = function set(val) { + var now = new Date(); + + if (this.current !== val) { + this.change_timestamp = now; + } + + this.current = val; + + if (this.current > this.max) { + this.max = this.current; + this.max_timestamp = now; + } else if (this.current < this.min || !this.min_timestamp) { + this.min = this.current; + this.min_timestamp = now; + } + }; + + function AdvancedMetric(name, subtypes, total) { + var self = this; + + Object.defineProperty(this, 'name', { + enumerable: false, + configurable: false, + writable: false, + value: name + }); + + if (total !== false) { + this.total = new SimpleMetric('total'); + } + + this.metrics = subtypes.map(function(subtype) { + var metric = new SimpleMetric(subtype); + self[subtype] = metric; + + if (total !== false) { + metric.set = function set(val) { + self.total.set(subtypes.reduce(function(prev, current) { + return prev + self[current].current; + }, 0)); + + this.__proto__.set.call(this, val); + }; + } + + return metric; + }); + } + + AdvancedMetric.prototype.set = function set(values) { + for (var metric in values) { + this[metric].set(values[metric]); + } + }; + + AdvancedMetric.prototype.toJSON = function() { + var obj = {}; + + this.metrics.forEach(metric => obj[metric.name] = metric); + + if (obj.total) { + obj.total = this.total; + } + + return obj; + }; + + var aggregates = { + connections: new SimpleMetric('connections'), + subscriptions: new SimpleMetric('subscriptions'), + incoming: new SimpleMetric('incoming'), + outgoing: new AdvancedMetric('outgoing', ['broadcast', 'identified', 'unidentified']), + nats: new AdvancedMetric('nats', ['processed', 'dropped', 'ignored']), + users: new AdvancedMetric('users', ['online', 'offline']), + memory_usage: new AdvancedMetric('memory_usage', ['heapUsed', 'heapTotal', 'rss'], false) + }, + users = {}, + sections = {}; + + function disconnectHandler () { + aggregates.connections.set(this.server.sockets.sockets.length); + + this.stats.connections.decrement(); + this.stats.last_seen = new Date(); + + if (this.stats.connections.current === 0) { + aggregates.users.online.decrement(); + aggregates.users.offline.increment(); + } + } + + if (options.httpServer) { + options.httpServer.on('request', function statsRequestHandler(req, res) { + switch (req.url) { + case '/stats': + case '/healthcheck': + aggregates.connections.set(global.io.sockets.sockets.length); + aggregates.memory_usage.set(process.memoryUsage()); + res.setHeader('Content-Type', 'application/json'); + let body = JSON.stringify(aggregates, null, '\t'); + res.end(body); + break; + default: + res.statusCode = 404; + return res.end('File not found'); + } + }); + } + + global.stats = { + aggregates: aggregates, + users: users, + sections: sections + }; + + return function statsCollector(socket, next) { + try { + var session = socket.session; + + if (!session || !session.userId) { + return next(); + } + + users[session.userId] || (users[session.userId] = { + connections: new SimpleMetric('connections'), + subscriptions: new SimpleMetric('subscriptions') + }); + + socket.stats = users[session.userId]; + + aggregates.connections.set(socket.server.sockets.sockets.length); + + if (socket.stats.connections.current === 0) { + aggregates.users.online.increment(); + + if (socket.stats.last_seen) { + aggregates.users.offline.decrement(); + } + } + + socket.stats.last_seen = new Date(); + socket.stats.connections.increment(); + + socket.on('disconnect', disconnectHandler); + socket.on('error', disconnectHandler); + + return next(); + } catch (e) { + console.error(e); + throw e; + } + }; +} + +module.exports = ioStats; \ No newline at end of file diff --git a/node-services/spark-realtime/worker.js b/node-services/spark-realtime/worker.js index f4cdc6642..112f61347 100644 --- a/node-services/spark-realtime/worker.js +++ b/node-services/spark-realtime/worker.js @@ -1,164 +1,168 @@ +'use strict'; + var config = JSON.parse(process.env.SPARK_REALTIME), + middleware = require('./middleware/index'), + http = require('http'), nats = require('nats').connect(config.nats), - server = require('http').createServer(requestHandler), - io = require('socket.io')(server), - stats = { - connections: { - peak: 0, - current: 0, - last_connection_at: null - }, - - subscriptions: { - peak: 0, - current: 0, - last_subscription_at: null - }, - - incoming: { - total: 0, - last_message_at: null - }, - - outgoing: { - broadcast: 0, - total: 0, - identified: 0, - unidentified: 0, - last_message_at: null - }, - - nats: { - total_messages: 0, - dropped_mesages: 0, - ignored_messages: 0, - last_message_at: null - }, - - users: { - online: 0, - offline: 0 - } - }, + server = http.createServer(), + pg = require('pg').native, + connString, + sparkpoints = {}, sections = {}, - userConnectionCount = {}, - userLastSeen = {}; + sectionPeople = {}, + personSections = {}, + io; -function getOnlineUsers() { - var online = []; +// TODO: Add real logging framework +console._log = console.log; - for (var user in userConnectionCount) { - if (userConnectionCount[user] > 0) { - online.push(user); - } +console.log = function() { + var args = Array.prototype.slice.call(arguments); + args.unshift(`[${config.schema}]`); + console._log.apply(this, args); +}; + +function initDatabase() { + var cfg = config.postgresql; + + if (!cfg) { + throw new Error(`${config.schema} is missing a valid postgresql section from its configuration`); } - return online; -} + let missingKeys = ['database', 'username', 'host', 'password'].filter(key => cfg[key] === undefined); + + if (missingKeys.length > 0) { + throw new Error( + `${config.schema} is missing the following key(s) from its postgresql config: ${missingKeys.join(',')}` + ); + } -function getOfflineUsers() { - var offline = []; + connString = `postgres://${cfg.username}:${cfg.password}@${cfg.host}/${cfg.database}?application_name=spark-realtime`; - for (var user in userConnectionCount) { - if (userConnectionCount[user] <= 0) { - offline.push(user); + // Test connection + pg.connect(connString, function(err, client, done) { + if (err) { + console.error(err); + throw new Error(`Unable to connect to ${cfg.database} PostgreSQL database on ${cfg.host}`); } - } - return offline; + console.log(`Connected to ${cfg.database} PostgreSQL database on ${cfg.host}`); + + let sql = ` + WITH section_people AS ( + SELECT "CourseSectionID" AS section, + json_agg("PersonID") AS people + FROM course_section_participants + GROUP BY "CourseSectionID" + ), person_sections AS ( + SELECT p."ID" AS person, + array_agg("CourseSectionID") AS sections + FROM people p + JOIN course_section_participants csp ON csp."PersonID" = p."ID" + GROUP BY p."ID" + ) + + SELECT json_build_object( + 'sections', (SELECT json_object_agg("ID", "Code") FROM course_sections), + 'sparkpoints', (SELECT json_object_agg(id, code) FROM sparkpoints), + 'section_people', (SELECT json_object_agg(section, people) FROM section_people), + 'person_sections', (SELECT json_object_agg(person, sections) FROM person_sections), + 'counts', json_build_object( + 'sections', (SELECT count(1) FROM course_sections), + 'sparkpoints', (SELECT count(1) FROM sparkpoints), + 'section_people', (SELECT count(1) FROM section_people), + 'person_sections', (SELECT count(1) FROM person_sections) + ) + ) AS lookup; + `; + + client.query(sql, [], function(err, results) { + if (err) { + console.log('Error populating lookup tables'); + throw err; + } + + let lookup = results.rows[0].lookup; + + sparkpoints = lookup.sparkpoints; + sections = lookup.sections; + sectionPeople = lookup.section_people; + personSections = lookup.person_sections; + + for (let entity in lookup.counts) { + console.log(`${lookup.counts[entity].toLocaleString()} entries in ${entity} lookup table`); + } + }); + + initMiddleware(); + }); } -function requestHandler(req, res) { - var healthcheck; - - switch (req.url) { - case '/healthcheck': - stats.connections.current = io.sockets.sockets.length; - stats.memory_usage = process.memoryUsage(); - stats.outgoing.total = (stats.outgoing.identified + stats.outgoing.broadcast); - stats.users.online = getOnlineUsers().length; - stats.users.offline = getOfflineUsers().length; - res.end(JSON.stringify(stats, null, '\t')); - break; - - case '/sections': - res.end(JSON.stringify(sections, null, '\t')); - break; - - case '/users/connection_count': - res.end(JSON.stringify(userConnectionCount, null, '\t')); - break; - - case '/users/last_seen': - res.end(JSON.stringify(userLastSeen, null, '\t')); - break; - - case '/users/online': - res.end(JSON.stringify(getOnlineUsers(), null, '\t')); - break; - - case '/users/offline': - res.end(JSON.stringify(getOfflineUsers(), null, '\t')); - break; - - case '/users': - var users = {}, - connectionCount; - - for (var user in userConnectionCount) { - connectionCount = userConnectionCount[user]; - - users[user] = { - status: connectionCount <= 0 ? 'offline' : 'online', - last_seen: userLastSeen[user], - connections: connectionCount - }; +function query(sql, values, cb) { + pg.connect(connString, function(err, client, done) { + + if (err) { + if (client) { + done(client); } - res.end(JSON.stringify(users, null, '\t')); + return cb && cb(err, null); + } - break; - default: - res.httpStatus = 404; - res.end('File not found'); - } + client.query(sql, values, function(err, result) { + + if (err) { + if (client) { + done(client); + } + + return cb && cb(err, null); + } + + cb && cb(null, result.rows); + + done(client); + }); + }); } -io.use(function (socket, next) { - var session = socket.request.headers['x-nginx-session'], - connectionCount = io.sockets.sockets.length; +function initMiddleware() { + config.middleware = config.middleware || {}; - stats.connections.last_connection_at = new Date(); + config.middleware.stats = config.middleware.stats || {}; + config.middleware.stats.httpServer = server; - if (connectionCount > stats.connections.peak) { - stats.connections.peak = connectionCount; + // We must initialize the middleware prior to initializing socket.io in order to attach any http request handlers + for (var name in middleware) { + middleware[name] = middleware[name](config.middleware[name] || {}); } - if (session) { - session = JSON.parse(session); + io = global.io = require('socket.io')(server); - socket.session = session; - socket.section = null; - socket.student = session.accountLevel === 'Student'; + (config.middleware.load_order || Object.keys(middleware)).forEach(function(name) { + io.use(middleware[name]); + }); - userConnectionCount[socket.session.username] || (userConnectionCount[socket.session.username] = 0); - userConnectionCount[socket.session.username]++; - userLastSeen[socket.session.username] = new Date(); + initNats(); +} + +function initServer() { + io.use(function (socket, next) { + var stats = global.stats, + sections = stats.sections; socket.on('subscribe', function subscribe(data) { console.log(socket.session.username + ' subscribed to ' + data.section); - stats.subscriptions.last_subscription_at = new Date(); - stats.incoming.last_message_at = new Date(); - stats.incoming.total++; + stats.aggregates.subscriptions.increment(); + stats.aggregates.incoming.increment(); - if (++stats.subscriptions.current > stats.subscriptions.peak) { - stats.subscriptions.peak = stats.subscriptions.current; - } + socket.stats.subscriptions.increment(); socket.join('section:' + data.section); socket.section = data.section; - sections[data.section] = sections[data.section] || {teachers: [], students: []}; + + sections[data.section] || (sections[data.section] = { teachers: [], students: [] }); if (socket.student) { if (sections[data.section].students.indexOf(socket.session.userId) === -1) { @@ -170,17 +174,19 @@ io.use(function (socket, next) { }); socket.on('unsubscribe', function unsubscribe(data) { - var idx; - - stats.subscriptions.current--; - stats.incoming.total++; + var sections = global.stats.sections, + idx; console.log(socket.session.username + ' unsubscribed from ' + data.section); + + stats.aggregates.subscriptions.decrement(); + stats.aggregates.incoming.increment(); + socket.leave('section:' + data.section); socket.section = null; - sections[data.section] = sections[data.section] || {teachers: [], students: []}; + sections[data.section] || (sections[data.section] = { teachers: [], students: [] }); - if (socket.student) { + if (socket.isStudent) { idx = sections[data.section].students.indexOf(socket.session.userId); sections[data.section].students = sections[data.section].students.slice(idx, 1); } else { @@ -189,62 +195,114 @@ io.use(function (socket, next) { } }); - function disconnectHandler (socket) { - userConnectionCount[session.username]--; - userLastSeen[session.username] = new Date(); + return next(); + }); + + server.listen(config.port); + + console.log(`Listening on ${config.port}`); +} + +initDatabase(); + +const USER_ID_COLUMNS = [ + 'PersonID', + 'user_id', + 'student_id', + 'teacher_id', + 'person_id', + 'recommender_id', + 'author_id' +]; + +function extractUserIds(data, userIds) { + USER_ID_COLUMNS.forEach(function(key) { + let userId = data[key]; + + if (userId && typeof userId === 'number') { + userIds.add(userId) } + }); +} - socket.on('disconnect', disconnectHandler); - socket.on('error', disconnectHandler); +function initNats() { + nats.subscribe(config.schema + '.>', function (msg) { + var stats = global.stats, + identified = false, + sent = false, + userIds, + data; + + if (!msg) { + stats.aggregates.nats.dropped.increment(); + return; + } - return next(); - } + msg = JSON.parse(msg); - next(new Error('Authentication error')); -}); + if (!msg.item || config.ignore.indexOf(msg.table) !== -1) { + stats.aggregates.nats.ignored.increment(); + return; + } -server.listen(config.port); + // Time to decorate! Let's start with a festive spark point + if (msg.item.section_id && msg.item.section_code === undefined) { + msg.item.section_code = sections[msg.item.section_id]; + } -nats.subscribe(config.schema + '.>', function (msg) { - var data, userId; + if (msg.item.sparkpoint_id && msg.item.sparkpoint_code === undefined) { + msg.item.sparkpoint_code = sparkpoints[msg.item.sparkpoint_id]; + } - stats.nats.total_messages++; - stats.nats.last_message_at = new Date(); + if (config.broadcast) { + stats.aggregates.outgoing.broadcast.increment(); + return io.emit('db', msg); + } - if (!msg) { - stats.nats.dropped_mesages++; - return; - } + userIds = new Set(); - msg = JSON.parse(msg); + extractUserIds(msg.item, userIds); - if (!msg.item || config.ignore.indexOf(msg.table) !== -1) { - stats.nats.ignored_messages++; - return; - } + if (msg.table === 'people') { + userIds.add(msg.item.ID); + } - if (config.broadcast) { - stats.outgoing.last_message_at = new Date(); - stats.outgoing.broadcast++; - return io.emit('db', msg); - } + // IMPORTANT: Right now, only rows with a section_id column are being broadcast to course section participants this + // may or may not be "correct" behavior. If we need to expand this, it's likely that we should add an abstraction + // layer where emitting a student event will relay that to all other course participants (including the teacher) - data = msg.item; - userId = data.PersonID || data.user_id || data.student_id || data.teacher_id || data.person_id; + if (msg.item.section_id) { + if (config.section_broadcast) { + sectionPeople[msg.item.section_id].forEach(function(userId) { + userIds.add(userId); + }); + } else { + identified = true; + sent = true; + io.to('section:' + sections[msg.item.section_id]).emit('db', msg); + } + } - if (msg.table === 'people') { - userId = msg.ID; - } + if (!sent) { + userIds.forEach(function(userId) { + identified = true; + io.to('user:' + userId).emit('db', msg); + }); + } - if (userId) { - stats.outgoing.identified++; - stats.outgoing.last_message_at = new Date(); - io.to('user:' + userId).emit('db', msg); - } else { - stats.outgoing.unidentified++; - console.log('Unable to associate database event with user: ' + msg); - } -}); + if (identified) { + stats.aggregates.outgoing.identified.increment(); + console.log(data); + console.log('Recipients: ', Array.from(userIds).join(', ')); + } else { + stats.aggregates.outgoing.unidentified.increment(); + console.log('Unable to associate database event with user:'); + console.log(msg); + } + }); + + initServer(); +} process.on('uncaughtException', function (err) { var markdown, request, options; @@ -264,11 +322,11 @@ process.on('uncaughtException', function (err) { request(options, function (error, response, body) { if (!error && response.statusCode == 200) { - console.log('Notification successfully send to Slack.'); + console.log('Notification successfully sent to Slack.'); } process.exit(1); }); } else { process.exit(1); } -}); \ No newline at end of file +});