Skip to content

Commit

Permalink
Merge pull request #2367 from telefonicaid/task/add_min_max_pool_idle…
Browse files Browse the repository at this point in the history
…_sql_sinks

add max and min pool idle config for sql sinks
  • Loading branch information
fgalan authored Apr 17, 2024
2 parents a27dacd + 533a343 commit acd8506
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
- [cygnus-common] [SQL-sinks] Define default values and allow to set connection pool config for maxPoolSize(3), maxPoolIddle(2), minPoolIdle(0) and minPoolIdleTimeMillis(10000) (#2366)
- [cygnus-ngsi] Upgrade Debian version from 12.4 to 12.5 in Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,16 @@ public class SQLBackendImpl implements SQLBackend{
* @param sqlUsername
* @param sqlPassword
* @param maxPoolSize
* @param maxPoolIdle
* @param minPoolIdle
* @param minPoolIdleTimeMillis
* @param sqlInstance
* @param sqlDriverName
* @param persistErrors
* @param maxLatestErrors
*/
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, SQLInstance sqlInstance, String sqlDriverName, boolean persistErrors, int maxLatestErrors) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors);
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, SQLInstance sqlInstance, String sqlDriverName, boolean persistErrors, int maxLatestErrors) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors);
} // SQLBackendImpl

/**
Expand All @@ -82,12 +85,15 @@ public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String
* @param sqlUsername
* @param sqlPassword
* @param maxPoolSize
* @param maxPoolIdle
* @param minPoolIdle
* @param minPoolIdleTimeMillis
* @param sqlInstance
* @param sqlDriverName
* @param sqlOptions
*/
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, true, DEFAULT_MAX_LATEST_ERRORS);
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, true, DEFAULT_MAX_LATEST_ERRORS);
} // SQLBackendImpl

