Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File shrinkage #1081

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.TailerDirection;
import net.openhft.chronicle.queue.impl.single.FileShrinkage;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -110,4 +111,8 @@ public interface RollingChronicleQueue extends ChronicleQueue {
* @return the checkpointInterval used by delta wire
*/
int deltaCheckpointInterval();

default FileShrinkage fileShrinkage() {
return FileShrinkage.SHRINK_ASYNCHRONOUSLY;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package net.openhft.chronicle.queue.impl.single;

public enum FileShrinkage {
NONE, SHRINK_SYNCHRONOUSLY, SHRINK_ASYNCHRONOUSLY;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ public enum MetaDataField implements WireKey {
lastIndexReplicated,
sourceId,
dataFormat,
metadata;
metadata,
fileShrinkage;

@Nullable
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static net.openhft.chronicle.queue.impl.single.FileShrinkage.*;

public enum QueueFileShrinkManager {
; // none
;
public static final String THREAD_NAME = "queue~file~shrink~daemon";
// don't use this with a Pretoucher enabled!
public static final boolean RUN_SYNCHRONOUSLY = Jvm.getBoolean("chronicle.queue.synchronousFileShrinking");
Expand All @@ -38,10 +40,23 @@ public enum QueueFileShrinkManager {
private static final ScheduledExecutorService EXECUTOR = Threads.acquireScheduledExecutorService(THREAD_NAME, true);
private static final long DELAY_S = 10;

public static void scheduleShrinking(@NotNull final File queueFile, final long writePos) {
public static FileShrinkage defaultFileShrink() {

if (DISABLE_QUEUE_FILE_SHRINKING)
return;
return NONE;

if (RUN_SYNCHRONOUSLY)
return SHRINK_SYNCHRONOUSLY;

return SHRINK_ASYNCHRONOUSLY;
}

public static void scheduleShrinking(@NotNull final File queueFile, final long writePos, @NotNull FileShrinkage fileShrink) {

if (fileShrink == NONE)
return;

if (fileShrink == SHRINK_SYNCHRONOUSLY)
task(queueFile, writePos);
else {
// The shrink is deferred a bit to allow any potentially lagging tailers/pre-touchers
Expand All @@ -63,6 +78,10 @@ private static void task(@NotNull final File queueFile, final long writePos) {
}
try (RandomAccessFile raf = new RandomAccessFile(queueFile, "rw")) {
raf.setLength(writePos);
System.out.println("setLength=" + writePos + ", raf=" + queueFile.getAbsolutePath());

System.out.println("reaad=" + new RandomAccessFile(queueFile, "r").length());


} catch (IOException ex) {
// on microsoft windows, keep retrying until the file is unmapped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ public class SingleChronicleQueue extends AbstractCloseable implements RollingCh
private final boolean useSparseFile;
private final long sparseCapacity;
final AppenderListener appenderListener;

@NotNull
private final FileShrinkage fileShrink;

protected int sourceId;
@NotNull
private Condition createAppenderCondition = NoOpCondition.INSTANCE;
Expand All @@ -138,6 +142,7 @@ public class SingleChronicleQueue extends AbstractCloseable implements RollingCh

protected SingleChronicleQueue(@NotNull final SingleChronicleQueueBuilder builder) {
try {
fileShrink = builder.fileShrinkage();
rollCycle = builder.rollCycle();
cycleCalculator = cycleCalculator(builder.rollTimeZone());
epoch = builder.epoch();
Expand Down Expand Up @@ -436,6 +441,11 @@ public int deltaCheckpointInterval() {
return deltaCheckpointInterval;
}

@Override
public FileShrinkage fileShrinkage() {
return fileShrink;
}

/**
* @return if we uses a ring buffer to buffer the appends, the Excerpts are written to the Chronicle Queue using a background thread
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

import static java.util.Objects.requireNonNull;
import static net.openhft.chronicle.core.pool.ClassAliasPool.CLASS_ALIASES;
import static net.openhft.chronicle.queue.impl.single.QueueFileShrinkManager.defaultFileShrink;
import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.QUEUE_METADATA_FILE;
import static net.openhft.chronicle.wire.WireType.DEFAULT_ZERO_BINARY;
import static net.openhft.chronicle.wire.WireType.DELTA_BINARY;
Expand Down Expand Up @@ -163,6 +164,25 @@ public static void addAliases() {
// static initialiser.
}


private FileShrinkage fileShrinkage = defaultFileShrink();

/**
* @return if set, shrinks the .cq4 file after roll
*/
public FileShrinkage fileShrinkage() {
return fileShrinkage;
}

/**
* @return sets shrinks the .cq4 file after roll, or use net.openhft.chronicle.queue.impl.single.FileShrink#NONE
* if not required, default is net.openhft.chronicle.queue.impl.single.FileShrink#SHRINK_ASYNCHRONOUSLY
*/
public SingleChronicleQueueBuilder fileShrinkage(FileShrinkage fileShrink) {
this.fileShrinkage = fileShrink;
return this;
}

/**
* @return an empty builder
*/
Expand Down Expand Up @@ -262,6 +282,7 @@ static SingleChronicleQueueStore createStore(@NotNull RollingChronicleQueue queu
queue.indexCount(),
queue.indexSpacing());

wireStore.fileShrinkage(queue.fileShrinkage());
wire.writeEventName(MetaDataKeys.header).typedMarshallable(wireStore);
return wireStore;
}
Expand Down Expand Up @@ -1065,7 +1086,7 @@ public Supplier<BiConsumer<BytesStore, Bytes<?>>> decodingSupplier() {
}

public SingleChronicleQueueBuilder codingSuppliers(@Nullable
Supplier<BiConsumer<BytesStore, Bytes<?>>> encodingSupplier,
Supplier<BiConsumer<BytesStore, Bytes<?>>> encodingSupplier,
@Nullable Supplier<BiConsumer<BytesStore, Bytes<?>>> decodingSupplier) {
if ((encodingSupplier == null) != (decodingSupplier == null))
throw new UnsupportedOperationException("Both encodingSupplier and decodingSupplier must be set or neither");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static net.openhft.chronicle.queue.impl.single.QueueFileShrinkManager.*;

public class SingleChronicleQueueStore extends AbstractCloseable implements WireStore {
static {
ClassAliasPool.CLASS_ALIASES.addAlias(SCQIndexing.class);
Expand All @@ -57,6 +59,9 @@ public class SingleChronicleQueueStore extends AbstractCloseable implements Wire
@NotNull
private final transient Sequence sequence;

@NotNull
private FileShrinkage fileShrinkage = defaultFileShrink();

private int cycle;

/**
Expand All @@ -77,6 +82,14 @@ private SingleChronicleQueueStore(@NotNull WireIn wire) {
this.indexing.writePosition = writePosition;
this.sequence = new RollCycleEncodeSequence(writePosition, rollIndexCount(), rollIndexSpacing());
this.indexing.sequence = sequence;

try {
ValueIn read = wire.read(MetaDataField.fileShrinkage);
if (read.isPresent())
this.fileShrinkage = read.asEnum(FileShrinkage.class);
} catch (Exception ignore) {
}

if (wire.bytes().readRemaining() > 0) {
final int version = wire.read(MetaDataField.dataFormat).int32();
this.dataVersion = version > 1 ? 0 : version;
Expand Down Expand Up @@ -341,6 +354,7 @@ public void writeMarshallable(@NotNull WireOut wire) {
intForBinding(wireOut, writePosition)
.write(MetaDataField.indexing).typedMarshallable(this.indexing)
.write(MetaDataField.dataFormat).int32(dataVersion);
wire.write(MetaDataField.fileShrinkage).asEnum(fileShrinkage);
}

@Override
Expand Down Expand Up @@ -408,10 +422,19 @@ public boolean writeEOF(@NotNull Wire wire, long timeoutMS) {
}
}

public FileShrinkage fileShrinkage() {
return fileShrinkage;
}

public SingleChronicleQueueStore fileShrinkage(@NotNull FileShrinkage fileShrinkage) {
this.fileShrinkage = fileShrinkage;
return this;
}

boolean writeEOFAndShrink(@NotNull Wire wire, long timeoutMS) {
if (wire.writeEndOfWire(timeoutMS, TimeUnit.MILLISECONDS, writePosition())) {
// only if we just written EOF
QueueFileShrinkManager.scheduleShrinking(mappedFile.file(), wire.bytes().writePosition());
scheduleShrinking(mappedFile.file(), wire.bytes().writePosition(), fileShrinkage);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.core.time.SetTimeProvider;
import net.openhft.chronicle.queue.ChronicleQueueTestBase;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.RollCycles;
import org.junit.Assert;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;

public class FileShrinkageTest extends ChronicleQueueTestBase {

@Test
public void testShrinkSynchronously() throws IOException, InterruptedException {

final File dataDir = getTmpDir();
final SetTimeProvider timeProvider = new SetTimeProvider();

File file;
try (final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dataDir)
.rollCycle(RollCycles.TEST_SECONDLY)
.fileShrinkage(FileShrinkage.SHRINK_SYNCHRONOUSLY)
.timeProvider(timeProvider).build()) {
final ExcerptAppender excerptAppender = queue.acquireAppender();
excerptAppender.writeText("hello");
file = excerptAppender.currentFile();
}

timeProvider.advanceMillis(2_000);
Thread.sleep(2_000);

try (final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dataDir)
.rollCycle(RollCycles.TEST_SECONDLY)
.timeProvider(timeProvider)
.fileShrinkage(FileShrinkage.SHRINK_SYNCHRONOUSLY)
.build()) {


// we should not have to do this, but even if we do it still does not work.
// queue.acquireAppender();

try (final RandomAccessFile raf = new RandomAccessFile(file, "r")) {
final long len = raf.length();
System.out.println("len=" + len + ", file=" + file.getAbsolutePath());
Assert.assertTrue("len=" + len, len > 520000 && len < 530000);
}
}
}


}