Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add max and min pool idle config for sql sinks #2367

Merged
merged 17 commits into from
Apr 17, 2024
Merged
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
- [cygnus-common] [SQL-sinks] Define default values and allow to set connection pool config for maxPoolSize(3), maxPoolIddle(2), minPoolIdle(0) and minPoolIdleTimeMillis(10000) (#2366)
- [cygnus-ngsi] Upgrade Debian version from 12.4 to 12.5 in Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,16 @@ public class SQLBackendImpl implements SQLBackend{
* @param sqlUsername
* @param sqlPassword
* @param maxPoolSize
* @param maxPoolIdle
* @param minPoolIdle
* @param minPoolIdleTimeMillis
* @param sqlInstance
* @param sqlDriverName
* @param persistErrors
* @param maxLatestErrors
*/
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, SQLInstance sqlInstance, String sqlDriverName, boolean persistErrors, int maxLatestErrors) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors);
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, SQLInstance sqlInstance, String sqlDriverName, boolean persistErrors, int maxLatestErrors) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors);
} // SQLBackendImpl

/**
Expand All @@ -82,12 +85,15 @@ public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String
* @param sqlUsername
* @param sqlPassword
* @param maxPoolSize
* @param maxPoolIdle
* @param minPoolIdle
* @param minPoolIdleTimeMillis
* @param sqlInstance
* @param sqlDriverName
* @param sqlOptions
*/
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, true, DEFAULT_MAX_LATEST_ERRORS);
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) {
this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, true, DEFAULT_MAX_LATEST_ERRORS);
} // SQLBackendImpl

