Skip to content

Commit

Permalink
[branch-2.10][fix][broker] Fix inconsensus namespace policies by getP…
Browse files Browse the repository at this point in the history
…oliciesIfCached
  • Loading branch information
mattisonchao committed Jul 25, 2023
1 parent 1eb5eb3 commit 7cc0f13
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ public Optional<Policies> getPolicies(NamespaceName ns) throws MetadataStoreExce
return get(joinPath(BASE_POLICIES_PATH, ns.toString()));
}

/**
* Get the namespace policy from the metadata cache. This method will not trigger the load of metadata cache.
*
* @deprecated Since this method may introduce inconsistent namespace policies. we should use
* #{@link NamespaceResources#getPoliciesAsync}
*/
@Deprecated
public Optional<Policies> getPoliciesIfCached(NamespaceName ns) {
return getCache().getIfCached(joinPath(BASE_POLICIES_PATH, ns.toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
Expand All @@ -75,6 +77,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicPolicies> {

protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
Expand Down Expand Up @@ -1065,6 +1069,12 @@ public PublishRateLimiter getBrokerPublishRateLimiter() {
return brokerService.getBrokerPublishRateLimiter();
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and we can use
* #{@link AbstractTopic#updateResourceGroupLimiter(Policies)} to instead of it.
*/
@Deprecated
public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
Policies policies;
try {
Expand All @@ -1078,17 +1088,20 @@ public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
policies = new Policies();
}
updateResourceGroupLimiter(policies);
}

public void updateResourceGroupLimiter(@Nonnull Policies namespacePolicies) {
requireNonNull(namespacePolicies);
// attach the resource-group level rate limiters, if set
String rgName = policies.resource_group_name;
String rgName = namespacePolicies.resource_group_name;
if (rgName != null) {
final ResourceGroup resourceGroup =
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
if (resourceGroup != null) {
this.resourceGroupRateLimitingEnabled = true;
this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter();
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(),
() -> this.enableCnxAutoRead());
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), this::enableCnxAutoRead);
log.info("Using resource group {} rate limiter for topic {}", rgName, topic);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
Expand Down Expand Up @@ -54,17 +55,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -175,6 +166,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

@Getter(AccessLevel.PUBLIC)
@Setter(AccessLevel.PROTECTED)
public class BrokerService implements Closeable {
Expand Down Expand Up @@ -3005,11 +2998,23 @@ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName t
return null;
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
* call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
*/
@Deprecated
public boolean isAllowAutoSubscriptionCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoSubscriptionCreation(topicName);
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
* call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
*/
@Deprecated
public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
AutoSubscriptionCreationOverride autoSubscriptionCreationOverride =
getAutoSubscriptionCreationOverride(topicName);
Expand All @@ -3020,6 +3025,12 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {
}
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
* call. we can use #{@link BrokerService#isAllowAutoSubscriptionCreationAsync(TopicName)} to instead of it.
*/
@Deprecated
private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) {
Optional<Policies> policies =
pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject());
Expand All @@ -3031,6 +3042,19 @@ private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(fin
return null;
}

public @Nonnull CompletionStage<Boolean> isAllowAutoSubscriptionCreationAsync(@Nonnull TopicName tpName) {
requireNonNull(tpName);
// namespace level policies
return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(tpName.getNamespaceObject())
.thenApply(policies -> {
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation();
}
// broker level policies
return pulsar.getConfiguration().isAllowAutoSubscriptionCreation();
});
}

public boolean isSystemTopic(String topic) {
return isSystemTopic(TopicName.get(topic));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -1044,38 +1045,42 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
.failedFuture(new TopicNotFoundException(
"Topic " + topicName + " does not exist"));
}

Topic topic = optTopic.get();

boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !service.isAllowAutoSubscriptionCreation(topicName.toString())
&& !topic.getSubscriptions().containsKey(subscriptionName)
&& topic.isPersistent();

if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
.failedFuture(
new SubscriptionNotFoundException(
"Subscription does not exist"));
}

SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
.subscriptionName(subscriptionName)
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
.consumerName(consumerName).isDurable(isDurable)
.startMessageId(startMessageId).metadata(metadata).readCompacted(readCompacted)
.initialPosition(initialPosition)
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
.subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
.build();
if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(option));
} else {
return topic.subscribe(option);
}
final Topic topic = optTopic.get();
return service.isAllowAutoSubscriptionCreationAsync(topicName)
.thenCompose(isAllowAutoSubscriptionCreation -> {
boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !isAllowAutoSubscriptionCreation
&& !topic.getSubscriptions().containsKey(subscriptionName)
&& topic.isPersistent();

if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
.failedFuture(
new SubscriptionNotFoundException(
"Subscription does not exist"));
}

SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
.subscriptionName(subscriptionName)
.consumerId(consumerId).subType(subType)
.priorityLevel(priorityLevel)
.consumerName(consumerName).isDurable(isDurable)
.startMessageId(startMessageId).metadata(metadata)
.readCompacted(readCompacted)
.initialPosition(initialPosition)
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
.replicatedSubscriptionStateArg(isReplicated)
.keySharedMeta(keySharedMeta)
.subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
.build();
if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(option));
} else {
return topic.subscribe(option);
}
});
})
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
Expand Down Expand Up @@ -1286,33 +1291,39 @@ protected void handleProducer(final CommandProducer cmdProducer) {

schemaVersionFuture.thenAccept(schemaVersion -> {
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> {
CompletableFuture<Subscription> createInitSubFuture;
CompletionStage<Subscription> createInitSubFuture;
if (!Strings.isNullOrEmpty(initialSubscriptionName)
&& topic.isPersistent()
&& !topic.getSubscriptions().containsKey(initialSubscriptionName)) {
if (!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) {
String msg =
"Could not create the initial subscription due to the auto subscription "
+ "creation is not allowed.";
if (producerFuture.completeExceptionally(
new BrokerServiceException.NotAllowedException(msg))) {
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
remoteAddress, msg, initialSubscriptionName, topicName);
commandSender.sendErrorResponse(requestId,
ServerError.NotAllowedError, msg);
}
producers.remove(producerId, producerFuture);
return;
}
createInitSubFuture =
topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest,
false, null);
createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName)
.thenCompose(isAllowAutoSubscriptionCreation -> {
if (!isAllowAutoSubscriptionCreation) {
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(
"Could not create the initial subscription due to"
+ " the auto subscription creation is not allowed."));
}
return topic.createSubscription(initialSubscriptionName,
InitialPosition.Earliest, false, null);
});
} else {
createInitSubFuture = CompletableFuture.completedFuture(null);
}

