diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java index c0d1f3fb42..68805bbf4a 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/StoreTailer.java @@ -919,7 +919,7 @@ private ExcerptTailer callOriginalToEnd() { queue.refreshDirectoryListing(); // due to a race condition, where the queue rolls as we are processing toEnd() // we may get a NotReachedException ( see https://github.com/OpenHFT/Chronicle-Queue/issues/702 ) - // hence are are just going to retry. + // hence we are just going to retry. try { return originalToEnd(); } catch (Exception ex) { diff --git a/src/main/java/net/openhft/chronicle/queue/reader/Reader.java b/src/main/java/net/openhft/chronicle/queue/reader/Reader.java index fa5380fb8b..9aeff62ec7 100644 --- a/src/main/java/net/openhft/chronicle/queue/reader/Reader.java +++ b/src/main/java/net/openhft/chronicle/queue/reader/Reader.java @@ -24,55 +24,175 @@ import java.nio.file.Path; import java.util.function.Consumer; +/** + * The Reader interface provides methods for reading messages from a Chronicle Queue. + * It allows for customization of the reading process through various configuration methods, and + * creates a Reader with the {@link #create()} method + */ public interface Reader { + /** + * Executes the Reader. + */ void execute(); + /** + * Stops the Reader. + */ void stop(); + /** + * Sets the message sink for this Reader. If not set, messages are output to stdout. + * + * @param messageSink A Consumer function that will handle the messages read by this Reader. + * @return this + */ Reader withMessageSink(@NotNull Consumer messageSink); + /** + * Sets the base path for this Reader. + * + * @param path The base path. + * @return this + */ Reader withBasePath(@NotNull Path path); + /** + * Adds an inclusion regex for this Reader. These are anded together. + * + * @param regex The inclusion regex. + * @return The Reader instance with the inclusion regex set. + */ Reader withInclusionRegex(@NotNull String regex); + /** + * Adds exclusion regex for this Reader. These are anded together. + * + * @param regex The exclusion regex. + * @return this + */ Reader withExclusionRegex(@NotNull String regex); + /** + * Sets the custom plugin for this Reader. Allows more flexibility than {@link #withMessageSink(Consumer)} + * + * @param customPlugin The custom plugin. + * @return this + */ Reader withCustomPlugin(@NotNull ChronicleReaderPlugin customPlugin); + /** + * Sets the start index for this Reader. + * + * @param index The start index. + * @return this + */ Reader withStartIndex(final long index); + /** + * Sets the content-based limiter for this Reader. + * + * @param contentBasedLimiter The content-based limiter. + * @return this + */ ChronicleReader withContentBasedLimiter(ContentBasedLimiter contentBasedLimiter); + /** + * Sets the argument for this Reader. Used in conjunction with {@link #withBinarySearch(String)} + * + * @param arg The argument. + * @return this + */ Reader withArg(@NotNull String arg); + /** + * Sets the limiter argument for this Reader. Used with {@link #withContentBasedLimiter(ContentBasedLimiter)} + * + * @param limiterArg The limiter argument. + * @return this + */ Reader withLimiterArg(@NotNull String limiterArg); + /** + * Sets the Reader to tail mode. + * + * @return this + */ Reader tail(); + /** + * Sets the maximum number of history records for this Reader. + * + * @param maxHistoryRecords The maximum number of history records. + * @return this + */ Reader historyRecords(final long maxHistoryRecords); /** - * specify method reader interface to use - * @param methodReaderInterface interface class name. If empty, a dummy reader is created + * Sets the method reader interface for this Reader. + * + * @param methodReaderInterface The method reader interface class name. If empty, a dummy reader is created. * @return this */ Reader asMethodReader(@NotNull String methodReaderInterface); + /** + * Sets the wire type for this Reader. + * + * @param wireType The wire type. + * @return this + */ Reader withWireType(@NotNull WireType wireType); + /** + * Suppresses the display index for this Reader. + * + * @return this + */ Reader suppressDisplayIndex(); + /** + * Sets the binary search for this Reader. + * + * @param binarySearch The binary search. + * @return this + */ Reader withBinarySearch(@NotNull String binarySearch); + /** + * Sets whether to show message history for this Reader. + * + * @param showMessageHistory Whether to show message history. + * @return this + */ Reader showMessageHistory(boolean showMessageHistory); + /** + * Retrieves the argument for this Reader. + * + * @return The argument. + */ String arg(); + /** + * Retrieves the limiter argument for this Reader. + * + * @return The limiter argument. + */ String limiterArg(); + /** + * Retrieves the method reader interface for this Reader. + * + * @return The method reader interface. + */ Class methodReaderInterface(); + /** + * Creates a new Reader instance. + * + * @return A new Reader instance. + */ static Reader create() { return new ChronicleReader(); } diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/StoreTailerTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/StoreTailerTest.java index a7734ba062..9aee06f95c 100644 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/StoreTailerTest.java +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/StoreTailerTest.java @@ -586,4 +586,77 @@ public void run() throws InterruptedException { } } } + + @Test(expected = IllegalStateException.class) + public void cantMoveToStartDuringDocumentReading() { + File dir = getTmpDir(); + try (SingleChronicleQueue queue = ChronicleQueue.singleBuilder(dir) + .testBlockSize().build(); + ExcerptTailer tailer = queue.createTailer(); + ExcerptAppender appender = queue.createAppender()) { + appender.writeText("Hello World"); + try (DocumentContext dc = tailer.readingDocument(true)) { + assertTrue(dc.isPresent()); + assertTrue(dc.isMetaData()); + assertEquals("header", dc.wire().readEvent(String.class)); + assertTrue(tailer.toString().contains("StoreTailer{")); + tailer.toStart();//forbidden + } + } + } + + @Test(expected = IllegalStateException.class) + public void cantMoveToEndDuringDocumentReading() { + File dir = getTmpDir(); + try (SingleChronicleQueue queue = ChronicleQueue.singleBuilder(dir) + .testBlockSize().build(); + ExcerptTailer tailer = queue.createTailer(); + ExcerptAppender appender = queue.createAppender()) { + appender.writeText("Hello World"); + try (DocumentContext dc = tailer.readingDocument(true)) { + assertTrue(dc.isPresent()); + assertTrue(dc.isMetaData()); + assertEquals("header", dc.wire().readEvent(String.class)); + assertTrue(tailer.toString().contains("StoreTailer{")); + tailer.toEnd();//forbidden + } + } + } + + @Test + public void testStriding() { + File dir = getTmpDir(); + try (SingleChronicleQueue queue = ChronicleQueue.singleBuilder(dir) + .testBlockSize().build(); + ExcerptTailer tailer = queue.createTailer()) { + tailer.striding(true); + assertTrue(tailer.striding()); + } + } + + @Test + public void testMoveToIndex() { + File dir = getTmpDir(); + try (SingleChronicleQueue queue = ChronicleQueue.singleBuilder(dir) + .testBlockSize().build(); + ExcerptTailer tailer = queue.createTailer(); + ExcerptAppender appender = queue.createAppender()) { + appender.writeText("Hello World"); + try (DocumentContext dc = tailer.readingDocument()) { + assertTrue(dc.isPresent()); + tailer.moveToIndex(tailer.index() + 1); + } + } + } + + @Test + public void testExcerptsInCycle() { + File dir = getTmpDir(); + try (SingleChronicleQueue queue = ChronicleQueue.singleBuilder(dir) + .testBlockSize().build(); + ExcerptTailer tailer = queue.createTailer()) { + tailer.excerptsInCycle(tailer.cycle()); + } + } + } diff --git a/src/test/java/net/openhft/chronicle/queue/internal/reader/ChronicleMethodReaderTest.java b/src/test/java/net/openhft/chronicle/queue/internal/reader/ChronicleMethodReaderTest.java index caef3d5edd..52caf63f25 100644 --- a/src/test/java/net/openhft/chronicle/queue/internal/reader/ChronicleMethodReaderTest.java +++ b/src/test/java/net/openhft/chronicle/queue/internal/reader/ChronicleMethodReaderTest.java @@ -187,10 +187,12 @@ public void shouldFilterByInclusionRegex() { .forEach(msg -> assertThat(msg, containsString("goodbye"))); } - @Ignore("https://github.com/OpenHFT/Chronicle-Queue/issues/1150") @Test public void shouldFilterByMultipleInclusionRegex() { - basicReader().withInclusionRegex(".*bye$").withInclusionRegex(".*o.*").execute(); + basicReader() + .withInclusionRegex(".*bye.*") + .withInclusionRegex(".*o.*") + .execute(); assertEquals(24, capturedOutput.size()); capturedOutput.stream().filter(msg -> !msg.startsWith("0x")).