/**
Expand All @@ -98,14 +104,17 @@ public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String
* @param sqlUsername
* @param sqlPassword
* @param maxPoolSize
* @param maxPoolIdle
* @param minPoolIdle
* @param minPoolIdleTimeMillis
* @param sqlInstance
* @param sqlDriverName
* @param sqlOptions
* @param persistErrors
* @param maxLatestErrors
*/
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions, boolean persistErrors, int maxLatestErrors) {
driver = new SQLBackendImpl.SQLDriver(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions);
public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions, boolean persistErrors, int maxLatestErrors) {
driver = new SQLBackendImpl.SQLDriver(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions);
cache = new SQLCache();
this.sqlInstance = sqlInstance;
this.persistErrors = persistErrors;
Expand Down Expand Up @@ -935,6 +944,9 @@ public class SQLDriver {
private final SQLInstance sqlInstance;
private final String sqlDriverName;
private final int maxPoolSize;
private final int maxPoolIdle;
private final int minPoolIdle;
private final int minPoolIdleTimeMillis;
private final String sqlOptions;

/**
Expand All @@ -945,18 +957,24 @@ public class SQLDriver {
* @param sqlUsername
* @param sqlPassword
* @param maxPoolSize
* @param maxPoolIdle
* @param minPoolIdle
* @param minPoolIdleTimeMillis
* @param sqlInstance
* @param sqlDriverName
* @param sqlOptions
*/
public SQLDriver(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) {
public SQLDriver(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) {
datasources = new HashMap<>();
pools = new HashMap<>();
this.sqlHost = sqlHost;
this.sqlPort = sqlPort;
this.sqlUsername = sqlUsername;
this.sqlPassword = sqlPassword;
this.maxPoolSize = maxPoolSize;
this.maxPoolIdle = maxPoolIdle;
this.minPoolIdle = minPoolIdle;
this.minPoolIdleTimeMillis = minPoolIdleTimeMillis;
this.sqlInstance = sqlInstance;
this.sqlDriverName = sqlDriverName;
this.sqlOptions = sqlOptions;
Expand Down Expand Up @@ -1095,7 +1113,17 @@ private DataSource createConnectionPool(String destination) throws Exception {

// Creates an Instance of GenericObjectPool That Holds Our Pool of Connections Object!
gPool = new GenericObjectPool();
// Tune from https://javadoc.io/static/commons-pool/commons-pool/1.6/org/apache/commons/pool/impl/GenericObjectPool.html
// Sets the cap on the number of objects that can be allocated by the pool (checked out to clients, or idle awaiting checkout) at a given time.
gPool.setMaxActive(this.maxPoolSize);
// Sets the cap on the number of "idle" instances in the pool.
gPool.setMaxIdle(this.maxPoolIdle);
// Sets the minimum number of objects allowed in the pool before the evictor thread (if active) spawns new objects.
gPool.setMinIdle(this.minPoolIdle);
// Sets the minimum amount of time an object may sit idle in the pool before it is eligible for eviction by the idle object evictor (if any)
gPool.setMinEvictableIdleTimeMillis(this.minPoolIdleTimeMillis);
// Sets the number of milliseconds to sleep between runs of the idle object evictor thread
gPool.setTimeBetweenEvictionRunsMillis(this.minPoolIdleTimeMillis*3);
pools.put(destination, gPool);

// Creates a ConnectionFactory Object Which Will Be Used by the Pool to Create the Connection Object!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class SQLBackendImplTest {

// constants
private final int maxPoolSize = 2;
private final int maxPoolIdle = 1;
private final int minPoolIdle = 0;
private final int minPoolIdleTimeMillis = 10000;
private final String host = "localhost";
private final String port = "3306";
private final String user = "root";
Expand Down Expand Up @@ -83,7 +86,7 @@ public class SQLBackendImplTest {
@Before
public void setUp() throws Exception {
// set up the instance of the tested class
backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, persistErrors, maxLatestErrors);

// set up the behaviour of the mocked classes
when(mockDriverDbCreate.getConnection(Mockito.anyString())).thenReturn(mockConnection);
Expand Down Expand Up @@ -169,7 +172,7 @@ public void testJDBCUrlMySQL() {
String sqlDriverName = "com.mysql.jdbc.Driver";
String destination = "dest";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");
Expand All @@ -185,7 +188,7 @@ public void testJDBCUrlPostgreSQL() {
String destination = "dest";
String defaultDataBase = "default";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");
Expand All @@ -201,7 +204,7 @@ public void testJDBCUrlOracleSQL() {
String destination = "dest";
String defaultDataBase = "default";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:oracle:oci://localhost:1521/default");
Expand All @@ -217,31 +220,31 @@ public void testJDBCUrlMySQLWithOptions() {
String destination = "dest";
String sqlOptions = "useSSL=true&requireSSL=false";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest?useSSL=true&requireSSL=false");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:<white spaces>)");
sqlOptions = " \t";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:<empty>)");
sqlOptions = "";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:<null>)");
sqlOptions = null;

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest");
Expand All @@ -258,31 +261,31 @@ public void testJDBCUrlPostgreSQLWithOptions() {
String defaultDataBase = "default";
String sqlOptions = "sslmode=require";

SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
SQLBackendImpl.SQLDriver driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default?sslmode=require");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:<white spaces)");
sqlOptions = " \t";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:<empty>)");
sqlOptions = "";

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");

System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:<null>)");
sqlOptions = null;

backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors);
driver = backend.getDriver();

assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class NGSIPostgisSink extends NGSILDSink {
private static final String DEFAULT_DATABASE = "postgres";
private static final String DEFAULT_ENABLE_CACHE = "false";
private static final int DEFAULT_MAX_POOL_SIZE = 3;
private static final int DEFAULT_MAX_POOL_IDLE = 2;
private static final int DEFAULT_MIN_POOL_IDLE = 0;
private static final int DEFAULT_MIN_POOL_IDLE_TIME_MILLIS = 10000;
private static final String DEFAULT_POSTGIS_TYPE = "geometry";
private static final String DEFAULT_ATTR_NATIVE_TYPES = "false";
private static final String POSTGIS_DRIVER_NAME = "org.postgresql.Driver";
Expand All @@ -70,6 +73,9 @@ public class NGSIPostgisSink extends NGSILDSink {
private String postgisPassword;
private boolean rowAttrPersistence;
private int maxPoolSize;
private int maxPoolIdle;
private int minPoolIdle;
private int minPoolIdleTimeMillis;
private SQLBackendImpl postgisPersistenceBackend;
private boolean enableCache;
private boolean swapCoordinates;
Expand Down Expand Up @@ -206,6 +212,15 @@ public void configure(Context context) {
maxPoolSize = context.getInteger("postgis_maxPoolSize", DEFAULT_MAX_POOL_SIZE);
LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_maxPoolSize=" + maxPoolSize + ")");

maxPoolIdle = context.getInteger("postgis_maxPoolIdle", DEFAULT_MAX_POOL_IDLE);
LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_maxPoolIdle=" + maxPoolIdle + ")");

minPoolIdle = context.getInteger("postgis_minPoolIdle", DEFAULT_MIN_POOL_IDLE);
LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_minPoolIdle=" + minPoolIdle + ")");

minPoolIdleTimeMillis = context.getInteger("postgis_minPoolIdleTimeMillis", DEFAULT_MIN_POOL_IDLE_TIME_MILLIS);
LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_minPoolIdleTimeMillis=" + minPoolIdleTimeMillis + ")");

rowAttrPersistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE).equals("row");
String persistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE);

Expand Down Expand Up @@ -269,7 +284,7 @@ public void stop() {
@Override
public void start() {
try {
createPersistenceBackend(postgisHost, postgisPort, postgisUsername, postgisPassword, maxPoolSize, postgisOptions);
createPersistenceBackend(postgisHost, postgisPort, postgisUsername, postgisPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, postgisOptions);
} catch (Exception e) {
LOGGER.error("Error while creating the Postgis persistence backend. Details="
+ e.getMessage());
Expand All @@ -282,9 +297,9 @@ public void start() {
/**
* Initialices a lazy singleton to share among instances on JVM
*/
private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, String sqlOptions) {
private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int minPoolIdleTimeMillis, String sqlOptions) {
if (postgisPersistenceBackend == null) {
postgisPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, POSTGIS_INSTANCE_NAME, POSTGIS_DRIVER_NAME, sqlOptions);
postgisPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, minPoolIdleTimeMillis, POSTGIS_INSTANCE_NAME, POSTGIS_DRIVER_NAME, sqlOptions);
}
}

Expand Down
Loading
Loading