Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] use host instead of database for connection pool #2373

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class SQLBackendImpl implements SQLBackend{
private SQLBackendImpl.SQLDriver driver;
private final SQLCache cache;
protected SQLInstance sqlInstance;
protected String sqlHost;
private final boolean persistErrors;
private final int maxLatestErrors;
private static final String DEFAULT_ERROR_TABLE_SUFFIX = "_error_log";
Expand Down Expand Up @@ -116,6 +117,7 @@ public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions, boolean persistErrors, int maxLatestErrors) {
driver = new SQLBackendImpl.SQLDriver(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions);
cache = new SQLCache();
this.sqlHost = sqlHost;
this.sqlInstance = sqlInstance;
this.persistErrors = persistErrors;
this.maxLatestErrors = maxLatestErrors;
Expand Down Expand Up @@ -174,7 +176,7 @@ public void createDestination(String destination) throws CygnusRuntimeError, Cyg
Statement stmt = null;

// get a connection to an empty destination
Connection con = driver.getConnection("");
Connection con = driver.getConnection("", this.sqlHost);

String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
Expand Down Expand Up @@ -229,7 +231,7 @@ public void createTable(String dataBase, String schema, String table, String typ
Statement stmt = null;

// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);
String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
query = "create table if not exists `" + tableName + "`" + typedFieldNames;
Expand Down Expand Up @@ -274,7 +276,7 @@ public void insertContextData(String dataBase, String schema, String table, Stri
String tableName = table;

// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);
String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
query = "insert into `" + tableName + "` " + fieldNames + " values " + fieldValues;
Expand Down Expand Up @@ -315,7 +317,7 @@ private CachedRowSet select(String dataBase, String schema, String tableName, St
Statement stmt = null;

// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);
String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
query = "select " + selection + " from `" + tableName + "` order by recvTime desc limit " + DEFAULT_LIMIT_SELECT_EXP_RECORDS;
Expand Down Expand Up @@ -362,7 +364,7 @@ private void delete(String dataBase, String schema, String tableName, String fil
Statement stmt = null;

// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);
String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
query = "delete from `" + tableName + "` where " + filters;
Expand Down Expand Up @@ -604,7 +606,7 @@ public void createErrorTable(String dataBase, String schema)

Statement stmt = null;
// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);

String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
Expand Down Expand Up @@ -672,7 +674,7 @@ public void upsertTransaction (LinkedHashMap<String, ArrayList<JsonElement>> agg

try {

connection = driver.getConnection(dataBase);
connection = driver.getConnection(dataBase, this.sqlHost);
connection.setAutoCommit(false);

ArrayList<StringBuffer> upsertQuerysList = SQLQueryUtils.sqlUpsertQuery(aggregation,
Expand Down Expand Up @@ -758,7 +760,7 @@ public void insertTransaction (LinkedHashMap<String, ArrayList<JsonElement>> agg

try {

connection = driver.getConnection(dataBase);
connection = driver.getConnection(dataBase, this.sqlHost);
connection.setAutoCommit(false);

insertQuery = SQLQueryUtils.sqlInsertQuery(aggregation,
Expand Down Expand Up @@ -841,7 +843,7 @@ public void purgeErrorTable(String dataBase, String schema)

Statement stmt = null;
// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);

String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
Expand Down Expand Up @@ -885,7 +887,7 @@ private void insertErrorLog(String dataBase, String schema, String errorQuery, E
", query)";

// get a connection to the given destination
Connection con = driver.getConnection(dataBase);
Connection con = driver.getConnection(dataBase, this.sqlHost);

String query = "";
if (sqlInstance == SQLInstance.MYSQL) {
Expand Down Expand Up @@ -988,30 +990,30 @@ public SQLDriver(String sqlHost, String sqlPort, String sqlUsername, String sqlP
* @throws CygnusRuntimeError
* @throws CygnusPersistenceError
*/
public Connection getConnection(String destination) throws CygnusRuntimeError, CygnusPersistenceError {
public Connection getConnection(String destination, String sqlHost) throws CygnusRuntimeError, CygnusPersistenceError {
try {
// FIXME: the number of cached connections should be limited to
// a certain number; with such a limit
// number, if a new connection is needed, the oldest one is closed
Connection connection = null;

if (datasources.containsKey(destination)) {
connection = datasources.get(destination).getConnection();
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Recovered destination connection from cache (" + destination + ")");
if (datasources.containsKey(sqlHost)) {
connection = datasources.get(sqlHost).getConnection();
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Recovered destination connection from cache (" + sqlHost + ")");
}

if (connection == null || !connection.isValid(0)) {
if (connection != null) {
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Closing invalid sql connection for destination " + destination);
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Closing invalid sql connection for destination " + sqlHost);
try {
connection.close();
} catch (SQLException e) {
LOGGER.warn(sqlInstance.toString().toUpperCase() + " error closing invalid connection: " + e.getMessage());
}
} // if

DataSource datasource = createConnectionPool(destination);
datasources.put(destination, datasource);
DataSource datasource = createConnectionPool(destination, sqlHost);
datasources.put(sqlHost, datasource);
connection = datasource.getConnection();
if (sqlInstance == SQLInstance.ORACLE) {
// set proper NLS_TIMESTAMP formats for current session
Expand All @@ -1025,12 +1027,12 @@ public Connection getConnection(String destination) throws CygnusRuntimeError, C
} // if

// Check Pool cache and log status
if (pools.containsKey(destination)){
GenericObjectPool pool = pools.get(destination);
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Pool status (" + destination + ") Max.: " + pool.getMaxActive() + "; Active: "
if (pools.containsKey(sqlHost)){
GenericObjectPool pool = pools.get(sqlHost);
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Pool status (" + sqlHost + ") Max.: " + pool.getMaxActive() + "; Active: "
+ pool.getNumActive() + "; Idle: " + pool.getNumIdle());
}else{
LOGGER.error(sqlInstance.toString().toUpperCase() + " Can't find dabase in pool cache (" + destination + ")");
LOGGER.error(sqlInstance.toString().toUpperCase() + " Can't find dabase in pool cache (" + sqlHost + ")");
}
return connection;
} catch (ClassNotFoundException e) {
Expand Down Expand Up @@ -1102,11 +1104,11 @@ protected int numConnectionsCreated() {
* @throws Exception
*/
@SuppressWarnings("unused")
private DataSource createConnectionPool(String destination) throws Exception {
private DataSource createConnectionPool(String destination, String sqlHost) throws Exception {
GenericObjectPool gPool = null;
if (pools.containsKey(destination)){
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Pool recovered from Cache (" + destination + ")");
gPool = pools.get(destination);
if (pools.containsKey(sqlHost)){
LOGGER.debug(sqlInstance.toString().toUpperCase() + " Pool recovered from Cache (" + sqlHost + ")");
gPool = pools.get(sqlHost);
}else{
String jdbcUrl = generateJDBCUrl(destination);
Class.forName(sqlDriverName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ public void setUp() throws Exception {
backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, persistErrors, maxLatestErrors);

// set up the behaviour of the mocked classes
when(mockDriverDbCreate.getConnection(Mockito.anyString())).thenReturn(mockConnection);
when(mockDriverDbCreate.getConnection(Mockito.anyString(), Mockito.anyString())).thenReturn(mockConnection);
when(mockDriverDbCreate.isConnectionCreated(Mockito.anyString())).thenReturn(true);
when(mockDriverDbCreate.numConnectionsCreated()).thenReturn(1);
when(mockDriverTableCreate.getConnection(Mockito.anyString())).thenReturn(mockConnection);
when(mockDriverTableCreate.getConnection(Mockito.anyString(), Mockito.anyString())).thenReturn(mockConnection);
when(mockDriverTableCreate.isConnectionCreated(Mockito.anyString())).thenReturn(true, true, true, true, true);
when(mockConnection.createStatement()).thenReturn(mockStatement);
when(mockStatement.executeUpdate(Mockito.anyString())).thenReturn(1);
Expand Down
Loading