-
Notifications
You must be signed in to change notification settings - Fork 11
/
index.js
303 lines (272 loc) · 10.1 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
"use strict";
let leoconfig = require("leo-config");
let ls = require("./lib/stream/leo-stream");
let logging = require("./lib/logging.js");
let LeoConfiguration = require("./lib/configuration.js");
let aws = require("./lib/leo-aws");
const fs = require("fs");
const ini = require('ini');
const { default: Configuration } = require("./lib/rstreams-configuration");
const { promisify } = require("util");
const execSync = require("child_process").execSync;
const ConfigProviderChain = require("./lib/rstreams-config-provider-chain").ConfigProviderChain;
const mockWrapper = require("./lib/mock-wrapper");
const leologger = require("leo-logger")("sdk");
function SDK(id, data, awsResourceConfig) {
if (typeof id !== "string" && id != null) {
awsResourceConfig = data;
data = id;
id = data.id || "default_bot";
}
if (awsResourceConfig == null && data &&
(data.dynamodbConfig || data.s3Config || data.firehoseConfig || data.kinesisConfig)) {
awsResourceConfig = data;
data = null;
}
let dataOrig = data;
if (data == null || data === false || data instanceof Configuration) {
let chain = data || new ConfigProviderChain();
try {
data = chain.resolveSync();
} catch (err) {
data = dataOrig;
if (data !== false) {
// This was a request using new sdk(), not the default import so throw the error
throw err;
}
// Ignore errors because this is just trying to find the defaults
}
}
// if (data.assumeRole) {
// const cred = await new AWS.STS({}).assumeRole({
// RoleArn: data.assumeRole,
// RoleSessionName: process.env.AWS_LAMBDA_FUNCTION_NAME || uuid.v4()
// }).promise();
// busConfig.credentials = sts.credentialsFrom(cred);
// }
let configuration = new LeoConfiguration(data);
configuration.awsResourceConfig = awsResourceConfig || {};
let awsConfig = leoconfig.leoaws || configuration.aws;
if (awsConfig.profile) {
let profile = awsConfig.profile;
let configFile = `${process.env.HOME || process.env.HOMEPATH}/.aws/config`;
if (fs.existsSync(configFile)) {
let config = ini.parse(fs.readFileSync(configFile, 'utf-8'));
let p = config[`profile ${profile}`];
if (p && p.mfa_serial) {
let cacheFile = `${process.env.HOME || process.env.HOMEPATH}/.aws/cli/cache/${profile}--${p.role_arn.replace(/:/g, '_').replace(/[^A-Za-z0-9\-_]/g, '-')}.json`;
let data = {};
try {
data = JSON.parse(fs.readFileSync(cacheFile));
} catch (e) {
// Ignore error, Referesh Credentials
data = {};
} finally {
console.log("Using cached AWS credentials", profile);
if (!data.Credentials || new Date() >= new Date(data.Credentials.Expiration)) {
execSync('aws sts get-caller-identity --duration-seconds 28800 --profile ' + profile);
data = JSON.parse(fs.readFileSync(cacheFile));
}
}
configuration.credentials = new aws.STS().credentialsFrom(data, data);
} else {
console.log("Switching AWS Profile", profile);
configuration.credentials = new aws.SharedIniFileCredentials(awsConfig);
}
} else {
console.log("Switching AWS Profile", awsConfig.profile);
configuration.credentials = new aws.SharedIniFileCredentials(awsConfig);
}
}
let logger = null;
if (data && data.logging) {
logger = logging(id, configuration);
}
let leoStream = ls(configuration);
if (process.env.RSTREAMS_MOCK_DATA) {
mockWrapper.default(leoStream);
}
// Only make this a function if it is the default loader
// Otherwise use an {} as the base
return Object.assign(dataOrig === false ? function(id, data) {
return new SDK(id, data);
} : {}, {
RStreamsSdk: SDK,
configuration: configuration,
destroy: (callback) => {
if (logger) {
logger.end(callback);
}
},
/**
* Stream for writing events to a queue
* @param {string} id - The id of the bot
* @param {string} outQueue - The queue into which events will be written
* @param {Object} config - An object that contains config values that control the flow of events to outQueue
* @return {stream} Stream
*/
load: leoStream.load,
/**
* Process events from a queue.
* @param {Object} opts
* @param {string} opts.id - The id of the bot
* @param {string} opts.inQueue - The queue from which events will be read
* @param {Object} opts.config - An object that contains config values that control the flow of events from inQueue
* @param {function} opts.batch - A function to batch data from inQueue (optional)
* @param {function} opts.each - A function to transform data from inQueue or from batch function, and offload from the platform
* @param {function} callback - A function called when all events have been processed. (payload, metadata, done) => { }
* @return {stream} Stream
*/
offload: leoStream.offload,
/**
* Process events from a queue.
* @param {Object} opts
* @param {string} opts.id - The id of the bot
* @param {string} opts.inQueue - The queue from which events will be read
* @param {Object} opts.config - An object that contains config values that control the flow of events from inQueue
* @param {function} opts.batch - A function to batch data from inQueue (optional)
* @param {function} opts.each - A function to transform data from inQueue or from batch function, and offload from the platform
* @return {Promise<void>}
*/
offloadEvents: promisify(leoStream.offload).bind(leoStream),
/**
* Enrich events from one queue to another.
* @param {Object} opts
* @param {string} opts.id - The id of the bot
* @param {string} opts.inQueue - The queue from which events will be read
* @param {string} opts.outQueue - The queue into which events will be written
* @param {Object} opts.config - An object that contains config values that control the flow of events from inQueue and to outQueue
* @param {function} opts.transform - A function to transform data from inQueue to outQueue
* @param {function} callback - A function called when all events have been processed. (payload, metadata, done) => { }
* @return {stream} Stream
*/
enrich: leoStream.enrich,
/**
* Enrich events from one queue to another.
* @param {Object} opts
* @param {string} opts.id - The id of the bot
* @param {string} opts.inQueue - The queue from which events will be read
* @param {string} opts.outQueue - The queue into which events will be written
* @param {Object} opts.config - An object that contains config values that control the flow of events from inQueue and to outQueue
* @param {function} opts.transform - A function to transform data from inQueue to outQueue
* @return {Promise<void>}
*/
enrichEvents: promisify(leoStream.enrich).bind(leoStream),
read: leoStream.fromLeo,
write: leoStream.toLeo,
put: function(bot_id, queue, payload, callback) {
let stream = this.load(bot_id, queue, {
kinesis: {
records: 1
}
});
stream.write(payload);
stream.end(callback);
},
putEvent: function(bot_id, queue, payload) {
return promisify(this.put).call(this, bot_id, queue, payload);
},
throughAsync: leoStream.throughAsync,
checkpoint: leoStream.toCheckpoint,
streams: leoStream,
bot: leoStream.cron,
aws: {
dynamodb: leoStream.dynamodb,
s3: leoStream.s3,
kinesis: leoStream.kinesis,
firehose: leoStream.firehose,
cloudformation: new aws.CloudFormation({
region: configuration.aws.region,
credentials: configuration.credentials
})
},
createSource: function(fn, opts = {}, state = {}) {
let log = leologger.sub("CreateSource");
// Set default option values
opts = Object.assign({
records: Number.POSITIVE_INFINITY,
milliseconds: undefined
}, opts);
// Counter/Timers
let startTime = Date.now();
let lastStart = startTime;
let totalRecords = 0;
// Stream pass through - This is the returned object
let pass = this.streams.passThrough({ objectMode: true });
// Setup a timeout if requested
let timeout;
if (opts.milliseconds != null && opts.milliseconds > 0) {
timeout = setTimeout(() => {
if (!pass.isEnding) {
log.debug('Requested timeout ms hit. Ending');
pass.end();
}
}, opts.milliseconds);
}
// Override stream end to cleanup timers
// and protect agains duplicate calls
pass.isEnding = false;
pass.orig_end = pass.end;
pass.end = function() {
log.debug('Pass.end Called');
if (!pass.isEnding) {
pass.isEnding = true;
timeout && clearTimeout(timeout);
pass.orig_end();
}
};
// Convience method for async writting with backpressure
pass.throttleWrite = function(data) {
return new Promise((resolve) => {
if (!pass.write(data)) {
pass.once('drain', () => {
resolve();
});
} else {
resolve();
}
});
};
// Generator to poll for more data
async function* poller() {
// Get the initial set of data to stream
let records = await fn(state);
// Loop yielding and fetching records until
// 1) There are no more recrods
// 2) Time runs out
// 3) We have yielding the requested number of records
outerLoop:
while ((records != null && records.length > 0) && opts.records > totalRecords && !pass.isEnding) {
for (const hit of records) {
totalRecords++;
// send the results back to the caller and wait to be resumed
// that's why this is a generator function (function*)
yield hit;
// Break out of the current batch because we hit
// an end condition
if (opts.records <= totalRecords || pass.isEnding) {
break outerLoop;
}
}
log.debug(`Batch Records: ${records.length}, Percent: ${totalRecords}/${opts.records}, Total Duration: ${Date.now() - startTime}, Batch Duration ${Date.now() - lastStart}`);
lastStart = Date.now();
// Get the next set of records
records = await fn(state);
}
}
// Async function to query and write data to the stream
let run = (async function() {
for await (const data of poller()) {
await pass.throttleWrite(data);
}
});
// Start running the async function with hooks to pass along errors
// and end the pass through
run()
.then(() => pass.end())
.catch(err => pass.emit('error', err));
return pass;
}
});
}
module.exports = new SDK(false);