From 875a1bafec93f81cb603dda097c87b9399613abc Mon Sep 17 00:00:00 2001 From: wenmo <32723967+wenmo@users.noreply.github.com> Date: Sat, 29 Jun 2024 20:57:31 +0800 Subject: [PATCH] [FLINK-35730][cdc-cli] PipelineDefinitionParser add parse string method --- .../cli/parser/PipelineDefinitionParser.java | 6 +++ .../parser/YamlPipelineDefinitionParser.java | 21 +++++--- .../YamlPipelineDefinitionParserTest.java | 51 +++++++++++++++++++ 3 files changed, 72 insertions(+), 6 deletions(-) diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java index c7eef56c14..bfcde27bdd 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/PipelineDefinitionParser.java @@ -30,4 +30,10 @@ public interface PipelineDefinitionParser { * the {@link PipelineDef}. */ PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfig) throws Exception; + + /** + * Parse the specified pipeline definition string, merge global configurations, then generate + * the {@link PipelineDef}. + */ + PipelineDef parse(String pipelineDefText, Configuration globalPipelineConfig) throws Exception; } diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index 23b2c63ff7..dbdad5a984 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -75,13 +75,22 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { @Override public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfig) throws Exception { - JsonNode root = mapper.readTree(pipelineDefPath.toFile()); + return parse(mapper.readTree(pipelineDefPath.toFile()), globalPipelineConfig); + } + + @Override + public PipelineDef parse(String pipelineDefText, Configuration globalPipelineConfig) + throws Exception { + return parse(mapper.readTree(pipelineDefText), globalPipelineConfig); + } + private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig) + throws Exception { // Source is required SourceDef sourceDef = toSourceDef( checkNotNull( - root.get(SOURCE_KEY), + pipelineDefJsonNode.get(SOURCE_KEY), "Missing required field \"%s\" in pipeline definition", SOURCE_KEY)); @@ -89,13 +98,13 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi SinkDef sinkDef = toSinkDef( checkNotNull( - root.get(SINK_KEY), + pipelineDefJsonNode.get(SINK_KEY), "Missing required field \"%s\" in pipeline definition", SINK_KEY)); // Transforms are optional List transformDefs = new ArrayList<>(); - Optional.ofNullable(root.get(TRANSFORM_KEY)) + Optional.ofNullable(pipelineDefJsonNode.get(TRANSFORM_KEY)) .ifPresent( node -> node.forEach( @@ -103,11 +112,11 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi // Routes are optional List routeDefs = new ArrayList<>(); - Optional.ofNullable(root.get(ROUTE_KEY)) + Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY)) .ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route)))); // Pipeline configs are optional - Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY)); + Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY)); // Merge user config into global config Configuration pipelineConfig = new Configuration(); diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 2d05bcbde3..fc4f01880f 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -166,6 +166,57 @@ void testInvalidTimeZone() throws Exception { + "Or use 'UTC' without time zone and daylight saving time."); } + @Test + void testParsingFullDefinitionFromString() throws Exception { + String pipelineDefText = + "source:\n" + + " type: mysql\n" + + " name: source-database\n" + + " host: localhost\n" + + " port: 3306\n" + + " username: admin\n" + + " password: pass\n" + + " tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*\n" + + " chunk-column: app_order_.*:id,web_order:product_id\n" + + " capture-new-tables: true\n" + + "\n" + + "sink:\n" + + " type: kafka\n" + + " name: sink-queue\n" + + " bootstrap-servers: localhost:9092\n" + + " auto-create-table: true\n" + + "\n" + + "route:\n" + + " - source-table: mydb.default.app_order_.*\n" + + " sink-table: odsdb.default.app_order\n" + + " description: sync all sharding tables to one\n" + + " - source-table: mydb.default.web_order\n" + + " sink-table: odsdb.default.ods_web_order\n" + + " description: sync table to with given prefix ods_\n" + + "\n" + + "transform:\n" + + " - source-table: mydb.app_order_.*\n" + + " projection: id, order_id, TO_UPPER(product_name)\n" + + " filter: id > 10 AND order_id > 100\n" + + " primary-keys: id\n" + + " partition-keys: product_name\n" + + " table-options: comment=app order\n" + + " description: project fields from source table\n" + + " - source-table: mydb.web_order_.*\n" + + " projection: CONCAT(id, order_id) as uniq_id, *\n" + + " filter: uniq_id > 10\n" + + " description: add new uniq_id for each row\n" + + "\n" + + "pipeline:\n" + + " name: source-database-sync-pipe\n" + + " parallelism: 4\n" + + " schema.change.behavior: evolve\n" + + " schema-operator.rpc-timeout: 1 h"; + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = parser.parse(pipelineDefText, new Configuration()); + assertThat(pipelineDef).isEqualTo(fullDef); + } + private final PipelineDef fullDef = new PipelineDef( new SourceDef(