From 99eccefaddf1554655ac145172d481420658bb82 Mon Sep 17 00:00:00 2001 From: Alvaro Vega Date: Fri, 12 Apr 2024 11:05:59 +0200 Subject: [PATCH] add maxPoolWait conf option --- .../cygnus/backends/sql/SQLBackendImpl.java | 23 +++++++++++----- .../backends/sql/SQLBackendImplTest.java | 27 ++++++++++--------- .../iot/cygnus/sinks/NGSIPostgisSink.java | 11 +++++--- .../iot/cygnus/sinks/NGSIPostgreSQLSink.java | 13 ++++++--- .../iot/cygnus/sinks/NGSIMySQLSink.java | 11 +++++--- .../iot/cygnus/sinks/NGSIOracleSQLSink.java | 13 ++++++--- .../iot/cygnus/sinks/NGSIPostgisSink.java | 13 ++++++--- .../iot/cygnus/sinks/NGSIPostgreSQLSink.java | 14 +++++++--- 8 files changed, 83 insertions(+), 42 deletions(-) diff --git a/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImpl.java b/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImpl.java index c909b3cbb..401ef653e 100644 --- a/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImpl.java +++ b/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImpl.java @@ -67,13 +67,14 @@ public class SQLBackendImpl implements SQLBackend{ * @param maxPoolSize * @param maxPoolIdle * @param minPoolIdle + * @param maxPoolWait * @param sqlInstance * @param sqlDriverName * @param persistErrors * @param maxLatestErrors */ - public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, SQLInstance sqlInstance, String sqlDriverName, boolean persistErrors, int maxLatestErrors) { - this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors); + public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int maxPoolWait, SQLInstance sqlInstance, String sqlDriverName, boolean persistErrors, int maxLatestErrors) { + this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors); } // SQLBackendImpl /** @@ -86,12 +87,13 @@ public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String * @param maxPoolSize * @param maxPoolIdle * @param minPoolIdle + * @param maxPoolWait * @param sqlInstance * @param sqlDriverName * @param sqlOptions */ - public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) { - this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, true, DEFAULT_MAX_LATEST_ERRORS); + public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int maxPoolWait, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) { + this(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, sqlOptions, true, DEFAULT_MAX_LATEST_ERRORS); } // SQLBackendImpl /** @@ -104,14 +106,15 @@ public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String * @param maxPoolSize * @param maxPoolIdle * @param minPoolIdle + * @param maxPoolWait * @param sqlInstance * @param sqlDriverName * @param sqlOptions * @param persistErrors * @param maxLatestErrors */ - public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions, boolean persistErrors, int maxLatestErrors) { - driver = new SQLBackendImpl.SQLDriver(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions); + public SQLBackendImpl(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int maxPoolWait, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions, boolean persistErrors, int maxLatestErrors) { + driver = new SQLBackendImpl.SQLDriver(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, sqlOptions); cache = new SQLCache(); this.sqlInstance = sqlInstance; this.persistErrors = persistErrors; @@ -943,6 +946,7 @@ public class SQLDriver { private final int maxPoolSize; private final int maxPoolIdle; private final int minPoolIdle; + private final int maxPoolWait; private final String sqlOptions; /** @@ -953,11 +957,14 @@ public class SQLDriver { * @param sqlUsername * @param sqlPassword * @param maxPoolSize + * @param maxPoolIdle + * @param minPoolIdle + * @param maxPoolWait * @param sqlInstance * @param sqlDriverName * @param sqlOptions */ - public SQLDriver(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) { + public SQLDriver(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int maxPoolWait, SQLInstance sqlInstance, String sqlDriverName, String sqlOptions) { datasources = new HashMap<>(); pools = new HashMap<>(); this.sqlHost = sqlHost; @@ -967,6 +974,7 @@ public SQLDriver(String sqlHost, String sqlPort, String sqlUsername, String sqlP this.maxPoolSize = maxPoolSize; this.maxPoolIdle = maxPoolIdle; this.minPoolIdle = minPoolIdle; + this.maxPoolWait = maxPoolWait; this.sqlInstance = sqlInstance; this.sqlDriverName = sqlDriverName; this.sqlOptions = sqlOptions; @@ -1108,6 +1116,7 @@ private DataSource createConnectionPool(String destination) throws Exception { gPool.setMaxActive(this.maxPoolSize); gPool.setMaxIdle(this.maxPoolIdle); gPool.setMinIdle(this.minPoolIdle); + gPool.setMaxWait(this.maxPoolWait); pools.put(destination, gPool); // Creates a ConnectionFactory Object Which Will Be Used by the Pool to Create the Connection Object! diff --git a/cygnus-common/src/test/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImplTest.java b/cygnus-common/src/test/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImplTest.java index b71a2585d..3272573e1 100644 --- a/cygnus-common/src/test/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImplTest.java +++ b/cygnus-common/src/test/java/com/telefonica/iot/cygnus/backends/sql/SQLBackendImplTest.java @@ -53,8 +53,9 @@ public class SQLBackendImplTest { // constants private final int maxPoolSize = 2; - private final int maxPoolIdle = 2; + private final int maxPoolIdle = 1; private final int minPoolIdle = 0; + private final int maxPoolWait = 10000; private final String host = "localhost"; private final String port = "3306"; private final String user = "root"; @@ -85,7 +86,7 @@ public class SQLBackendImplTest { @Before public void setUp() throws Exception { // set up the instance of the tested class - backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, persistErrors, maxLatestErrors); + backend = new SQLBackendImpl(host, port, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, null, persistErrors, maxLatestErrors); // set up the behaviour of the mocked classes when(mockDriverDbCreate.getConnection(Mockito.anyString())).thenReturn(mockConnection); @@ -171,7 +172,7 @@ public void testJDBCUrlMySQL() { String sqlDriverName = "com.mysql.jdbc.Driver"; String destination = "dest"; - SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors); + SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, null, persistErrors, maxLatestErrors); SQLBackendImpl.SQLDriver driver = backend.getDriver(); assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest"); @@ -187,7 +188,7 @@ public void testJDBCUrlPostgreSQL() { String destination = "dest"; String defaultDataBase = "default"; - SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors); + SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors); SQLBackendImpl.SQLDriver driver = backend.getDriver(); assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default"); @@ -203,7 +204,7 @@ public void testJDBCUrlOracleSQL() { String destination = "dest"; String defaultDataBase = "default"; - SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors); + SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, defaultDataBase, persistErrors, maxLatestErrors); SQLBackendImpl.SQLDriver driver = backend.getDriver(); assertEquals(driver.generateJDBCUrl(destination), "jdbc:oracle:oci://localhost:1521/default"); @@ -219,7 +220,7 @@ public void testJDBCUrlMySQLWithOptions() { String destination = "dest"; String sqlOptions = "useSSL=true&requireSSL=false"; - SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); + SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); SQLBackendImpl.SQLDriver driver = backend.getDriver(); assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest?useSSL=true&requireSSL=false"); @@ -227,7 +228,7 @@ public void testJDBCUrlMySQLWithOptions() { System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:)"); sqlOptions = " \t"; - backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); + backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); driver = backend.getDriver(); assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest"); @@ -235,7 +236,7 @@ public void testJDBCUrlMySQLWithOptions() { System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:)"); sqlOptions = ""; - backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); + backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); driver = backend.getDriver(); assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest"); @@ -243,7 +244,7 @@ public void testJDBCUrlMySQLWithOptions() { System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:mysql, options:)"); sqlOptions = null; - backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); + backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); driver = backend.getDriver(); assertEquals(driver.generateJDBCUrl(destination), "jdbc:mysql://localhost:3306/dest"); @@ -260,7 +261,7 @@ public void testJDBCUrlPostgreSQLWithOptions() { String defaultDataBase = "default"; String sqlOptions = "sslmode=require"; - SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); + SQLBackendImpl backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); SQLBackendImpl.SQLDriver driver = backend.getDriver(); assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default?sslmode=require"); @@ -268,7 +269,7 @@ public void testJDBCUrlPostgreSQLWithOptions() { System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:)"); sqlOptions = ""; - backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); + backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); driver = backend.getDriver(); assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default"); @@ -284,7 +285,7 @@ public void testJDBCUrlPostgreSQLWithOptions() { System.out.println("Testing SQLBackendImpl.SQLDriver.generateJDBCUrl (sqlInstance:postgresql, options:)"); sqlOptions = null; - backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); + backend = new SQLBackendImpl(sqlHost, sqlPort, user, password, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, sqlInstance, sqlDriverName, sqlOptions, persistErrors, maxLatestErrors); driver = backend.getDriver(); assertEquals(driver.generateJDBCUrl(destination), "jdbc:postgresql://localhost:5432/default"); diff --git a/cygnus-ngsi-ld/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgisSink.java b/cygnus-ngsi-ld/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgisSink.java index 90a559d5a..d5fa24066 100644 --- a/cygnus-ngsi-ld/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgisSink.java +++ b/cygnus-ngsi-ld/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgisSink.java @@ -59,6 +59,7 @@ public class NGSIPostgisSink extends NGSILDSink { private static final int DEFAULT_MAX_POOL_SIZE = 3; private static final int DEFAULT_MAX_POOL_IDLE = 2; private static final int DEFAULT_MIN_POOL_IDLE = 0; + private static final int DEFAULT_MAX_POOL_WAIT = 10000; private static final String DEFAULT_POSTGIS_TYPE = "geometry"; private static final String DEFAULT_ATTR_NATIVE_TYPES = "false"; private static final String POSTGIS_DRIVER_NAME = "org.postgresql.Driver"; @@ -74,6 +75,7 @@ public class NGSIPostgisSink extends NGSILDSink { private int maxPoolSize; private int maxPoolIdle; private int minPoolIdle; + private int maxPoolWait; private SQLBackendImpl postgisPersistenceBackend; private boolean enableCache; private boolean swapCoordinates; @@ -216,6 +218,9 @@ public void configure(Context context) { minPoolIdle = context.getInteger("postgis_minPoolIdle", DEFAULT_MIN_POOL_IDLE); LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_minPoolIdle=" + minPoolIdle + ")"); + maxPoolWait = context.getInteger("postgis_maxPoolWait", DEFAULT_MAX_POOL_WAIT); + LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_maxPoolWait=" + maxPoolWait + ")"); + rowAttrPersistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE).equals("row"); String persistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE); @@ -279,7 +284,7 @@ public void stop() { @Override public void start() { try { - createPersistenceBackend(postgisHost, postgisPort, postgisUsername, postgisPassword, maxPoolSize, maxPoolIdle, minPoolIdle, postgisOptions); + createPersistenceBackend(postgisHost, postgisPort, postgisUsername, postgisPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, postgisOptions); } catch (Exception e) { LOGGER.error("Error while creating the Postgis persistence backend. Details=" + e.getMessage()); @@ -292,9 +297,9 @@ public void start() { /** * Initialices a lazy singleton to share among instances on JVM */ - private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, String sqlOptions) { + private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int maxPoolWait, String sqlOptions) { if (postgisPersistenceBackend == null) { - postgisPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, POSTGIS_INSTANCE_NAME, POSTGIS_DRIVER_NAME, sqlOptions); + postgisPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, POSTGIS_INSTANCE_NAME, POSTGIS_DRIVER_NAME, sqlOptions); } } diff --git a/cygnus-ngsi-ld/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgreSQLSink.java b/cygnus-ngsi-ld/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgreSQLSink.java index 2464edec2..b5be024d5 100644 --- a/cygnus-ngsi-ld/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgreSQLSink.java +++ b/cygnus-ngsi-ld/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgreSQLSink.java @@ -50,6 +50,7 @@ public class NGSIPostgreSQLSink extends NGSILDSink { private static final int DEFAULT_MAX_POOL_SIZE = 3; private static final int DEFAULT_MAX_POOL_IDLE = 2; private static final int DEFAULT_MIN_POOL_IDLE = 0; + private static final int DEFAULT_MAX_POOL_WAIT = 10000; private static final String POSTGRESQL_DRIVER_NAME = "org.postgresql.Driver"; private static final SQLInstance POSTGRESQL_INSTANCE_NAME = SQLInstance.POSTGRESQL; @@ -61,7 +62,8 @@ public class NGSIPostgreSQLSink extends NGSILDSink { private String postgresqlPassword; private int maxPoolSize; private int maxPoolIdle; - private int minPoolIdle; + private int minPoolIdle; + private int maxPoolWait; private boolean rowAttrPersistence; private SQLBackendImpl postgreSQLPersistenceBackend; private boolean enableCache; @@ -194,6 +196,9 @@ public void configure(Context context) { minPoolIdle = context.getInteger("postgresql_minPoolIdle", DEFAULT_MIN_POOL_IDLE); LOGGER.debug("[" + this.getName() + "] Reading configuration (postgresql_minPoolIdle=" + minPoolIdle + ")"); + maxPoolWait = context.getInteger("postgresql_maxPoolWait", DEFAULT_MAX_POOL_WAIT); + LOGGER.debug("[" + this.getName() + "] Reading configuration (postgresql_maxPoolWait=" + maxPoolWait + ")"); + rowAttrPersistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE).equals("row"); String persistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE); @@ -223,7 +228,7 @@ public void configure(Context context) { @Override public void start() { try { - createPersistenceBackend(postgresqlHost, postgresqlPort, postgresqlUsername, postgresqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, postgresqlOptions); + createPersistenceBackend(postgresqlHost, postgresqlPort, postgresqlUsername, postgresqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, postgresqlOptions); } catch (Exception e) { LOGGER.error("Error while creating the PostgreSQL persistence backend. Details=" + e.getMessage()); @@ -236,9 +241,9 @@ public void start() { /** * Initialices a lazy singleton to share among instances on JVM */ - private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, String sqlOptions) { + private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int maxPoolWait, String sqlOptions) { if (postgreSQLPersistenceBackend == null) { - postgreSQLPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, POSTGRESQL_INSTANCE_NAME, POSTGRESQL_DRIVER_NAME, sqlOptions); + postgreSQLPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, POSTGRESQL_INSTANCE_NAME, POSTGRESQL_DRIVER_NAME, sqlOptions); } } diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMySQLSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMySQLSink.java index be64dae89..308aa69db 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMySQLSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIMySQLSink.java @@ -60,6 +60,7 @@ public class NGSIMySQLSink extends NGSISink { private static final int DEFAULT_MAX_POOL_SIZE = 3; private static final int DEFAULT_MAX_POOL_IDLE = 2; private static final int DEFAULT_MIN_POOL_IDLE = 0; + private static final int DEFAULT_MAX_POOL_WAIT = 10000; private static final String DEFAULT_ATTR_NATIVE_TYPES = "false"; private static final String MYSQL_DRIVER_NAME = "com.mysql.jdbc.Driver"; private static final SQLInstance MYSQL_INSTANCE_NAME = SQLInstance.MYSQL; @@ -78,6 +79,7 @@ public class NGSIMySQLSink extends NGSISink { private int maxPoolSize; private int maxPoolIdle; private int minPoolIdle; + private int maxPoolWait; private boolean rowAttrPersistence; private SQLBackendImpl mySQLPersistenceBackend; private boolean attrNativeTypes; @@ -203,6 +205,9 @@ public void configure(Context context) { minPoolIdle = context.getInteger("mysql_minPoolIdle", DEFAULT_MIN_POOL_IDLE); LOGGER.debug("[" + this.getName() + "] Reading configuration (mysql_minPoolIdle=" + minPoolIdle + ")"); + maxPoolWait = context.getInteger("mysql_maxPoolWait", DEFAULT_MAX_POOL_WAIT); + LOGGER.debug("[" + this.getName() + "] Reading configuration (mysql_maxPoolWait=" + maxPoolWait + ")"); + rowAttrPersistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE).equals("row"); String persistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE); @@ -291,7 +296,7 @@ public void start() { createPersistenceBackend(mysqlHost, mysqlPort, mysqlUsername, mysqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, mysqlOptions, persistErrors, maxLatestErrors); LOGGER.debug("[" + this.getName() + "] MySQL persistence backend created"); } catch (Exception e) { - String configParams = " mysqlHost " + mysqlHost + " mysqlPort " + mysqlPort + " mysqlUsername " + mysqlUsername + " mysqlPassword " + mysqlPassword + " maxPoolSize " + maxPoolSize + " maxPoolIdle " + maxPoolIdle + " minPoolIdle " + minPoolIdle + " mysqlOptions " + mysqlOptions + " persistErrors " + persistErrors + " maxLatestErrors " + maxLatestErrors; + String configParams = " mysqlHost " + mysqlHost + " mysqlPort " + mysqlPort + " mysqlUsername " + mysqlUsername + " mysqlPassword " + mysqlPassword + " maxPoolSize " + maxPoolSize + " maxPoolIdle " + maxPoolIdle + " minPoolIdle " + minPoolIdle + " maxPoolWait " + maxPoolWait + " mysqlOptions " + mysqlOptions + " persistErrors " + persistErrors + " maxLatestErrors " + maxLatestErrors; LOGGER.error("Error while creating the MySQL persistence backend. " + "Config params= " + configParams + "Details=" + e.getMessage() + @@ -310,9 +315,9 @@ public void stop() { /** * Initialices a lazy singleton to share among instances on JVM */ - private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, String sqlOptions, boolean persistErrors, int maxLatestErrors) { +private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int maxPoolWait, String sqlOptions, boolean persistErrors, int maxLatestErrors) { if (mySQLPersistenceBackend == null) { - mySQLPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, sqlOptions, persistErrors, maxLatestErrors); + mySQLPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, MYSQL_INSTANCE_NAME, MYSQL_DRIVER_NAME, sqlOptions, persistErrors, maxLatestErrors); } } diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIOracleSQLSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIOracleSQLSink.java index 79635a3df..2bd6e5e17 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIOracleSQLSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIOracleSQLSink.java @@ -60,6 +60,7 @@ public class NGSIOracleSQLSink extends NGSISink { private static final int DEFAULT_MAX_POOL_SIZE = 3; private static final int DEFAULT_MAX_POOL_IDLE = 2; private static final int DEFAULT_MIN_POOL_IDLE = 0; + private static final int DEFAULT_MAX_POOL_WAIT = 10000; private static final String DEFAULT_ATTR_NATIVE_TYPES = "false"; //private static final String ORACLE_DRIVER_NAME = "oracle.jdbc.OracleDriver"; private static final String ORACLE_DRIVER_NAME = "oracle.jdbc.driver.OracleDriver"; @@ -86,6 +87,7 @@ public class NGSIOracleSQLSink extends NGSISink { private int maxPoolSize; private int maxPoolIdle; private int minPoolIdle; + private int maxPoolWait; private boolean rowAttrPersistence; private SQLBackendImpl oracleSQLPersistenceBackend; private boolean attrNativeTypes; @@ -226,6 +228,9 @@ public void configure(Context context) { minPoolIdle = context.getInteger("oracle_minPoolIdle", DEFAULT_MIN_POOL_IDLE); LOGGER.debug("[" + this.getName() + "] Reading configuration (oracle_minPoolIdle=" + minPoolIdle + ")"); + maxPoolWait = context.getInteger("oracle_maxPoolWait", DEFAULT_MAX_POOL_WAIT); + LOGGER.debug("[" + this.getName() + "] Reading configuration (oracle_maxPoolWait=" + maxPoolWait + ")"); + rowAttrPersistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE).equals("row"); String persistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE); @@ -337,10 +342,10 @@ public void configure(Context context) { @Override public void start() { try { - createPersistenceBackend(oracleHost, oraclePort, oracleUsername, oraclePassword, maxPoolSize, maxPoolIdle, minPoolIdle, oracleOptions, persistErrors, maxLatestErrors); + createPersistenceBackend(oracleHost, oraclePort, oracleUsername, oraclePassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, oracleOptions, persistErrors, maxLatestErrors); LOGGER.debug("[" + this.getName() + "] OracleSQL persistence backend created"); } catch (Exception e) { - String configParams = " oracleHost " + oracleHost + " oraclePort " + oraclePort + " oracleUsername " + oracleUsername + " oraclePassword " + oraclePassword + " maxPoolSize " + maxPoolSize + " maxPoolIdle " + maxPoolIdle + " minPoolIdle " + minPoolIdle + " oracleOptions " + oracleOptions + " persistErrors " + persistErrors + " maxLatestErrors " + maxLatestErrors; + String configParams = " oracleHost " + oracleHost + " oraclePort " + oraclePort + " oracleUsername " + oracleUsername + " oraclePassword " + oraclePassword + " maxPoolSize " + maxPoolSize + " maxPoolIdle " + maxPoolIdle + " minPoolIdle " + minPoolIdle + " maxPoolWait " + maxPoolWait + " oracleOptions " + oracleOptions + " persistErrors " + persistErrors + " maxLatestErrors " + maxLatestErrors; LOGGER.error("Error while creating the OracleSQL persistence backend. " + "Config params= " + configParams + "Details=" + e.getMessage() + @@ -359,9 +364,9 @@ public void stop() { /** * Initialices a lazy singleton to share among instances on JVM */ - private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, String sqlOptions, boolean persistErrors, int maxLatestErrors) { + private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int maxPoolWait, String sqlOptions, boolean persistErrors, int maxLatestErrors) { if (oracleSQLPersistenceBackend == null) { - oracleSQLPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, ORACLE_INSTANCE_NAME, ORACLE_DRIVER_NAME, sqlOptions, persistErrors, maxLatestErrors); + oracleSQLPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, ORACLE_INSTANCE_NAME, ORACLE_DRIVER_NAME, sqlOptions, persistErrors, maxLatestErrors); oracleSQLPersistenceBackend.setNlsTimestampFormat(nlsTimestampFormat); oracleSQLPersistenceBackend.setNlsTimestampTzFormat(nlsTimestampTzFormat); } diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgisSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgisSink.java index 6abad53a5..b8aa1f18a 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgisSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgisSink.java @@ -56,6 +56,7 @@ public class NGSIPostgisSink extends NGSISink { private static final int DEFAULT_MAX_POOL_SIZE = 3; private static final int DEFAULT_MAX_POOL_IDLE = 2; private static final int DEFAULT_MIN_POOL_IDLE = 0; + private static final int DEFAULT_MAX_POOL_WAIT = 10000; private static final String DEFAULT_POSTGIS_TYPE = "geometry"; private static final String DEFAULT_ATTR_NATIVE_TYPES = "false"; private static final String POSTGIS_DRIVER_NAME = "org.postgresql.Driver"; @@ -79,6 +80,7 @@ public class NGSIPostgisSink extends NGSISink { private int maxPoolSize; private int maxPoolIdle; private int minPoolIdle; + private int maxPoolWait; private SQLBackendImpl postgisPersistenceBackend; private boolean enableCache; private boolean swapCoordinates; @@ -228,6 +230,9 @@ public void configure(Context context) { minPoolIdle = context.getInteger("postgis_minPoolIdle", DEFAULT_MIN_POOL_IDLE); LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_minPoolIdle=" + minPoolIdle + ")"); + maxPoolWait = context.getInteger("postgis_maxPoolWait", DEFAULT_MAX_POOL_WAIT); + LOGGER.debug("[" + this.getName() + "] Reading configuration (postgis_maxPoolWait=" + maxPoolWait + ")"); + rowAttrPersistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE).equals("row"); String persistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE); @@ -333,11 +338,11 @@ public void stop() { @Override public void start() { try { - createPersistenceBackend(postgisHost, postgisPort, postgisUsername, postgisPassword, maxPoolSize, maxPoolIdle, minPoolIdle, postgisOptions, persistErrors, maxLatestErrors); + createPersistenceBackend(postgisHost, postgisPort, postgisUsername, postgisPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, postgisOptions, persistErrors, maxLatestErrors); LOGGER.debug("[" + this.getName() + "] POSTGIS persistence backend created"); } catch (Exception e) { String configParams = " postgisHost " + postgisHost + " postgisPort " + postgisPort + " postgisUsername " + - postgisUsername + " postgisPassword " + postgisPassword + " maxPoolSize " + maxPoolSize + " maxPoolIdle " + maxPoolIdle + " minPoolIdle " + minPoolIdle + " postgisOptions " + + postgisUsername + " postgisPassword " + postgisPassword + " maxPoolSize " + maxPoolSize + " maxPoolIdle " + maxPoolIdle + " minPoolIdle " + minPoolIdle + " maxPoolWait " + maxPoolIWaitdle + " postgisOptions " + postgisOptions + " persistErrors " + persistErrors + " maxLatestErrors " + maxLatestErrors; LOGGER.error("Error while creating the Postgis persistence backend. " + "Config params= " + configParams + @@ -352,9 +357,9 @@ public void start() { /** * Initialices a lazy singleton to share among instances on JVM */ - private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, String sqlOptions, boolean persistErrors, int maxLatestErrors) { + private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int maxPoolWait, String sqlOptions, boolean persistErrors, int maxLatestErrors) { if (postgisPersistenceBackend == null) { - postgisPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, POSTGIS_INSTANCE_NAME, POSTGIS_DRIVER_NAME, sqlOptions, persistErrors, maxLatestErrors); + postgisPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, POSTGIS_INSTANCE_NAME, POSTGIS_DRIVER_NAME, sqlOptions, persistErrors, maxLatestErrors); } } diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgreSQLSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgreSQLSink.java index 7a2be4cb4..d9d31fe4a 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgreSQLSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIPostgreSQLSink.java @@ -58,6 +58,7 @@ public class NGSIPostgreSQLSink extends NGSISink { private static final int DEFAULT_MAX_POOL_SIZE = 3; private static final int DEFAULT_MAX_POOL_IDLE = 2; private static final int DEFAULT_MIN_POOL_IDLE = 0; + private static final int DEFAULT_MAX_POOL_WAIT = 10000; private static final String DEFAULT_ATTR_NATIVE_TYPES = "false"; private static final String POSTGRESQL_DRIVER_NAME = "org.postgresql.Driver"; private static final SQLInstance POSTGRESQL_INSTANCE_NAME = SQLInstance.POSTGRESQL; @@ -79,6 +80,7 @@ public class NGSIPostgreSQLSink extends NGSISink { private int maxPoolSize; private int maxPoolIdle; private int minPoolIdle; + private int maxPoolWait; private boolean rowAttrPersistence; private SQLBackendImpl postgreSQLPersistenceBackend; private boolean enableCache; @@ -227,6 +229,10 @@ public void configure(Context context) { minPoolIdle = context.getInteger("postgresql_minPoolIdle", DEFAULT_MIN_POOL_IDLE); LOGGER.debug("[" + this.getName() + "] Reading configuration (postgresql_minPoolIdle=" + minPoolIdle + ")"); + + maxPoolWait = context.getInteger("postgresql_maxPoolWait", DEFAULT_MAX_POOL_WAIT); + LOGGER.debug("[" + this.getName() + "] Reading configuration (postgresql_maxPoolWait=" + maxPoolWait + ")"); + rowAttrPersistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE).equals("row"); String persistence = context.getString("attr_persistence", DEFAULT_ROW_ATTR_PERSISTENCE); @@ -324,12 +330,12 @@ public void configure(Context context) { @Override public void start() { try { - createPersistenceBackend(postgresqlHost, postgresqlPort, postgresqlUsername, postgresqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, postgresqlOptions, persistErrors, maxLatestErrors); + createPersistenceBackend(postgresqlHost, postgresqlPort, postgresqlUsername, postgresqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, postgresqlOptions, persistErrors, maxLatestErrors); LOGGER.debug("[" + this.getName() + "] Postgresql persistence backend created"); } catch (Exception e) { String configParams = " postgresqlHost " + postgresqlHost + " postgresqlPort " + postgresqlPort + " postgresqlUsername " + postgresqlUsername + " postgresqlPassword " + postgresqlPassword + - " maxPoolSize " + maxPoolSize + " maxPoolIdle " + maxPoolIdle + " minPoolIdle " + minPoolIdle + " postgresqlOptions " + + " maxPoolSize " + maxPoolSize + " maxPoolIdle " + maxPoolIdle + " minPoolIdle " + minPoolIdle + " maxPoolWait " + maxPoolWait + " postgresqlOptions " + postgresqlOptions + " persistErrors " + persistErrors + " maxLatestErrors " + maxLatestErrors; LOGGER.error("Error while creating the Postgresql persistence backend. " + "Config params= " + configParams + @@ -344,9 +350,9 @@ public void start() { /** * Initialices a lazy singleton to share among instances on JVM */ - private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, String sqlOptions, boolean persistErrors, int maxLatestErrors) { + private void createPersistenceBackend(String sqlHost, String sqlPort, String sqlUsername, String sqlPassword, int maxPoolSize, int maxPoolIdle, int minPoolIdle, int maxPoolWait, String sqlOptions, boolean persistErrors, int maxLatestErrors) { if (postgreSQLPersistenceBackend == null) { - postgreSQLPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, POSTGRESQL_INSTANCE_NAME, POSTGRESQL_DRIVER_NAME, sqlOptions, persistErrors, maxLatestErrors); + postgreSQLPersistenceBackend = new SQLBackendImpl(sqlHost, sqlPort, sqlUsername, sqlPassword, maxPoolSize, maxPoolIdle, minPoolIdle, maxPoolWait, POSTGRESQL_INSTANCE_NAME, POSTGRESQL_DRIVER_NAME, sqlOptions, persistErrors, maxLatestErrors); } }