Skip to content

Commit

Permalink
use host instead of database for connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
AlvaroVega committed Apr 18, 2024
1 parent dc2b705 commit f59d561
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class SQLBackendImpl implements SQLBackend{
private SQLBackendImpl.SQLDriver driver;
private final SQLCache cache;
protected SQLInstance sqlInstance;
protected String sqlHost;
private final boolean persistErrors;
private final int maxLatestErrors;
private static final String DEFAULT_ERROR_TABLE_SUFFIX = "_error_log";
Expand Down Expand Up @@ -116,6 +117,7 @@ public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions, boolean persistErrors, int maxLatestErrors) {
driver = new SQLBackendImpl.SQLDriver(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions);
cache = new SQLCache();
this.sqlHost = sqlHost;
this.sqlInstance = sqlInstance;
this.persistErrors = persistErrors;
this.maxLatestErrors = maxLatestErrors;
Expand Down Expand Up @@ -174,7 +176,7 @@ public void createDestination(String destination) throws CygnusRuntimeError, Cyg
Statement stmt = null;

// get a connection to an empty destination
Connection con = driver.getConnection("");
Connection con = driver.getConnection("", this.sqlHost);

String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
Expand Down Expand Up @@ -229,7 +231,7 @@ public void createTable(String dataBase, String schema, String table, String typ
Statement stmt = null;

// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);
String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
query = "create table if not exists `" + tableName + "`" + typedFieldNames;
Expand Down Expand Up @@ -274,7 +276,7 @@ public void insertContextData(String dataBase, String schema, String table, Stri
String tableName = table;

// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);
String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
query = "insert into `" + tableName + "` " + fieldNames + " values " + fieldValues;
Expand Down Expand Up @@ -315,7 +317,7 @@ private CachedRowSet select(String dataBase, String schema, String tableName, St
Statement stmt = null;

// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);
String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
query = "select " + selection + " from `" + tableName + "` order by recvTime desc limit " + DEFAULT_LIMIT_SELECT_EXP_RECORDS;
Expand Down Expand Up @@ -362,7 +364,7 @@ private void delete(String dataBase, String schema, String tableName, String fil
Statement stmt = null;

// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);
String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
query = "delete from `" + tableName + "` where " + filters;
Expand Down Expand Up @@ -604,7 +606,7 @@ public void createErrorTable(String dataBase, String schema)

Statement stmt = null;
// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);

String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
Expand Down Expand Up @@ -672,7 +674,7 @@ public void upsertTransaction (LinkedHashMap<String, ArrayList<JsonElement>> agg

try {

connection = driver.getConnection(dataBase);
connection = driver.getConnection(dataBase, this.sqlHost);
connection.setAutoCommit(false);

ArrayList<StringBuffer> upsertQuerysList = SQLQueryUtils.sqlUpsertQuery(aggregation,
Expand Down Expand Up @@ -758,7 +760,7 @@ public void insertTransaction (LinkedHashMap<String, ArrayList<JsonElement>> agg

try {

connection = driver.getConnection(dataBase);
connection = driver.getConnection(dataBase, this.sqlHost);
connection.setAutoCommit(false);

insertQuery = SQLQueryUtils.sqlInsertQuery(aggregation,
Expand Down Expand Up @@ -841,7 +843,7 @@ public void purgeErrorTable(String dataBase, String schema)

Statement stmt = null;
// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);

String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
Expand Down Expand Up @@ -885,7 +887,7 @@ private void insertErrorLog(String dataBase, String schema, String errorQuery, E
", query)";

// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);

String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
Expand Down Expand Up @@ -988,30 +990,36 @@ public SQLDriver(String sqlHost, String sqlPort, String sqlUsername, String sqlP
* @throws CygnusRuntimeError
* @throws CygnusPersistenceError
*/
public Connection getConnection(String destination) throws CygnusRuntimeError, CygnusPersistenceError {
public Connection getConnection(String destination, String sqlHost) throws CygnusRuntimeError, CygnusPersistenceError {
try {
// FIXME: the number of cached connections should be limited to
// a certain number; with such a limit
// number, if a new connection is needed, the oldest one is closed
Connection connection = null;

if (datasources.containsKey(destination)) {
connection = datasources.get(destination).getConnection();
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Recovered destination connection from cache (" + destination + ")");
//if (datasources.containsKey(destination)) {
if (datasources.containsKey(sqlHost)) {
//connection = datasources.get(destination).getConnection();
connection = datasources.get(sqlHost).getConnection();
//LOGGER.debug(sqlInstance.toString().toUpperCase() + " Recovered destination connection from cache (" + destination + ")");
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Recovered destination connection from cache (" + sqlHost + ")");
}

if (connection == null || !connection.isValid(0)) {
if (connection != null) {
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Closing invalid sql connection for destination " + destination);
//LOGGER.debug(sqlInstance.toString().toUpperCase() + " Closing invalid sql connection for destination " + destination);
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Closing invalid sql connection for destination " + sqlHost);
try {
connection.close();
} catch (SQLException e) {
LOGGER.warn(sqlInstance.toString().toUpperCase() + " error closing invalid connection: " + e.getMessage());
}
} // if

DataSource datasource = createConnectionPool(destination);
datasources.put(destination, datasource);
//DataSource datasource = createConnectionPool(destination);
DataSource datasource = createConnectionPool(sqlHost);
//datasources.put(destination, datasource);
datasources.put(sqlHost, datasource);
connection = datasource.getConnection();
if (sqlInstance == SQLInstance.ORACLE) {
// set proper NLS_TIMESTAMP formats for current session
Expand All @@ -1025,12 +1033,16 @@ public Connection getConnection(String destination) throws CygnusRuntimeError, C
} // if

// Check Pool cache and log status
if (pools.containsKey(destination)){
GenericObjectPool pool = pools.get(destination);
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Pool status (" + destination + ") Max.: " + pool.getMaxActive() + "; Active: "
//if (pools.containsKey(destination)){
if (pools.containsKey(sqlHost)){
//GenericObjectPool pool = pools.get(destination);
GenericObjectPool pool = pools.get(sqlHost);
//LOGGER.debug(sqlInstance.toString().toUpperCase() + " Pool status (" + destination + ") Max.: " + pool.getMaxActive() + "; Active: "
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Pool status (" + sqlHost + ") Max.: " + pool.getMaxActive() + "; Active: "
+ pool.getNumActive() + "; Idle: " + pool.getNumIdle());
}else{
LOGGER.error(sqlInstance.toString().toUpperCase() + " Can't find dabase in pool cache (" + destination + ")");
//LOGGER.error(sqlInstance.toString().toUpperCase() + " Can't find dabase in pool cache (" + destination + ")");
LOGGER.error(sqlInstance.toString().toUpperCase() + " Can't find dabase in pool cache (" + sqlHost + ")");
}
return connection;
} catch (ClassNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ public void setUp() throws Exception {
backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, persistErrors, maxLatestErrors);

// set up the behaviour of the mocked classes
when(mockDriverDbCreate.getConnection(Mockito.anyString())).thenReturn(mockConnection);
when(mockDriverDbCreate.getConnection(Mockito.anyString(), Mockito.anyString())).thenReturn(mockConnection);
when(mockDriverDbCreate.isConnectionCreated(Mockito.anyString())).thenReturn(true);
when(mockDriverDbCreate.numConnectionsCreated()).thenReturn(1);
when(mockDriverTableCreate.getConnection(Mockito.anyString())).thenReturn(mockConnection);
when(mockDriverTableCreate.getConnection(Mockito.anyString(), Mockito.anyString())).thenReturn(mockConnection);
when(mockDriverTableCreate.isConnectionCreated(Mockito.anyString())).thenReturn(true, true, true, true, true);
when(mockConnection.createStatement()).thenReturn(mockStatement);
when(mockStatement.executeUpdate(Mockito.anyString())).thenReturn(1);
Expand Down

0 comments on commit f59d561

Please sign in to comment.