Skip to content

Commit

Permalink
feat: support username password in offline (#3744)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobegit3hub committed Feb 26, 2024
1 parent a8d7a1d commit 8021967
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ class OpenmldbBatchConfig extends Serializable {
@ConfigOption(name = "openmldb.zk.root.path", doc = "The root path of ZooKeeper for NameServer")
var openmldbZkRootPath = ""

@ConfigOption(name = "openmldb.user", doc = "The user of OpenMLDB")
var openmldbUser = "root"

@ConfigOption(name = "openmldb.password", doc = "The password of OpenMLDB")
var openmldbPassword = "root"

@ConfigOption(name = "openmldb.default.db", doc = "The default database for OpenMLDB SQL")
var defaultDb = "default_db"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ class OpenmldbSession {
}

if (this.config.openmldbZkCluster.nonEmpty && this.config.openmldbZkRootPath.nonEmpty) {
logger.info(s"Try to connect OpenMLDB with zk ${this.config.openmldbZkCluster} and root path " +
s"${this.config.openmldbZkRootPath}")
logger.info(s"Try to connect OpenMLDB with zk cluster: ${this.config.openmldbZkCluster}, root path: " +
s"${this.config.openmldbZkRootPath}, user: ${this.config.openmldbUser}, password: " +
s"${this.config.openmldbPassword}")
try {
openmldbCatalogService = new OpenmldbCatalogService(this.config.openmldbZkCluster,
this.config.openmldbZkRootPath, config.openmldbJsdkLibraryPath)
this.config.openmldbZkRootPath, this.config.openmldbUser, this.config.openmldbPassword,
config.openmldbJsdkLibraryPath)
registerOpenmldbOfflineTable(openmldbCatalogService)
} catch {
case e: Exception => logger.warn("Fail to connect OpenMLDB cluster and register tables, " + e.getMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,24 @@ import com._4paradigm.openmldb.sdk.SdkOption
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable

class OpenmldbCatalogService(val zkCluster: String, val zkPath: String, val openmldbJsdkPath: String) {
class OpenmldbCatalogService(val zkCluster: String, val zkPath: String, val username: String, val password: String,
val openmldbJsdkPath: String) {

val option = new SdkOption
option.setZkCluster(zkCluster)
option.setZkPath(zkPath)
option.setUser(username)
option.setPassword(password)

val sqlExecutor = new SqlClusterExecutor(option, openmldbJsdkPath)
val zkClient = new ZKClient(ZKConfig.builder()
.cluster(zkCluster)
.namespace(zkPath)
.build())
zkClient.connect();

def this(zkCluster: String, zkPath: String) = {
this(zkCluster, zkPath, "")
def this(zkCluster: String, zkPath: String, username: String, password: String) = {
this(zkCluster, zkPath, username, password, "")
}

def getDatabases: Array[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ object ConfigReflections {
setters += name -> { (config: OpenmldbBatchConfig, value: Any) =>
try {
val im = mirror.reflect(config)
logger.info("Native Spark Configuration: " + name + " -> " + value)
logger.info("OpenMLDB Batch Configuration: " + name + " -> " + value)

val typedValue = parseValue(value, term.info.toString)
if (typedValue != null) {
im.reflectField(term).set(typedValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class TestOpenmldbCatalogService extends FunSuite {
val zkCluster = "127.0.0.1:2181"
val zkPath = "/openmldb"

val openmldbCatalogService = new OpenmldbCatalogService(zkCluster, zkPath)
val openmldbCatalogService = new OpenmldbCatalogService(zkCluster, zkPath, "root", "root")

val databases = openmldbCatalogService.getDatabases
for (i <- databases.indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ zookeeper.connection_timeout=5000
zookeeper.max_retries=10
zookeeper.base_sleep_time=1000
zookeeper.max_connect_waitTime=30000
user=root
password=root

# Spark Config
spark.home=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ object JobInfoManager {
option.setZkCluster(TaskManagerConfig.getZkCluster)
option.setZkPath(TaskManagerConfig.getZkRootPath)
option.setUser(TaskManagerConfig.getUser)
option.setPassword(TaskManagerConfig.getPassword)

if (!TaskManagerConfig.getPassword.isEmpty) {
option.setPassword(TaskManagerConfig.getPassword)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ object SparkJobManager {
*/
def createSparkLauncher(mainClass: String): SparkLauncher = {


val launcher = new SparkLauncher()
.setAppResource(TaskManagerConfig.getBatchjobJarPath)
.setMainClass(mainClass)
Expand Down Expand Up @@ -128,6 +127,9 @@ object SparkJobManager {
if (TaskManagerConfig.getZkCluster.nonEmpty && TaskManagerConfig.getZkRootPath.nonEmpty) {
launcher.setConf("spark.openmldb.zk.cluster", TaskManagerConfig.getZkCluster)
launcher.setConf("spark.openmldb.zk.root.path", TaskManagerConfig.getZkRootPath)

launcher.setConf("spark.openmldb.user", TaskManagerConfig.getUser)
launcher.setConf("spark.openmldb.password", TaskManagerConfig.getPassword)
}

// Set ad-hoc Spark configuration
Expand Down

0 comments on commit 8021967

Please sign in to comment.