/**
Expand All @@ -98,14 +104,17 @@ public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String
* @param sqlUsername
* @param sqlPassword
* @param maxPoolSize
* @param maxPoolIdle
* @param minPoolIdle
* @param minPoolIdleTimeMillis
* @param sqlInstance
* @param sqlDriverName
* @param sqlOptions
* @param persistErrors
* @param maxLatestErrors
*/
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions, boolean persistErrors, int maxLatestErrors) {
driver = new SQLBackendImpl.SQLDriver(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions);
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions, boolean persistErrors, int maxLatestErrors) {
driver = new SQLBackendImpl.SQLDriver(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions);
cache = new SQLCache();
this.sqlInstance = sqlInstance;
this.persistErrors = persistErrors;
Expand Down Expand Up @@ -935,6 +944,9 @@ public class SQLDriver {
private final SQLInstance sqlInstance;
private final String sqlDriverName;
private final int maxPoolSize;
private final int maxPoolIdle;
private final int minPoolIdle;
private final int minPoolIdleTimeMillis;
private final String sqlOptions;

/**
Expand All @@ -945,18 +957,24 @@ public class SQLDriver {
* @param sqlUsername
* @param sqlPassword
* @param maxPoolSize
* @param maxPoolIdle
* @param minPoolIdle
* @param minPoolIdleTimeMillis
* @param sqlInstance
* @param sqlDriverName
* @param sqlOptions
*/
public SQLDriver(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) {
public SQLDriver(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) {
datasources = new HashMap<>();
pools = new HashMap<>();
this.sqlHost = sqlHost;
this.sqlPort = sqlPort;
this.sqlUsername = sqlUsername;
this.sqlPassword = sqlPassword;
this.maxPoolSize = maxPoolSize;
this.maxPoolIdle = maxPoolIdle;
this.minPoolIdle = minPoolIdle;
this.minPoolIdleTimeMillis = minPoolIdleTimeMillis;
this.sqlInstance = sqlInstance;
this.sqlDriverName = sqlDriverName;
this.sqlOptions = sqlOptions;
Expand Down Expand Up @@ -1095,7 +1113,17 @@ private DataSource createConnectionPool(String destination) throws Exception {

// Creates an Instance of GenericObjectPool That Holds Our Pool of Connections Object!
gPool = new GenericObjectPool();
// Tune from https://javadoc.io/static/commons-pool/commons-pool/1.6/org/apache/commons/pool/impl/GenericObjectPool.html
// Sets the cap on the number of objects that can be allocated by the pool (checked out to clients, or idle awaiting checkout) at a given time.
gPool.setMaxActive(this.maxPoolSize);
// Sets the cap on the number of "idle" instances in the pool.
gPool.setMaxIdle(this.maxPoolIdle);
// Sets the minimum number of objects allowed in the pool before the evictor thread (if active) spawns new objects.
gPool.setMinIdle(this.minPoolIdle);
// Sets the minimum amount of time an object may sit idle in the pool before it is eligible for eviction by the idle object evictor (if any)
gPool.setMinEvictableIdleTimeMillis(this.minPoolIdleTimeMillis);
// Sets the number of milliseconds to sleep between runs of the idle object evictor thread
gPool.setTimeBetweenEvictionRunsMillis(this.minPoolIdleTimeMillis*3);
pools.put(destination, gPool);

// Creates a ConnectionFactory Object Which Will Be Used by the Pool to Create the Connection Object!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class SQLBackendImplTest {

// constants
private final int maxPoolSize = 2;
private final int maxPoolIdle = 1;
private final int minPoolIdle = 0;
private final int minPoolIdleTimeMillis = 10000;
private final String host = "localhost";
private final String port = "3306";
private final String user = "root";
Expand Down Expand Up @@ -83,7 +86,7 @@ public class SQLBackendImplTest {
@Before
public void setUp() throws Exception {
// set up the instance of the tested class
backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, persistErrors, maxLatestErrors);

// set up the behaviour of the mocked classes
when(mockDriverDbCreate.getConnection(Mockito.anyString())).thenReturn(mockConnection);
Expand Down Expand Up @@ -169,7 +172,7 @@ public void testJDBCUrlMySQL() {
String sqlDriverName = "com.mysql.jdbc.Driver";
String destination = "dest";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");
Expand All @@ -185,7 +188,7 @@ public void testJDBCUrlPostgreSQL() {
String destination = "dest";
String defaultDataBase = "default";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");
Expand All @@ -201,7 +204,7 @@ public void testJDBCUrlOracleSQL() {
String destination = "dest";
String defaultDataBase = "default";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:oracle:oci://localhost:1521/default");
Expand All @@ -217,31 +220,31 @@ public void testJDBCUrlMySQLWithOptions() {
String destination = "dest";
String sqlOptions = "useSSL=true&requireSSL=false";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest?useSSL=true&requireSSL=false");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:<white spaces>)");
sqlOptions = " \t";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:<empty>)");
sqlOptions = "";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:<null>)");
sqlOptions = null;

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");
Expand All @@ -258,31 +261,31 @@ public void testJDBCUrlPostgreSQLWithOptions() {
String defaultDataBase = "default";
String sqlOptions = "sslmode=require";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default?sslmode=require");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:<white spaces)");
sqlOptions = " \t";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:<empty>)");
sqlOptions = "";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:<null>)");
sqlOptions = null;

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class NGSIPostgisSink extends NGSILDSink {
private static final String DEFAULT_DATABASE = "postgres";
private static final String DEFAULT_ENABLE_CACHE = "false";
private static final int DEFAULT_MAX_POOL_SIZE = 3;
private static final int DEFAULT_MAX_POOL_IDLE = 2;
private static final int DEFAULT_MIN_POOL_IDLE = 0;
private static final int DEFAULT_MIN_POOL_IDLE_TIME_MILLIS = 10000;
private static final String DEFAULT_POSTGIS_TYPE = "geometry";
private static final String DEFAULT_ATTR_NATIVE_TYPES = "false";
private static final String POSTGIS_DRIVER_NAME = "org.postgresql.Driver";
Expand All @@ -70,6 +73,9 @@ public class NGSIPostgisSink extends NGSILDSink {
private String postgisPassword;
private boolean rowAttrPersistence;
private int maxPoolSize;
private int maxPoolIdle;
private int minPoolIdle;
private int minPoolIdleTimeMillis;
private SQLBackendImpl postgisPersistenceBackend;
private boolean enableCache;
private boolean swapCoordinates;
Expand Down Expand Up @@ -206,6 +212,15 @@ public void configure(Context context) {
maxPoolSize = context.getInteger("postgis_maxPoolSize", DEFAULT_MAX_POOL_SIZE);
LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_maxPoolSize=" + maxPoolSize + ")");

maxPoolIdle = context.getInteger("postgis_maxPoolIdle", DEFAULT_MAX_POOL_IDLE);
LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_maxPoolIdle=" + maxPoolIdle + ")");

minPoolIdle = context.getInteger("postgis_minPoolIdle", DEFAULT_MIN_POOL_IDLE);
LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_minPoolIdle=" + minPoolIdle + ")");

minPoolIdleTimeMillis = context.getInteger("postgis_minPoolIdleTimeMillis", DEFAULT_MIN_POOL_IDLE_TIME_MILLIS);
LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_minPoolIdleTimeMillis=" + minPoolIdleTimeMillis + ")");

rowAttrPersistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE).equals("row");
String persistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE);

Expand Down Expand Up @@ -269,7 +284,7 @@ public void stop() {
@Override
public void start() {
try {
createPersistenceBackend(postgisHost, postgisPort, postgisUsername, postgisPassword, maxPoolSize, postgisOptions);
createPersistenceBackend(postgisHost, postgisPort, postgisUsername, postgisPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, postgisOptions);
} catch (Exception e) {
LOGGER.error("Error while creating the Postgis persistence backend. Details="
+ e.getMessage());
Expand All @@ -282,9 +297,9 @@ public void start() {
/**
* Initialices a lazy singleton to share among instances on JVM
*/
private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, String sqlOptions) {
private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, String sqlOptions) {
if (postgisPersistenceBackend == null) {
postgisPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, POSTGIS_INSTANCE_NAME, POSTGIS_DRIVER_NAME, sqlOptions);
postgisPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, POSTGIS_INSTANCE_NAME, POSTGIS_DRIVER_NAME, sqlOptions);
}
}

Expand Down
Loading

0 comments on commit acd8506

Please sign in to comment.