diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE
index e69de29bb..78dc1831a 100644
--- a/CHANGES_NEXT_RELEASE
+++ b/CHANGES_NEXT_RELEASE
@@ -0,0 +1,2 @@
+[cygnus-ngsi] [mongo-sink] Add mongo_ssl, mongo_ssl_invalid_host_allowed, mongo_ssl_keystore_path_file and mongo_ssl_keystore_password options for mongoDB connections
+[cygnus-common] [mongo-backend] Use sslEnabled, sslInvalidHostNameAllowed, sslKeystorePathFile and sslKeystorePassword options for mongoDB connections
diff --git a/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/mongo/MongoBackendImpl.java b/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/mongo/MongoBackendImpl.java
index 9b263e01e..9d1b29b8e 100644
--- a/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/mongo/MongoBackendImpl.java
+++ b/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/mongo/MongoBackendImpl.java
@@ -40,6 +40,13 @@
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLContext;
+import java.security.NoSuchAlgorithmException;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.security.KeyStore;
+import java.io.FileInputStream;
+import java.io.InputStream;
import org.bson.Document;
/**
@@ -60,6 +67,10 @@ public enum Resolution { SECOND, MINUTE, HOUR, DAY, MONTH }
private final String mongoPassword;
private final String mongoAuthSource;
private final String mongoReplicaSet;
+ private final Boolean sslEnabled;
+ private final Boolean sslInvalidHostNameAllowed;
+ private final String sslKeystorePathFile;
+ private final String sslKeystorePassword;
private final DataModel dataModel;
private static final CygnusLogger LOGGER = new CygnusLogger(MongoBackendImpl.class);
@@ -73,13 +84,19 @@ public enum Resolution { SECOND, MINUTE, HOUR, DAY, MONTH }
* @param dataModel
*/
public MongoBackendImpl(String mongoHosts, String mongoUsername, String mongoPassword,
- String mongoAuthSource, String mongoReplicaSet, DataModel dataModel) {
+ String mongoAuthSource, String mongoReplicaSet, DataModel dataModel,
+ Boolean sslEnabled, Boolean sslInvalidHostNameAllowed,
+ String sslKeystorePathFile, String sslKeystorePassword) {
client = null;
this.mongoHosts = mongoHosts;
this.mongoUsername = mongoUsername;
this.mongoPassword = mongoPassword;
this.mongoAuthSource = mongoAuthSource;
this.mongoReplicaSet = mongoReplicaSet;
+ this.sslEnabled = sslEnabled;
+ this.sslInvalidHostNameAllowed = sslInvalidHostNameAllowed;
+ this.sslKeystorePathFile = sslKeystorePathFile;
+ this.sslKeystorePassword = sslKeystorePassword;
this.dataModel = dataModel;
} // MongoBackendImpl
@@ -583,6 +600,30 @@ private MongoDatabase getDatabase(String dbName) {
// create a Mongo client
if (client == null) {
+
+ SSLContext sslContext = null;
+ if (sslEnabled && (sslKeystorePathFile != null) && !sslKeystorePathFile.isEmpty()) {
+ try {
+ // Init TrustManager to init SSL Context
+ KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ try (InputStream keyStoreStream = new FileInputStream(sslKeystorePathFile)) {
+ keyStore.load(keyStoreStream, sslKeystorePassword.toCharArray());
+ }
+ TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(keyStore);
+ sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(null, trustManagerFactory.getTrustManagers(), new java.security.SecureRandom());
+ } catch (Exception e) {
+ LOGGER.warn("Error when init SSL Context: " + e.getMessage());
+ }
+ }
+
+ MongoClientOptions options = MongoClientOptions.builder()
+ .sslEnabled(sslEnabled)
+ .sslInvalidHostNameAllowed(sslInvalidHostNameAllowed)
+ .sslContext(sslContext)
+ .build();
+
if (mongoUsername.length() != 0) {
String authSource;
if ((mongoAuthSource != null) && !mongoAuthSource.isEmpty()) {
@@ -591,20 +632,24 @@ private MongoDatabase getDatabase(String dbName) {
authSource = dbName;
}
MongoCredential credential = MongoCredential.createCredential(mongoUsername, authSource,
- mongoPassword.toCharArray());
+ mongoPassword.toCharArray());
+
/****
// This constructor is deprecated see Mongo Client API documentation
// @deprecated Prefer {@link #MongoClient(List, MongoCredential, MongoClientOptions)}
client = new MongoClient(servers, Arrays.asList(credential));
****/
if ((mongoReplicaSet!= null) && !mongoReplicaSet.isEmpty()) {
- client = new MongoClient(servers, credential, new MongoClientOptions.Builder().
- requiredReplicaSetName(mongoReplicaSet).build());
- } else {
- client = new MongoClient(servers, credential, new MongoClientOptions.Builder().build());
+ options = MongoClientOptions.builder()
+ .requiredReplicaSetName(mongoReplicaSet)
+ .sslEnabled(sslEnabled)
+ .sslInvalidHostNameAllowed(sslInvalidHostNameAllowed)
+ .sslContext(sslContext)
+ .build();
}
+ client = new MongoClient(servers, credential, options);
} else {
- client = new MongoClient(servers);
+ client = new MongoClient(servers, options);
} // if else
} // if
diff --git a/cygnus-common/src/test/java/com/telefonica/iot/cygnus/backends/mongo/MongoBackendImplTest.java b/cygnus-common/src/test/java/com/telefonica/iot/cygnus/backends/mongo/MongoBackendImplTest.java
index 594c986cc..0625507b9 100644
--- a/cygnus-common/src/test/java/com/telefonica/iot/cygnus/backends/mongo/MongoBackendImplTest.java
+++ b/cygnus-common/src/test/java/com/telefonica/iot/cygnus/backends/mongo/MongoBackendImplTest.java
@@ -53,7 +53,7 @@ public MongoBackendImplTest() {
public void testGetRange() {
System.out.println(getTestTraceHead("[MongoBackendImpl.getRange]")
+ "-------- Given a resolution, its related range is correctly returned");
- MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null);
+ MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null);
try {
@@ -115,7 +115,7 @@ public void testGetRange() {
public void testGetOrigin() {
System.out.println(getTestTraceHead("[MongoBackendImpl.getOrigin]")
+ "-------- Given a calendar and a resolution, its related origin is correctly returned");
- MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null);
+ MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null);
GregorianCalendar calendar = new GregorianCalendar(2017, 4, 5, 11, 46, 13);
try {
@@ -198,7 +198,7 @@ public void testGetOrigin() {
public void testGetOffset() {
System.out.println(getTestTraceHead("[MongoBackendImpl.getOffset]")
+ "-------- Given a calendar and a resolution, its related offset is correctly returned");
- MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null);
+ MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null);
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
try {
@@ -266,7 +266,7 @@ public void testBuildQueryForInsertAggregated() {
String entityType = "someType";
String attrName = "someName";
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
- MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYSERVICEPATH);
+ MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYSERVICEPATH, false, false, null, null);
String queryForInsertAggregated = "{\"_id\": {\"entityId\": \"someId\", \"entityType\": \"someType\", "
+ "\"attrName\": \"someName\", \"origin\": {\"$date\": 1491392760000}, "
+ "\"resolution\": \"second\", \"range\": \"minute\"}, \"points.offset\": 13}";
@@ -353,7 +353,7 @@ public void testBuildQueryForInsertAggregated() {
throw e;
} // try catch
- backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYENTITY);
+ backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYENTITY, false, false, null, null);
queryForInsertAggregated = "{\"_id\": {\"attrName\": \"someName\", "
+ "\"origin\": {\"$date\": 1491392760000}, \"resolution\": \"second\", "
@@ -457,7 +457,7 @@ public void testBuildUpdateForUpdateNumerical() {
double sum2 = 200;
int numSamples = 2;
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
- MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null,null);
+ MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null);
String updateForUpdate = "{\"$set\": {\"attrType\": \"someType\"}, "
+ "\"$inc\": {\"points.$.samples\": 2, \"points.$.sum\": 20.0, \"points.$.sum2\": 200.0}, "
+ "\"$min\": {\"points.$.min\": 0.0}, \"$max\": {\"points.$.max\": 10.0}}";
@@ -489,7 +489,7 @@ public void testBuildUpdateForUpdateString() {
String value = "someString";
int count = 2;
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
- MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null);
+ MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null);
String updateForUpdate = "{\"$set\": {\"attrType\": \"someType\"}, "
+ "\"$inc\": {\"points.13.samples\": 2, \"points.13.occur.someString\": 2}}";
diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMongoBaseSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMongoBaseSink.java
index deb1995ab..aef37af7d 100644
--- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMongoBaseSink.java
+++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMongoBaseSink.java
@@ -39,6 +39,10 @@ public abstract class NGSIMongoBaseSink extends NGSISink {
protected String mongoPassword;
protected String mongoAuthSource;
protected String mongoReplicaSet;
+ protected Boolean sslEnabled;
+ protected Boolean sslInvalidHostNameAllowed;
+ protected String sslKeystorePathFile;
+ protected String sslKeystorePassword;
protected String dbPrefix;
protected String collectionPrefix;
protected MongoBackendImpl backend;
@@ -169,12 +173,44 @@ public void configure(Context context) {
LOGGER.warn("[" + this.getName() + "] Invalid configuration (ignore_white_spaces="
+ ignoreWhiteSpacesStr + ") -- Must be 'true' or 'false'");
} // if else
+
+ String sslEnabledStr = context.getString("mongo_ssl", "false");
+ if (sslEnabledStr.equals("true") || sslEnabledStr.equals("false")) {
+ sslEnabled = Boolean.valueOf(sslEnabledStr);
+ LOGGER.debug("[" + this.getName() + "] Reading configuration (mongo_ssl="
+ + sslEnabledStr + ")");
+ } else {
+ invalidConfiguration = true;
+ LOGGER.warn("[" + this.getName() + "] Invalid configuration (mongo_ssl="
+ + sslEnabledStr + ") -- Must be 'true' or 'false'");
+ } // if else
+
+ String sslInvalidHostNameAllowedStr = context.getString("mongo_ssl_invalid_host_allowed", "false");
+ if (sslInvalidHostNameAllowedStr.equals("true") || sslInvalidHostNameAllowedStr.equals("false")) {
+ sslInvalidHostNameAllowed = Boolean.valueOf(sslInvalidHostNameAllowedStr);
+ LOGGER.debug("[" + this.getName() + "] Reading configuration (mongo_ssl_invalid_host_allowed="
+ + sslInvalidHostNameAllowedStr + ")");
+ } else {
+ invalidConfiguration = true;
+ LOGGER.warn("[" + this.getName() + "] Invalid configuration (mongo_ssl_invalid_host_allowed="
+ + sslInvalidHostNameAllowedStr + ") -- Must be 'true' or 'false'");
+ } // if else
+
+ sslKeystorePathFile = context.getString("mongo_ssl_keystore_path_file", "");
+ LOGGER.debug("[" + this.getName() + "] Reading configuration (mongo_ssl_keystore_path_file=" + sslKeystorePathFile + ")");
+
+ sslKeystorePassword = context.getString("mongo_ssl_keystore_password", "");
+ LOGGER.debug("[" + this.getName() + "] Reading configuration (mongo_ssl_keystore_password=" + sslKeystorePassword + ")");
+
} // configure
@Override
public void start() {
try {
- backend = new MongoBackendImpl(mongoHosts, mongoUsername, mongoPassword, mongoAuthSource, mongoReplicaSet, dataModel);
+ backend = new MongoBackendImpl(mongoHosts, mongoUsername, mongoPassword,
+ mongoAuthSource, mongoReplicaSet, dataModel,
+ sslEnabled, sslInvalidHostNameAllowed,
+ sslKeystorePathFile, sslKeystorePassword);
LOGGER.debug("[" + this.getName() + "] MongoDB persistence backend created");
} catch (Exception e) {
LOGGER.error("Error while creating the MongoDB persistence backend. Details="
diff --git a/doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_mongo_sink.md b/doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_mongo_sink.md
index 6f99b5948..d1e7fda1f 100644
--- a/doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_mongo_sink.md
+++ b/doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_mongo_sink.md
@@ -320,6 +320,10 @@ When datamodel changes Cygnus tries to recreate index (delete current and create
| mongo\_auth_source | no | empty | Auth source database use to authenticate the user. Usually could be `admin`. |
| mongo\_replica_set | no | empty | Replica Set name. Note that this parameter is optional because Cygnus is able to connect to a MongoDB replica set without needing to specify its name.|
| db\_prefix | no | sth_ ||
+| mongo\_ssl | no | false | Enable SSL in mongodb connection |
+| mongo\_ssl\_invalid\_host\_allowed | no | false | Allow invalid host name in mongo SSL connections |
+| mongo\_ssl\_keystore\_path\_file | no | empty | Java SSL KeyStore path file (JKS file). A JKS file could be create from a certificate file using keytool: ```keytool -importkeystore -srckeystore certificate.p12 -srcstoretype pkcs12 -destkeystore mongo_ssl_keystore.jks``` |
+| mongo\_ssl\_keystore\_password | no | empty | Java SSL KeyStore password for keystore file (JKS file). |
| collection\_prefix | no | sth_ | `system.` is not accepted. |
| batch\_size | no | 1 | Number of events accumulated before persistence. |
| batch\_timeout | no | 30 | Number of seconds the batch will be building before it is persisted as it is. |