diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala index 51389aa7fe9..febe53c580e 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/OpenmldbBatchConfig.scala @@ -155,6 +155,12 @@ class OpenmldbBatchConfig extends Serializable { @ConfigOption(name = "openmldb.zk.root.path", doc = "The root path of ZooKeeper for NameServer") var openmldbZkRootPath = "" + @ConfigOption(name = "openmldb.user", doc = "The user of OpenMLDB") + var openmldbUser = "root" + + @ConfigOption(name = "openmldb.password", doc = "The password of OpenMLDB") + var openmldbPassword = "root" + @ConfigOption(name = "openmldb.default.db", doc = "The default database for OpenMLDB SQL") var defaultDb = "default_db" diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala index d16496a6111..d6919a00ed5 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala @@ -68,11 +68,13 @@ class OpenmldbSession { } if (this.config.openmldbZkCluster.nonEmpty && this.config.openmldbZkRootPath.nonEmpty) { - logger.info(s"Try to connect OpenMLDB with zk ${this.config.openmldbZkCluster} and root path " + - s"${this.config.openmldbZkRootPath}") + logger.info(s"Try to connect OpenMLDB with zk cluster: ${this.config.openmldbZkCluster}, root path: " + + s"${this.config.openmldbZkRootPath}, user: ${this.config.openmldbUser}, password: " + + s"${this.config.openmldbPassword}") try { openmldbCatalogService = new OpenmldbCatalogService(this.config.openmldbZkCluster, - this.config.openmldbZkRootPath, config.openmldbJsdkLibraryPath) + this.config.openmldbZkRootPath, this.config.openmldbUser, this.config.openmldbPassword, + config.openmldbJsdkLibraryPath) registerOpenmldbOfflineTable(openmldbCatalogService) } catch { case e: Exception => logger.warn("Fail to connect OpenMLDB cluster and register tables, " + e.getMessage) diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/catalog/OpenmldbCatalogService.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/catalog/OpenmldbCatalogService.scala index d6e4dfa2990..76f20cca538 100644 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/catalog/OpenmldbCatalogService.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/catalog/OpenmldbCatalogService.scala @@ -24,11 +24,15 @@ import com._4paradigm.openmldb.sdk.SdkOption import scala.collection.JavaConverters.asScalaBufferConverter import scala.collection.mutable -class OpenmldbCatalogService(val zkCluster: String, val zkPath: String, val openmldbJsdkPath: String) { +class OpenmldbCatalogService(val zkCluster: String, val zkPath: String, val username: String, val password: String, + val openmldbJsdkPath: String) { val option = new SdkOption option.setZkCluster(zkCluster) option.setZkPath(zkPath) + option.setUser(username) + option.setPassword(password) + val sqlExecutor = new SqlClusterExecutor(option, openmldbJsdkPath) val zkClient = new ZKClient(ZKConfig.builder() .cluster(zkCluster) @@ -36,8 +40,8 @@ class OpenmldbCatalogService(val zkCluster: String, val zkPath: String, val open .build()) zkClient.connect(); - def this(zkCluster: String, zkPath: String) = { - this(zkCluster, zkPath, "") + def this(zkCluster: String, zkPath: String, username: String, password: String) = { + this(zkCluster, zkPath, username, password, "") } def getDatabases: Array[String] = { diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/ConfigReflections.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/ConfigReflections.scala index 9ffad2a4e3a..062580eeb32 100644 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/ConfigReflections.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/ConfigReflections.scala @@ -48,7 +48,8 @@ object ConfigReflections { setters += name -> { (config: OpenmldbBatchConfig, value: Any) => try { val im = mirror.reflect(config) - logger.info("Native Spark Configuration: " + name + " -> " + value) + logger.info("OpenMLDB Batch Configuration: " + name + " -> " + value) + val typedValue = parseValue(value, term.info.toString) if (typedValue != null) { im.reflectField(term).set(typedValue) diff --git a/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/catalog/TestOpenmldbCatalogService.scala b/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/catalog/TestOpenmldbCatalogService.scala index d92587b6614..b2d33a4b5d1 100644 --- a/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/catalog/TestOpenmldbCatalogService.scala +++ b/java/openmldb-batch/src/test/scala/com/_4paradigm/openmldb/batch/catalog/TestOpenmldbCatalogService.scala @@ -26,7 +26,7 @@ class TestOpenmldbCatalogService extends FunSuite { val zkCluster = "127.0.0.1:2181" val zkPath = "/openmldb" - val openmldbCatalogService = new OpenmldbCatalogService(zkCluster, zkPath) + val openmldbCatalogService = new OpenmldbCatalogService(zkCluster, zkPath, "root", "root") val databases = openmldbCatalogService.getDatabases for (i <- databases.indices) { diff --git a/java/openmldb-taskmanager/src/main/resources/taskmanager.properties b/java/openmldb-taskmanager/src/main/resources/taskmanager.properties index e3c41a4b5f8..2b8201c56eb 100644 --- a/java/openmldb-taskmanager/src/main/resources/taskmanager.properties +++ b/java/openmldb-taskmanager/src/main/resources/taskmanager.properties @@ -18,6 +18,8 @@ zookeeper.connection_timeout=5000 zookeeper.max_retries=10 zookeeper.base_sleep_time=1000 zookeeper.max_connect_waitTime=30000 +user=root +password=root # Spark Config spark.home= diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala index 73394749313..47f1afb4d7b 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/JobInfoManager.scala @@ -46,6 +46,8 @@ object JobInfoManager { option.setZkCluster(TaskManagerConfig.getZkCluster) option.setZkPath(TaskManagerConfig.getZkRootPath) option.setUser(TaskManagerConfig.getUser) + option.setPassword(TaskManagerConfig.getPassword) + if (!TaskManagerConfig.getPassword.isEmpty) { option.setPassword(TaskManagerConfig.getPassword) } diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala index 44fc619e536..e06bb5bb076 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala @@ -37,7 +37,6 @@ object SparkJobManager { */ def createSparkLauncher(mainClass: String): SparkLauncher = { - val launcher = new SparkLauncher() .setAppResource(TaskManagerConfig.getBatchjobJarPath) .setMainClass(mainClass) @@ -128,6 +127,9 @@ object SparkJobManager { if (TaskManagerConfig.getZkCluster.nonEmpty && TaskManagerConfig.getZkRootPath.nonEmpty) { launcher.setConf("spark.openmldb.zk.cluster", TaskManagerConfig.getZkCluster) launcher.setConf("spark.openmldb.zk.root.path", TaskManagerConfig.getZkRootPath) + + launcher.setConf("spark.openmldb.user", TaskManagerConfig.getUser) + launcher.setConf("spark.openmldb.password", TaskManagerConfig.getPassword) } // Set ad-hoc Spark configuration