Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-4749: Respect zookeeper.request.timeout also for asynchronous api #2168

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 86 additions & 64 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ static class AuthData {
* If any request's response in not received in configured requestTimeout
* then it is assumed that the response packet is lost.
*/
private long requestTimeout;
private final long requestTimeout;

ZKWatchManager getWatcherManager() {
return watchManager;
Expand Down Expand Up @@ -286,6 +286,8 @@ static class Packet {

WatchDeregistration watchDeregistration;

long deadline = Long.MAX_VALUE;

/** Convenience ctor */
Packet(
RequestHeader requestHeader,
Expand Down Expand Up @@ -414,7 +416,12 @@ public ClientCnxn(

this.sendThread = new SendThread(clientCnxnSocket);
this.eventThread = new EventThread();
initRequestTimeout();
this.requestTimeout = clientConfig.getRequestTimeout();
LOG.info(
"{} value is {}. feature enabled={}",
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
requestTimeout,
requestTimeout > 0);
}

public void start() {
Expand Down Expand Up @@ -728,8 +735,6 @@ protected void finishPacket(Packet p) {
p.replyHeader.setErr(Code.OK.intValue());
}
}
} catch (KeeperException.NoWatcherException nwe) {
p.replyHeader.setErr(nwe.code().intValue());
} catch (KeeperException ke) {
p.replyHeader.setErr(ke.code().intValue());
}
Expand Down Expand Up @@ -765,21 +770,26 @@ protected void onConnecting(InetSocketAddress addr) {

}

private void conLossPacket(Packet p) {
private Code abortPacket(Packet p, Code cause) {
if (p.replyHeader == null) {
return;
return cause;
}
switch (state) {
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
case CLOSED:
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
default:
p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
case CLOSED:
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
default:
p.replyHeader.setErr(cause.intValue());
}
finishPacket(p);
return Code.CONNECTIONLOSS;
}

private void conLossPacket(Packet p) {
abortPacket(p, Code.CONNECTIONLOSS);
}

private volatile long lastZxid;
Expand Down Expand Up @@ -852,6 +862,8 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {

replyHdr.deserialize(bbia, "header");
switch (replyHdr.getXid()) {
case SET_WATCHES_XID:
return;
case PING_XID:
LOG.debug("Got ping response for session id: 0x{} after {}ms.",
Long.toHexString(sessionId),
Expand Down Expand Up @@ -1116,6 +1128,39 @@ private void logStartConnect(InetSocketAddress addr) {
}
}

private long requestDeadline() {
if (requestTimeout == 0) {
return Long.MAX_VALUE;
}

// The correctness of following code depends on several implementation details:
// 1. Polling of outgoingQueue happens only in SendThread.
// 2. Adding to pendingQueue happens only in SendThread.
//
// It is possible for netty socket to readResponse for first pendingQueue entry
// while we are checking deadline for the same entry. So, it is possible that
// a request was responded near deadline, but we disconnect the session. Given
// that we are dealing with timeout, this should not be much matter.
//
// In long term, we should sequence all pendingQueue operations to SendThread.

Packet p;
synchronized (pendingQueue) {
p = pendingQueue.peek();
}
if (p != null) {
return p.deadline;
}

for (Packet packet : outgoingQueue) {
if (packet.requestHeader != null && packet.requestHeader.getXid() >= 0) {
return packet.deadline;
}
}

return Long.MAX_VALUE;
}

@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
Expand Down Expand Up @@ -1192,6 +1237,14 @@ public void run() {
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
long deadline = requestDeadline();
if (deadline != Long.MAX_VALUE) {
long now = Time.currentElapsedTime();
if (now >= deadline) {
throw new KeeperException.RequestTimeoutException();
}
to = Integer.min(to, (int) (deadline - now));
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
Expand Down Expand Up @@ -1240,9 +1293,14 @@ public void run() {
serverAddress,
e);

Code cause = Code.CONNECTIONLOSS;
if (e instanceof KeeperException) {
cause = ((KeeperException) e).code();
}

// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanAndNotifyState();
cleanAndNotifyState(cause);
}
}
}
Expand All @@ -1268,8 +1326,8 @@ public void run() {
"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}

private void cleanAndNotifyState() {
cleanup();
private void cleanAndNotifyState(Code cause) {
cleanup(cause);
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
Expand Down Expand Up @@ -1328,10 +1386,14 @@ private void pingRwServer() throws RWServerFoundException {
}

private void cleanup() {
cleanup(Code.CONNECTIONLOSS);
}

private void cleanup(Code cause) {
clientCnxnSocket.cleanup();
synchronized (pendingQueue) {
for (Packet p : pendingQueue) {
conLossPacket(p);
cause = abortPacket(p, cause);
}
pendingQueue.clear();
}
Expand All @@ -1341,7 +1403,7 @@ private void cleanup() {
Iterator<Packet> iter = outgoingQueue.iterator();
while (iter.hasNext()) {
Packet p = iter.next();
conLossPacket(p);
cause = abortPacket(p, cause);
iter.remove();
}
}
Expand Down Expand Up @@ -1525,37 +1587,13 @@ public ReplyHeader submitRequest(
watchRegistration,
watchDeregistration);
synchronized (packet) {
if (requestTimeout > 0) {
// Wait for request completion with timeout
waitForPacketFinish(r, packet);
} else {
// Wait for request completion infinitely
while (!packet.finished) {
packet.wait();
}
while (!packet.finished) {
packet.wait();
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not guarantee Code.REQUESTTIMEOUT in returning as sendThread.cleanAndNotifyState() will reset r.err to Code.CONNECTIONLOSS. The old testing code is cheating us as it modified client side behavior.

sendThread.cleanAndNotifyState();
}
return r;
}

/**
* Wait for request completion with timeout.
*/
private void waitForPacketFinish(ReplyHeader r, Packet packet) throws InterruptedException {
long waitStartTime = Time.currentElapsedTime();
while (!packet.finished) {
packet.wait(requestTimeout);
if (!packet.finished && ((Time.currentElapsedTime() - waitStartTime) >= requestTimeout)) {
LOG.error("Timeout error occurred for the packet '{}'.", packet);
r.setErr(Code.REQUESTTIMEOUT.intValue());
break;
}
}
}

public void saslCompleted() {
sendThread.getClientCnxnSocket().saslCompleted();
}
Expand Down Expand Up @@ -1612,6 +1650,9 @@ public Packet queuePacket(
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
if (requestTimeout != 0 && h.getXid() >= 0) {
packet.deadline = Time.currentElapsedTime() + requestTimeout;
}
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
Expand Down Expand Up @@ -1669,25 +1710,6 @@ public LocalCallback(AsyncCallback cb, int rc, String path, Object ctx) {

}

private void initRequestTimeout() {
try {
requestTimeout = clientConfig.getLong(
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT);
LOG.info(
"{} value is {}. feature enabled={}",
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
requestTimeout,
requestTimeout > 0);
} catch (NumberFormatException e) {
LOG.error(
"Configured value {} for property {} can not be parsed to long.",
clientConfig.getProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT),
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT);
throw e;
}
}

public ZooKeeperSaslClient getZooKeeperSaslClient() {
return sendThread.getZooKeeperSaslClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
import org.apache.zookeeper.ClientCnxn.Packet;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -109,9 +108,7 @@ void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedExcepti
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != OpCode.ping)
&& (p.requestHeader.getType() != OpCode.auth)) {
if (p.requestHeader != null && p.requestHeader.getXid() >= 0) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
Expand All @@ -120,9 +117,7 @@ void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedExcepti
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
if (p.requestHeader != null && p.requestHeader.getXid() >= 0) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,7 @@ private void doWrite(Queue<Packet> pendingQueue, Packet p, ClientCnxn cnxn) thro
boolean anyPacketsSent = false;
while (true) {
if (p != WakeupPacket.getInstance()) {
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != ZooDefs.OpCode.ping)
&& (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
if (p.requestHeader != null && p.requestHeader.getXid() >= 0) {
p.requestHeader.setXid(cnxn.getXid());
synchronized (pendingQueue) {
pendingQueue.add(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,8 @@ static String getCodeMessage(Code code) {
return "Quota has exceeded";
case THROTTLEDOP:
return "Op throttled due to high load";
case REQUESTTIMEOUT:
return "Request timeout";
default:
return "Unknown error " + code;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.ZKConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Handles client specific properties
* @since 3.5.2
*/
@InterfaceAudience.Public
public class ZKClientConfig extends ZKConfig {
private static final Logger LOG = LoggerFactory.getLogger(ZKClientConfig.class);

public static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
public static final String ZK_SASL_CLIENT_USERNAME_DEFAULT = "zookeeper";
Expand Down Expand Up @@ -142,4 +145,18 @@ public long getLong(String key, long defaultValue) {
return defaultValue;
}

@InterfaceAudience.Private
public long getRequestTimeout() {
try {
return getLong(
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT);
} catch (NumberFormatException e) {
LOG.error(
"Configured value {} for property {} can not be parsed to long.",
getProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT),
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT);
throw e;
}
}
}
Loading
Loading