Skip to content

Commit

Permalink
ZOOKEEPER-4749: Respect zookeeper.request.timeout also for asynchrono…
Browse files Browse the repository at this point in the history
…us api

Currently, `zookeeper.request.timeout` is only respected in synchronous
api. I think there are should be no much differences between following two.

```java
String createdPath = zk.create("/path", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
```

```java
CompletableFuture<String> future = new CompletableFuture<>();
zk.create("/path", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
    if (rc == 0) {
        future.complete(name);
    } else {
        future.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
    }
}, null);
String createdPath = future.join();
```

After this pr, we are able to unify synchronous api through calls to
asynchronous api as [review comments][review-comments] pointed out if we
feel there are too much identical code between synchronous and asynchronous
api.

[review-comments]: #2068 (comment)
  • Loading branch information
kezhuw committed May 29, 2024
1 parent 66202cb commit 4b686ed
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 192 deletions.
150 changes: 86 additions & 64 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ static class AuthData {
* If any request's response in not received in configured requestTimeout
* then it is assumed that the response packet is lost.
*/
private long requestTimeout;
private final long requestTimeout;

ZKWatchManager getWatcherManager() {
return watchManager;
Expand Down Expand Up @@ -286,6 +286,8 @@ static class Packet {

WatchDeregistration watchDeregistration;

long deadline = Long.MAX_VALUE;

/** Convenience ctor */
Packet(
RequestHeader requestHeader,
Expand Down Expand Up @@ -414,7 +416,12 @@ public ClientCnxn(

this.sendThread = new SendThread(clientCnxnSocket);
this.eventThread = new EventThread();
initRequestTimeout();
this.requestTimeout = clientConfig.getRequestTimeout();
LOG.info(
"{} value is {}. feature enabled={}",
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
requestTimeout,
requestTimeout > 0);
}

public void start() {
Expand Down Expand Up @@ -728,8 +735,6 @@ protected void finishPacket(Packet p) {
p.replyHeader.setErr(Code.OK.intValue());
}
}
} catch (KeeperException.NoWatcherException nwe) {
p.replyHeader.setErr(nwe.code().intValue());
} catch (KeeperException ke) {
p.replyHeader.setErr(ke.code().intValue());
}
Expand Down Expand Up @@ -765,21 +770,26 @@ protected void onConnecting(InetSocketAddress addr) {

}

private void conLossPacket(Packet p) {
private Code abortPacket(Packet p, Code cause) {
if (p.replyHeader == null) {
return;
return cause;
}
switch (state) {
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
case CLOSED:
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
default:
p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
case CLOSED:
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
default:
p.replyHeader.setErr(cause.intValue());
}
finishPacket(p);
return Code.CONNECTIONLOSS;
}

private void conLossPacket(Packet p) {
abortPacket(p, Code.CONNECTIONLOSS);
}

private volatile long lastZxid;
Expand Down Expand Up @@ -852,6 +862,8 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {

replyHdr.deserialize(bbia, "header");
switch (replyHdr.getXid()) {
case SET_WATCHES_XID:
return;
case PING_XID:
LOG.debug("Got ping response for session id: 0x{} after {}ms.",
Long.toHexString(sessionId),
Expand Down Expand Up @@ -1116,6 +1128,39 @@ private void logStartConnect(InetSocketAddress addr) {
}
}

private long requestDeadline() {
if (requestTimeout == 0) {
return Long.MAX_VALUE;
}

// The correctness of following code depends on several implementation details:
// 1. Polling of outgoingQueue happens only in SendThread.
// 2. Adding to pendingQueue happens only in SendThread.
//
// It is possible for netty socket to readResponse for first pendingQueue entry
// while we are checking deadline for the same entry. So, it is possible that
// a request was responded near deadline, but we disconnect the session. Given
// that we are dealing with timeout, this should not be much matter.
//
// In long term, we should sequence all pendingQueue operations to SendThread.

Packet p;
synchronized (pendingQueue) {
p = pendingQueue.peek();
}
if (p != null) {
return p.deadline;
}

for (Packet packet : outgoingQueue) {
if (packet.requestHeader != null && packet.requestHeader.getXid() >= 0) {
return packet.deadline;
}
}

return Long.MAX_VALUE;
}

@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
Expand Down Expand Up @@ -1192,6 +1237,14 @@ public void run() {
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
long deadline = requestDeadline();
if (deadline != Long.MAX_VALUE) {
long now = Time.currentElapsedTime();
if (now >= deadline) {
throw new KeeperException.RequestTimeoutException();
}
to = Integer.min(to, (int) (deadline - now));
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
Expand Down Expand Up @@ -1240,9 +1293,14 @@ public void run() {
serverAddress,
e);

Code cause = Code.CONNECTIONLOSS;
if (e instanceof KeeperException) {
cause = ((KeeperException) e).code();
}

// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanAndNotifyState();
cleanAndNotifyState(cause);
}
}
}
Expand All @@ -1268,8 +1326,8 @@ public void run() {
"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}

private void cleanAndNotifyState() {
cleanup();
private void cleanAndNotifyState(Code cause) {
cleanup(cause);
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
Expand Down Expand Up @@ -1328,10 +1386,14 @@ private void pingRwServer() throws RWServerFoundException {
}

private void cleanup() {
cleanup(Code.CONNECTIONLOSS);
}

private void cleanup(Code cause) {
clientCnxnSocket.cleanup();
synchronized (pendingQueue) {
for (Packet p : pendingQueue) {
conLossPacket(p);
cause = abortPacket(p, cause);
}
pendingQueue.clear();
}
Expand All @@ -1341,7 +1403,7 @@ private void cleanup() {
Iterator<Packet> iter = outgoingQueue.iterator();
while (iter.hasNext()) {
Packet p = iter.next();
conLossPacket(p);
cause = abortPacket(p, cause);
iter.remove();
}
}
Expand Down Expand Up @@ -1525,37 +1587,13 @@ public ReplyHeader submitRequest(
watchRegistration,
watchDeregistration);
synchronized (packet) {
if (requestTimeout > 0) {
// Wait for request completion with timeout
waitForPacketFinish(r, packet);
} else {
// Wait for request completion infinitely
while (!packet.finished) {
packet.wait();
}
while (!packet.finished) {
packet.wait();
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
sendThread.cleanAndNotifyState();
}
return r;
}

/**
* Wait for request completion with timeout.
*/
private void waitForPacketFinish(ReplyHeader r, Packet packet) throws InterruptedException {
long waitStartTime = Time.currentElapsedTime();
while (!packet.finished) {
packet.wait(requestTimeout);
if (!packet.finished && ((Time.currentElapsedTime() - waitStartTime) >= requestTimeout)) {
LOG.error("Timeout error occurred for the packet '{}'.", packet);
r.setErr(Code.REQUESTTIMEOUT.intValue());
break;
}
}
}

public void saslCompleted() {
sendThread.getClientCnxnSocket().saslCompleted();
}
Expand Down Expand Up @@ -1612,6 +1650,9 @@ public Packet queuePacket(
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
if (requestTimeout != 0 && h.getXid() >= 0) {
packet.deadline = Time.currentElapsedTime() + requestTimeout;
}
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
Expand Down Expand Up @@ -1669,25 +1710,6 @@ public LocalCallback(AsyncCallback cb, int rc, String path, Object ctx) {

}

private void initRequestTimeout() {
try {
requestTimeout = clientConfig.getLong(
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT);
LOG.info(
"{} value is {}. feature enabled={}",
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
requestTimeout,
requestTimeout > 0);
} catch (NumberFormatException e) {
LOG.error(
"Configured value {} for property {} can not be parsed to long.",
clientConfig.getProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT),
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT);
throw e;
}
}

public ZooKeeperSaslClient getZooKeeperSaslClient() {
return sendThread.getZooKeeperSaslClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
import org.apache.zookeeper.ClientCnxn.Packet;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -109,9 +108,7 @@ void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedExcepti
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != OpCode.ping)
&& (p.requestHeader.getType() != OpCode.auth)) {
if (p.requestHeader != null && p.requestHeader.getXid() >= 0) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
Expand All @@ -120,9 +117,7 @@ void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedExcepti
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
if (p.requestHeader != null && p.requestHeader.getXid() >= 0) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,7 @@ private void doWrite(Queue<Packet> pendingQueue, Packet p, ClientCnxn cnxn) thro
boolean anyPacketsSent = false;
while (true) {
if (p != WakeupPacket.getInstance()) {
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != ZooDefs.OpCode.ping)
&& (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
if (p.requestHeader != null && p.requestHeader.getXid() >= 0) {
p.requestHeader.setXid(cnxn.getXid());
synchronized (pendingQueue) {
pendingQueue.add(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,8 @@ static String getCodeMessage(Code code) {
return "Quota has exceeded";
case THROTTLEDOP:
return "Op throttled due to high load";
case REQUESTTIMEOUT:
return "Request timeout";
default:
return "Unknown error " + code;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.ZKConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Handles client specific properties
* @since 3.5.2
*/
@InterfaceAudience.Public
public class ZKClientConfig extends ZKConfig {
private static final Logger LOG = LoggerFactory.getLogger(ZKClientConfig.class);

public static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
public static final String ZK_SASL_CLIENT_USERNAME_DEFAULT = "zookeeper";
Expand Down Expand Up @@ -142,4 +145,18 @@ public long getLong(String key, long defaultValue) {
return defaultValue;
}

@InterfaceAudience.Private
public long getRequestTimeout() {
try {
return getLong(
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT);
} catch (NumberFormatException e) {
LOG.error(
"Configured value {} for property {} can not be parsed to long.",
getProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT),
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT);
throw e;
}
}
}
Loading

0 comments on commit 4b686ed

Please sign in to comment.