From e9f511809b55ea9867709ebf2b4487c4c93eaff1 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Mon, 4 Apr 2022 09:31:02 +0200 Subject: [PATCH 1/4] Prototype to limit the number of threads to destinations Works for puts, gets and transactions and them also within functions. This solution allows to limit the number of threads handling client requests in a server waiting for a response from a given server. For example, if we configure the system to allow at max 1/2 of the available threads to handle client requests busy towards a given server, supposing we have a 4 servers system with MAX_THREADS=8, if at a given point in time server1 has 4 threads busy waiting to get a response from server2 and another request comes to server1 that would require to communicate with server2, the request will be rejected. This feature could be used in situations when a server is not responding or is responding slowly in order to limit the number of server threads (in the rest of servers) to handle client requests that are busy waiting for an answer from the problematic server. This way, even if a server is not responding or is being very slow, there will always be some threads available to handle client requests in the healthy servers. As a result, a non-responding server would not eventually provoke a 100% traffic loss. In order to implement this feature, a new observer type has been created (SendReceiveMessageObserver). Calls to this observer have been added before a put, get or transaction needs to go to another server (before any change has been made locally) and after the response has been received. With a particular implementation of this observer (see GenericDunitTest.MySendReceiveMessageObserver) that keeps track of the number of threads waiting for an answer to each servers, operations may be rejected if the limit has been surpassed. In GenericDUnitTest that observer is set in the servers and the functionality can be tested manually by looking at the logs. Some refactorings and renamings --- .../cache/execute/GenericDUnitTest.java | 525 ++++++++++++++++++ .../internal/cache/PartitionedRegion.java | 412 +++++++------- .../cache/PartitionedRegionDataStore.java | 4 + .../cache/ProxyClientRequestObserver.java | 26 + .../ProxyClientRequestObserverAdapter.java | 27 + .../ProxyClientRequestObserverHolder.java | 76 +++ .../geode/internal/cache/TXCommitMessage.java | 49 +- .../apache/geode/internal/cache/TXState.java | 17 +- .../geode/internal/cache/TXStateTest.java | 2 +- 9 files changed, 937 insertions(+), 201 deletions(-) create mode 100644 geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserver.java create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverAdapter.java create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverHolder.java diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java new file mode 100644 index 000000000000..25e75f6967eb --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.execute; + +import static org.apache.geode.cache.RegionShortcut.PARTITION; +import static org.apache.geode.cache.client.ClientRegionShortcut.CACHING_PROXY; +import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.IntStream; + +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import org.apache.geode.cache.CacheTransactionManager; +import org.apache.geode.cache.CommitConflictException; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.execute.Execution; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.cache.execute.RegionFunctionContext; +import org.apache.geode.cache.execute.ResultCollector; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.ProxyClientRequestObserver; +import org.apache.geode.internal.cache.ProxyClientRequestObserverHolder; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.ClientCacheRule; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.DistributedRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.categories.WanTest; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; +import org.apache.geode.test.junit.runners.GeodeParamsRunner; + + +@Category({WanTest.class}) +@RunWith(GeodeParamsRunner.class) +public class GenericDUnitTest implements Serializable { + + private static final int MAX_THREADS = 8; + + private static final String regionName = "GenericDUnitTest"; + + @ClassRule + public static final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(5); + + @Rule + public DistributedRule distributedRule = new DistributedRule(6); + MemberVM locator; + MemberVM server1; + MemberVM server2; + MemberVM server3; + MemberVM server4; + ClientVM client; + + @Rule + public CacheRule cacheRule = new CacheRule(); + + @Rule + public ClientCacheRule clientCacheRule = new ClientCacheRule(); + + @Rule + public SerializableTestName testName = new SerializableTestName(); + + @Before + public void setUp() throws Exception { + locator = clusterStartupRule.startLocatorVM(0); + server1 = startServer(1, MAX_THREADS); + } + + @After + public void tearDown() throws Exception { + IgnoredException.removeAllExpectedExceptions(); + } + + enum Operation { + GET, PUT, TXPUT, GETFUNCTION, TXGETFUNCTION, PUTFUNCTION, TXPUTFUNCTION + } + + @Test + @Parameters({"GET", "PUT", "TXPUT", "GETFUNCTION", "TXGETFUNCTION", "PUTFUNCTION", + "TXPUTFUNCTION"}) + @TestCaseName("{method}(operation:{params})") + public void testSuccessfulExecution(Operation operation) throws Exception { + IgnoredException.addIgnoredException(java.lang.IllegalStateException.class); + + server1 = startServer(1, MAX_THREADS); + server2 = startServer(2, MAX_THREADS); + server3 = startServer(3, MAX_THREADS); + server4 = startServer(4, MAX_THREADS); + + List serversInA = + Arrays.asList(server1, server2, server3, server4); + + // Set client connect-timeout to a very high value so that if there are no ServerConnection + // threads available the test will time-out before the client times-out. + int connectTimeout = (int) GeodeAwaitility.getTimeout().toMillis() * 2; + + client = startClient(5, 0, connectTimeout); + + Function testFunction = new TestFunction(); + + for (MemberVM memberVM : serversInA) { + createServerRegionAndRegisterFunction(memberVM, 1, testFunction); + } + + client.invoke(() -> createClientRegion()); + + client.invoke(() -> executeQuery(regionName)); + + Object key = "key"; + Object value = "value"; + + int invocationsNo = 50; + AsyncInvocation[] invocations = new AsyncInvocation[invocationsNo]; + + IntStream.range(0, invocationsNo).forEach(i -> invocations[i] = client.invokeAsync(() -> { + String finalKey = key + "" + i; + String finalValue = finalKey; + try { + switch (operation) { + case GET: { + doGet(finalKey, regionName); + break; + } + case PUT: { + doPut(finalKey, finalValue, regionName); + break; + } + case TXPUT: { + doTxPut(finalKey, finalValue, regionName); + break; + } + case GETFUNCTION: { + executeGetFunction(testFunction, regionName, finalKey, i, false); + break; + } + case TXGETFUNCTION: { + executeGetFunction(testFunction, regionName, finalKey, i, true); + break; + } + case PUTFUNCTION: { + executePutFunction(testFunction, regionName, finalKey, finalValue, i, false); + break; + } + case TXPUTFUNCTION: { + executePutFunction(testFunction, regionName, finalKey, finalValue, i, true); + break; + } + } + } catch (Exception e) { + System.out.println( + "toberal exception calling " + operation + " operation with key: " + finalKey + ", " + + e); + } + })); + + // Sleep a bit to make sure that entries are replicated + Thread.sleep(5000); + + // Run several times to make sure that the results are the same in all servers. + IntStream.range(0, 10).forEach(x -> { + client.invoke(() -> executeQuery(regionName)); + }); + } + + void doPut(Object key, Object value, String regionName) { + ClientCache cache = ClusterStartupRule.getClientCache(); + Region region = cache.getRegion(regionName); + System.out.println("toberal before put"); + region.put(key, value); + System.out.println("toberal after put"); + } + + void doTxPut(Object key, Object value, String regionName) { + ClientCache cache = ClusterStartupRule.getClientCache(); + Region region = cache.getRegion(regionName); + CacheTransactionManager txManager = + cache.getCacheTransactionManager(); + txManager.begin(); + try { + System.out.println("toberal before put"); + region.put(key, value); + System.out.println("toberal after put"); + System.out.println("toberal before committing: " + key); + txManager.commit(); + System.out.println("toberal after committing: " + key); + } catch (CommitConflictException conflict) { + System.out.println("toberal CommitConflictException"); + // ... do necessary work for a transaction that failed on commit + } catch (Exception e) { + System.out.println("toberal Exception putting: " + e); + // ... do necessary work for a transaction that failed on commit + } finally { + if (txManager.exists()) { + txManager.rollback(); + } + } + } + + Object doGet(Object key, String regionName) { + ClientCache cache = ClusterStartupRule.getClientCache(); + Region region = cache.getRegion(regionName); + System.out.println("toberal before get"); + Object value = region.get(key); + System.out.println("toberal after get: " + value); + return value; + } + + void executeQuery(String regionName) { + ClientCache cache = ClusterStartupRule.getClientCache(); + String queryString = "select * from /" + regionName; + + // Get QueryService from Cache. + QueryService queryService = cache.getQueryService(); + + // Create the Query Object. + Query query = queryService.newQuery(queryString); + + // Execute Query locally. Returns results set. + Object[] params = new Object[0]; + SelectResults results = null; + try { + results = (SelectResults) query.execute(params); + } catch (Exception e) { + e.printStackTrace(); + } + System.out.println("toberal query results: (" + results.size() + ") - " + results); + } + + private Object executePutFunction(Function function, String regionName, Object key, Object value, + int invocation, boolean useTransaction) { + FunctionService.registerFunction(function); + final Region region = + ClusterStartupRule.getClientCache().getRegion(regionName); + + Execution execution = FunctionService.onRegion(region); + + ResultCollector resultCollector; + Object[] args = {useTransaction, invocation, key, value}; + Set filter = new HashSet(); + filter.add(key); + // without filter, results are weird. Check, toberal + resultCollector = + execution.setArguments(args).withFilter(filter).execute(function.getId()); + Object result = resultCollector.getResult(); + return result; + } + + private Object executeGetFunction(Function function, String regionName, Object key, + int invocation, boolean useTransaction) { + FunctionService.registerFunction(function); + final Region region = + ClusterStartupRule.getClientCache().getRegion(regionName); + + Execution execution = FunctionService.onRegion(region); + + ResultCollector resultCollector; + Object[] args = {useTransaction, invocation, key}; + Set filter = new HashSet(); + filter.add(key); + System.out.println("toberal before executeGetFunction. key: " + key); + resultCollector = + execution.setArguments(args).withFilter(filter).execute(function.getId()); + Object result = resultCollector.getResult(); + System.out.println("toberal after executeGetFunction. key: " + key); + return result; + } + + private void createServerRegion(int redundantCopies) { + final PartitionAttributesFactory paf = new PartitionAttributesFactory<>(); + paf.setRedundantCopies(redundantCopies); + ClusterStartupRule.getCache().createRegionFactory(PARTITION) + .setPartitionAttributes(paf.create()) + .create(regionName); + } + + private void createServerRegionAndRegisterFunction(MemberVM server, int redundantCopies, + final Function function) { + server.invoke(() -> { + createServerRegion(redundantCopies); + FunctionService.registerFunction(function); + // DistributionMessageObserver.setInstance(new MyMessageObserver()); + int maxThreadsPerDestination = MAX_THREADS / 4; + ProxyClientRequestObserverHolder + .setInstance(new ThreadLimitingProxyClientRequestObserver(maxThreadsPerDestination)); + }); + } + + private ClientVM startClient(final int vmIndex, final int retryAttempts, final int connectTimeout) + throws Exception { + return clusterStartupRule.startClientVM( + vmIndex, + cacheRule -> cacheRule + .withLocatorConnection(locator.getPort()) + .withCacheSetup(cf -> cf.setPoolRetryAttempts(retryAttempts) + .setPoolPRSingleHopEnabled(false) + .setPoolSocketConnectTimeout(connectTimeout))); + } + + private MemberVM startServer(final int vmIndex, int maxThreads) { + return clusterStartupRule.startServerVM( + vmIndex, + cacheRule -> cacheRule + .withProperty(SERIALIZABLE_OBJECT_FILTER, + "org.apache.geode.internal.cache.execute.GenericDUnitTest*") + // .withMaxThreads(maxThreads) + .withConnectionToLocator(locator.getPort())); + } + + private void createClientRegion() { + ClusterStartupRule.getClientCache().createClientRegionFactory(CACHING_PROXY).create(regionName); + } + + static class TestFunction implements Function, Serializable { + public TestFunction() { + super(); + } + + @Override + public void execute(FunctionContext context) { + final Object[] args = context.getArguments(); + if (args.length < 2) { + throw new IllegalStateException( + "Arguments length does not match required length."); + } + Object value = null; + if (args.length == 4) { + value = args[3]; + } + boolean useTransaction = (boolean) args[0]; + Integer invocation = (Integer) args[1]; + Object key = args[2]; + + RegionFunctionContext regionFunctionContext = (RegionFunctionContext) context; + Region region = regionFunctionContext.getDataSet(); + Object result = null; + try { + CacheTransactionManager txManager = + context.getCache().getCacheTransactionManager(); + if (useTransaction) { + txManager.begin(); + } + try { + logger.info("toberal inv: {} before getting/putting: {}", invocation, key); + if (value != null) { + result = region.put(key, value); + } else { + result = region.get(key); + // Thread.sleep(100); + } + logger.info("toberal inv: {} after getting/putting: {}", invocation, key); + if (useTransaction) { + logger.info("toberal inv: {} before committing: {}", invocation, key); + txManager.commit(); + logger.info("toberal inv: {} after committing: {}", invocation, key); + } + } catch (CommitConflictException conflict) { + logger.info("toberal inv: {} CommitConflictException: {}, invocation, key: {}", + invocation, conflict, key); + // ... do necessary work for a transaction that failed on commit + } catch (Exception e) { + logger.info("toberal inv: {} Exception: {}, invocation, key: {}", invocation, e, key, e); + // ... do necessary work for a transaction that failed on commit + } finally { + if (txManager.exists()) { + logger.info("toberal inv: {} rolling-back: {}, invocation, key: {}", invocation, key); + txManager.rollback(); + } else { + logger.info("toberal inv: {} not rolling-back: {}, invocation, key: {}", invocation, + key); + } + } + } catch (Exception e) { + context.getResultSender().lastResult(e); + logger.info("toberal inv: {} after returning last result with exception: {}", invocation, + e); + } + context.getResultSender().lastResult(result); + logger.info("toberal inv: {} after returning last result: {}", invocation, result); + } + + public static final String ID = TestFunction.class.getName(); + + private static final Logger logger = LogService.getLogger(); + + @Override + public String getId() { + return ID; + } + + @Override + public boolean hasResult() { + return true; + } + + @Override + public boolean isHA() { + return false; + } + + @Override + public boolean optimizeForWrite() { + return true; + } + } + + + public static class MyMessageObserver extends DistributionMessageObserver { + private static final Logger logger = LogService.getLogger(); + + public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) { + logger.info("toberal beforeProcessMessage dm: {}, message: {}, sender: {}", dm, message, + message.getSender()); + } + + /** + * Called after the process method of the DistributionMessage is called + * + * @param dm the distribution manager that received the message + * @param message The message itself + */ + public void afterProcessMessage(ClusterDistributionManager dm, DistributionMessage message) { + logger.info("toberal afterProcessMessage dm: {}, message: {}", dm, message); + } + + /** + * Called just before a message is distributed. + * + * @param dm the distribution manager that's sending the message + * @param message the message itself + */ + public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage message) { + logger.info("toberal beforeSendMessage dm: {}, message: {}", dm, message, new Exception()); + } + } + + public static class ThreadLimitingProxyClientRequestObserver + implements ProxyClientRequestObserver { + private final int maxThreadsToDestination; + + public ThreadLimitingProxyClientRequestObserver(int maxThreadsToDestination) { + this.maxThreadsToDestination = maxThreadsToDestination; + } + + private static final Logger logger = LogService.getLogger(); + private final Map threadsToDestination = + new ConcurrentHashMap(); + + @Override + public void beforeSendRequest(Set members) { + logger.info( + "toberal beforeSendRequest members: {}, threadsToDestination: size:{}, contents:{}", + members, threadsToDestination.size(), threadsToDestination); + if (Thread.currentThread().getName().startsWith("ServerConnection on") + || Thread.currentThread().getName().startsWith("Function Execution Processor")) { + for (InternalDistributedMember member : members) { + if (threadsToDestination.getOrDefault(member, 0) >= maxThreadsToDestination) { + logger.info("toberal Max number of threads reached for " + member, new Exception("kk")); + throw new IllegalStateException("Max number of threads reached"); + } + } + for (InternalDistributedMember member : members) { + threadsToDestination.merge(member, 1, Integer::sum); + } + } + } + + @Override + public void afterReceiveResponse(Set members) { + logger.info( + "toberal afterReceiveResponse removing members: {}, threadsToDestination: size:{}, contents:{}", + members, threadsToDestination.size(), threadsToDestination); + for (InternalDistributedMember member : members) { + if (Thread.currentThread().getName().startsWith("ServerConnection on") + || Thread.currentThread().getName().startsWith("Function Execution Processor")) { + threadsToDestination.merge(member, -1, Integer::sum); + logger.info( + "toberal afterReceiveResponse removing member: {}, threadsToDestination: size:{}, contents:{}", + member, threadsToDestination.size(), threadsToDestination); + } + } + } + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index e14a159fc26c..193c53a04f9e 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -2979,201 +2979,215 @@ private boolean putInBucket(final InternalDistributedMember targetNode, final In logger.debug("putInBucket: {} ({}) to {} to bucketId={} retry={} ms", event.getKey(), event.getKey().hashCode(), targetNode, bucketStringForLogs(bucketId), retryTimeout); } - // retry the put remotely until it finds the right node managing the bucket - RetryTimeKeeper retryTime = null; - boolean result = false; - InternalDistributedMember currentTarget = targetNode; - long timeOut = 0; - int count = 0; - for (;;) { - switch (count) { - case 0: - // Note we don't check for DM cancellation in common case. - // First time. Assume success, keep going. - break; - case 1: - cache.getCancelCriterion().checkCancelInProgress(null); - // Second time (first failure). Calculate timeout and keep going. - timeOut = System.currentTimeMillis() + retryTimeout; - break; - default: - cache.getCancelCriterion().checkCancelInProgress(null); - // test for timeout - long timeLeft = timeOut - System.currentTimeMillis(); - if (timeLeft < 0) { - PRHARedundancyProvider.timedOut(this, null, null, "update an entry", retryTimeout); - // NOTREACHED + ProxyClientRequestObserver observer = ProxyClientRequestObserverHolder.getInstance(); + Set bucketOwners = null; + try { + if (observer != null) { + bucketOwners = getRegionAdvisor().getBucketOwners(bucketId); + observer.beforeSendRequest(bucketOwners); + } + + // retry the put remotely until it finds the right node managing the bucket + + RetryTimeKeeper retryTime = null; + boolean result = false; + InternalDistributedMember currentTarget = targetNode; + long timeOut = 0; + int count = 0; + for (;;) { + switch (count) { + case 0: + // Note we don't check for DM cancellation in common case. + // First time. Assume success, keep going. + break; + case 1: + cache.getCancelCriterion().checkCancelInProgress(null); + // Second time (first failure). Calculate timeout and keep going. + timeOut = System.currentTimeMillis() + retryTimeout; + break; + default: + cache.getCancelCriterion().checkCancelInProgress(null); + // test for timeout + long timeLeft = timeOut - System.currentTimeMillis(); + if (timeLeft < 0) { + PRHARedundancyProvider.timedOut(this, null, null, "update an entry", retryTimeout); + // NOTREACHED + } + + // Didn't time out. Sleep a bit and then continue + boolean interrupted = Thread.interrupted(); + try { + Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION); + } catch (InterruptedException ignore) { + interrupted = true; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + break; + } // switch + count++; + + if (currentTarget == null) { // pick target + checkReadiness(); + if (retryTime == null) { + retryTime = new RetryTimeKeeper(retryTimeout); } + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); - // Didn't time out. Sleep a bit and then continue - boolean interrupted = Thread.interrupted(); - try { - Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION); - } catch (InterruptedException ignore) { - interrupted = true; - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); + // It's possible this is a GemFire thread e.g. ServerConnection + // which got to this point because of a distributed system shutdown or + // region closure which uses interrupt to break any sleep() or wait() calls + // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception + checkShutdown(); + continue; + } // pick target + + try { + final boolean isLocal = (localMaxMemory > 0) && currentTarget.equals(getMyId()); + if (logger.isDebugEnabled()) { + logger.debug("putInBucket: currentTarget = {}; ifNew = {}; ifOld = {}; isLocal = {}", + currentTarget, ifNew, ifOld, isLocal); + } + checkIfAboveThreshold(event); + if (isLocal) { + event.setInvokePRCallbacks(true); + long start = prStats.startPutLocal(); + try { + final BucketRegion br = + dataStore.getInitializedBucketForId(event.getKey(), bucketId); + // Local updates should insert a serialized (aka CacheDeserializable) object + // given that most manipulation of values is remote (requiring serialization to send). + // But... function execution always implies local manipulation of + // values so keeping locally updated values in Object form should be more efficient. + if (!FunctionExecutionPooledExecutor.isFunctionExecutionThread()) { + // TODO: this condition may not help since BucketRegion.virtualPut calls + // forceSerialized + br.forceSerialized(event); + } + if (ifNew) { + result = dataStore.createLocally(br, event, ifNew, ifOld, requireOldValue, + lastModified); + } else { + result = dataStore.putLocally(br, event, ifNew, ifOld, expectedOldValue, + requireOldValue, lastModified); + } + } finally { + prStats.endPutLocal(start); + } + } // local + else { // remote + // no need to perform early serialization (and create an un-necessary byte array) + // sending the message performs that work. + long start = prStats.startPutRemote(); + try { + if (ifNew) { + result = createRemotely(currentTarget, bucketId, event, requireOldValue); + } else { + result = putRemotely(currentTarget, event, ifNew, ifOld, expectedOldValue, + requireOldValue); + if (!requireOldValue) { + // make sure old value is set to NOT_AVAILABLE token + event.oldValueNotAvailable(); + } + } + } finally { + prStats.endPutRemote(start); } + } // remote + + if (!result && !ifOld && !ifNew) { + Assert.assertTrue(!isLocal); + ForceReattemptException fre = new ForceReattemptException( + "false result when !ifNew and !ifOld is unacceptable - retrying"); + fre.setHash(event.getKey().hashCode()); + throw fre; } - break; - } // switch - count++; - if (currentTarget == null) { // pick target - checkReadiness(); - if (retryTime == null) { - retryTime = new RetryTimeKeeper(retryTimeout); + return result; + } catch (ConcurrentCacheModificationException e) { + if (logger.isDebugEnabled()) { + logger.debug("putInBucket: caught concurrent cache modification exception", e); + } + event.isConcurrencyConflict(true); + + if (logger.isTraceEnabled()) { + logger.trace( + "ConcurrentCacheModificationException received for putInBucket for bucketId: {}{}{} for event: {} No reattampt is done, returning from here", + getPRId(), BUCKET_ID_SEPARATOR, bucketId, event); + } + return result; + } catch (ForceReattemptException prce) { + prce.checkKey(event.getKey()); + if (logger.isDebugEnabled()) { + logger.debug( + "putInBucket: Got ForceReattemptException for {} on VM {} for node {}{}{} for bucket = {}", + this, getMyId(), currentTarget, getPRId(), BUCKET_ID_SEPARATOR, bucketId, prce); + logger.debug("putInBucket: count={}", count); + } + checkReadiness(); + InternalDistributedMember lastTarget = currentTarget; + if (retryTime == null) { + retryTime = new RetryTimeKeeper(retryTimeout); + } + currentTarget = getNodeForBucketWrite(bucketId, retryTime); + if (lastTarget.equals(currentTarget)) { + if (retryTime.overMaximum()) { + PRHARedundancyProvider.timedOut(this, null, null, "update an entry", retryTimeout); + // NOTREACHED + } + retryTime.waitToRetryNode(); + } + event.setPossibleDuplicate(true); + } catch (PrimaryBucketException notPrimary) { + if (logger.isDebugEnabled()) { + logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(), + currentTarget); + } + getRegionAdvisor().notPrimary(bucketId, currentTarget); + if (retryTime == null) { + retryTime = new RetryTimeKeeper(retryTimeout); + } + currentTarget = getNodeForBucketWrite(bucketId, retryTime); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); // It's possible this is a GemFire thread e.g. ServerConnection // which got to this point because of a distributed system shutdown or - // region closure which uses interrupt to break any sleep() or wait() calls - // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception + // region closure which uses interrupt to break any sleep() or wait() + // calls + // e.g. waitForPrimary or waitForBucketRecovery in which case throw + // exception checkShutdown(); - continue; - } // pick target - try { - final boolean isLocal = (localMaxMemory > 0) && currentTarget.equals(getMyId()); - if (logger.isDebugEnabled()) { - logger.debug("putInBucket: currentTarget = {}; ifNew = {}; ifOld = {}; isLocal = {}", - currentTarget, ifNew, ifOld, isLocal); - } - checkIfAboveThreshold(event); - if (isLocal) { - event.setInvokePRCallbacks(true); - long start = prStats.startPutLocal(); - try { - final BucketRegion br = - dataStore.getInitializedBucketForId(event.getKey(), bucketId); - // Local updates should insert a serialized (aka CacheDeserializable) object - // given that most manipulation of values is remote (requiring serialization to send). - // But... function execution always implies local manipulation of - // values so keeping locally updated values in Object form should be more efficient. - if (!FunctionExecutionPooledExecutor.isFunctionExecutionThread()) { - // TODO: this condition may not help since BucketRegion.virtualPut calls - // forceSerialized - br.forceSerialized(event); - } - if (ifNew) { - result = dataStore.createLocally(br, event, ifNew, ifOld, requireOldValue, - lastModified); - } else { - result = dataStore.putLocally(br, event, ifNew, ifOld, expectedOldValue, - requireOldValue, lastModified); - } - } finally { - prStats.endPutLocal(start); - } - } // local - else { // remote - // no need to perform early serialization (and create an un-necessary byte array) - // sending the message performs that work. - long start = prStats.startPutRemote(); - try { - if (ifNew) { - result = createRemotely(currentTarget, bucketId, event, requireOldValue); - } else { - result = putRemotely(currentTarget, event, ifNew, ifOld, expectedOldValue, - requireOldValue); - if (!requireOldValue) { - // make sure old value is set to NOT_AVAILABLE token - event.oldValueNotAvailable(); - } - } - } finally { - prStats.endPutRemote(start); + // If we get here, the attempt failed... + if (count == 1) { + if (ifNew) { + prStats.incCreateOpsRetried(); + } else { + prStats.incPutOpsRetried(); } - } // remote - - if (!result && !ifOld && !ifNew) { - Assert.assertTrue(!isLocal); - ForceReattemptException fre = new ForceReattemptException( - "false result when !ifNew and !ifOld is unacceptable - retrying"); - fre.setHash(event.getKey().hashCode()); - throw fre; } - - return result; - } catch (ConcurrentCacheModificationException e) { - if (logger.isDebugEnabled()) { - logger.debug("putInBucket: caught concurrent cache modification exception", e); + if (event.getOperation().isCreate()) { + prStats.incCreateRetries(); + } else { + prStats.incPutRetries(); } - event.isConcurrencyConflict(true); - if (logger.isTraceEnabled()) { - logger.trace( - "ConcurrentCacheModificationException received for putInBucket for bucketId: {}{}{} for event: {} No reattampt is done, returning from here", - getPRId(), BUCKET_ID_SEPARATOR, bucketId, event); - } - return result; - } catch (ForceReattemptException prce) { - prce.checkKey(event.getKey()); if (logger.isDebugEnabled()) { logger.debug( - "putInBucket: Got ForceReattemptException for {} on VM {} for node {}{}{} for bucket = {}", - this, getMyId(), currentTarget, getPRId(), BUCKET_ID_SEPARATOR, bucketId, prce); - logger.debug("putInBucket: count={}", count); - } - checkReadiness(); - InternalDistributedMember lastTarget = currentTarget; - if (retryTime == null) { - retryTime = new RetryTimeKeeper(retryTimeout); - } - currentTarget = getNodeForBucketWrite(bucketId, retryTime); - if (lastTarget.equals(currentTarget)) { - if (retryTime.overMaximum()) { - PRHARedundancyProvider.timedOut(this, null, null, "update an entry", retryTimeout); - // NOTREACHED - } - retryTime.waitToRetryNode(); - } - event.setPossibleDuplicate(true); - } catch (PrimaryBucketException notPrimary) { - if (logger.isDebugEnabled()) { - logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(), + "putInBucket for bucketId = {} failed (attempt # {} ({} ms left), retrying with node {}", + bucketStringForLogs(bucketId), count, (timeOut - System.currentTimeMillis()), currentTarget); } - getRegionAdvisor().notPrimary(bucketId, currentTarget); - if (retryTime == null) { - retryTime = new RetryTimeKeeper(retryTimeout); - } - currentTarget = getNodeForBucketWrite(bucketId, retryTime); - } - - // It's possible this is a GemFire thread e.g. ServerConnection - // which got to this point because of a distributed system shutdown or - // region closure which uses interrupt to break any sleep() or wait() - // calls - // e.g. waitForPrimary or waitForBucketRecovery in which case throw - // exception - checkShutdown(); - - // If we get here, the attempt failed... - if (count == 1) { - if (ifNew) { - prStats.incCreateOpsRetried(); - } else { - prStats.incPutOpsRetried(); - } - } - if (event.getOperation().isCreate()) { - prStats.incCreateRetries(); - } else { - prStats.incPutRetries(); - } + } // for - if (logger.isDebugEnabled()) { - logger.debug( - "putInBucket for bucketId = {} failed (attempt # {} ({} ms left), retrying with node {}", - bucketStringForLogs(bucketId), count, (timeOut - System.currentTimeMillis()), - currentTarget); + } finally { + if (observer != null) { + observer.afterReceiveResponse(bucketOwners); } - } // for - + } // NOTREACHED } @@ -4173,8 +4187,20 @@ private Object getFromBucket(final InternalDistributedMember targetNode, int buc } } - obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, - clientEvent, returnTombstones); + // toberal + ProxyClientRequestObserver observer = ProxyClientRequestObserverHolder.getInstance();; + try { + if (observer != null) { + observer.beforeSendRequest(Collections.singleton(retryNode)); + } + obj = + getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, + clientEvent, returnTombstones); + } finally { + if (observer != null) { + observer.afterReceiveResponse(Collections.singleton(retryNode)); + } + } // TODO: there should be better way than this one String name = Thread.currentThread().getName(); @@ -4886,23 +4912,35 @@ public Object getRemotely(InternalDistributedMember targetNode, int bucketId, fi private ResultCollector executeFunctionOnRemoteNode(InternalDistributedMember targetNode, final Function function, final Object object, final Set routingKeys, ResultCollector rc, int[] bucketArray, ServerToClientFunctionResultSender sender, AbstractExecution execution) { - PartitionedRegionFunctionResultSender resultSender = - new PartitionedRegionFunctionResultSender(null, this, 0, rc, sender, false, true, - execution.isForwardExceptions(), function, bucketArray); - PartitionedRegionFunctionResultWaiter resultReceiver = - new PartitionedRegionFunctionResultWaiter(getSystem(), getPRId(), rc, function, - resultSender); + ProxyClientRequestObserver observer = ProxyClientRequestObserverHolder.getInstance(); + if (observer != null) { + observer.beforeSendRequest(Collections.singleton(targetNode)); + } - FunctionRemoteContext context = new FunctionRemoteContext(function, object, routingKeys, - bucketArray, execution.isReExecute(), execution.isFnSerializationReqd(), getPrincipal()); + try { + PartitionedRegionFunctionResultSender resultSender = + new PartitionedRegionFunctionResultSender(null, this, 0, rc, sender, false, true, + execution.isForwardExceptions(), function, bucketArray); - HashMap recipMap = - new HashMap<>(); + PartitionedRegionFunctionResultWaiter resultReceiver = + new PartitionedRegionFunctionResultWaiter(getSystem(), getPRId(), rc, function, + resultSender); - recipMap.put(targetNode, context); + FunctionRemoteContext context = new FunctionRemoteContext(function, object, routingKeys, + bucketArray, execution.isReExecute(), execution.isFnSerializationReqd(), getPrincipal()); - return resultReceiver.getPartitionedDataFrom(recipMap, this, execution); + HashMap recipMap = + new HashMap<>(); + + recipMap.put(targetNode, context); + + return resultReceiver.getPartitionedDataFrom(recipMap, this, execution); + } finally { + if (observer != null) { + observer.afterReceiveResponse(Collections.singleton(targetNode)); + } + } } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java index 7bfb4f17f199..76e11a06fc6c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java @@ -2952,6 +2952,10 @@ public void executeOnDataStore(final Set localKeys, final Function function, fin FunctionStats stats = FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem()); long start = stats.startFunctionExecution(function.hasResult()); try { + logger.info("toberal Executing Function: {} on Remote Node with context: {}, bucketArray: {}", + function.getId(), + prContext, bucketArray); + if (logger.isDebugEnabled()) { logger.debug("Executing Function:{} on Remote Node with context:{}", function.getId(), prContext); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserver.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserver.java new file mode 100644 index 000000000000..58d6dcc934ec --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserver.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Set; + +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; + +public interface ProxyClientRequestObserver { + void beforeSendRequest(Set members); + + void afterReceiveResponse(Set members); + +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverAdapter.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverAdapter.java new file mode 100644 index 000000000000..173d3d524b69 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverAdapter.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.Set; + +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; + +public class ProxyClientRequestObserverAdapter implements ProxyClientRequestObserver { + @Override + public void beforeSendRequest(Set members) {} + + @Override + public void afterReceiveResponse(Set members) {} +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverHolder.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverHolder.java new file mode 100644 index 000000000000..6e6eb548015c --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverHolder.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.concurrent.atomic.AtomicReference; + +import org.jetbrains.annotations.NotNull; + +import org.apache.geode.annotations.Immutable; +import org.apache.geode.annotations.internal.MakeNotStatic; + +/** + * This class is intended to hold a single 'observer' which will receive callbacks from the + * Distributed/Local Region when events like clear take place. There can be only one such observer + * at a time. If no observer is needed, this member variable should point to an object with + * 'do-nothing' methods, such as PutGetMessageObserverAdapter. + * + * Code which wishes to observe events during Region clear should do so using the following + * technique: + * + * class MyPutGetMessageObserver extends PutGetMessageObserverAdapter { // ... override methods of + * interest ... } + * + * PutGetMessageObserver old = PutGetMessageObserverHolder.setInstance(new + * MyPutGetMessageObserver()); + * org.apache.geode.internal.cache.LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER=true; + * + * try { Call region methods here } finally { // reset to the original PutGetMessageObserver. + * PutGetMessageObserverHolder.setInstance(old); } + * + * The Region code will call methods on this static member using the following technique: + * + * PutGetMessageObserver observer = PutGetMessageObserverHolder.getInstance(); try { + * observer.startMethod(arguments); doSomething(); } finally { observer.stopMethod(arguments); } + * + */ +public class ProxyClientRequestObserverHolder { + + /** + * The default 'do-nothing' observer * + */ + @Immutable + private static final ProxyClientRequestObserver NO_OBSERVER = + new ProxyClientRequestObserverAdapter(); + /** + * The current observer which will be notified of all query events. + */ + @MakeNotStatic + private static final AtomicReference _instance = + new AtomicReference<>(NO_OBSERVER); + + /** + * Set the given observer to be notified of query events. Returns the current observer. + */ + public static ProxyClientRequestObserver setInstance( + @NotNull ProxyClientRequestObserver observer) { + return _instance.getAndSet(observer); + } + + /** Return the current observer instance */ + public static ProxyClientRequestObserver getInstance() { + return _instance.get(); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java index 2d1b1a864623..e7f091cb3696 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java @@ -327,6 +327,7 @@ void send(TXLockId lockId) { } return; } + Assert.assertTrue(txState != null, "Send must have transaction state."); this.lockId = (TXLockIdImpl) lockId; updateLockMembers(); @@ -335,18 +336,7 @@ void send(TXLockId lockId) { IdentityHashMap> distMap = new IdentityHashMap<>(); HashSet ackReceivers = null; - for (final Map.Entry entry : msgMap.entrySet()) { - final RegionCommitList rcl = entry.getValue(); - if (rcl.getNeedsAck()) { - if (ackReceivers == null) { - ackReceivers = new HashSet<>(); - } - ackReceivers.add(entry.getKey()); - } - final Set receivers = - distMap.computeIfAbsent(rcl, k -> new HashSet<>()); - receivers.add(entry.getKey()); - } + ackReceivers = getAckReceivers(distMap, ackReceivers); CommitReplyProcessor processor = null; { @@ -422,6 +412,41 @@ void send(TXLockId lockId) { } } + Set getTxCommitRecipients() { + IdentityHashMap> distMap = + new IdentityHashMap<>(); + + getAckReceivers(distMap, null); + Set recipients = new HashSet<>(); + for (final Map.Entry> me : distMap + .entrySet()) { + recipients.addAll(me.getValue()); + } + recipients.retainAll(dm.getDistributionManagerIds()); + return recipients; + } + + private HashSet getAckReceivers( + IdentityHashMap> distMap, + HashSet ackReceivers) { + if (msgMap == null) { + return null; + } + for (final Map.Entry entry : msgMap.entrySet()) { + final RegionCommitList rcl = entry.getValue(); + if (rcl.getNeedsAck()) { + if (ackReceivers == null) { + ackReceivers = new HashSet<>(); + } + ackReceivers.add(entry.getKey()); + } + final Set receivers = + distMap.computeIfAbsent(rcl, k -> new HashSet<>()); + receivers.add(entry.getKey()); + } + return ackReceivers; + } + @Override public boolean containsRegionContentChange() { return true; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index 58828b4b6550..71c025795410 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -512,7 +512,20 @@ public void commit() throws CommitConflictException { lockTXRegions(regions); + ProxyClientRequestObserver observer = ProxyClientRequestObserverHolder.getInstance(); + msg = buildMessage(); + Set txCommitRecipients = msg.getTxCommitRecipients(); try { + if (observer != null) { + try { + observer.beforeSendRequest(txCommitRecipients); + } catch (Exception e) { + SystemFailure.checkFailure(); + logger.error("toberal throwing exception {} before committing for tx: {}", e, this); + throw e; + } + } + // apply changes to the cache applyChanges(entries); // For internal testing @@ -523,7 +536,6 @@ public void commit() throws CommitConflictException { attachFilterProfileInformation(entries); // build and send the message - msg = buildMessage(); commitMessage = msg; if (internalBeforeSend != null) { internalBeforeSend.run(); @@ -542,6 +554,9 @@ public void commit() throws CommitConflictException { */ commitMessage = buildCompleteMessage(); } finally { + if (observer != null) { + observer.afterReceiveResponse(txCommitRecipients); + } unlockTXRegions(regions); } } finally { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java index 8ea970d3ed8a..186b29a1e3e6 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java @@ -110,9 +110,9 @@ public void attachFilterProfileAfterApplyingChanges() { txState.commit(); InOrder inOrder = inOrder(txState, txCommitMessage); + inOrder.verify(txState).buildMessage(); inOrder.verify(txState).applyChanges(any()); inOrder.verify(txState).attachFilterProfileInformation(any()); - inOrder.verify(txState).buildMessage(); inOrder.verify(txCommitMessage).send(any()); inOrder.verify(txState).firePendingCallbacks(); } From bfdc7c9302f155c12b51c3debb2ea8fffbcb7d81 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Wed, 28 Sep 2022 14:40:55 +0200 Subject: [PATCH 2/4] Added counter and made observer public --- .../internal/cache/execute/GenericDUnitTest.java | 15 +++++++++------ .../cache/ProxyClientRequestObserver.java | 2 +- .../cache/ProxyClientRequestObserverAdapter.java | 2 +- .../cache/ProxyClientRequestObserverHolder.java | 2 +- .../apache/geode/cache/server/CacheServer.java | 5 +++++ .../geode/internal/cache/CacheServerImpl.java | 12 ++++++++++++ .../geode/internal/cache/PartitionedRegion.java | 5 +++-- .../cache/PartitionedRegionDataStore.java | 3 --- .../org/apache/geode/internal/cache/TXState.java | 2 ++ .../geode/management/CacheServerMXBean.java | 1 + .../internal/beans/CacheServerBridge.java | 3 +++ .../internal/beans/CacheServerMBean.java | 5 +++++ 12 files changed, 43 insertions(+), 14 deletions(-) rename geode-core/src/main/java/org/apache/geode/{internal => }/cache/ProxyClientRequestObserver.java (96%) rename geode-core/src/main/java/org/apache/geode/{internal => }/cache/ProxyClientRequestObserverAdapter.java (96%) rename geode-core/src/main/java/org/apache/geode/{internal => }/cache/ProxyClientRequestObserverHolder.java (98%) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java index 25e75f6967eb..cf43029f2562 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java @@ -38,9 +38,12 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.CacheTransactionManager; import org.apache.geode.cache.CommitConflictException; import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.ProxyClientRequestObserver; +import org.apache.geode.cache.ProxyClientRequestObserverHolder; import org.apache.geode.cache.Region; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.execute.Execution; @@ -56,8 +59,6 @@ import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.distributed.internal.DistributionMessageObserver; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.cache.ProxyClientRequestObserver; -import org.apache.geode.internal.cache.ProxyClientRequestObserverHolder; import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.test.awaitility.GeodeAwaitility; import org.apache.geode.test.dunit.AsyncInvocation; @@ -77,7 +78,7 @@ @RunWith(GeodeParamsRunner.class) public class GenericDUnitTest implements Serializable { - private static final int MAX_THREADS = 8; + private static final int MAX_THREADS = 4; private static final String regionName = "GenericDUnitTest"; @@ -105,7 +106,6 @@ public class GenericDUnitTest implements Serializable { @Before public void setUp() throws Exception { locator = clusterStartupRule.startLocatorVM(0); - server1 = startServer(1, MAX_THREADS); } @After @@ -280,8 +280,10 @@ private Object executePutFunction(Function function, String regionName, Object k Object[] args = {useTransaction, invocation, key, value}; Set filter = new HashSet(); filter.add(key); - // without filter, results are weird. Check, toberal + // without filter, results are weird. Check, toberal. + // I have retested without filter and have not seen anything strange. resultCollector = + // execution.setArguments(args).execute(function.getId()); execution.setArguments(args).withFilter(filter).execute(function.getId()); Object result = resultCollector.getResult(); return result; @@ -344,7 +346,7 @@ private MemberVM startServer(final int vmIndex, int maxThreads) { cacheRule -> cacheRule .withProperty(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.internal.cache.execute.GenericDUnitTest*") - // .withMaxThreads(maxThreads) + .withMaxThreads(maxThreads) .withConnectionToLocator(locator.getPort())); } @@ -496,6 +498,7 @@ public void beforeSendRequest(Set members) { || Thread.currentThread().getName().startsWith("Function Execution Processor")) { for (InternalDistributedMember member : members) { if (threadsToDestination.getOrDefault(member, 0) >= maxThreadsToDestination) { + CacheFactory.getAnyInstance().getCacheServers().get(0).incRejectedProxyRequests(); logger.info("toberal Max number of threads reached for " + member, new Exception("kk")); throw new IllegalStateException("Max number of threads reached"); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserver.java b/geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserver.java similarity index 96% rename from geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserver.java rename to geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserver.java index 58d6dcc934ec..995b5ec0cc9a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserver.java +++ b/geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserver.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.apache.geode.internal.cache; +package org.apache.geode.cache; import java.util.Set; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverAdapter.java b/geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserverAdapter.java similarity index 96% rename from geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverAdapter.java rename to geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserverAdapter.java index 173d3d524b69..9dc1eddffbf7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverAdapter.java +++ b/geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserverAdapter.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.apache.geode.internal.cache; +package org.apache.geode.cache; import java.util.Set; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverHolder.java b/geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserverHolder.java similarity index 98% rename from geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverHolder.java rename to geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserverHolder.java index 6e6eb548015c..5be2424f9e4b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyClientRequestObserverHolder.java +++ b/geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserverHolder.java @@ -12,7 +12,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package org.apache.geode.internal.cache; +package org.apache.geode.cache; import java.util.concurrent.atomic.AtomicReference; diff --git a/geode-core/src/main/java/org/apache/geode/cache/server/CacheServer.java b/geode-core/src/main/java/org/apache/geode/cache/server/CacheServer.java index 945e018ded7b..cb8b0e0d5802 100755 --- a/geode-core/src/main/java/org/apache/geode/cache/server/CacheServer.java +++ b/geode-core/src/main/java/org/apache/geode/cache/server/CacheServer.java @@ -519,4 +519,9 @@ public interface CacheServer { */ Set getInterestRegistrationListeners(); + default void incRejectedProxyRequests() {} + + default long getRejectedProxyRequests() { + return 0; + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java index a921f543d706..e88f9a937302 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; @@ -135,6 +136,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution private final ClientHealthMonitorProvider clientHealthMonitorProvider; private final Function cacheServerAdvisorProvider; + private final AtomicLong rejectedProxyRequests = new AtomicLong(); public static final boolean ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE = Boolean.getBoolean( GeodeGlossary.GEMFIRE_PREFIX + "cache-server.enable-notify-by-subscription-false"); @@ -839,4 +841,14 @@ public CacheClientNotifierProvider getCacheClientNotifierProvider() { public ClientHealthMonitorProvider getClientHealthMonitorProvider() { return clientHealthMonitorProvider; } + + @Override + public long getRejectedProxyRequests() { + return rejectedProxyRequests.get(); + } + + @Override + public void incRejectedProxyRequests() { + rejectedProxyRequests.incrementAndGet(); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 193c53a04f9e..1b9b0398e5a0 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -83,6 +83,8 @@ import org.apache.geode.cache.PartitionResolver; import org.apache.geode.cache.PartitionedRegionDistributionException; import org.apache.geode.cache.PartitionedRegionStorageException; +import org.apache.geode.cache.ProxyClientRequestObserver; +import org.apache.geode.cache.ProxyClientRequestObserverHolder; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionDestroyedException; @@ -4187,8 +4189,7 @@ private Object getFromBucket(final InternalDistributedMember targetNode, int buc } } - // toberal - ProxyClientRequestObserver observer = ProxyClientRequestObserverHolder.getInstance();; + ProxyClientRequestObserver observer = ProxyClientRequestObserverHolder.getInstance(); try { if (observer != null) { observer.beforeSendRequest(Collections.singleton(retryNode)); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java index 76e11a06fc6c..dc091ab63e34 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java @@ -2952,9 +2952,6 @@ public void executeOnDataStore(final Set localKeys, final Function function, fin FunctionStats stats = FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem()); long start = stats.startFunctionExecution(function.hasResult()); try { - logger.info("toberal Executing Function: {} on Remote Node with context: {}, bucketArray: {}", - function.getId(), - prContext, bucketArray); if (logger.isDebugEnabled()) { logger.debug("Executing Function:{} on Remote Node with context:{}", function.getId(), diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index 71c025795410..d9ab3f9728ac 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -42,6 +42,8 @@ import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.cache.FailedSynchronizationException; import org.apache.geode.cache.Operation; +import org.apache.geode.cache.ProxyClientRequestObserver; +import org.apache.geode.cache.ProxyClientRequestObserverHolder; import org.apache.geode.cache.Region; import org.apache.geode.cache.Region.Entry; import org.apache.geode.cache.RegionDestroyedException; diff --git a/geode-core/src/main/java/org/apache/geode/management/CacheServerMXBean.java b/geode-core/src/main/java/org/apache/geode/management/CacheServerMXBean.java index d3b567650090..7191a6c416ca 100644 --- a/geode-core/src/main/java/org/apache/geode/management/CacheServerMXBean.java +++ b/geode-core/src/main/java/org/apache/geode/management/CacheServerMXBean.java @@ -498,4 +498,5 @@ public interface CacheServerMXBean { */ ClientQueueDetail showClientQueueDetails(String clientId) throws Exception; + long getRejectedProxyRequests(); } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java index 281114ddb708..a5aaac808088 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java @@ -740,6 +740,9 @@ public ClientQueueDetail getClientQueueDetail(String clientId) throws Exception return null; } + public long getRejectedProxyRequests() { + return cacheServer.getRejectedProxyRequests(); + } private static String getClientIdFromCacheClientProxy(CacheClientProxy p) { if (p == null) { diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerMBean.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerMBean.java index e372ecfe2bc7..1abc6c73fd2b 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerMBean.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerMBean.java @@ -311,4 +311,9 @@ public ClientQueueDetail[] showClientQueueDetails() throws Exception { public ClientQueueDetail showClientQueueDetails(String clientId) throws Exception { return bridge.getClientQueueDetail(clientId); } + + @Override + public long getRejectedProxyRequests() { + return bridge.getRejectedProxyRequests(); + } } From 6322defd70e92f5d859499763c3a46408e9c669a Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Wed, 28 Sep 2022 18:32:31 +0200 Subject: [PATCH 3/4] Add command to set thread limit and move class out of test --- .../cache/execute/GenericDUnitTest.java | 61 +--------- ...bserver.java => ProxyRequestObserver.java} | 2 +- ....java => ProxyRequestObserverAdapter.java} | 2 +- ...r.java => ProxyRequestObserverHolder.java} | 23 ++-- .../ThreadLimitingProxyRequestObserver.java | 69 +++++++++++ .../internal/cache/PartitionedRegion.java | 10 +- .../apache/geode/internal/cache/TXState.java | 6 +- .../SetThreadLimitingProxyRequestCommand.java | 110 ++++++++++++++++++ ...SetThreadLimitingProxyRequestFunction.java | 91 +++++++++++++++ ...g.springframework.shell.core.CommandMarker | 1 + 10 files changed, 297 insertions(+), 78 deletions(-) rename geode-core/src/main/java/org/apache/geode/cache/{ProxyClientRequestObserver.java => ProxyRequestObserver.java} (95%) rename geode-core/src/main/java/org/apache/geode/cache/{ProxyClientRequestObserverAdapter.java => ProxyRequestObserverAdapter.java} (92%) rename geode-core/src/main/java/org/apache/geode/cache/{ProxyClientRequestObserverHolder.java => ProxyRequestObserverHolder.java} (78%) create mode 100644 geode-core/src/main/java/org/apache/geode/cache/ThreadLimitingProxyRequestObserver.java create mode 100644 geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/SetThreadLimitingProxyRequestCommand.java create mode 100644 geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/SetThreadLimitingProxyRequestFunction.java diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java index cf43029f2562..1d15380d4f80 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/GenericDUnitTest.java @@ -22,9 +22,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.IntStream; import junitparams.Parameters; @@ -38,13 +36,12 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.CacheTransactionManager; import org.apache.geode.cache.CommitConflictException; import org.apache.geode.cache.PartitionAttributesFactory; -import org.apache.geode.cache.ProxyClientRequestObserver; -import org.apache.geode.cache.ProxyClientRequestObserverHolder; +import org.apache.geode.cache.ProxyRequestObserverHolder; import org.apache.geode.cache.Region; +import org.apache.geode.cache.ThreadLimitingProxyRequestObserver; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.execute.Execution; import org.apache.geode.cache.execute.Function; @@ -58,7 +55,6 @@ import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.distributed.internal.DistributionMessageObserver; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.test.awaitility.GeodeAwaitility; import org.apache.geode.test.dunit.AsyncInvocation; @@ -324,8 +320,8 @@ private void createServerRegionAndRegisterFunction(MemberVM server, int redundan FunctionService.registerFunction(function); // DistributionMessageObserver.setInstance(new MyMessageObserver()); int maxThreadsPerDestination = MAX_THREADS / 4; - ProxyClientRequestObserverHolder - .setInstance(new ThreadLimitingProxyClientRequestObserver(maxThreadsPerDestination)); + ProxyRequestObserverHolder + .setInstance(new ThreadLimitingProxyRequestObserver(maxThreadsPerDestination)); }); } @@ -476,53 +472,4 @@ public void beforeSendMessage(ClusterDistributionManager dm, DistributionMessage logger.info("toberal beforeSendMessage dm: {}, message: {}", dm, message, new Exception()); } } - - public static class ThreadLimitingProxyClientRequestObserver - implements ProxyClientRequestObserver { - private final int maxThreadsToDestination; - - public ThreadLimitingProxyClientRequestObserver(int maxThreadsToDestination) { - this.maxThreadsToDestination = maxThreadsToDestination; - } - - private static final Logger logger = LogService.getLogger(); - private final Map threadsToDestination = - new ConcurrentHashMap(); - - @Override - public void beforeSendRequest(Set members) { - logger.info( - "toberal beforeSendRequest members: {}, threadsToDestination: size:{}, contents:{}", - members, threadsToDestination.size(), threadsToDestination); - if (Thread.currentThread().getName().startsWith("ServerConnection on") - || Thread.currentThread().getName().startsWith("Function Execution Processor")) { - for (InternalDistributedMember member : members) { - if (threadsToDestination.getOrDefault(member, 0) >= maxThreadsToDestination) { - CacheFactory.getAnyInstance().getCacheServers().get(0).incRejectedProxyRequests(); - logger.info("toberal Max number of threads reached for " + member, new Exception("kk")); - throw new IllegalStateException("Max number of threads reached"); - } - } - for (InternalDistributedMember member : members) { - threadsToDestination.merge(member, 1, Integer::sum); - } - } - } - - @Override - public void afterReceiveResponse(Set members) { - logger.info( - "toberal afterReceiveResponse removing members: {}, threadsToDestination: size:{}, contents:{}", - members, threadsToDestination.size(), threadsToDestination); - for (InternalDistributedMember member : members) { - if (Thread.currentThread().getName().startsWith("ServerConnection on") - || Thread.currentThread().getName().startsWith("Function Execution Processor")) { - threadsToDestination.merge(member, -1, Integer::sum); - logger.info( - "toberal afterReceiveResponse removing member: {}, threadsToDestination: size:{}, contents:{}", - member, threadsToDestination.size(), threadsToDestination); - } - } - } - } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserver.java b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserver.java similarity index 95% rename from geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserver.java rename to geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserver.java index 995b5ec0cc9a..72e1827c43f5 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserver.java +++ b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserver.java @@ -18,7 +18,7 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -public interface ProxyClientRequestObserver { +public interface ProxyRequestObserver { void beforeSendRequest(Set members); void afterReceiveResponse(Set members); diff --git a/geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserverAdapter.java b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverAdapter.java similarity index 92% rename from geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserverAdapter.java rename to geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverAdapter.java index 9dc1eddffbf7..b6274abba3eb 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserverAdapter.java +++ b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverAdapter.java @@ -18,7 +18,7 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -public class ProxyClientRequestObserverAdapter implements ProxyClientRequestObserver { +public class ProxyRequestObserverAdapter implements ProxyRequestObserver { @Override public void beforeSendRequest(Set members) {} diff --git a/geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserverHolder.java b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverHolder.java similarity index 78% rename from geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserverHolder.java rename to geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverHolder.java index 5be2424f9e4b..53b764f749c3 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/ProxyClientRequestObserverHolder.java +++ b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverHolder.java @@ -16,10 +16,12 @@ import java.util.concurrent.atomic.AtomicReference; +import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.apache.geode.annotations.Immutable; import org.apache.geode.annotations.internal.MakeNotStatic; +import org.apache.geode.logging.internal.log4j.api.LogService; /** * This class is intended to hold a single 'observer' which will receive callbacks from the @@ -46,31 +48,30 @@ * observer.startMethod(arguments); doSomething(); } finally { observer.stopMethod(arguments); } * */ -public class ProxyClientRequestObserverHolder { +public class ProxyRequestObserverHolder { + private static final Logger logger = LogService.getLogger(); - /** - * The default 'do-nothing' observer * - */ @Immutable - private static final ProxyClientRequestObserver NO_OBSERVER = - new ProxyClientRequestObserverAdapter(); + private static final ProxyRequestObserver NO_OBSERVER = + new ProxyRequestObserverAdapter(); /** * The current observer which will be notified of all query events. */ @MakeNotStatic - private static final AtomicReference _instance = + private static final AtomicReference _instance = new AtomicReference<>(NO_OBSERVER); /** - * Set the given observer to be notified of query events. Returns the current observer. + * Set the given observer to be notified of proxy requests. Returns the current observer. */ - public static ProxyClientRequestObserver setInstance( - @NotNull ProxyClientRequestObserver observer) { + public static ProxyRequestObserver setInstance( + @NotNull ProxyRequestObserver observer) { + logger.info("Setting ProxyRequestObserver with: {}", observer); return _instance.getAndSet(observer); } /** Return the current observer instance */ - public static ProxyClientRequestObserver getInstance() { + public static ProxyRequestObserver getInstance() { return _instance.get(); } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/ThreadLimitingProxyRequestObserver.java b/geode-core/src/main/java/org/apache/geode/cache/ThreadLimitingProxyRequestObserver.java new file mode 100644 index 000000000000..bb88056daae3 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/ThreadLimitingProxyRequestObserver.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ThreadLimitingProxyRequestObserver + implements ProxyRequestObserver { + private final int maxThreadsToDestination; + + public ThreadLimitingProxyRequestObserver(int maxThreadsToDestination) { + this.maxThreadsToDestination = maxThreadsToDestination; + } + + private static final Logger logger = LogService.getLogger(); + private final Map threadsToDestination = + new ConcurrentHashMap(); + + @Override + public void beforeSendRequest(Set members) { + if (Thread.currentThread().getName().startsWith("ServerConnection on") + || Thread.currentThread().getName().startsWith("Function Execution Processor")) { + for (InternalDistributedMember member : members) { + if (threadsToDestination.getOrDefault(member, 0) >= maxThreadsToDestination) { + CacheFactory.getAnyInstance().getCacheServers().get(0).incRejectedProxyRequests(); + logger.info("toberal Max number of threads reached for " + member, new Exception("kk")); + throw new IllegalStateException("Max number of threads reached"); + } + } + for (InternalDistributedMember member : members) { + threadsToDestination.merge(member, 1, Integer::sum); + } + } + } + + @Override + public void afterReceiveResponse(Set members) { + for (InternalDistributedMember member : members) { + if (Thread.currentThread().getName().startsWith("ServerConnection on") + || Thread.currentThread().getName().startsWith("Function Execution Processor")) { + threadsToDestination.merge(member, -1, Integer::sum); + } + } + } + + @Override + public String toString() { + return this.getClass().getName() + ", maxThreadsToDestination: " + maxThreadsToDestination; + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 1b9b0398e5a0..fdce3137b3e5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -83,8 +83,8 @@ import org.apache.geode.cache.PartitionResolver; import org.apache.geode.cache.PartitionedRegionDistributionException; import org.apache.geode.cache.PartitionedRegionStorageException; -import org.apache.geode.cache.ProxyClientRequestObserver; -import org.apache.geode.cache.ProxyClientRequestObserverHolder; +import org.apache.geode.cache.ProxyRequestObserver; +import org.apache.geode.cache.ProxyRequestObserverHolder; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionDestroyedException; @@ -2982,7 +2982,7 @@ private boolean putInBucket(final InternalDistributedMember targetNode, final In event.getKey().hashCode(), targetNode, bucketStringForLogs(bucketId), retryTimeout); } - ProxyClientRequestObserver observer = ProxyClientRequestObserverHolder.getInstance(); + ProxyRequestObserver observer = ProxyRequestObserverHolder.getInstance(); Set bucketOwners = null; try { if (observer != null) { @@ -4189,7 +4189,7 @@ private Object getFromBucket(final InternalDistributedMember targetNode, int buc } } - ProxyClientRequestObserver observer = ProxyClientRequestObserverHolder.getInstance(); + ProxyRequestObserver observer = ProxyRequestObserverHolder.getInstance(); try { if (observer != null) { observer.beforeSendRequest(Collections.singleton(retryNode)); @@ -4914,7 +4914,7 @@ private ResultCollector executeFunctionOnRemoteNode(InternalDistributedMember ta final Function function, final Object object, final Set routingKeys, ResultCollector rc, int[] bucketArray, ServerToClientFunctionResultSender sender, AbstractExecution execution) { - ProxyClientRequestObserver observer = ProxyClientRequestObserverHolder.getInstance(); + ProxyRequestObserver observer = ProxyRequestObserverHolder.getInstance(); if (observer != null) { observer.beforeSendRequest(Collections.singleton(targetNode)); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index d9ab3f9728ac..4e79c72073f0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -42,8 +42,8 @@ import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.cache.FailedSynchronizationException; import org.apache.geode.cache.Operation; -import org.apache.geode.cache.ProxyClientRequestObserver; -import org.apache.geode.cache.ProxyClientRequestObserverHolder; +import org.apache.geode.cache.ProxyRequestObserver; +import org.apache.geode.cache.ProxyRequestObserverHolder; import org.apache.geode.cache.Region; import org.apache.geode.cache.Region.Entry; import org.apache.geode.cache.RegionDestroyedException; @@ -514,7 +514,7 @@ public void commit() throws CommitConflictException { lockTXRegions(regions); - ProxyClientRequestObserver observer = ProxyClientRequestObserverHolder.getInstance(); + ProxyRequestObserver observer = ProxyRequestObserverHolder.getInstance(); msg = buildMessage(); Set txCommitRecipients = msg.getTxCommitRecipients(); try { diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/SetThreadLimitingProxyRequestCommand.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/SetThreadLimitingProxyRequestCommand.java new file mode 100644 index 000000000000..a1900efa4fc6 --- /dev/null +++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/SetThreadLimitingProxyRequestCommand.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.management.internal.cli.commands; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.logging.log4j.Logger; +import org.springframework.shell.core.annotation.CliAvailabilityIndicator; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; + +import org.apache.geode.cache.execute.Execution; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.management.cli.ConverterHint; +import org.apache.geode.management.cli.GfshCommand; +import org.apache.geode.management.internal.cli.functions.SetThreadLimitingProxyRequestFunction; +import org.apache.geode.management.internal.cli.result.model.ResultModel; +import org.apache.geode.management.internal.cli.result.model.TabularResultModel; +import org.apache.geode.management.internal.i18n.CliStrings; +import org.apache.geode.management.internal.security.ResourceOperation; +import org.apache.geode.security.ResourcePermission; + +public class SetThreadLimitingProxyRequestCommand extends GfshCommand { + + private static final Logger logger = LogService.getLogger(); + + @CliAvailabilityIndicator({"set-thread-limit-proxy"}) + public boolean commandAvailable() { + return isOnlineCommandAvailable(); + } + + @CliCommand(value = "set-thread-limit-proxy", + help = "Set a maximum number of threads to be used to proxy operations to another server") + @ResourceOperation(resource = ResourcePermission.Resource.CLUSTER, + operation = ResourcePermission.Operation.MANAGE) + + public ResultModel setThreadLimitingProxyRequest( + @CliOption(key = "max-threads-to-destination", + optionContext = ConverterHint.LOG_LEVEL, mandatory = true, unspecifiedDefaultValue = "0", + help = "The maximum number of threads to a specific destination") int maxThreads) { + + + Set dsMembers = getAllNormalMembers(); + + SetThreadLimitingProxyRequestFunction logFunction = new SetThreadLimitingProxyRequestFunction(); + FunctionService.registerFunction(logFunction); + Object[] functionArgs = new Object[1]; + functionArgs[0] = maxThreads; + + ResultModel result = new ResultModel(); + + @SuppressWarnings("unchecked") + Execution execution = FunctionService.onMembers(dsMembers).setArguments(functionArgs); + if (execution == null) { + return ResultModel.createError(CliStrings.CHANGE_LOGLEVEL__MSG__CANNOT_EXECUTE); + } + List resultList = + (List) executeFunction(logFunction, functionArgs, dsMembers).getResult(); + + TabularResultModel tableInfo = result.addTable("result"); + tableInfo.setColumnHeader("Member", + "thread limited"); + for (Object object : resultList) { + try { + if (object instanceof Throwable) { + logger.warn("Exception in set thread limit for proxy requests " + + ((Throwable) object).getMessage(), + ((Throwable) object)); + continue; + } + + if (object != null) { + @SuppressWarnings("unchecked") + Map resultMap = (Map) object; + Map.Entry entry = resultMap.entrySet().iterator().next(); + + if (entry.getValue().contains("Exception")) { + tableInfo.addRow(entry.getKey(), "false"); + } else { + tableInfo.addRow(entry.getKey(), "true"); + } + + } + } catch (Exception ex) { + logger.warn("command exception " + ex); + } + } + + logger.info("command result=" + result); + return result; + + } +} diff --git a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/SetThreadLimitingProxyRequestFunction.java b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/SetThreadLimitingProxyRequestFunction.java new file mode 100644 index 000000000000..1428e18d6c1b --- /dev/null +++ b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/SetThreadLimitingProxyRequestFunction.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.management.internal.cli.functions; + + +import java.util.HashMap; +import java.util.Map; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.cache.ProxyRequestObserver; +import org.apache.geode.cache.ProxyRequestObserverHolder; +import org.apache.geode.cache.ThreadLimitingProxyRequestObserver; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.execute.InternalFunction; +import org.apache.geode.internal.logging.log4j.LogMarker; +import org.apache.geode.logging.internal.log4j.api.LogService; + +/** + * Class for change log level function + * + * @since 8.0 + */ +public class SetThreadLimitingProxyRequestFunction implements InternalFunction { + private static final Logger logger = LogService.getLogger(); + private static final long serialVersionUID = 1L; + + private static final String ID = + "org.apache.geode.management.internal.cli.functions.SetThreadLimitingProxyRequestFunction"; + + @Override + public String getId() { + return ID; + } + + @Override + public void execute(FunctionContext context) { + InternalCache cache = (InternalCache) context.getCache(); + Map result = new HashMap<>(); + try { + Object[] args = context.getArguments(); + Integer maxThreads = (Integer) args[0]; + + ProxyRequestObserver observer = null; + + if (maxThreads > 0) { + observer = new ThreadLimitingProxyRequestObserver(maxThreads); + } + + ProxyRequestObserverHolder.setInstance(observer); + + result.put(cache.getDistributedSystem().getDistributedMember().getId(), + "Set thread liminting proxy request to: " + maxThreads); + context.getResultSender().lastResult(result); + } catch (Exception ex) { + logger.info(LogMarker.CONFIG_MARKER, "GFSH exception {}", ex.getMessage(), + ex); + result.put(cache.getDistributedSystem().getDistributedMember().getId(), + "Exception " + ex.getMessage()); + context.getResultSender().lastResult(result); + } + } + + @Override + public boolean hasResult() { + return true; + } + + @Override + public boolean optimizeForWrite() { + return false; + } + + @Override + public boolean isHA() { + return false; + } +} diff --git a/geode-gfsh/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker b/geode-gfsh/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker index 00ae9b2e0341..0fe8f2ab187d 100644 --- a/geode-gfsh/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker +++ b/geode-gfsh/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker @@ -86,6 +86,7 @@ org.apache.geode.management.internal.cli.commands.RestoreRedundancyCommand org.apache.geode.management.internal.cli.commands.ResumeAsyncEventQueueDispatcherCommand org.apache.geode.management.internal.cli.commands.ResumeGatewaySenderCommand org.apache.geode.management.internal.cli.commands.RevokeMissingDiskStoreCommand +org.apache.geode.management.internal.cli.commands.SetThreadLimitingProxyRequestCommand org.apache.geode.management.internal.cli.commands.SetVariableCommand org.apache.geode.management.internal.cli.commands.ShCommand org.apache.geode.management.internal.cli.commands.ShowDeadlockCommand From a1ee7c3ed641b21bec6d589c489713a4469a78ad Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Thu, 29 Sep 2022 15:02:08 +0200 Subject: [PATCH 4/4] Add exceptions for new methods in japicmp and fix javadoc warnings and add function to sanctioned data serializables --- .../src/main/resources/japicmp_exceptions.json | 7 ++++++- .../src/integrationTest/resources/assembly_content.txt | 4 ++++ .../apache/geode/cache/ProxyRequestObserverHolder.java | 10 +++++++++- .../management/sanctioned-geode-gfsh-serializables.txt | 1 + 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/build-tools/geode-japicmp/src/main/resources/japicmp_exceptions.json b/build-tools/geode-japicmp/src/main/resources/japicmp_exceptions.json index f9d029ecb6ac..39f062677a15 100755 --- a/build-tools/geode-japicmp/src/main/resources/japicmp_exceptions.json +++ b/build-tools/geode-japicmp/src/main/resources/japicmp_exceptions.json @@ -13,5 +13,10 @@ "Class org.apache.geode.management.DiskStoreMXBean": "Added new methods.", "Method org.apache.geode.management.DiskStoreMXBean.getTotalRecoveredEntryCreates()": "Added new stat", "Method org.apache.geode.management.DiskStoreMXBean.getTotalRecoveredEntryDestroys()": "Added new stat", - "Method org.apache.geode.management.DiskStoreMXBean.getTotalRecoveredEntryUpdates()": "Added new stat" + "Method org.apache.geode.management.DiskStoreMXBean.getTotalRecoveredEntryUpdates()": "Added new stat", + "Class org.apache.geode.cache.server.CacheServer": "Added two methods", + "Method org.apache.geode.cache.server.CacheServer.getRejectedProxyRequests()": "added new counter", + "Method org.apache.geode.cache.server.CacheServer.incRejectedProxyRequests()": "added new counter", + "Class org.apache.geode.management.CacheServerMXBean":"Added new counter", + "Method org.apache.geode.management.CacheServerMXBean.getRejectedProxyRequests()": "added new counter" } diff --git a/geode-assembly/src/integrationTest/resources/assembly_content.txt b/geode-assembly/src/integrationTest/resources/assembly_content.txt index 966298fe1abc..d419ece9f4ba 100644 --- a/geode-assembly/src/integrationTest/resources/assembly_content.txt +++ b/geode-assembly/src/integrationTest/resources/assembly_content.txt @@ -223,6 +223,9 @@ javadoc/org/apache/geode/cache/PartitionAttributesFactory.html javadoc/org/apache/geode/cache/PartitionResolver.html javadoc/org/apache/geode/cache/PartitionedRegionDistributionException.html javadoc/org/apache/geode/cache/PartitionedRegionStorageException.html +javadoc/org/apache/geode/cache/ProxyRequestObserver.html +javadoc/org/apache/geode/cache/ProxyRequestObserverAdapter.html +javadoc/org/apache/geode/cache/ProxyRequestObserverHolder.html javadoc/org/apache/geode/cache/Region.Entry.html javadoc/org/apache/geode/cache/Region.html javadoc/org/apache/geode/cache/RegionAccessException.html @@ -248,6 +251,7 @@ javadoc/org/apache/geode/cache/SerializedCacheValue.html javadoc/org/apache/geode/cache/StatisticsDisabledException.html javadoc/org/apache/geode/cache/SubscriptionAttributes.html javadoc/org/apache/geode/cache/SynchronizationCommitConflictException.html +javadoc/org/apache/geode/cache/ThreadLimitingProxyRequestObserver.html javadoc/org/apache/geode/cache/TimeoutException.html javadoc/org/apache/geode/cache/TransactionDataNodeHasDepartedException.html javadoc/org/apache/geode/cache/TransactionDataNotColocatedException.html diff --git a/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverHolder.java b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverHolder.java index 53b764f749c3..1430a1d3f017 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverHolder.java +++ b/geode-core/src/main/java/org/apache/geode/cache/ProxyRequestObserverHolder.java @@ -63,6 +63,10 @@ public class ProxyRequestObserverHolder { /** * Set the given observer to be notified of proxy requests. Returns the current observer. + * + * @param observer the observer to be set + * + * @return the current observer instance */ public static ProxyRequestObserver setInstance( @NotNull ProxyRequestObserver observer) { @@ -70,7 +74,11 @@ public static ProxyRequestObserver setInstance( return _instance.getAndSet(observer); } - /** Return the current observer instance */ + /** + * Return the current observer instance + * + * @return the current observer instance + */ public static ProxyRequestObserver getInstance() { return _instance.get(); } diff --git a/geode-gfsh/src/main/resources/org/apache/geode/gfsh/internal/management/sanctioned-geode-gfsh-serializables.txt b/geode-gfsh/src/main/resources/org/apache/geode/gfsh/internal/management/sanctioned-geode-gfsh-serializables.txt index e9a15586e71a..0c39fc335899 100644 --- a/geode-gfsh/src/main/resources/org/apache/geode/gfsh/internal/management/sanctioned-geode-gfsh-serializables.txt +++ b/geode-gfsh/src/main/resources/org/apache/geode/gfsh/internal/management/sanctioned-geode-gfsh-serializables.txt @@ -94,6 +94,7 @@ org/apache/geode/management/internal/cli/functions/RegionFunctionArgs$EvictionAt org/apache/geode/management/internal/cli/functions/RegionFunctionArgs$ExpirationAttrs,true,1474255033398008063,action:org/apache/geode/cache/ExpirationAction,time:java/lang/Integer org/apache/geode/management/internal/cli/functions/RegionFunctionArgs$PartitionArgs,true,5907052187323280919,partitionResolver:java/lang/String,prColocatedWith:java/lang/String,prLocalMaxMemory:java/lang/Integer,prRecoveryDelay:java/lang/Long,prRedundantCopies:java/lang/Integer,prStartupRecoveryDelay:java/lang/Long,prTotalMaxMemory:java/lang/Long,prTotalNumBuckets:java/lang/Integer org/apache/geode/management/internal/cli/functions/ResumeAsyncEventQueueDispatcherFunction,false +org/apache/geode/management/internal/cli/functions/SetThreadLimitingProxyRequestFunction,true,1 org/apache/geode/management/internal/cli/functions/ShowMissingDiskStoresFunction,false org/apache/geode/management/internal/cli/functions/ShutDownFunction,true,1 org/apache/geode/management/internal/cli/functions/SizeExportLogsFunction,true,1