Skip to content

Commit

Permalink
Add test cases for new queueing mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
bpred754 committed Sep 27, 2016
1 parent eed6cb7 commit 5cdffed
Show file tree
Hide file tree
Showing 33 changed files with 1,527 additions and 663 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "augeo",
"version": "0.3.4",
"version": "0.4.0",
"description": "Web application written with the MEAN stack that uses Natural Language Processing to classify a user's internet activity into different skills. In a nutshell, Augeo is the gamification of life.",
"author": "Brian Redd <[email protected]> (http://brianredd.com)",
"license": "GPL-3.0",
Expand Down
2 changes: 1 addition & 1 deletion src/api/twitter-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
var mentionTask = new TwitterMentionTask(updatedUser, userData, null, logData);
QueueService.mentionEventQueue.addTask(mentionTask, logData);

QueueService.twitterConnectQueue.connectToTwitter(logData);
QueueService.twitterConnectQueue.connectToTwitter(logData, function(){});
}

// Set user's Twitter session data
Expand Down
42 changes: 24 additions & 18 deletions src/model/schema/augeo/user.js
Original file line number Diff line number Diff line change
Expand Up @@ -503,27 +503,33 @@

if(error) {
log.functionError(COLLECTION, 'updateSkillData', logData.parentProcess, logData.username,
'Failed to find user with ID: ' + id + '. Error: ' + error);
'Error when trying to find user with ID: ' + id + '. Error: ' + error);
} else {
log.functionCall(COLLECTION, 'updateSkillData', logData.parentProcess, logData.username, {'id':id});

doc.skill.experience += experience.mainSkillExperience;
doc.skill.level = AugeoUtility.calculateLevel(doc.skill.experience, logData);
doc.subSkills.forEach(function(subSkill){
subSkill.experience += experience.subSkillsExperience[subSkill.name];
subSkill.level = AugeoUtility.calculateLevel(subSkill.experience, logData);
});
doc.save(function(error) {
if(error) {
log.functionError(COLLECTION, 'updateSkillData', logData.parentProcess, logData.username,
'Failed to save experience. Error: ' + error);
} else {
log.functionCall(COLLECTION, 'updateSkillData', logData.parentProcess, logData.username,
{'mainSkillExperience':(experience)?experience.mainSkillExperience:'invalid',
'subSkillExperience':(experience && experience.subSkillsExperience)?'defined':'invalid'});
callback();
}
});
if(doc) {
doc.skill.experience += experience.mainSkillExperience;
doc.skill.level = AugeoUtility.calculateLevel(doc.skill.experience, logData);
doc.subSkills.forEach(function(subSkill){
subSkill.experience += experience.subSkillsExperience[subSkill.name];
subSkill.level = AugeoUtility.calculateLevel(subSkill.experience, logData);
});
doc.save(function(error) {
if(error) {
log.functionError(COLLECTION, 'updateSkillData', logData.parentProcess, logData.username,
'Failed to save experience. Error: ' + error);
} else {
log.functionCall(COLLECTION, 'updateSkillData', logData.parentProcess, logData.username,
{'mainSkillExperience':(experience)?experience.mainSkillExperience:'invalid',
'subSkillExperience':(experience && experience.subSkillsExperience)?'defined':'invalid'});
callback();
}
});
} else {
log.functionError(COLLECTION, 'updateSkillData', logData.parentProcess, logData.username,
'Failed to find user with ID: ' + id + '.');
callback();
}
}
});
};
Expand Down
1 change: 1 addition & 0 deletions src/queue-task/github/github-event-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

task.path = result.path;

// Note the lastEventId if it's the last request
if(!task.path) {
if(task.commits.length > 0) {
task.lastEventId = task.commits[0].eventId;
Expand Down
2 changes: 0 additions & 2 deletions src/queue-task/twitter/event/twitter-tweet-task.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
$this.base.constructor.call(this, user, twitterData, lastEventId, logData);

// public variables
this.lastEventId = lastEventId;
this.twitterMessenger = TwitterInterfaceService.createTwitterMessenger(twitterData.accessToken, twitterData.secretAccessToken, logData);
this.wait = $this.TWEET_TIMEOUT;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
execute: function(logData, callback) {
log.functionCall(TASK, 'execute', logData.parentProcess, logData.username);

TwitterService.removeTweet(this.data.status, logData, callback);
TwitterService.removeTweet(this.data, logData, callback);
}
});

