diff --git a/.github/workflows/ci-integration-backwards-compatibility.yaml b/.github/workflows/ci-integration-backwards-compatibility.yaml index 2a5eca9d9e1ad..525aed662509b 100644 --- a/.github/workflows/ci-integration-backwards-compatibility.yaml +++ b/.github/workflows/ci-integration-backwards-compatibility.yaml @@ -77,9 +77,19 @@ jobs: - name: clean disk if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | - sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc - sudo apt clean + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" + sudo swapoff -a + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL + echo "::group::Cleaning apt state" + time sudo bash -c "apt-get clean; apt-get autoclean; apt-get -y --purge autoremove" + time df -BM / /mnt + echo "::endgroup::" docker rmi $(docker images -q) -f + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" df -h - name: run install by skip tests diff --git a/.github/workflows/ci-integration-pulsar-io-ora.yaml b/.github/workflows/ci-integration-pulsar-io-ora.yaml index 41fd95355c394..22d8ba320ee37 100644 --- a/.github/workflows/ci-integration-pulsar-io-ora.yaml +++ b/.github/workflows/ci-integration-pulsar-io-ora.yaml @@ -77,10 +77,19 @@ jobs: - name: clean disk if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" sudo swapoff -a - sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc - sudo apt clean + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL + echo "::group::Cleaning apt state" + time sudo bash -c "apt-get clean; apt-get autoclean; apt-get -y --purge autoremove" + time df -BM / /mnt + echo "::endgroup::" docker rmi $(docker images -q) -f + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" df -h - name: run install by skip tests diff --git a/.github/workflows/ci-integration-pulsar-io.yaml b/.github/workflows/ci-integration-pulsar-io.yaml index e08cc261e7928..e7a590057c218 100644 --- a/.github/workflows/ci-integration-pulsar-io.yaml +++ b/.github/workflows/ci-integration-pulsar-io.yaml @@ -77,10 +77,19 @@ jobs: - name: clean disk if: ${{ steps.check_changes.outputs.docs_only != 'true' }} run: | + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" sudo swapoff -a - sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc - sudo apt clean + sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc /opt/hostedtoolcache/CodeQL + echo "::group::Cleaning apt state" + time sudo bash -c "apt-get clean; apt-get autoclean; apt-get -y --purge autoremove" + time df -BM / /mnt + echo "::endgroup::" docker rmi $(docker images -q) -f + echo "::group::Available diskspace" + time df -BM / /mnt + echo "::endgroup::" df -h - name: run install by skip tests diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 2fb5fc09ac2f9..6084942e8c039 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -367,7 +367,6 @@ The Apache Software License, Version 2.0 - io.netty-netty-transport-4.1.100.Final.jar - io.netty-netty-transport-classes-epoll-4.1.100.Final.jar - io.netty-netty-transport-native-epoll-4.1.100.Final-linux-x86_64.jar - - io.netty-netty-transport-native-epoll-4.1.100.Final.jar - io.netty-netty-transport-native-unix-common-4.1.100.Final.jar - io.netty-netty-transport-native-unix-common-4.1.100.Final-linux-x86_64.jar - io.netty-netty-tcnative-boringssl-static-2.0.61.Final.jar @@ -498,7 +497,7 @@ The Apache Software License, Version 2.0 - org.apache.curator-curator-framework-5.1.0.jar - org.apache.curator-curator-recipes-5.1.0.jar * Apache Yetus - - org.apache.yetus-audience-annotations-0.13.0.jar + - org.apache.yetus-audience-annotations-0.12.0.jar * Kubernetes Client - io.kubernetes-client-java-18.0.0.jar - io.kubernetes-client-java-api-18.0.0.jar diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java index 662d20333baca..602497e799037 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; @@ -54,6 +55,9 @@ public class ZooKeeperUtil { private String connectString; public ZooKeeperUtil() { + String loopbackIPAddr = InetAddress.getLoopbackAddress().getHostAddress(); + zkaddr = new InetSocketAddress(loopbackIPAddr, 0); + connectString = loopbackIPAddr + ":" + zooKeeperPort; } public ZooKeeper getZooKeeperClient() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b07a090b1d28d..48b859bd11b87 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1646,10 +1646,18 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T } if (retentionPolicies == null) { - retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( - () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), - serviceConfig.getDefaultRetentionSizeInMB()) - ); + if (EventsTopicNames.checkTopicIsEventsNames(topicName) + || EventsTopicNames.isTransactionInternalName(topicName)) { + if (log.isDebugEnabled()) { + log.debug("{} Disable data retention policy for system topic.", topicName); + } + retentionPolicies = new RetentionPolicies(0, 0); + } else { + retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( + () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), + serviceConfig.getDefaultRetentionSizeInMB()) + ); + } } ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index f4101dfdaa3a9..0e9262efadf8c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -23,7 +23,6 @@ import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 85c8eca8fa054..dbeb98aa59060 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -1682,6 +1682,7 @@ public void testReplicatorWithFailedAck() throws Exception { MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get(); Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId()); + Awaitility.await().untilAsserted(() -> assertNotNull(topic.getReplicators().get("r2"))); ConcurrentOpenHashMap replicators = topic.getReplicators(); PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index 2ef8b8f68eee3..1eb04ca1ccf0c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -460,7 +460,9 @@ public void testDeleteUselessLogDataWhenSubCursorMoved() throws Exception { Message message2 = consumer.receive(5, TimeUnit.SECONDS); consumer.acknowledgeAsync(message2.getMessageId(), transaction2).get(); - Assert.assertEquals(pendingAckLogIndex.size(), 0); + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(pendingAckLogIndex.size(), 0); + }); maxIndexLag = (long) field4.get(pendingAckStore); Assert.assertEquals(maxIndexLag, 5); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java index 8696e76150676..bd645d4e979d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java @@ -37,6 +37,7 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -44,9 +45,13 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.events.EventsTopicNames; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -208,6 +213,49 @@ public void testCompactionRetentionOnTopicCreationWithTopicPolicies() throws Exc ); } + @Test + public void testRetentionPolicesForSystemTopic() throws Exception { + String namespace = "my-tenant/my-ns"; + String topicPrefix = "persistent://" + namespace + "/"; + admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, -1)); + // Check event topics and transaction internal topics. + for (String eventTopic : EventsTopicNames.EVENTS_TOPIC_NAMES) { + checkSystemTopicRetentionPolicy(topicPrefix + eventTopic); + } + checkSystemTopicRetentionPolicy(topicPrefix + TopicName.TRANSACTION_COORDINATOR_ASSIGN); + checkSystemTopicRetentionPolicy(topicPrefix + TopicName.TRANSACTION_COORDINATOR_LOG); + checkSystemTopicRetentionPolicy(topicPrefix + TopicName.PENDING_ACK_STORE_SUFFIX); + + // Check common topics. + checkCommonTopicRetentionPolicy(topicPrefix + "my-topic" + System.nanoTime()); + // Specify retention policies for system topic. + pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true); + pulsar.getConfiguration().setSystemTopicEnabled(true); + admin.topics().createNonPartitionedTopic(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + admin.topicPolicies().setRetention(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT, + new RetentionPolicies(10, 10)); + Awaitility.await().untilAsserted(() -> { + checkTopicRetentionPolicy(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT, + new RetentionPolicies(10, 10)); + }); + } + + private void checkSystemTopicRetentionPolicy(String topicName) throws Exception { + checkTopicRetentionPolicy(topicName, new RetentionPolicies(0, 0)); + + } + + private void checkCommonTopicRetentionPolicy(String topicName) throws Exception { + checkTopicRetentionPolicy(topicName, new RetentionPolicies(-1, -1)); + } + + private void checkTopicRetentionPolicy(String topicName, RetentionPolicies retentionPolicies) throws Exception { + ManagedLedgerConfig config = pulsar.getBrokerService() + .getManagedLedgerConfig(TopicName.get(topicName)).get(); + Assert.assertEquals(config.getRetentionSizeInMB(), retentionPolicies.getRetentionSizeInMB()); + Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes() * 60000L); + } + private void testCompactionCursorRetention(String topic) throws Exception { Set keys = Sets.newHashSet("a", "b", "c"); Set keysToExpire = Sets.newHashSet("x1", "x2"); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java index 79e950842478d..f6f2c17fd8629 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java @@ -53,6 +53,13 @@ public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicNa .startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); } + public static boolean isTransactionInternalName(TopicName topicName) { + String topic = topicName.toString(); + return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()) + || topic.startsWith(TopicName.TRANSACTION_COORDINATOR_LOG.toString()) + || topic.endsWith(TopicName.PENDING_ACK_STORE_SUFFIX); + } + public static boolean isTopicPoliciesSystemTopic(String topic) { if (topic == null) { return false; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 97f99b19f19cc..04d96fd54a110 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -66,6 +66,7 @@ public TopicName load(String name) throws Exception { public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_"); + public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack"; public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; return TopicName.get(name); diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE index 0c8c111795942..e003a28c27c24 100644 --- a/pulsar-sql/presto-distribution/LICENSE +++ b/pulsar-sql/presto-distribution/LICENSE @@ -473,7 +473,7 @@ The Apache Software License, Version 2.0 - zookeeper-3.9.1.jar - zookeeper-jute-3.9.1.jar * Apache Yetus Audience Annotations - - audience-annotations-0.13.0.jar + - audience-annotations-0.12.0.jar * Swagger - swagger-annotations-1.6.10.jar * Perfmark