Skip to content

Commit

Permalink
fix(aws): render variable inside SNS and SQS Publish tasks (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed May 20, 2023
1 parent ab552e5 commit c63c7ab
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 21 deletions.
10 changes: 5 additions & 5 deletions src/main/java/io/kestra/plugin/aws/sns/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Publish.Output run(RunContext runContext) throws Exception {

try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
flowable = Flowable.create(FileSerde.reader(inputStream, Message.class), BackpressureStrategy.BUFFER);
resultFlowable = this.buildFlowable(flowable, snsClient);
resultFlowable = this.buildFlowable(flowable, snsClient, runContext);

count = resultFlowable.reduce(Integer::sum).blockingGet();
}
Expand All @@ -79,12 +79,12 @@ public Publish.Output run(RunContext runContext) throws Exception {
.fromArray(((List<Message>) this.from).toArray())
.cast(Message.class);

resultFlowable = this.buildFlowable(flowable, snsClient);
resultFlowable = this.buildFlowable(flowable, snsClient, runContext);

count = resultFlowable.reduce(Integer::sum).blockingGet();
} else {
var msg = JacksonMapper.toMap(this.from, Message.class);
snsClient.publish(msg.to(PublishRequest.builder().topicArn(getTopicArn())));
snsClient.publish(msg.to(PublishRequest.builder().topicArn(getTopicArn()), runContext));

count = 1;
}
Expand All @@ -98,10 +98,10 @@ public Publish.Output run(RunContext runContext) throws Exception {
}
}

private Flowable<Integer> buildFlowable(Flowable<Message> flowable, SnsClient snsClient) {
private Flowable<Integer> buildFlowable(Flowable<Message> flowable, SnsClient snsClient, RunContext runContext) {
return flowable
.map(message -> {
snsClient.publish(message.to(PublishRequest.builder().topicArn(getTopicArn())));
snsClient.publish(message.to(PublishRequest.builder().topicArn(getTopicArn()), runContext));
return 1;
});
}
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/io/kestra/plugin/aws/sns/model/Message.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.kestra.plugin.aws.sns.model;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
Expand Down Expand Up @@ -39,14 +41,14 @@ public class Message {
private String structure;


public PublishRequest to(PublishRequest.Builder builder) {
public PublishRequest to(PublishRequest.Builder builder, RunContext runContext) throws IllegalVariableEvaluationException {
return builder
.message(data)
.messageGroupId(groupId)
.messageDeduplicationId(deduplicationId)
.subject(subject)
.phoneNumber(phoneNumber)
.messageStructure(structure)
.message(runContext.render(data))
.messageGroupId(runContext.render(groupId))
.messageDeduplicationId(runContext.render(deduplicationId))
.subject(runContext.render(subject))
.phoneNumber(runContext.render(phoneNumber))
.messageStructure(runContext.render(structure))
.build();
}
}
10 changes: 5 additions & 5 deletions src/main/java/io/kestra/plugin/aws/sqs/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Output run(RunContext runContext) throws Exception {

try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
flowable = Flowable.create(FileSerde.reader(inputStream, Message.class), BackpressureStrategy.BUFFER);
resultFlowable = this.buildFlowable(flowable, sqsClient);
resultFlowable = this.buildFlowable(flowable, sqsClient, runContext);

count = resultFlowable.reduce(Integer::sum).blockingGet();
}
Expand All @@ -79,12 +79,12 @@ public Output run(RunContext runContext) throws Exception {
.fromArray(((List<Message>) this.from).toArray())
.cast(Message.class);

resultFlowable = this.buildFlowable(flowable, sqsClient);
resultFlowable = this.buildFlowable(flowable, sqsClient, runContext);

count = resultFlowable.reduce(Integer::sum).blockingGet();
} else {
var msg = JacksonMapper.toMap(this.from, Message.class);
sqsClient.sendMessage(msg.to(SendMessageRequest.builder().queueUrl(getQueueUrl())));
sqsClient.sendMessage(msg.to(SendMessageRequest.builder().queueUrl(getQueueUrl()), runContext));

count = 1;
}
Expand All @@ -98,10 +98,10 @@ public Output run(RunContext runContext) throws Exception {
}
}

private Flowable<Integer> buildFlowable(Flowable<Message> flowable, SqsClient sqsClient) {
private Flowable<Integer> buildFlowable(Flowable<Message> flowable, SqsClient sqsClient, RunContext runContext) {
return flowable
.map(message -> {
sqsClient.sendMessage(message.to(SendMessageRequest.builder().queueUrl(getQueueUrl())));
sqsClient.sendMessage(message.to(SendMessageRequest.builder().queueUrl(getQueueUrl()), runContext));
return 1;
});
}
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/io/kestra/plugin/aws/sqs/model/Message.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.kestra.plugin.aws.sqs.model;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
Expand Down Expand Up @@ -30,11 +32,11 @@ public class Message {
@PluginProperty
private Integer delaySeconds;

public SendMessageRequest to(SendMessageRequest.Builder builder) {
public SendMessageRequest to(SendMessageRequest.Builder builder, RunContext runContext) throws IllegalVariableEvaluationException {
return builder
.messageBody(data)
.messageGroupId(groupId)
.messageDeduplicationId(deduplicationId)
.messageBody(runContext.render(data))
.messageGroupId(runContext.render(groupId))
.messageDeduplicationId(runContext.render(deduplicationId))
.delaySeconds(delaySeconds)
.build();
}
Expand Down

0 comments on commit c63c7ab

Please sign in to comment.