Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update mongo sink adding ssl options #2386

Merged
merged 15 commits into from
Jun 19, 2024
2 changes: 2 additions & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[cygnus-ngsi] [mongo-sink] Add mongo_ssl, mongo_ssl_invalid_host_allowed, mongo_ssl_keystore_path_file and mongo_ssl_keystore_password options for mongoDB connections
[cygnus-common] [mongo-backend] Use sslEnabled, sslInvalidHostNameAllowed, sslKeystorePathFile and sslKeystorePassword options for mongoDB connections
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import java.security.NoSuchAlgorithmException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.security.KeyStore;
import java.io.FileInputStream;
import java.io.InputStream;
import org.bson.Document;

/**
Expand All @@ -60,6 +67,10 @@ public enum Resolution { SECOND, MINUTE, HOUR, DAY, MONTH }
private final String mongoPassword;
private final String mongoAuthSource;
private final String mongoReplicaSet;
private final Boolean sslEnabled;
private final Boolean sslInvalidHostNameAllowed;
private final String sslKeystorePathFile;
private final String sslKeystorePassword;
private final DataModel dataModel;
private static final CygnusLogger LOGGER = new CygnusLogger(MongoBackendImpl.class);

Expand All @@ -73,13 +84,19 @@ public enum Resolution { SECOND, MINUTE, HOUR, DAY, MONTH }
* @param dataModel
*/
public MongoBackendImpl(String mongoHosts, String mongoUsername, String mongoPassword,
String mongoAuthSource, String mongoReplicaSet, DataModel dataModel) {
String mongoAuthSource, String mongoReplicaSet, DataModel dataModel,
Boolean sslEnabled, Boolean sslInvalidHostNameAllowed,
String sslKeystorePathFile, String sslKeystorePassword) {
client = null;
this.mongoHosts = mongoHosts;
this.mongoUsername = mongoUsername;
this.mongoPassword = mongoPassword;
this.mongoAuthSource = mongoAuthSource;
this.mongoReplicaSet = mongoReplicaSet;
this.sslEnabled = sslEnabled;
this.sslInvalidHostNameAllowed = sslInvalidHostNameAllowed;
this.sslKeystorePathFile = sslKeystorePathFile;
this.sslKeystorePassword = sslKeystorePassword;
this.dataModel = dataModel;
} // MongoBackendImpl

