Skip to content

Commit

Permalink
feat: Ceph & other S3-compatible database-based storages are working
Browse files Browse the repository at this point in the history
  • Loading branch information
brian-mulier-p committed Jul 1, 2024
1 parent 51cd8d9 commit 7f99068
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 24 deletions.
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ dependencies {
annotationProcessor group: "io.kestra", name: "processor", version: kestraVersion

// libs
api 'io.minio:minio:8.5.10'
api ('io.minio:minio:8.5.10') {
exclude (group: 'com.fasterxml.jackson.core', module: 'jackson-core')
exclude (group: 'com.fasterxml.jackson.core', module: 'jackson-databind')
exclude (group: 'com.fasterxml.jackson.core', module: 'jackson-annotations')
}
}


Expand Down
35 changes: 18 additions & 17 deletions src/main/java/io/kestra/storage/minio/MinioStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -66,9 +67,10 @@ public void init() {
@Override
public InputStream get(String tenantId, URI uri) throws IOException {
try {
String path = getPath(tenantId, uri);
return this.minioClient.getObject(GetObjectArgs.builder()
.bucket(this.bucket)
.object(getPath(tenantId, uri))
.object(path)
.build()
);
} catch (MinioException e) {
Expand Down Expand Up @@ -124,8 +126,7 @@ private Stream<String> keysForPrefix(String prefix, boolean recursive, boolean i
&& !name.equals("/")
&& (recursive || Path.of(name).getParent() == null)
&& (includeDirectories || !name.endsWith("/"));
})
.map(name -> name.startsWith("/") ? name : "/" + name);
});
} catch (MinioException e) {
throw reThrowMinioStorageException(prefix, e);
} catch (FileNotFoundException | IllegalArgumentException e) {
Expand Down Expand Up @@ -207,7 +208,7 @@ public URI put(String tenantId, URI uri, InputStream data) throws IOException {
private void mkdirs(String path) throws IOException {
path = path.replaceAll("^/*", "");
String[] directories = path.split("/");
StringBuilder aggregatedPath = new StringBuilder("/");
StringBuilder aggregatedPath = new StringBuilder();
// perform 1 put request per parent directory in the path
for (int i = 0; i <= directories.length - (path.endsWith("/") ? 1 : 2); i++) {
aggregatedPath.append(directories[i]).append("/");
Expand Down Expand Up @@ -383,33 +384,33 @@ public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc
}

private String toPrefix(String path, boolean isDirectory) {
String withoutLeadingSlash = path.substring(1);
if(isDirectory && !withoutLeadingSlash.isBlank()) {
return withoutLeadingSlash.endsWith("/") ? withoutLeadingSlash : withoutLeadingSlash + "/";
boolean isRoot = path.isEmpty();
if(isDirectory && !isRoot) {
return path.endsWith("/") ? path : path + "/";
}

return withoutLeadingSlash;
return path;
}

@NotNull
private String getPath(String tenantId, URI uri) {
if (uri == null) {
uri = URI.create("/");
}

parentTraversalGuard(uri);
String path = uri.getPath();
if (!path.startsWith("/")) {
path = "/" + path;
String path = Optional.ofNullable(uri).map(URI::getPath).orElse("");
if (path.startsWith("/")) {
path = path.substring(1);
}

if (tenantId == null) {
return path;
}
return "/" + tenantId + path;
return tenantId + "/"+ path;
}

private void parentTraversalGuard(URI uri) {
if (uri == null) {
return;
}

if (uri.toString().contains("..")) {
throw new IllegalArgumentException("File should be accessed with their full path and not using relative '..' path.");
}
Expand All @@ -423,7 +424,7 @@ private IOException reThrowMinioStorageException(String uri, MinioException e) {
}

@VisibleForTesting
MinioClient miniClient() {
MinioClient minioClient() {
return minioClient;
}
}
8 changes: 4 additions & 4 deletions src/test/java/io/kestra/storage/minio/MinioStorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ class MinioStorageTest extends StorageTestSuite {
@BeforeEach
void init() throws Exception {
String bucket = ((MinioStorage) storage).getBucket();
if (!((MinioStorage) storage).miniClient().bucketExists(BucketExistsArgs.builder().bucket(bucket).build())) {
((MinioStorage) storage).miniClient().makeBucket(MakeBucketArgs.builder().bucket(bucket).build());
if (!((MinioStorage) storage).minioClient().bucketExists(BucketExistsArgs.builder().bucket(bucket).build())) {
((MinioStorage) storage).minioClient().makeBucket(MakeBucketArgs.builder().bucket(bucket).build());
}
}

@Test
void checkVhostOff() throws Exception {
ByteArrayOutputStream traceStream = new ByteArrayOutputStream();
((MinioStorage) storage).miniClient().traceOn(traceStream);
((MinioStorage) storage).minioClient().traceOn(traceStream);
try {
storage.list(null, URI.create("/"));
assertThat(traceStream.toString(), containsString("Host: " + ((MinioStorage) storage).getEndpoint()));
} finally {
((MinioStorage) storage).miniClient().traceOff();
((MinioStorage) storage).minioClient().traceOff();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class MinioStorageVhostTest {
StorageInterface storage;
@BeforeEach
void init() throws Exception {
MinioClient minioClient = ((MinioStorage) storage).miniClient();
MinioClient minioClient = ((MinioStorage) storage).minioClient();
if (!minioClient.bucketExists(BucketExistsArgs.builder().bucket(((MinioStorage) storage).getBucket()).build())) {
minioClient.makeBucket(MakeBucketArgs.builder().bucket(((MinioStorage) storage).getBucket()).build());
}
Expand All @@ -33,7 +33,7 @@ void init() throws Exception {
@Test
void checkVhostOn() throws Exception {
ByteArrayOutputStream traceStream = new ByteArrayOutputStream();
MinioClient minioClient = ((MinioStorage) storage).miniClient();
MinioClient minioClient = ((MinioStorage) storage).minioClient();
minioClient.traceOn(traceStream);
try {
storage.list(null, URI.create("/"));
Expand Down

0 comments on commit 7f99068

Please sign in to comment.