Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update mongo sink adding ssl options #2385

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[cygnus-ngsi] [mongo-sink] Add mongo_ssl and mongo_ssl_invalid_host_allowed options for mongoDB connections
[cygnus-common] [mongo-backend] Use sslEnabled and sslInvalidHostNameAllowed options for mongoDB connections
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public enum Resolution { SECOND, MINUTE, HOUR, DAY, MONTH }
private final String mongoPassword;
private final String mongoAuthSource;
private final String mongoReplicaSet;
private final Boolean sslEnabled;
private final Boolean sslInvalidHostNameAllowed;
private final DataModel dataModel;
private static final CygnusLogger LOGGER = new CygnusLogger(MongoBackendImpl.class);

Expand All @@ -73,13 +75,16 @@ public enum Resolution { SECOND, MINUTE, HOUR, DAY, MONTH }
* @param dataModel
*/
public MongoBackendImpl(String mongoHosts, String mongoUsername, String mongoPassword,
String mongoAuthSource, String mongoReplicaSet, DataModel dataModel) {
String mongoAuthSource, String mongoReplicaSet, DataModel dataModel,
Boolean sslEnabled, Boolean sslInvalidHostNameAllowed) {
client = null;
this.mongoHosts = mongoHosts;
this.mongoUsername = mongoUsername;
this.mongoPassword = mongoPassword;
this.mongoAuthSource = mongoAuthSource;
this.mongoReplicaSet = mongoReplicaSet;
this.sslEnabled = sslEnabled;
this.sslInvalidHostNameAllowed = sslInvalidHostNameAllowed;
this.dataModel = dataModel;
} // MongoBackendImpl

Expand Down Expand Up @@ -592,19 +597,30 @@ private MongoDatabase getDatabase(String dbName) {
}
MongoCredential credential = MongoCredential.createCredential(mongoUsername, authSource,
mongoPassword.toCharArray());