Expand Down Expand Up @@ -583,6 +600,30 @@ private MongoDatabase getDatabase(String dbName) {
// create a Mongo client

if (client == null) {

SSLContext sslContext = null;
if (sslEnabled && (sslKeystorePathFile != null) && !sslKeystorePathFile.isEmpty()) {
try {
// Init TrustManager to init SSL Context
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
try (InputStream keyStoreStream = new FileInputStream(sslKeystorePathFile)) {
keyStore.load(keyStoreStream, sslKeystorePassword.toCharArray());
}
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(keyStore);
sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustManagerFactory.getTrustManagers(), new java.security.SecureRandom());
} catch (Exception e) {
LOGGER.warn("Error when init SSL Context: " + e.getMessage());
}
}

MongoClientOptions options = MongoClientOptions.builder()
.sslEnabled(sslEnabled)
.sslInvalidHostNameAllowed(sslInvalidHostNameAllowed)
.sslContext(sslContext)
.build();

if (mongoUsername.length() != 0) {
String authSource;
if ((mongoAuthSource != null) && !mongoAuthSource.isEmpty()) {
Expand All @@ -591,20 +632,24 @@ private MongoDatabase getDatabase(String dbName) {
authSource = dbName;
}
MongoCredential credential = MongoCredential.createCredential(mongoUsername, authSource,
mongoPassword.toCharArray());
mongoPassword.toCharArray());

/****
// This constructor is deprecated see Mongo Client API documentation
// @deprecated Prefer {@link #MongoClient(List, MongoCredential, MongoClientOptions)}
client = new MongoClient(servers, Arrays.asList(credential));
****/
if ((mongoReplicaSet!= null) && !mongoReplicaSet.isEmpty()) {
client = new MongoClient(servers, credential, new MongoClientOptions.Builder().
requiredReplicaSetName(mongoReplicaSet).build());
} else {
client = new MongoClient(servers, credential, new MongoClientOptions.Builder().build());
options = MongoClientOptions.builder()
.requiredReplicaSetName(mongoReplicaSet)
.sslEnabled(sslEnabled)
.sslInvalidHostNameAllowed(sslInvalidHostNameAllowed)
.sslContext(sslContext)
.build();
}
client = new MongoClient(servers, credential, options);
} else {
client = new MongoClient(servers);
client = new MongoClient(servers, options);
} // if else
} // if

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public MongoBackendImplTest() {
public void testGetRange() {
System.out.println(getTestTraceHead("[MongoBackendImpl.getRange]")
+ "-------- Given a resolution, its related range is correctly returned");
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null);


try {
Expand Down Expand Up @@ -115,7 +115,7 @@ public void testGetRange() {
public void testGetOrigin() {
System.out.println(getTestTraceHead("[MongoBackendImpl.getOrigin]")
+ "-------- Given a calendar and a resolution, its related origin is correctly returned");
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null);
GregorianCalendar calendar = new GregorianCalendar(2017, 4, 5, 11, 46, 13);

try {
Expand Down Expand Up @@ -198,7 +198,7 @@ public void testGetOrigin() {
public void testGetOffset() {
System.out.println(getTestTraceHead("[MongoBackendImpl.getOffset]")
+ "-------- Given a calendar and a resolution, its related offset is correctly returned");
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null);
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April

try {
Expand Down Expand Up @@ -266,7 +266,7 @@ public void testBuildQueryForInsertAggregated() {
String entityType = "someType";
String attrName = "someName";
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYSERVICEPATH);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYSERVICEPATH, false, false, null, null);
String queryForInsertAggregated = "{\"_id\": {\"entityId\": \"someId\", \"entityType\": \"someType\", "
+ "\"attrName\": \"someName\", \"origin\": {\"$date\": 1491392760000}, "
+ "\"resolution\": \"second\", \"range\": \"minute\"}, \"points.offset\": 13}";
Expand Down Expand Up @@ -353,7 +353,7 @@ public void testBuildQueryForInsertAggregated() {
throw e;
} // try catch

backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYENTITY);
backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYENTITY, false, false, null, null);

queryForInsertAggregated = "{\"_id\": {\"attrName\": \"someName\", "
+ "\"origin\": {\"$date\": 1491392760000}, \"resolution\": \"second\", "
Expand Down Expand Up @@ -457,7 +457,7 @@ public void testBuildUpdateForUpdateNumerical() {
double sum2 = 200;
int numSamples = 2;
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null,null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null);
String updateForUpdate = "{\"$set\": {\"attrType\": \"someType\"}, "
+ "\"$inc\": {\"points.$.samples\": 2, \"points.$.sum\": 20.0, \"points.$.sum2\": 200.0}, "
+ "\"$min\": {\"points.$.min\": 0.0}, \"$max\": {\"points.$.max\": 10.0}}";
Expand Down Expand Up @@ -489,7 +489,7 @@ public void testBuildUpdateForUpdateString() {
String value = "someString";
int count = 2;
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null);
String updateForUpdate = "{\"$set\": {\"attrType\": \"someType\"}, "
+ "\"$inc\": {\"points.13.samples\": 2, \"points.13.occur.someString\": 2}}";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public abstract class NGSIMongoBaseSink extends NGSISink {
protected String mongoPassword;
protected String mongoAuthSource;
protected String mongoReplicaSet;
protected Boolean sslEnabled;
protected Boolean sslInvalidHostNameAllowed;
protected String sslKeystorePathFile;
protected String sslKeystorePassword;
protected String dbPrefix;
protected String collectionPrefix;
protected MongoBackendImpl backend;
Expand Down Expand Up @@ -169,12 +173,44 @@ public void configure(Context context) {
LOGGER.warn("[" + this.getName() + "] Invalid configuration (ignore_white_spaces="
+ ignoreWhiteSpacesStr + ") -- Must be 'true' or 'false'");
} // if else

String sslEnabledStr = context.getString("mongo_ssl", "false");
if (sslEnabledStr.equals("true") || sslEnabledStr.equals("false")) {
sslEnabled = Boolean.valueOf(sslEnabledStr);
LOGGER.debug("[" + this.getName() + "] Reading configuration (mongo_ssl="
+ sslEnabledStr + ")");
} else {
invalidConfiguration = true;
LOGGER.warn("[" + this.getName() + "] Invalid configuration (mongo_ssl="
+ sslEnabledStr + ") -- Must be 'true' or 'false'");
} // if else

String sslInvalidHostNameAllowedStr = context.getString("mongo_ssl_invalid_host_allowed", "false");
if (sslInvalidHostNameAllowedStr.equals("true") || sslInvalidHostNameAllowedStr.equals("false")) {
sslInvalidHostNameAllowed = Boolean.valueOf(sslInvalidHostNameAllowedStr);
LOGGER.debug("[" + this.getName() + "] Reading configuration (mongo_ssl_invalid_host_allowed="
+ sslInvalidHostNameAllowedStr + ")");
} else {
invalidConfiguration = true;
LOGGER.warn("[" + this.getName() + "] Invalid configuration (mongo_ssl_invalid_host_allowed="
+ sslInvalidHostNameAllowedStr + ") -- Must be 'true' or 'false'");
} // if else

sslKeystorePathFile = context.getString("mongo_ssl_keystore_path_file", "");
LOGGER.debug("[" + this.getName() + "] Reading configuration (mongo_ssl_keystore_path_file=" + sslKeystorePathFile + ")");

sslKeystorePassword = context.getString("mongo_ssl_keystore_password", "");
LOGGER.debug("[" + this.getName() + "] Reading configuration (mongo_ssl_keystore_password=" + sslKeystorePassword + ")");

} // configure

@Override
public void start() {
try {
backend = new MongoBackendImpl(mongoHosts, mongoUsername, mongoPassword, mongoAuthSource, mongoReplicaSet, dataModel);
backend = new MongoBackendImpl(mongoHosts, mongoUsername, mongoPassword,
mongoAuthSource, mongoReplicaSet, dataModel,
sslEnabled, sslInvalidHostNameAllowed,
sslKeystorePathFile, sslKeystorePassword);
LOGGER.debug("[" + this.getName() + "] MongoDB persistence backend created");
} catch (Exception e) {
LOGGER.error("Error while creating the MongoDB persistence backend. Details="
Expand Down
4 changes: 4 additions & 0 deletions doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_mongo_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ When datamodel changes Cygnus tries to recreate index (delete current and create
| mongo\_auth_source | no | <i>empty</i> | Auth source database use to authenticate the user. Usually could be `admin`. |
| mongo\_replica_set | no | <i>empty</i> | Replica Set name. Note that this parameter is optional because Cygnus is able to connect to a MongoDB replica set without needing to specify its name.|
| db\_prefix | no | sth_ ||
| mongo\_ssl | no | false | Enable SSL in mongodb connection |
| mongo\_ssl\_invalid\_host\_allowed | no | false | Allow invalid host name in mongo SSL connections |
| mongo\_ssl\_keystore\_path\_file | no | <i>empty</i> | Java SSL KeyStore path file (JKS file). A JKS file could be create from a certificate file using keytool: ```keytool -importkeystore -srckeystore certificate.p12 -srcstoretype pkcs12 -destkeystore mongo_ssl_keystore.jks``` |
| mongo\_ssl\_keystore\_password | no | <i>empty</i> | Java SSL KeyStore password for keystore file (JKS file). |
| collection\_prefix | no | sth_ | `system.` is not accepted. |
| batch\_size | no | 1 | Number of events accumulated before persistence. |
| batch\_timeout | no | 30 | Number of seconds the batch will be building before it is persisted as it is. |
Expand Down
Loading