Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: RDO-2-all-errors-to-file-sd-ks #252

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var __asyncValues = (this && this.__asyncValues) || function (o) {
Object.defineProperty(exports, "__esModule", { value: true });
exports.Consumer = void 0;
const events = require("events");
const errors_1 = require("./errors");
const memphis_1 = require("./memphis");
const message_1 = require("./message");
const utils_1 = require("./utils");
Expand Down Expand Up @@ -85,7 +86,7 @@ class Consumer {
var _a, e_1, _b, _c;
try {
if (batchSize > maxBatchSize || batchSize < 1) {
throw (0, utils_1.MemphisError)(new Error(`Batch size can not be greater than ${maxBatchSize} or less than 1`));
throw errors_1.MemphisErrors.IncorrectBatchSize(maxBatchSize);
}
let streamName = `${this.internalStationName}`;
let stationPartitions = this.connection.stationPartitions.get(this.internalStationName);
Expand All @@ -97,7 +98,7 @@ class Consumer {
}
else if (stationPartitions != null && stationPartitions.length > 0) {
if (consumerPartitionNumber > 0 && consumerPartitionKey != null) {
throw (0, utils_1.MemphisError)(new Error('Can not use both partition number and partition key'));
throw errors_1.MemphisErrors.GivenBothPartitionNumAndKey;
}
if (consumerPartitionKey != null) {
const partitionNumberKey = this.connection._getPartitionFromKey(consumerPartitionKey, this.internalStationName);
Expand Down
26 changes: 26 additions & 0 deletions lib/errors.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
export declare const MemphisErrors: {
IncorrectBatchSize: (maxBatchSize: number) => Error;
GivenBothPartitionNumAndKey: Error;
InvalidJSONSchema: Error;
InvalidAVROSchema: Error;
DeadConnection: Error;
NegativeStartConsumeFromSeq: Error;
InvalidLastMessages: Error;
GivenBothLastMessagesAndStartConsume: Error;
ProducingWithoutConnection: Error;
FetchingWithoutConnection: Error;
UnsupportedSchemaType: Error;
UnsupportedSchemaNameChars: Error;
InvalidSchemaNameStartOrEnd: Error;
EmptySchemaName: Error;
SchemaNameTooLong: Error;
InvalidHeaderKeyNameStart: Error;
DeserializationFailure: (ex: Error) => Error;
CannotDelayDLSMsg: Error;
UnsupportedHeaderFormat: Error;
FailedToProduce: Error;
ExpectingJSONFormat: (ex: Error) => Error;
UnsupportedMessageType: Error;
ExpectingAVROFormat: (ex: Error) => Error;
FailedSchemaValidation: (toPrint: any) => Error;
};
40 changes: 40 additions & 0 deletions lib/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.MemphisErrors = void 0;
const utils_1 = require("./utils");
exports.MemphisErrors = {
IncorrectBatchSize: (maxBatchSize) => {
return (0, utils_1.MemphisError)(new Error(`Batch size can not be greater than ${maxBatchSize} or less than 1`));
},
GivenBothPartitionNumAndKey: (0, utils_1.MemphisError)(new Error('Can not use both partition number and partition key')),
InvalidJSONSchema: (0, utils_1.MemphisError)(new Error('invalid json schema')),
InvalidAVROSchema: (0, utils_1.MemphisError)(new Error('invalid avro schema')),
DeadConnection: (0, utils_1.MemphisError)(new Error('Connection is dead')),
NegativeStartConsumeFromSeq: (0, utils_1.MemphisError)(new Error('startConsumeFromSequence has to be a positive number')),
InvalidLastMessages: (0, utils_1.MemphisError)(new Error('min value for LastMessages is -1')),
GivenBothLastMessagesAndStartConsume: (0, utils_1.MemphisError)(new Error("Consumer creation options can't contain both startConsumeFromSequence and lastMessages")),
ProducingWithoutConnection: (0, utils_1.MemphisError)(new Error('Cant produce a message without being connected!')),
FetchingWithoutConnection: (0, utils_1.MemphisError)(new Error('Cant fetch messages without being connected!')),
UnsupportedSchemaType: (0, utils_1.MemphisError)(new Error("Schema type not supported")),
UnsupportedSchemaNameChars: (0, utils_1.MemphisError)(new Error("Only alphanumeric and the '_', '-', '.' characters are allowed in the schema name")),
InvalidSchemaNameStartOrEnd: (0, utils_1.MemphisError)(new Error("schema name can not start or end with non alphanumeric character")),
EmptySchemaName: (0, utils_1.MemphisError)(new Error("schema name can not be empty")),
SchemaNameTooLong: (0, utils_1.MemphisError)(new Error("schema name should be under 128 characters")),
InvalidHeaderKeyNameStart: (0, utils_1.MemphisError)(new Error('Keys in headers should not start with $memphis')),
DeserializationFailure: (ex) => {
return (0, utils_1.MemphisError)(new Error(`Deserialization has been failed since the message format does not align with the currently attached schema: ${ex.message}`));
},
CannotDelayDLSMsg: (0, utils_1.MemphisError)(new Error('cannot delay DLS message')),
UnsupportedHeaderFormat: (0, utils_1.MemphisError)(new Error('headers has to be a Javascript object or an instance of MsgHeaders')),
FailedToProduce: (0, utils_1.MemphisError)(new Error('Produce operation has failed, please check whether Station/Producer still exist')),
ExpectingJSONFormat: (ex) => {
return (0, utils_1.MemphisError)(new Error('Expecting Json format: ' + ex));
},
UnsupportedMessageType: (0, utils_1.MemphisError)(new Error('Unsupported message type')),
ExpectingAVROFormat: (ex) => {
return (0, utils_1.MemphisError)(new Error('Expecting Avro format: ' + ex));
},
FailedSchemaValidation: (toPrint) => {
return (0, utils_1.MemphisError)(new Error(`Schema validation has failed: ${toPrint}`));
},
};
31 changes: 16 additions & 15 deletions lib/memphis.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const message_header_1 = require("./message-header");
const producer_1 = require("./producer");
const station_1 = require("./station");
const utils_1 = require("./utils");
const errors_1 = require("./errors");
const avro = require('avro-js');
const murmurhash = require('murmurhash');
const appId = (0, uuid_1.v4)();
Expand Down Expand Up @@ -382,7 +383,7 @@ class Memphis {
return validate;
}
catch (ex) {
throw (0, utils_1.MemphisError)(new Error('invalid json schema'));
throw errors_1.MemphisErrors.InvalidJSONSchema;
}
}
}
Expand All @@ -398,7 +399,7 @@ class Memphis {
return validate;
}
catch (ex) {
throw (0, utils_1.MemphisError)(new Error('invalid avro schema'));
throw errors_1.MemphisErrors.InvalidAVROSchema;
}
}
_compileGraphQl(stationName) {
Expand Down Expand Up @@ -622,7 +623,7 @@ class Memphis {
var _a;
try {
if (!this.isConnectionActive)
throw (0, utils_1.MemphisError)(new Error('Connection is dead'));
throw errors_1.MemphisErrors.DeadConnection;
const realName = producerName.toLowerCase();
if (Array.isArray(stationName)) {
return new producer_1.Producer(this, producerName, stationName, realName, []);
Expand Down Expand Up @@ -691,7 +692,7 @@ class Memphis {
if (!this.isConnectionActive)
throw new Error('Connection is dead');
if (batchSize > maxBatchSize || batchSize < 1) {
throw (0, utils_1.MemphisError)(new Error(`Batch size can not be greater than ${maxBatchSize} or less than 1`));
throw errors_1.MemphisErrors.IncorrectBatchSize(maxBatchSize);
}
const realName = consumerName.toLowerCase();
if (genUniqueSuffix) {
Expand All @@ -702,13 +703,13 @@ class Memphis {
: consumerName;
consumerGroup = consumerGroup || consumerName;
if (startConsumeFromSequence <= 0) {
throw (0, utils_1.MemphisError)(new Error('startConsumeFromSequence has to be a positive number'));
throw errors_1.MemphisErrors.NegativeStartConsumeFromSeq;
}
if (lastMessages < -1) {
throw (0, utils_1.MemphisError)(new Error('min value for LastMessages is -1'));
throw errors_1.MemphisErrors.InvalidLastMessages;
}
if (startConsumeFromSequence > 1 && lastMessages > -1) {
throw (0, utils_1.MemphisError)(new Error("Consumer creation options can't contain both startConsumeFromSequence and lastMessages"));
throw errors_1.MemphisErrors.GivenBothLastMessagesAndStartConsume;
}
const createConsumerReq = {
name: consumerName,
Expand Down Expand Up @@ -764,7 +765,7 @@ class Memphis {
async produce({ stationName, producerName, genUniqueSuffix = false, message, ackWaitSec, asyncProduce, headers, msgId, producerPartitionKey = null, producerPartitionNumber = -1 }) {
let producer;
if (!this.isConnectionActive)
throw (0, utils_1.MemphisError)(new Error('Cant produce a message without being connected!'));
throw errors_1.MemphisErrors.ProducingWithoutConnection;
if (typeof stationName === 'string') {
const internalStationName = stationName.replace(/\./g, '#').toLowerCase();
const producerMapKey = `${internalStationName}_${producerName.toLowerCase()}`;
Expand Down Expand Up @@ -801,9 +802,9 @@ class Memphis {
async fetchMessages({ stationName, consumerName, consumerGroup = '', genUniqueSuffix = false, batchSize = 10, maxAckTimeMs = 30000, batchMaxTimeToWaitMs = 100, maxMsgDeliveries = 2, startConsumeFromSequence = 1, lastMessages = -1, consumerPartitionKey = null, consumerPartitionNumber = -1, }) {
let consumer;
if (!this.isConnectionActive)
throw (0, utils_1.MemphisError)(new Error('Cant fetch messages without being connected!'));
throw errors_1.MemphisErrors.FetchingWithoutConnection;
if (batchSize > maxBatchSize || batchSize < 1) {
throw (0, utils_1.MemphisError)(new Error(`Batch size can not be greater than ${maxBatchSize} or less than 1`));
throw errors_1.MemphisErrors.IncorrectBatchSize(maxBatchSize);
}
if (genUniqueSuffix) {
console.log("Deprecation warning: genUniqueSuffix will be stopped to be supported after November 1'st, 2023.");
Expand Down Expand Up @@ -905,18 +906,18 @@ class Memphis {
async createSchema({ schemaName, schemaType, schemaFilePath, timeoutRetry = 5 }) {
try {
if (schemaType !== "json" && schemaType !== "graphql" && schemaType !== "protobuf" && schemaType !== "avro")
throw (0, utils_1.MemphisError)(new Error("Schema type not supported"));
throw errors_1.MemphisErrors.UnsupportedSchemaType;
var nameConvention = RegExp('^[a-z0-9_.-]*$');
if (!nameConvention.test(schemaName))
throw (0, utils_1.MemphisError)(new Error("Only alphanumeric and the '_', '-', '.' characters are allowed in the schema name"));
throw errors_1.MemphisErrors.UnsupportedSchemaNameChars;
var firstChar = Array.from(schemaName)[0];
var lastChar = Array.from(schemaName)[-1];
if (firstChar === "." || firstChar === "_" || firstChar === "-" || lastChar === "." || lastChar === "_" || lastChar === "-")
throw (0, utils_1.MemphisError)(new Error("schema name can not start or end with non alphanumeric character"));
throw errors_1.MemphisErrors.InvalidSchemaNameStartOrEnd;
if (schemaName.length === 0)
throw (0, utils_1.MemphisError)(new Error("schema name can not be empty"));
throw errors_1.MemphisErrors.EmptySchemaName;
if (schemaName.length > 128)
throw (0, utils_1.MemphisError)(new Error("schema name should be under 128 characters"));
throw errors_1.MemphisErrors.SchemaNameTooLong;
var schemContent = fs.readFileSync(schemaFilePath, 'utf-8');
var createSchemaReq = {
name: schemaName,
Expand Down
4 changes: 2 additions & 2 deletions lib/message-header.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Object.defineProperty(exports, "__esModule", { value: true });
exports.MsgHeaders = void 0;
const nats_1 = require("nats");
const utils_1 = require("./utils");
const errors_1 = require("./errors");
class MsgHeaders {
constructor() {
this.headers = (0, nats_1.headers)();
Expand All @@ -12,7 +12,7 @@ class MsgHeaders {
this.headers.append(key, value);
}
else {
throw (0, utils_1.MemphisError)(new Error('Keys in headers should not start with $memphis'));
throw errors_1.MemphisErrors.InvalidHeaderKeyNameStart;
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions lib/message.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Message = void 0;
const utils_1 = require("./utils");
const station_1 = require("./station");
const errors_1 = require("./errors");
class Message {
constructor(message, connection, cgName, internalStationName, partition_number) {
this.message = message;
Expand Down Expand Up @@ -77,7 +77,7 @@ class Message {
this.station._validateMessage(message);
}
catch (ex) {
throw (0, utils_1.MemphisError)(new Error(`Deserialization has been failed since the message format does not align with the currently attached schema: ${ex.message}`));
throw errors_1.MemphisErrors.DeserializationFailure(ex);
}
switch (stationSchemaData['type']) {
case 'protobuf':
Expand Down Expand Up @@ -135,7 +135,7 @@ class Message {
if (this.message.nak)
this.message.nak(millis);
else
throw (0, utils_1.MemphisError)(new Error('cannot delay DLS message'));
throw errors_1.MemphisErrors.CannotDelayDLSMsg;
}
}
exports.Message = Message;
7 changes: 4 additions & 3 deletions lib/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ exports.Producer = void 0;
const _1 = require(".");
const utils_1 = require("./utils");
const station_1 = require("./station");
const errors_1 = require("./errors");
const schemaVFailAlertType = 'schema_validation_fail_alert';
class Producer {
constructor(connection, producerName, stationName, realName, partitions) {
Expand Down Expand Up @@ -33,7 +34,7 @@ class Producer {
type = 'object';
}
else {
throw (0, utils_1.MemphisError)(new Error('headers has to be a Javascript object or an instance of MsgHeaders'));
throw errors_1.MemphisErrors.UnsupportedHeaderFormat;
}
switch (type) {
case 'object':
Expand Down Expand Up @@ -85,7 +86,7 @@ class Producer {
}
else if (stationPartitions != null && stationPartitions.length > 0) {
if (producerPartitionNumber > 0 && producerPartitionKey != null) {
throw (0, utils_1.MemphisError)(new Error('Can not use both partition number and partition key'));
throw errors_1.MemphisErrors.GivenBothPartitionNumAndKey;
}
if (producerPartitionKey != null) {
const partitionNumberKey = this.connection._getPartitionFromKey(producerPartitionKey, this.internalStation);
Expand Down Expand Up @@ -145,7 +146,7 @@ class Producer {
}
async _hanldeProduceError(ex, message, headers) {
if (ex.code === '503') {
throw (0, utils_1.MemphisError)(new Error('Produce operation has failed, please check whether Station/Producer still exist'));
throw errors_1.MemphisErrors.FailedToProduce;
}
if (ex.message.includes('BAD_PAYLOAD'))
ex = (0, utils_1.MemphisError)(new Error('Invalid message format, expecting Uint8Array'));
Expand Down
Loading