diff --git a/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImpl.java b/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImpl.java index 461b6e422..26697cd4a 100644 --- a/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImpl.java +++ b/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImpl.java @@ -49,6 +49,7 @@ public class SQLBackendImpl implements SQLBackend{ private SQLBackendImpl.SQLDriver driver; private final SQLCache cache; protected SQLInstance sqlInstance; + protected String sqlHost; private final boolean persistErrors; private final int maxLatestErrors; private static final String DEFAULT_ERROR_TABLE_SUFFIX = "_error_log"; @@ -116,6 +117,7 @@ public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions, boolean persistErrors, int maxLatestErrors) { driver = new SQLBackendImpl.SQLDriver(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions); cache = new SQLCache(); + this.sqlHost = sqlHost; this.sqlInstance = sqlInstance; this.persistErrors = persistErrors; this.maxLatestErrors = maxLatestErrors; @@ -174,7 +176,7 @@ public void createDestination(String destination) throws CygnusRuntimeError, Cyg Statement stmt = null; // get a connection to an empty destination - Connection con = driver.getConnection(""); + Connection con = driver.getConnection("", this.sqlHost); String query = ""; if (sqlInstance == SQLInstance.MYSQL) { @@ -229,7 +231,7 @@ public void createTable(String dataBase, String schema, String table, String typ Statement stmt = null; // get a connection to the given destination - Connection con = driver.getConnection(dataBase); + Connection con = driver.getConnection(dataBase, this.sqlHost); String query = ""; if (sqlInstance == SQLInstance.MYSQL) { query = "create table if not exists `" + tableName + "`" + typedFieldNames; @@ -274,7 +276,7 @@ public void insertContextData(String dataBase, String schema, String table, Stri String tableName = table; // get a connection to the given destination - Connection con = driver.getConnection(dataBase); + Connection con = driver.getConnection(dataBase, this.sqlHost); String query = ""; if (sqlInstance == SQLInstance.MYSQL) { query = "insert into `" + tableName + "` " + fieldNames + " values " + fieldValues; @@ -315,7 +317,7 @@ private CachedRowSet select(String dataBase, String schema, String tableName, St Statement stmt = null; // get a connection to the given destination - Connection con = driver.getConnection(dataBase); + Connection con = driver.getConnection(dataBase, this.sqlHost); String query = ""; if (sqlInstance == SQLInstance.MYSQL) { query = "select " + selection + " from `" + tableName + "` order by recvTime desc limit " + DEFAULT_LIMIT_SELECT_EXP_RECORDS; @@ -362,7 +364,7 @@ private void delete(String dataBase, String schema, String tableName, String fil Statement stmt = null; // get a connection to the given destination - Connection con = driver.getConnection(dataBase); + Connection con = driver.getConnection(dataBase, this.sqlHost); String query = ""; if (sqlInstance == SQLInstance.MYSQL) { query = "delete from `" + tableName + "` where " + filters; @@ -604,7 +606,7 @@ public void createErrorTable(String dataBase, String schema) Statement stmt = null; // get a connection to the given destination - Connection con = driver.getConnection(dataBase); + Connection con = driver.getConnection(dataBase, this.sqlHost); String query = ""; if (sqlInstance == SQLInstance.MYSQL) { @@ -672,7 +674,7 @@ public void upsertTransaction (LinkedHashMap> agg try { - connection = driver.getConnection(dataBase); + connection = driver.getConnection(dataBase, this.sqlHost); connection.setAutoCommit(false); ArrayList upsertQuerysList = SQLQueryUtils.sqlUpsertQuery(aggregation, @@ -758,7 +760,7 @@ public void insertTransaction (LinkedHashMap> agg try { - connection = driver.getConnection(dataBase); + connection = driver.getConnection(dataBase, this.sqlHost); connection.setAutoCommit(false); insertQuery = SQLQueryUtils.sqlInsertQuery(aggregation, @@ -841,7 +843,7 @@ public void purgeErrorTable(String dataBase, String schema) Statement stmt = null; // get a connection to the given destination - Connection con = driver.getConnection(dataBase); + Connection con = driver.getConnection(dataBase, this.sqlHost); String query = ""; if (sqlInstance == SQLInstance.MYSQL) { @@ -885,7 +887,7 @@ private void insertErrorLog(String dataBase, String schema, String errorQuery, E ", query)"; // get a connection to the given destination - Connection con = driver.getConnection(dataBase); + Connection con = driver.getConnection(dataBase, this.sqlHost); String query = ""; if (sqlInstance == SQLInstance.MYSQL) { @@ -988,21 +990,25 @@ public SQLDriver(String sqlHost, String sqlPort, String sqlUsername, String sqlP * @throws CygnusRuntimeError * @throws CygnusPersistenceError */ - public Connection getConnection(String destination) throws CygnusRuntimeError, CygnusPersistenceError { + public Connection getConnection(String destination, String sqlHost) throws CygnusRuntimeError, CygnusPersistenceError { try { // FIXME: the number of cached connections should be limited to // a certain number; with such a limit // number, if a new connection is needed, the oldest one is closed Connection connection = null; - if (datasources.containsKey(destination)) { - connection = datasources.get(destination).getConnection(); - LOGGER.debug(sqlInstance.toString().toUpperCase() + " Recovered destination connection from cache (" + destination + ")"); + //if (datasources.containsKey(destination)) { + if (datasources.containsKey(sqlHost)) { + //connection = datasources.get(destination).getConnection(); + connection = datasources.get(sqlHost).getConnection(); + //LOGGER.debug(sqlInstance.toString().toUpperCase() + " Recovered destination connection from cache (" + destination + ")"); + LOGGER.debug(sqlInstance.toString().toUpperCase() + " Recovered destination connection from cache (" + sqlHost + ")"); } if (connection == null || !connection.isValid(0)) { if (connection != null) { - LOGGER.debug(sqlInstance.toString().toUpperCase() + " Closing invalid sql connection for destination " + destination); + //LOGGER.debug(sqlInstance.toString().toUpperCase() + " Closing invalid sql connection for destination " + destination); + LOGGER.debug(sqlInstance.toString().toUpperCase() + " Closing invalid sql connection for destination " + sqlHost); try { connection.close(); } catch (SQLException e) { @@ -1010,8 +1016,10 @@ public Connection getConnection(String destination) throws CygnusRuntimeError, C } } // if - DataSource datasource = createConnectionPool(destination); - datasources.put(destination, datasource); + //DataSource datasource = createConnectionPool(destination); + DataSource datasource = createConnectionPool(sqlHost); + //datasources.put(destination, datasource); + datasources.put(sqlHost, datasource); connection = datasource.getConnection(); if (sqlInstance == SQLInstance.ORACLE) { // set proper NLS_TIMESTAMP formats for current session @@ -1025,12 +1033,16 @@ public Connection getConnection(String destination) throws CygnusRuntimeError, C } // if // Check Pool cache and log status - if (pools.containsKey(destination)){ - GenericObjectPool pool = pools.get(destination); - LOGGER.debug(sqlInstance.toString().toUpperCase() + " Pool status (" + destination + ") Max.: " + pool.getMaxActive() + "; Active: " + //if (pools.containsKey(destination)){ + if (pools.containsKey(sqlHost)){ + //GenericObjectPool pool = pools.get(destination); + GenericObjectPool pool = pools.get(sqlHost); + //LOGGER.debug(sqlInstance.toString().toUpperCase() + " Pool status (" + destination + ") Max.: " + pool.getMaxActive() + "; Active: " + LOGGER.debug(sqlInstance.toString().toUpperCase() + " Pool status (" + sqlHost + ") Max.: " + pool.getMaxActive() + "; Active: " + pool.getNumActive() + "; Idle: " + pool.getNumIdle()); }else{ - LOGGER.error(sqlInstance.toString().toUpperCase() + " Can't find dabase in pool cache (" + destination + ")"); + //LOGGER.error(sqlInstance.toString().toUpperCase() + " Can't find dabase in pool cache (" + destination + ")"); + LOGGER.error(sqlInstance.toString().toUpperCase() + " Can't find dabase in pool cache (" + sqlHost + ")"); } return connection; } catch (ClassNotFoundException e) { diff --git a/cygnus-common/src/test/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImplTest.java b/cygnus-common/src/test/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImplTest.java index 50c61eb7f..04ca21eff 100644 --- a/cygnus-common/src/test/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImplTest.java +++ b/cygnus-common/src/test/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImplTest.java @@ -89,10 +89,10 @@ public void setUp() throws Exception { backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, persistErrors, maxLatestErrors); // set up the behaviour of the mocked classes - when(mockDriverDbCreate.getConnection(Mockito.anyString())).thenReturn(mockConnection); + when(mockDriverDbCreate.getConnection(Mockito.anyString(), Mockito.anyString())).thenReturn(mockConnection); when(mockDriverDbCreate.isConnectionCreated(Mockito.anyString())).thenReturn(true); when(mockDriverDbCreate.numConnectionsCreated()).thenReturn(1); - when(mockDriverTableCreate.getConnection(Mockito.anyString())).thenReturn(mockConnection); + when(mockDriverTableCreate.getConnection(Mockito.anyString(), Mockito.anyString())).thenReturn(mockConnection); when(mockDriverTableCreate.isConnectionCreated(Mockito.anyString())).thenReturn(true, true, true, true, true); when(mockConnection.createStatement()).thenReturn(mockStatement); when(mockStatement.executeUpdate(Mockito.anyString())).thenReturn(1);