Expand Down
19 changes: 10 additions & 9 deletions src/queue/base-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
} else {
var taskPosition = this.queue.getUserTaskPosition(userId);
if(taskPosition != -1) {
waitTime = this.maxTaskExecutionTime * (taskPosition + 1);
waitTime = this.maxTaskExecutionTime * (taskPosition + 2);
}
}
return waitTime;
Expand All @@ -68,7 +68,7 @@
log.functionCall(this.QUEUE, 'getWaitTime', logData.parentProcess, logData.username);

var queueNumber = this.queue.tasks.length;
if(this.currentTask) {
if(this.currentTask && this.currentTask.user) {
queueNumber++;
}

Expand Down Expand Up @@ -108,16 +108,17 @@

self.isBusy = true;
task.execute(logData, function(executeData) {
self.finishTask(executeData, logData);
self.finishTask(executeData, logData, function() {

if(self.queue.tasks.length == 0) {
self.isBusy = false;
}
if(self.queue.tasks.length == 0) {
self.isBusy = false;
}

// Reset current task
self.currentTask = {};
// Reset current task
self.currentTask = {};

callback();
callback();
});
});
});
});
Expand Down
18 changes: 14 additions & 4 deletions src/queue/github-event-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@

AbstractObject.extend(BaseQueue, $this, {

addAllUsers: function(logData){
addAllUsers: function(logData, callback){
log.functionCall(this.QUEUE, 'addAllUsers', logData.parentProcess, logData.username);

var self = this;
GithubService.loopThroughUsersQueueData(logData, function(queueData) {
var user = queueData.user;
var task = new GithubQueueTask(user.augeoUser, user, queueData.eventId, logData);
self.addTask(task, logData)
});
}, callback);
},

addTask: function(task, logData) {
Expand All @@ -76,12 +76,12 @@
queue.insert(index, task, function () {});
}
} else {
log.functionCall(QUEUE, 'addTask', logData.parentProcess, logData.username, {'task.screenName': (task) ? task.screenName : 'invalid'}, 'User already on queue');
log.functionCall(this.QUEUE, 'addTask', logData.parentProcess, logData.username, {'task.screenName': (task) ? task.screenName : 'invalid'}, 'User already on queue');
}
});
},