createInitSubFuture.whenComplete((sub, ex) -> {
if (ex != null) {
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
if (rc instanceof BrokerServiceException.NotAllowedException) {
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
remoteAddress, rc.getMessage(), initialSubscriptionName, topicName);
if (producerFuture.completeExceptionally(rc)) {
commandSender.sendErrorResponse(requestId,
ServerError.NotAllowedError, rc.getMessage());
}
producers.remove(producerId, producerFuture);
return;
}

String msg =
"Failed to create the initial subscription: " + ex.getCause().getMessage();
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,20 @@ public CompletableFuture<Void> initialize() {
return brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenAccept(optPolicies -> {
final Policies policies;
if (!optPolicies.isPresent()) {
log.warn("[{}] Policies not present and isEncryptionRequired will be set to false", topic);
isEncryptionRequired = false;
policies = new Policies();
} else {
Policies policies = optPolicies.get();
policies = optPolicies.get();
updateTopicPolicyByNamespacePolicy(policies);
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
schemaValidationEnforced = policies.schema_validation_enforced;
}
updatePublishDispatcher();
updateResourceGroupLimiter(optPolicies);
updateResourceGroupLimiter(policies);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,12 @@ public static CompletableFuture<Optional<Policies>> getPoliciesAsync(BrokerServi
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace);
}

/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
* call. we can use #{@link DispatchRateLimiter#getPoliciesAsync(BrokerService, String)} to instead of it.
*/
@Deprecated
public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
Expand Down
Loading

0 comments on commit 7cc0f13

Please sign in to comment.