From 13ee3d85a11ef6fc1ba1d031b29a94a84bcc8476 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Tue, 6 Feb 2024 15:28:43 +0800 Subject: [PATCH 1/8] [branch-2.10][improve][broker] Do not retain the data in the system topic (#22022) ### Motivation For some use case, the users need to store all the messages even though these message are acked by all subscription. So they set the retention policy of the namespace to infinite retention (setting both time and size limits to `-1`). But the data in the system topic does not need for infinite retention. ### Modifications For system topics, do not retain messages that have already been acknowledged. --- .../pulsar/broker/service/BrokerService.java | 16 +++++-- .../compaction/CompactionRetentionTest.java | 48 +++++++++++++++++++ .../common/events/EventsTopicNames.java | 7 +++ .../pulsar/common/naming/TopicName.java | 1 + 4 files changed, 68 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b07a090b1d28d..48b859bd11b87 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1646,10 +1646,18 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T } if (retentionPolicies == null) { - retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( - () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), - serviceConfig.getDefaultRetentionSizeInMB()) - ); + if (EventsTopicNames.checkTopicIsEventsNames(topicName) + || EventsTopicNames.isTransactionInternalName(topicName)) { + if (log.isDebugEnabled()) { + log.debug("{} Disable data retention policy for system topic.", topicName); + } + retentionPolicies = new RetentionPolicies(0, 0); + } else { + retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( + () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), + serviceConfig.getDefaultRetentionSizeInMB()) + ); + } } ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java index 8696e76150676..bd645d4e979d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java @@ -37,6 +37,7 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -44,9 +45,13 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.events.EventsTopicNames; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -208,6 +213,49 @@ public void testCompactionRetentionOnTopicCreationWithTopicPolicies() throws Exc ); } + @Test + public void testRetentionPolicesForSystemTopic() throws Exception { + String namespace = "my-tenant/my-ns"; + String topicPrefix = "persistent://" + namespace + "/"; + admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, -1)); + // Check event topics and transaction internal topics. + for (String eventTopic : EventsTopicNames.EVENTS_TOPIC_NAMES) { + checkSystemTopicRetentionPolicy(topicPrefix + eventTopic); + } + checkSystemTopicRetentionPolicy(topicPrefix + TopicName.TRANSACTION_COORDINATOR_ASSIGN); + checkSystemTopicRetentionPolicy(topicPrefix + TopicName.TRANSACTION_COORDINATOR_LOG); + checkSystemTopicRetentionPolicy(topicPrefix + TopicName.PENDING_ACK_STORE_SUFFIX); + + // Check common topics. + checkCommonTopicRetentionPolicy(topicPrefix + "my-topic" + System.nanoTime()); + // Specify retention policies for system topic. + pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true); + pulsar.getConfiguration().setSystemTopicEnabled(true); + admin.topics().createNonPartitionedTopic(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + admin.topicPolicies().setRetention(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT, + new RetentionPolicies(10, 10)); + Awaitility.await().untilAsserted(() -> { + checkTopicRetentionPolicy(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT, + new RetentionPolicies(10, 10)); + }); + } + + private void checkSystemTopicRetentionPolicy(String topicName) throws Exception { + checkTopicRetentionPolicy(topicName, new RetentionPolicies(0, 0)); + + } + + private void checkCommonTopicRetentionPolicy(String topicName) throws Exception { + checkTopicRetentionPolicy(topicName, new RetentionPolicies(-1, -1)); + } + + private void checkTopicRetentionPolicy(String topicName, RetentionPolicies retentionPolicies) throws Exception { + ManagedLedgerConfig config = pulsar.getBrokerService() + .getManagedLedgerConfig(TopicName.get(topicName)).get(); + Assert.assertEquals(config.getRetentionSizeInMB(), retentionPolicies.getRetentionSizeInMB()); + Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes() * 60000L); + } + private void testCompactionCursorRetention(String topic) throws Exception { Set keys = Sets.newHashSet("a", "b", "c"); Set keysToExpire = Sets.newHashSet("x1", "x2"); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java index 79e950842478d..f6f2c17fd8629 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java @@ -53,6 +53,13 @@ public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicNa .startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); } + public static boolean isTransactionInternalName(TopicName topicName) { + String topic = topicName.toString(); + return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()) + || topic.startsWith(TopicName.TRANSACTION_COORDINATOR_LOG.toString()) + || topic.endsWith(TopicName.PENDING_ACK_STORE_SUFFIX); + } + public static boolean isTopicPoliciesSystemTopic(String topic) { if (topic == null) { return false; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 97f99b19f19cc..04d96fd54a110 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -66,6 +66,7 @@ public TopicName load(String name) throws Exception { public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_"); + public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack"; public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; return TopicName.get(name); From 7498242e42120ed7528a800a15c7fc8cc5a1b20d Mon Sep 17 00:00:00 2001 From: xymeng <1984997880@qq.com> Date: Tue, 6 Feb 2024 17:34:33 +0800 Subject: [PATCH 2/8] build: unused import --- .../org/apache/pulsar/broker/service/OneWayReplicatorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index f4101dfdaa3a9..0e9262efadf8c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -23,7 +23,6 @@ import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; From 1559deadee8c6802b7ae17dbcffb4bd6287561c5 Mon Sep 17 00:00:00 2001 From: xymeng <1984997880@qq.com> Date: Thu, 15 Feb 2024 11:18:45 +0800 Subject: [PATCH 3/8] test: fix npe issue introduced by #20933 zookeeper 3.9.1 add a check to print a log. ```java if (addr.getPort() == 0) { LOG.info("bound to port {}", this.ss.getLocalAddress()); } ``` --- .../test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java index 662d20333baca..602497e799037 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; @@ -54,6 +55,9 @@ public class ZooKeeperUtil { private String connectString; public ZooKeeperUtil() { + String loopbackIPAddr = InetAddress.getLoopbackAddress().getHostAddress(); + zkaddr = new InetSocketAddress(loopbackIPAddr, 0); + connectString = loopbackIPAddr + ":" + zooKeeperPort; } public ZooKeeper getZooKeeperClient() { From 2e6a34f883ef2dcff0f5c0e3bd13963a41c3c0be Mon Sep 17 00:00:00 2001 From: xymeng <1984997880@qq.com> Date: Thu, 15 Feb 2024 12:35:21 +0800 Subject: [PATCH 4/8] LICENSE --- distribution/server/src/assemble/LICENSE.bin.txt | 3 +-- pulsar-sql/presto-distribution/LICENSE | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 2fb5fc09ac2f9..6084942e8c039 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -367,7 +367,6 @@ The Apache Software License, Version 2.0 - io.netty-netty-transport-4.1.100.Final.jar - io.netty-netty-transport-classes-epoll-4.1.100.Final.jar - io.netty-netty-transport-native-epoll-4.1.100.Final-linux-x86_64.jar - - io.netty-netty-transport-native-epoll-4.1.100.Final.jar - io.netty-netty-transport-native-unix-common-4.1.100.Final.jar - io.netty-netty-transport-native-unix-common-4.1.100.Final-linux-x86_64.jar - io.netty-netty-tcnative-boringssl-static-2.0.61.Final.jar @@ -498,7 +497,7 @@ The Apache Software License, Version 2.0 - org.apache.curator-curator-framework-5.1.0.jar - org.apache.curator-curator-recipes-5.1.0.jar * Apache Yetus - - org.apache.yetus-audience-annotations-0.13.0.jar + - org.apache.yetus-audience-annotations-0.12.0.jar * Kubernetes Client - io.kubernetes-client-java-18.0.0.jar - io.kubernetes-client-java-api-18.0.0.jar diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE index 0c8c111795942..e003a28c27c24 100644 --- a/pulsar-sql/presto-distribution/LICENSE +++ b/pulsar-sql/presto-distribution/LICENSE @@ -473,7 +473,7 @@ The Apache Software License, Version 2.0 - zookeeper-3.9.1.jar - zookeeper-jute-3.9.1.jar * Apache Yetus Audience Annotations - - audience-annotations-0.13.0.jar + - audience-annotations-0.12.0.jar * Swagger - swagger-annotations-1.6.10.jar * Perfmark From f94669be1c7414234b233cd016bac5be59bacaed Mon Sep 17 00:00:00 2001 From: xymeng <1984997880@qq.com> Date: Fri, 16 Feb 2024 17:17:28 +0800 Subject: [PATCH 5/8] flaky test --- .../transaction/pendingack/PendingAckPersistentTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index 2ef8b8f68eee3..1eb04ca1ccf0c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -460,7 +460,9 @@ public void testDeleteUselessLogDataWhenSubCursorMoved() throws Exception { Message message2 = consumer.receive(5, TimeUnit.SECONDS); consumer.acknowledgeAsync(message2.getMessageId(), transaction2).get(); - Assert.assertEquals(pendingAckLogIndex.size(), 0); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(pendingAckLogIndex.size(), 0); + }); maxIndexLag = (long) field4.get(pendingAckStore); Assert.assertEquals(maxIndexLag, 5); } From 1438b32f8b4e0ea5924a7b57f6c7d21ea15af321 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Fri, 16 Feb 2024 20:48:06 +0800 Subject: [PATCH 6/8] flaky test: testReplicatorWithFailedAck --- .../java/org/apache/pulsar/broker/service/ReplicatorTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 85c8eca8fa054..dbeb98aa59060 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -1682,6 +1682,7 @@ public void testReplicatorWithFailedAck() throws Exception { MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get(); Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId()); + Awaitility.await().untilAsserted(() -> assertNotNull(topic.getReplicators().get("r2"))); ConcurrentOpenHashMap replicators = topic.getReplicators(); PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2"); From a15982e08247bacd3383dd2e539b20d8fc38d419 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Mon, 19 Feb 2024 14:52:56 +0800 Subject: [PATCH 7/8] test: improve clean disk job --- .github/workflows/ci-integration-pulsar-io.yaml | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci-integration-pulsar-io.yaml b/.github/workflows/ci-integration-pulsar-io.yaml index e08cc261e7928..e7a590057c218 100644 --- a/.github/workflows/ci-integration-pulsar-io.yaml +++ b/.github/workflows/ci-integration-pulsar-io.yaml @@ -77,10 +77,19 @@ jobs: - name: clean disk if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" sudo swapoff -a - sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc - sudo apt clean + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL + echo "::group::Cleaning apt state" + time sudo bash -c "apt-get clean; apt-get autoclean; apt-get -y --purge autoremove" + time df -BM / /mnt + echo "::endgroup::" docker rmi $(docker images -q) -f + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" df -h - name: run install by skip tests From 869dfd6fffcc3dfe4ea4e2cee71170c81bfc05b1 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Mon, 19 Feb 2024 15:53:41 +0800 Subject: [PATCH 8/8] test: improve clean disk job --- .../ci-integration-backwards-compatibility.yaml | 14 ++++++++++++-- .../workflows/ci-integration-pulsar-io-ora.yaml | 13 +++++++++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci-integration-backwards-compatibility.yaml b/.github/workflows/ci-integration-backwards-compatibility.yaml index 2a5eca9d9e1ad..525aed662509b 100644 --- a/.github/workflows/ci-integration-backwards-compatibility.yaml +++ b/.github/workflows/ci-integration-backwards-compatibility.yaml @@ -77,9 +77,19 @@ jobs: - name: clean disk if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | - sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc - sudo apt clean + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" + sudo swapoff -a + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL + echo "::group::Cleaning apt state" + time sudo bash -c "apt-get clean; apt-get autoclean; apt-get -y --purge autoremove" + time df -BM / /mnt + echo "::endgroup::" docker rmi $(docker images -q) -f + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" df -h - name: run install by skip tests diff --git a/.github/workflows/ci-integration-pulsar-io-ora.yaml b/.github/workflows/ci-integration-pulsar-io-ora.yaml index 41fd95355c394..22d8ba320ee37 100644 --- a/.github/workflows/ci-integration-pulsar-io-ora.yaml +++ b/.github/workflows/ci-integration-pulsar-io-ora.yaml @@ -77,10 +77,19 @@ jobs: - name: clean disk if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" sudo swapoff -a - sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc - sudo apt clean + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL + echo "::group::Cleaning apt state" + time sudo bash -c "apt-get clean; apt-get autoclean; apt-get -y --purge autoremove" + time df -BM / /mnt + echo "::endgroup::" docker rmi $(docker images -q) -f + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" df -h - name: run install by skip tests