/****
// This constructor is deprecated see Mongo Client API documentation
// @deprecated Prefer {@link #MongoClient(List, MongoCredential, MongoClientOptions)}
client = new MongoClient(servers, Arrays.asList(credential));
****/
if ((mongoReplicaSet!= null) && !mongoReplicaSet.isEmpty()) {
client = new MongoClient(servers, credential, new MongoClientOptions.Builder().
requiredReplicaSetName(mongoReplicaSet).build());
requiredReplicaSetName(mongoReplicaSet).
sslEnabled(sslEnabled).
sslInvalidHostNameAllowed(sslInvalidHostNameAllowed).
build());
} else {
client = new MongoClient(servers, credential, new MongoClientOptions.Builder().build());
client = new MongoClient(servers, credential, new MongoClientOptions.Builder().
sslEnabled(sslEnabled).
sslInvalidHostNameAllowed(sslInvalidHostNameAllowed).
build());
}
} else {
client = new MongoClient(servers);
MongoClientOptions options = MongoClientOptions.builder()
.sslEnabled(sslEnabled)
.sslInvalidHostNameAllowed(sslInvalidHostNameAllowed)
.build();
client = new MongoClient(servers, options);
} // if else
} // if

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public MongoBackendImplTest() {
public void testGetRange() {
System.out.println(getTestTraceHead("[MongoBackendImpl.getRange]")
+ "-------- Given a resolution, its related range is correctly returned");
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false);


try {
Expand Down Expand Up @@ -115,7 +115,7 @@ public void testGetRange() {
public void testGetOrigin() {
System.out.println(getTestTraceHead("[MongoBackendImpl.getOrigin]")
+ "-------- Given a calendar and a resolution, its related origin is correctly returned");
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false);
GregorianCalendar calendar = new GregorianCalendar(2017, 4, 5, 11, 46, 13);

try {
Expand Down Expand Up @@ -198,7 +198,7 @@ public void testGetOrigin() {
public void testGetOffset() {
System.out.println(getTestTraceHead("[MongoBackendImpl.getOffset]")
+ "-------- Given a calendar and a resolution, its related offset is correctly returned");
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false);
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April

try {
Expand Down Expand Up @@ -266,7 +266,7 @@ public void testBuildQueryForInsertAggregated() {
String entityType = "someType";
String attrName = "someName";
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYSERVICEPATH);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYSERVICEPATH, false, false);
String queryForInsertAggregated = "{\"_id\": {\"entityId\": \"someId\", \"entityType\": \"someType\", "
+ "\"attrName\": \"someName\", \"origin\": {\"$date\": 1491392760000}, "
+ "\"resolution\": \"second\", \"range\": \"minute\"}, \"points.offset\": 13}";
Expand Down Expand Up @@ -353,7 +353,7 @@ public void testBuildQueryForInsertAggregated() {
throw e;
} // try catch

backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYENTITY);
backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYENTITY, false, false);

queryForInsertAggregated = "{\"_id\": {\"attrName\": \"someName\", "
+ "\"origin\": {\"$date\": 1491392760000}, \"resolution\": \"second\", "
Expand Down Expand Up @@ -457,7 +457,7 @@ public void testBuildUpdateForUpdateNumerical() {
double sum2 = 200;
int numSamples = 2;
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null,null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false);
String updateForUpdate = "{\"$set\": {\"attrType\": \"someType\"}, "
+ "\"$inc\": {\"points.$.samples\": 2, \"points.$.sum\": 20.0, \"points.$.sum2\": 200.0}, "
+ "\"$min\": {\"points.$.min\": 0.0}, \"$max\": {\"points.$.max\": 10.0}}";
Expand Down Expand Up @@ -489,7 +489,7 @@ public void testBuildUpdateForUpdateString() {
String value = "someString";
int count = 2;
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false);
String updateForUpdate = "{\"$set\": {\"attrType\": \"someType\"}, "
+ "\"$inc\": {\"points.13.samples\": 2, \"points.13.occur.someString\": 2}}";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public abstract class NGSIMongoBaseSink extends NGSISink {
protected String mongoPassword;
protected String mongoAuthSource;
protected String mongoReplicaSet;
protected Boolean sslEnabled;
protected Boolean sslInvalidHostNameAllowed;
protected String dbPrefix;
protected String collectionPrefix;
protected MongoBackendImpl backend;
Expand Down Expand Up @@ -169,12 +171,37 @@ public void configure(Context context) {
LOGGER.warn("[" + this.getName() + "] Invalid configuration (ignore_white_spaces="
+ ignoreWhiteSpacesStr + ") -- Must be 'true' or 'false'");
} // if else

String sslEnabledStr = context.getString("mongo_ssl", "false");
if (sslEnabledStr.equals("true") || sslEnabledStr.equals("false")) {
sslEnabled = Boolean.valueOf(sslEnabledStr);
LOGGER.debug("[" + this.getName() + "] Reading configuration (mongo_ssl="
+ sslEnabledStr + ")");
} else {
invalidConfiguration = true;
LOGGER.warn("[" + this.getName() + "] Invalid configuration (mongo_ssl="
+ sslEnabledStr + ") -- Must be 'true' or 'false'");
} // if else

String sslInvalidHostNameAllowedStr = context.getString("mongo_ssl_invalid_host_allowed", "false");
if (sslInvalidHostNameAllowedStr.equals("true") || sslInvalidHostNameAllowedStr.equals("false")) {
sslInvalidHostNameAllowed = Boolean.valueOf(sslInvalidHostNameAllowedStr);
LOGGER.debug("[" + this.getName() + "] Reading configuration (mongo_ssl_invalid_host_allowed="
+ sslInvalidHostNameAllowedStr + ")");
} else {
invalidConfiguration = true;
LOGGER.warn("[" + this.getName() + "] Invalid configuration (mongo_ssl_invalid_host_allowed="
+ sslInvalidHostNameAllowedStr + ") -- Must be 'true' or 'false'");
} // if else

} // configure

@Override
public void start() {
try {
backend = new MongoBackendImpl(mongoHosts, mongoUsername, mongoPassword, mongoAuthSource, mongoReplicaSet, dataModel);
backend = new MongoBackendImpl(mongoHosts, mongoUsername, mongoPassword,
mongoAuthSource, mongoReplicaSet, dataModel,
sslEnabled, sslInvalidHostNameAllowed);
LOGGER.debug("[" + this.getName() + "] MongoDB persistence backend created");
} catch (Exception e) {
LOGGER.error("Error while creating the MongoDB persistence backend. Details="
Expand Down
2 changes: 2 additions & 0 deletions doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_mongo_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ When datamodel changes Cygnus tries to recreate index (delete current and create
| mongo\_auth_source | no | <i>empty</i> | Auth source database use to authenticate the user. Usually could be `admin`. |
| mongo\_replica_set | no | <i>empty</i> | Replica Set name. Note that this parameter is optional because Cygnus is able to connect to a MongoDB replica set without needing to specify its name.|
| db\_prefix | no | sth_ ||
| mongo\_ssl | no | false | Enable SSL in mongodb connection |
| mongo\_ssl\_invalid\_host\_allowed | no | false | Allow invalid host name in mongo SSL connections |
| collection\_prefix | no | sth_ | `system.` is not accepted. |
| batch\_size | no | 1 | Number of events accumulated before persistence. |
| batch\_timeout | no | 30 | Number of seconds the batch will be building before it is persisted as it is. |
Expand Down
Loading