Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-2.10][improve][broker] Do not retain the data in the system topic #22031

Merged
merged 10 commits into from
Feb 19, 2024
3 changes: 1 addition & 2 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ The Apache Software License, Version 2.0
- io.netty-netty-transport-4.1.100.Final.jar
- io.netty-netty-transport-classes-epoll-4.1.100.Final.jar
- io.netty-netty-transport-native-epoll-4.1.100.Final-linux-x86_64.jar
- io.netty-netty-transport-native-epoll-4.1.100.Final.jar
- io.netty-netty-transport-native-unix-common-4.1.100.Final.jar
- io.netty-netty-transport-native-unix-common-4.1.100.Final-linux-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.61.Final.jar
Expand Down Expand Up @@ -498,7 +497,7 @@ The Apache Software License, Version 2.0
- org.apache.curator-curator-framework-5.1.0.jar
- org.apache.curator-curator-recipes-5.1.0.jar
* Apache Yetus
- org.apache.yetus-audience-annotations-0.13.0.jar
- org.apache.yetus-audience-annotations-0.12.0.jar
* Kubernetes Client
- io.kubernetes-client-java-18.0.0.jar
- io.kubernetes-client-java-api-18.0.0.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;

Expand Down Expand Up @@ -54,6 +55,9 @@ public class ZooKeeperUtil {
private String connectString;

public ZooKeeperUtil() {
String loopbackIPAddr = InetAddress.getLoopbackAddress().getHostAddress();
zkaddr = new InetSocketAddress(loopbackIPAddr, 0);
connectString = loopbackIPAddr + ":" + zooKeeperPort;
}

public ZooKeeper getZooKeeperClient() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1646,10 +1646,18 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
}

if (retentionPolicies == null) {
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
if (EventsTopicNames.checkTopicIsEventsNames(topicName)
|| EventsTopicNames.isTransactionInternalName(topicName)) {
if (log.isDebugEnabled()) {
log.debug("{} Disable data retention policy for system topic.", topicName);
}
retentionPolicies = new RetentionPolicies(0, 0);
} else {
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
}
}

ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,7 @@ public void testReplicatorWithFailedAck() throws Exception {

MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get();
Position lastPosition = PositionImpl.get(lastMessageId.getLedgerId(), lastMessageId.getEntryId());
Awaitility.await().untilAsserted(() -> assertNotNull(topic.getReplicators().get("r2")));
ConcurrentOpenHashMap<String, Replicator> replicators = topic.getReplicators();
PersistentReplicator replicator = (PersistentReplicator) replicators.get("r2");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,9 @@ public void testDeleteUselessLogDataWhenSubCursorMoved() throws Exception {
Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message2.getMessageId(), transaction2).get();

Assert.assertEquals(pendingAckLogIndex.size(), 0);
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(pendingAckLogIndex.size(), 0);
});
maxIndexLag = (long) field4.get(pendingAckStore);
Assert.assertEquals(maxIndexLag, 5);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,21 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -208,6 +213,49 @@ public void testCompactionRetentionOnTopicCreationWithTopicPolicies() throws Exc
);
}

@Test
public void testRetentionPolicesForSystemTopic() throws Exception {
String namespace = "my-tenant/my-ns";
String topicPrefix = "persistent://" + namespace + "/";
admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, -1));
// Check event topics and transaction internal topics.
for (String eventTopic : EventsTopicNames.EVENTS_TOPIC_NAMES) {
checkSystemTopicRetentionPolicy(topicPrefix + eventTopic);
}
checkSystemTopicRetentionPolicy(topicPrefix + TopicName.TRANSACTION_COORDINATOR_ASSIGN);
checkSystemTopicRetentionPolicy(topicPrefix + TopicName.TRANSACTION_COORDINATOR_LOG);
checkSystemTopicRetentionPolicy(topicPrefix + TopicName.PENDING_ACK_STORE_SUFFIX);

// Check common topics.
checkCommonTopicRetentionPolicy(topicPrefix + "my-topic" + System.nanoTime());
// Specify retention policies for system topic.
pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true);
pulsar.getConfiguration().setSystemTopicEnabled(true);
admin.topics().createNonPartitionedTopic(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
admin.topicPolicies().setRetention(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
new RetentionPolicies(10, 10));
Awaitility.await().untilAsserted(() -> {
checkTopicRetentionPolicy(topicPrefix + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
new RetentionPolicies(10, 10));
});
}

private void checkSystemTopicRetentionPolicy(String topicName) throws Exception {
checkTopicRetentionPolicy(topicName, new RetentionPolicies(0, 0));

}

private void checkCommonTopicRetentionPolicy(String topicName) throws Exception {
checkTopicRetentionPolicy(topicName, new RetentionPolicies(-1, -1));
}

private void checkTopicRetentionPolicy(String topicName, RetentionPolicies retentionPolicies) throws Exception {
ManagedLedgerConfig config = pulsar.getBrokerService()
.getManagedLedgerConfig(TopicName.get(topicName)).get();
Assert.assertEquals(config.getRetentionSizeInMB(), retentionPolicies.getRetentionSizeInMB());
Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes() * 60000L);
}

private void testCompactionCursorRetention(String topic) throws Exception {
Set<String> keys = Sets.newHashSet("a", "b", "c");
Set<String> keysToExpire = Sets.newHashSet("x1", "x2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicNa
.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
}

public static boolean isTransactionInternalName(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
|| topic.startsWith(TopicName.TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(TopicName.PENDING_ACK_STORE_SUFFIX);
}

public static boolean isTopicPoliciesSystemTopic(String topic) {
if (topic == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public TopicName load(String name) throws Exception {
public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");

public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";
public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
return TopicName.get(name);
Expand Down
2 changes: 1 addition & 1 deletion pulsar-sql/presto-distribution/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ The Apache Software License, Version 2.0
- zookeeper-3.9.1.jar
- zookeeper-jute-3.9.1.jar
* Apache Yetus Audience Annotations
- audience-annotations-0.13.0.jar
- audience-annotations-0.12.0.jar
* Swagger
- swagger-annotations-1.6.10.jar
* Perfmark
Expand Down
Loading