Skip to content

Commit

Permalink
add max and min pool idle config for sql sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
AlvaroVega committed Apr 11, 2024
1 parent ba12477 commit 2ca53e6
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ public class SQLBackendImpl implements SQLBackend{
* @param sqlUsername
* @param sqlPassword
* @param maxPoolSize
* @param maxPoolIdle
* @param minPoolIdle
* @param sqlInstance
* @param sqlDriverName
* @param persistErrors
* @param maxLatestErrors
*/
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, SQLInstance sqlInstance, String sqlDriverName, boolean persistErrors, int maxLatestErrors) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors);
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, SQLInstance sqlInstance, String sqlDriverName, boolean persistErrors, int maxLatestErrors) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors);
} // SQLBackendImpl

/**
Expand All @@ -82,12 +84,14 @@ public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String
* @param sqlUsername
* @param sqlPassword
* @param maxPoolSize
* @param maxPoolIdle
* @param minPoolIdle
* @param sqlInstance
* @param sqlDriverName
* @param sqlOptions
*/
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, true, DEFAULT_MAX_LATEST_ERRORS);
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, true, DEFAULT_MAX_LATEST_ERRORS);
} // SQLBackendImpl

/**
Expand All @@ -98,14 +102,16 @@ public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String
* @param sqlUsername
* @param sqlPassword
* @param maxPoolSize
* @param maxPoolIdle
* @param minPoolIdle
* @param sqlInstance
* @param sqlDriverName
* @param sqlOptions
* @param persistErrors
* @param maxLatestErrors
*/
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions, boolean persistErrors, int maxLatestErrors) {
driver = new SQLBackendImpl.SQLDriver(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions);
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions, boolean persistErrors, int maxLatestErrors) {
driver = new SQLBackendImpl.SQLDriver(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions);
cache = new SQLCache();
this.sqlInstance = sqlInstance;
this.persistErrors = persistErrors;
Expand Down Expand Up @@ -935,6 +941,8 @@ public class SQLDriver {
private final SQLInstance sqlInstance;
private final String sqlDriverName;
private final int maxPoolSize;
private final int maxPoolIdle;
private final int minPoolIdle;
private final String sqlOptions;

/**
Expand All @@ -949,14 +957,16 @@ public class SQLDriver {
* @param sqlDriverName
* @param sqlOptions
*/
public SQLDriver(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) {
public SQLDriver(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) {
datasources = new HashMap<>();
pools = new HashMap<>();
this.sqlHost = sqlHost;
this.sqlPort = sqlPort;
this.sqlUsername = sqlUsername;
this.sqlPassword = sqlPassword;
this.maxPoolSize = maxPoolSize;
this.maxPoolIdle = maxPoolIdle;
this.minPoolIdle = minPoolIdle;
this.sqlInstance = sqlInstance;
this.sqlDriverName = sqlDriverName;
this.sqlOptions = sqlOptions;
Expand Down Expand Up @@ -1096,6 +1106,8 @@ private DataSource createConnectionPool(String destination) throws Exception {
// Creates an Instance of GenericObjectPool That Holds Our Pool of Connections Object!
gPool = new GenericObjectPool();
gPool.setMaxActive(this.maxPoolSize);
gPool.setMaxIdle(this.maxPoolIdle);
gPool.setMinIdle(this.minPoolIdle);
pools.put(destination, gPool);

// Creates a ConnectionFactory Object Which Will Be Used by the Pool to Create the Connection Object!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class SQLBackendImplTest {

// constants
private final int maxPoolSize = 2;
private final int maxPoolIdle = 2;
private final int minPoolIdle = 0;
private final String host = "localhost";
private final String port = "3306";
private final String user = "root";
Expand Down Expand Up @@ -83,7 +85,7 @@ public class SQLBackendImplTest {
@Before
public void setUp() throws Exception {
// set up the instance of the tested class
backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, persistErrors, maxLatestErrors);

// set up the behaviour of the mocked classes
when(mockDriverDbCreate.getConnection(Mockito.anyString())).thenReturn(mockConnection);
Expand Down Expand Up @@ -169,7 +171,7 @@ public void testJDBCUrlMySQL() {
String sqlDriverName = "com.mysql.jdbc.Driver";
String destination = "dest";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");
Expand All @@ -185,7 +187,7 @@ public void testJDBCUrlPostgreSQL() {
String destination = "dest";
String defaultDataBase = "default";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");
Expand All @@ -201,7 +203,7 @@ public void testJDBCUrlOracleSQL() {
String destination = "dest";
String defaultDataBase = "default";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:oracle:oci://localhost:1521/default");
Expand All @@ -217,31 +219,31 @@ public void testJDBCUrlMySQLWithOptions() {
String destination = "dest";
String sqlOptions = "useSSL=true&requireSSL=false";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest?useSSL=true&requireSSL=false");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:<white spaces>)");
sqlOptions = " \t";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:<empty>)");
sqlOptions = "";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:<null>)");
sqlOptions = null;

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");
Expand All @@ -258,31 +260,31 @@ public void testJDBCUrlPostgreSQLWithOptions() {
String defaultDataBase = "default";
String sqlOptions = "sslmode=require";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default?sslmode=require");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:<white spaces)");
sqlOptions = " \t";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:<empty>)");
sqlOptions = "";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:<null>)");
sqlOptions = null;

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class NGSIMySQLSink extends NGSISink {
private static final String DEFAULT_HOST = "localhost";
private static final String DEFAULT_USER_NAME = "root";
private static final int DEFAULT_MAX_POOL_SIZE = 3;
private static final int DEFAULT_MAX_POOL_IDLE = 3;
private static final int DEFAULT_MIN_POOL_IDLE = 0;
private static final String DEFAULT_ATTR_NATIVE_TYPES = "false";
private static final String MYSQL_DRIVER_NAME = "com.mysql.jdbc.Driver";
private static final SQLInstance MYSQL_INSTANCE_NAME = SQLInstance.MYSQL;
Expand All @@ -74,6 +76,8 @@ public class NGSIMySQLSink extends NGSISink {
private String mysqlUsername;
private String mysqlPassword;
private int maxPoolSize;
private int maxPoolIdle;
private int minPoolIdle;
private boolean rowAttrPersistence;
private SQLBackendImpl mySQLPersistenceBackend;
private boolean attrNativeTypes;
Expand Down Expand Up @@ -193,6 +197,12 @@ public void configure(Context context) {
maxPoolSize = context.getInteger("mysql_maxPoolSize", DEFAULT_MAX_POOL_SIZE);
LOGGER.debug("[" + this.getName() + "] Reading configuration (mysql_maxPoolSize=" + maxPoolSize + ")");

maxPoolIdle = context.getInteger("mysql_maxPoolIdle", DEFAULT_MAX_POOL_IDLE);
LOGGER.debug("[" + this.getName() + "] Reading configuration (mysql_maxPoolIdle=" + maxPoolIdle + ")");

minPoolIdle = context.getInteger("mysql_minPoolIdle", DEFAULT_MIN_POOL_IDLE);
LOGGER.debug("[" + this.getName() + "] Reading configuration (mysql_minPoolIdle=" + minPoolIdle + ")");

rowAttrPersistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE).equals("row");
String persistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE);

Expand Down Expand Up @@ -278,10 +288,10 @@ public void configure(Context context) {
@Override
public void start() {
try {
createPersistenceBackend(mysqlHost, mysqlPort, mysqlUsername, mysqlPassword, maxPoolSize, mysqlOptions, persistErrors, maxLatestErrors);
createPersistenceBackend(mysqlHost, mysqlPort, mysqlUsername, mysqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, mysqlOptions, persistErrors, maxLatestErrors);
LOGGER.debug("[" + this.getName() + "] MySQL persistence backend created");
} catch (Exception e) {
String configParams = " mysqlHost " + mysqlHost + " mysqlPort " + mysqlPort + " mysqlUsername " + mysqlUsername + " mysqlPassword " + mysqlPassword + " maxPoolSize " + maxPoolSize + " mysqlOptions " + mysqlOptions + " persistErrors " + persistErrors + " maxLatestErrors " + maxLatestErrors;
String configParams = " mysqlHost " + mysqlHost + " mysqlPort " + mysqlPort + " mysqlUsername " + mysqlUsername + " mysqlPassword " + mysqlPassword + " maxPoolSize " + maxPoolSize + " maxPoolIdle " + maxPoolIdle + " minPoolIdle " + minPoolIdle + " mysqlOptions " + mysqlOptions + " persistErrors " + persistErrors + " maxLatestErrors " + maxLatestErrors;
LOGGER.error("Error while creating the MySQL persistence backend. " +
"Config params= " + configParams +
"Details=" + e.getMessage() +
Expand All @@ -300,9 +310,9 @@ public void stop() {
/**
* Initialices a lazy singleton to share among instances on JVM
*/
private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, String sqlOptions, boolean persistErrors, int maxLatestErrors) {
private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, String sqlOptions, boolean persistErrors, int maxLatestErrors) {
if (mySQLPersistenceBackend == null) {
mySQLPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, sqlOptions, persistErrors, maxLatestErrors);
mySQLPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, sqlOptions, persistErrors, maxLatestErrors);
}
}

Expand Down
Loading

0 comments on commit 2ca53e6

Please sign in to comment.