finishTask: function(task, logData) {
finishTask: function(task, logData, callback) {

// Update poll time for next request
pollTime = task.poll;
Expand All @@ -93,10 +93,12 @@
var finalize = function() {
task.reset(logData);
queue.push(task);
callback();
};

if(task.path) {
queue.unshift(task);
callback();
} else {
if(task.commits.length > 0) {
GithubService.addCommits(task.screenName, task.commits, logData, function () {
Expand All @@ -110,10 +112,18 @@
}
},

getPollTime: function() {
return pollTime;
},

prepareTask: function(task) {
if(task.lastEventId) {
this.taskWaitTime = pollTime;
}
},

reset: function() {
this.queue.tasks.length = 0;
}

});
Expand Down
22 changes: 15 additions & 7 deletions src/queue/twitter-connect-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,21 @@

AbstractObject.extend(BaseQueue, $this, {

addUsersToEventQueues: function(logData) {
this.mentionQueue.addAllUsers(logData);
this.tweetQueue.addAllUsers(logData);
addUsersToEventQueues: function(logData, callback) {
var self = this;
this.mentionQueue.addAllUsers(logData, function() {
self.tweetQueue.addAllUsers(logData, function() {
callback();
});
});
},

connectToTwitter: function(logData) {
connectToTwitter: function(logData, callback) {
var self = this;

TwitterService.getUsers(logData, function (users) {
if (users.length > 0) {
if (process.env.ENV != 'local') { // Only connect to Twitter's stream API if not in local environment
if (process.env.ENV != 'local' || process.env.TEST == 'true') { // Only connect to Twitter's stream API if not in local environment
var connectTask = new TwitterConnectTask(users, logData,
function (tweetData) { // Callback function for adding activity task to stream queue
var addActivityTask = new TwitterAddActivityTask(tweetData, logData);
Expand All @@ -71,13 +75,17 @@
self.streamQueue.addTask(removeActivityTask, logData);
},
function () { // Callback function for after the connection with Twitter has been made
self.addUsersToEventQueues(logData);
self.addUsersToEventQueues(logData, function(){});
}
);
self.queue.push(connectTask);
callback();
} else {
self.addUsersToEventQueues(logData);
self.addUsersToEventQueues(logData, function(){});
callback();
}
} else {
callback();
}
});
}
Expand Down
11 changes: 7 additions & 4 deletions src/queue/twitter-event-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

AbstractObject.extend(BaseQueue, $this, {

addAllUsers: function(logData){
addAllUsers: function(logData, callback){
log.functionCall(this.QUEUE, 'addAllUsers', logData.parentProcess, logData.username);

var self = this;
Expand All @@ -66,7 +66,7 @@
}

self.addTask(task, logData)
});
}, callback);
},

addTask: function(task, logData) {
Expand All @@ -82,16 +82,19 @@
});
},

finishTask: function(task, logData) {
finishTask: function(task, logData, callback) {
var queue = this.queue;

if(!task.areAllTweetsRetrieved) {
queue.unshift(task);
callback();
} else {
if(task.tweets.length > 0) {
TwitterService.addTweets(task.user._id, task.screenName, task.tweets, this.isMention, logData, function() {
UserService.updateAllRanks(logData, function(){});
UserService.updateAllRanks(logData, callback);
});
} else {
callback();
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/queue/twitter-stream-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,14 @@
this.queue.push(task, function(){});
},

finishTask: function(classification, logData) {
finishTask: function(classification, logData, callback) {
var self = this;
UserService.updateRanks(logData, function() {
UserService.updateSubSkillRanks(classification, logData, function() {
log.functionCall(self.QUEUE, 'finishTask', logData.parentProcess, logData.username, {}, 'Finished updating ranks');
callback();
});
});

return classification;
},

prepareTask: function() {
Expand Down
4 changes: 3 additions & 1 deletion src/service/github-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
});
};

exports.loopThroughUsersQueueData = function(logData, callback) {
exports.loopThroughUsersQueueData = function(logData, callback, finalCallback) {
log.functionCall(SERVICE, 'loopThroughUsersQueueData', logData.parentProcess, logData.username);

GithubUser.getAllUsers(logData, function(users) {
Expand All @@ -110,6 +110,8 @@
i++;
if (i < users.length) {
myClojure(i);
} else {
finalCallback();
}
});
})(0); // Pass i as 0 and myArray to myClojure
Expand Down
4 changes: 2 additions & 2 deletions src/service/queue-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@

// Github Queues
exports.githubEventQueue = new GithubEventQueue(logData);
exports.githubEventQueue.addAllUsers(logData);
exports.githubEventQueue.addAllUsers(logData, function(){});

// Twitter Queues
exports.mentionEventQueue = new TwitterEventQueue(logData, true);
exports.tweetEventQueue = new TwitterEventQueue(logData);
exports.twitterStreamQueue = new TwitterStreamQueue(logData);
exports.twitterConnectQueue = new TwitterConnectQueue(exports.tweetEventQueue, exports.mentionEventQueue, exports.twitterStreamQueue, logData);
exports.twitterConnectQueue.connectToTwitter(logData);
exports.twitterConnectQueue.connectToTwitter(logData, function(){});
};
4 changes: 3 additions & 1 deletion src/service/twitter-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@
});
};

exports.loopThroughUsersQueueData = function(logData, callback) {
exports.loopThroughUsersQueueData = function(logData, callback, finalCallback) {
log.functionCall(SERVICE, 'loopThroughUsersQueueData', logData.parentProcess, logData.username);

TwitterUser.getAllUsers(logData, function(users) {
Expand All @@ -219,6 +219,8 @@
i++;
if (i < users.length) {
myClojure(i);
} else {
finalCallback();
}
});
});
Expand Down
Loading

0 comments on commit 5cdffed

Please sign in to comment.