Skip to content

Commit

Permalink
move rest API to vertx worker pool and docker environment for e2e tes…
Browse files Browse the repository at this point in the history
…t execution. (#10)

* Moving REST APIs to Worker Verticle off Standard Verticle (event loop thread) as they are likely to be blocking.
* docker compose the test and  environment and adding e2e tests for create topic.
* changing the health-check response string and added health check in docker compose for service.
  • Loading branch information
kmrdhruv committed Jun 19, 2023
1 parent 45fe579 commit 03b887e
Show file tree
Hide file tree
Showing 17 changed files with 312 additions and 33 deletions.
5 changes: 0 additions & 5 deletions buildSrc/settings.gradle

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import org.gradle.api.tasks.testing.logging.TestExceptionFormat

plugins {
id 'idea'
id 'java'
Expand All @@ -18,6 +20,28 @@ ext {

otl_version = "1.25.0"
micrometer_version = "1.10.6"
ws_rs_version = "2.1.1"
jersey_version = "2.25.1"
}

sourceSets {
testE2E {
java.srcDir "$projectDir/src/testE2E/java"
resources.srcDir "$projectDir/src/testE2E/resources"
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
}
}

tasks.register('testE2E', Test) {
testClassesDirs = sourceSets.testE2E.output.classesDirs
classpath = sourceSets.testE2E.runtimeClasspath
outputs.upToDateWhen { false } //this will force testE2E to execute always on target invocation.
}

configurations {
testE2EImplementation.extendsFrom testImplementation
testE2ERuntime.extendsFrom testRuntime
}

dependencies {
Expand Down Expand Up @@ -56,6 +80,10 @@ dependencies {

testImplementation('org.junit.jupiter:junit-jupiter:5.9.1')
testImplementation("io.vertx:vertx-junit5:$vertx_version")

testE2EImplementation("javax.ws.rs:javax.ws.rs-api:$ws_rs_version")
testE2EImplementation("org.glassfish.jersey.core:jersey-client:$jersey_version")
testE2EImplementation("org.glassfish.jersey.media:jersey-media-json-jackson:$jersey_version")
}

// code generation
Expand Down Expand Up @@ -95,6 +123,11 @@ tasks.withType(Test).configureEach {
useJUnitPlatform()
testLogging {
events "PASSED", "SKIPPED", "FAILED", "STANDARD_OUT", "STANDARD_ERROR"
exceptionFormat TestExceptionFormat.FULL
showCauses true
showExceptions true
showStackTraces true
showStandardStreams false
}
afterSuite { desc, result ->
if (!desc.parent) { // will match the outermost suite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class JsonMapper {
mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
}

public static ObjectMapper getMapper() {
return mapper;
}

public static <T> String jsonSerialize(T entity) {
try {
return mapper.writeValueAsString(entity);
Expand Down
6 changes: 6 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ plugins {
id 'com.flipkart.varadhi.java-application-conventions'
}


dependencies {

implementation(project(':common'))
Expand Down Expand Up @@ -44,8 +45,13 @@ dependencies {
implementation('org.apache.curator:curator-framework:5.5.0')

testImplementation("io.vertx:vertx-junit5")

testE2EImplementation("javax.ws.rs:javax.ws.rs-api")
testE2EImplementation("org.glassfish.jersey.core:jersey-client")
testE2EImplementation("org.glassfish.jersey.media:jersey-media-json-jackson")
}

application {
mainClass = 'com.flipkart.varadhi.Server'
}

3 changes: 1 addition & 2 deletions server/src/main/java/com/flipkart/varadhi/RestVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ public RestVerticle(ServerConfiguration configuration, CoreServices coreServices
@Override
public void start(Promise<Void> startPromise) {
log.info("HttpServer Starting.");

Router router = Router.router(vertx);

FailureHandler failureHandler = new FailureHandler();
for (RouteDefinition def : coreServices.getRouteDefinitions()) {
Route route = router.route().method(def.method()).path(def.path());
def.behaviours().stream().forEach(d -> d.Configure(route, def, coreServices));
def.behaviours().forEach(d -> d.Configure(route, def, coreServices));
route.handler(def.handler());
route.failureHandler(failureHandler);
}
Expand Down
28 changes: 22 additions & 6 deletions server/src/main/java/com/flipkart/varadhi/Server.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.flipkart.varadhi;

import com.flipkart.varadhi.exceptions.InvalidConfigException;
import com.flipkart.varadhi.exceptions.VaradhiException;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.jmx.JmxConfig;
Expand All @@ -19,6 +20,7 @@
import io.vertx.config.ConfigRetriever;
import io.vertx.config.ConfigRetrieverOptions;
import io.vertx.config.ConfigStoreOptions;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -49,12 +51,8 @@ public static void main(String[] args) {

CoreServices services = new CoreServices(observabilityStack, vertx, configuration);

vertx.deployVerticle(
() -> new RestVerticle(configuration, services), configuration.getDeploymentOptions()
).onFailure(t -> {
log.error("Could not start HttpServer verticle", t);
System.exit(-1);
});
deployRestAPI(vertx, services, configuration);

} catch (Exception e) {
log.error("Failed to initialise the server.", e);
System.out.println("Failed to initialise the server:" + e);
Expand All @@ -63,6 +61,24 @@ public static void main(String[] args) {
// TODO: check need for shutdown hook
}

private static void deployRestAPI(Vertx vertx, CoreServices services, ServerConfiguration configuration) {
DeploymentOptions deploymentOptions = configuration.getRestVerticleDeploymentOptions();
if (!deploymentOptions.isWorker()) {
// Rest API should avoid complete execution on Vertx event loop thread because they are likely to be
// blocking. Rest API need to be either offloaded from event loop via Async or need to be executed on
// Worker Verticle or should use executeBlocking() facility.
// Current code assumes Rest API will be executing on Worker Verticle and hence validate.
log.error("Rest Verticle is expected to be deployed as Worker Verticle.");
throw new InvalidConfigException("Rest API is expected to be deployed via Worker Verticle.");
}

vertx.deployVerticle(() -> new RestVerticle(configuration, services), deploymentOptions)
.onFailure(t -> {
log.error("Could not start HttpServer verticle", t);
throw new VaradhiException("Failed to Deploy Rest API.", t);})
.onSuccess(name -> log.debug("Successfully deployed the Verticle id({}).", name));
}

public static CoreServices.ObservabilityStack setupObservabilityStack(ServerConfiguration configuration) {
Resource resource = Resource.getDefault()
.merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "com.flipkart.varadhi")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class ServerConfiguration {

private VertxOptions vertxOptions;

private DeploymentOptions deploymentOptions;
private DeploymentOptions restVerticleDeploymentOptions;

private boolean authenticationEnabled;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.flipkart.varadhi.utils;

import io.netty.handler.codec.http.HttpHeaderValues;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -9,6 +11,8 @@
public class ResponseExtension {
public static <T> void endRequestWithResponse(RoutingContext ctx, T response) {
String responseBody = JsonMapper.jsonSerialize(response);
ctx.response().putHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
ctx.response().putHeader(HttpHeaders.CONTENT_ENCODING, "utf-8");
ctx.response().end(responseBody, (r) -> {
HttpServerRequest request = ctx.request();
if (r.succeeded()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.flipkart.varadhi.web;

import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.Json;
import io.vertx.ext.web.RoutingContext;
Expand All @@ -22,20 +24,20 @@ public void handle(RoutingContext ctx) {
String errorMsg =
overWriteErrorMsg(response) ? getErrorFromFailure(ctx.failure()) : response.getStatusMessage();

log.error(
"{}: {}: Failed. Status:{}, Error:{}", ctx.request().method(), ctx.request().path(), statusCode,
log.error("{}: {}: Failed. Status:{}, Error:{}", ctx.request().method(), ctx.request().path(), statusCode,
errorMsg
);

response.putHeader(HttpHeaders.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
response.putHeader(HttpHeaders.CONTENT_ENCODING, "utf-8");
response.setStatusCode(statusCode);
response.end(Json.encodeToBuffer(new ErrorResponse(errorMsg)));
}
}

private boolean overWriteErrorMsg(HttpServerResponse response) {
return null == response.getStatusMessage() ||
response.getStatusMessage().isBlank() ||
response.getStatusMessage().equalsIgnoreCase(HttpResponseStatus.OK.reasonPhrase());
return null == response.getStatusMessage()
|| response.getStatusMessage().isBlank()
|| response.getStatusMessage().equalsIgnoreCase(HttpResponseStatus.OK.reasonPhrase());
}

private String getErrorFromFailure(Throwable t) {
Expand All @@ -52,7 +54,7 @@ private String getErrorFromFailure(Throwable t) {
sb.append(t.getCause().getMessage());
}
} else {
sb.append("Internal error");
sb.append("Internal error.");
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.flipkart.varadhi.web.v1;

import com.flipkart.varadhi.utils.RequestBodyExtension;
import com.flipkart.varadhi.utils.ResponseExtension;
import com.flipkart.varadhi.web.RouteDefinition;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.experimental.ExtensionMethod;

import java.util.List;
import java.util.Optional;
Expand All @@ -14,23 +17,23 @@
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;

@ExtensionMethod({ResponseExtension.class})
public class HealthCheckHandler implements Handler<RoutingContext>, RouteDefinition.Provider {

// TODO: add appropriate checks

private volatile int responseCode = HTTP_OK;
private volatile String responseMsg = new JsonObject().encode();
private volatile String responseMsg = "iam_ok";

@Override
public void handle(RoutingContext ctx) {
HttpServerResponse response =
ctx.response().setStatusCode(responseCode).setStatusMessage(responseMsg);
response.end();
ctx.endRequestWithResponse(responseMsg);

}

public void bringOOR() {
responseCode = HTTP_UNAVAILABLE;
responseMsg = JsonObject.of("reason", "oor").encode();
responseMsg = "not_ok: under orr";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,10 @@ public void create(RoutingContext ctx) {
throw new DuplicateResourceException(String.format("Specified Topic(%s) already exists.", topicKey));
}
resourcePersistence.create(topicResource);

//TODO::This should move to async/future pattern. It is getting executed on event loop..
VaradhiTopic vt = varadhiTopicFactory.get(topicResource);

varadhiTopicService.create(vt);

//TODO::Return updated object.
ctx.endRequestWithResponse(topicResource);
}

Expand Down
6 changes: 4 additions & 2 deletions server/src/main/resources/configuration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ vertxOptions:
workerPoolSize: 1
internalBlockingPoolSize: 1

deploymentOptions:
instances: 1
restVerticleDeploymentOptions:
instances: 2
worker: true
workerPoolSize: 2

authenticationEnabled: false

Expand Down
2 changes: 1 addition & 1 deletion server/src/main/resources/pulsarConfig.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pulsarClientOptions:
pulsarUrl: "http://localhost:8081"
pulsarUrl: "http://127.0.0.1:8081"
connectTimeout: 2000
readTimeout: 2000
requestTimeout: 2000
Loading

0 comments on commit 03b887e

Please sign in to comment.