From dfe43b3bdc317a45df38961ee8e286a21cc2fe11 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Tue, 6 Oct 2020 15:32:25 -0700 Subject: [PATCH] Avoid caching presto worker worker nodes #462 --- .../qubole/rubix/core/CachingFileSystem.java | 4 +- .../rubix/core/utils/DummyClusterManager.java | 12 +- .../utils/DummyClusterManagerMultinode.java | 37 ++- .../core/utils/DockerTestClusterManager.java | 11 +- .../rubix/hadoop2/Hadoop2ClusterManager.java | 13 +- .../hadoop2/TestHadoop2ClusterManager.java | 44 +-- .../TestHadoop2ClusterManagerUtil.java | 8 +- .../rubix/presto/PrestoClusterManager.java | 15 +- .../rubix/presto/TestClusterManager.java | 13 +- .../rubix/prestosql/PrestoClusterManager.java | 275 ++--------------- .../rubix/prestosql/TestClusterManager.java | 67 ++-- .../rubix/prestosql/TestingNodeManager.java | 287 ++++++++++++++++++ .../qubole/rubix/spi/AsyncClusterManager.java | 86 ++++++ .../com/qubole/rubix/spi/ClusterManager.java | 65 +--- .../qubole/rubix/spi/SyncClusterManager.java | 54 ++++ 15 files changed, 590 insertions(+), 401 deletions(-) create mode 100644 rubix-prestosql/src/test/java/com/qubole/rubix/prestosql/TestingNodeManager.java create mode 100644 rubix-spi/src/main/java/com/qubole/rubix/spi/AsyncClusterManager.java create mode 100644 rubix-spi/src/main/java/com/qubole/rubix/spi/SyncClusterManager.java diff --git a/rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java b/rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java index 196d5823..785336f9 100644 --- a/rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java +++ b/rubix-core/src/main/java/com/qubole/rubix/core/CachingFileSystem.java @@ -48,7 +48,7 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.net.URI; -import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Preconditions.checkState; @@ -401,7 +401,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l return fs.getFileBlockLocations(file, start, len); } - List nodes = clusterManager.getNodes(); + Set nodes = clusterManager.getNodes(); if (nodes == null) { return fs.getFileBlockLocations(file, start, len); diff --git a/rubix-core/src/main/java/com/qubole/rubix/core/utils/DummyClusterManager.java b/rubix-core/src/main/java/com/qubole/rubix/core/utils/DummyClusterManager.java index dfbd49ca..c8afa812 100644 --- a/rubix-core/src/main/java/com/qubole/rubix/core/utils/DummyClusterManager.java +++ b/rubix-core/src/main/java/com/qubole/rubix/core/utils/DummyClusterManager.java @@ -12,23 +12,25 @@ */ package com.qubole.rubix.core.utils; -import com.qubole.rubix.spi.ClusterManager; +import com.qubole.rubix.spi.AsyncClusterManager; import com.qubole.rubix.spi.ClusterType; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * Created by Abhishek on 6/8/18. */ -public class DummyClusterManager extends ClusterManager +public class DummyClusterManager extends AsyncClusterManager { @Override - public List getNodesInternal() + public Set getNodesInternal() { - List list = new ArrayList(); + Set list = new HashSet<>(); String hostName = ""; try { hostName = InetAddress.getLocalHost().getCanonicalHostName(); @@ -51,6 +53,6 @@ public ClusterType getClusterType() @Override public String getCurrentNodeName() { - return getNodes().get(0); + return getNodes().iterator().next(); } } diff --git a/rubix-core/src/main/java/com/qubole/rubix/core/utils/DummyClusterManagerMultinode.java b/rubix-core/src/main/java/com/qubole/rubix/core/utils/DummyClusterManagerMultinode.java index eb82dec5..d529ec74 100644 --- a/rubix-core/src/main/java/com/qubole/rubix/core/utils/DummyClusterManagerMultinode.java +++ b/rubix-core/src/main/java/com/qubole/rubix/core/utils/DummyClusterManagerMultinode.java @@ -12,32 +12,41 @@ */ package com.qubole.rubix.core.utils; -import com.qubole.rubix.spi.ClusterManager; +import com.qubole.rubix.spi.AsyncClusterManager; import com.qubole.rubix.spi.ClusterType; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; -public class DummyClusterManagerMultinode extends ClusterManager +public class DummyClusterManagerMultinode extends AsyncClusterManager { - @Override - public List getNodesInternal() + private final String currentNode; + private final String otherNode; + private final Set nodes = new HashSet<>(); + + public DummyClusterManagerMultinode() { - List list = new ArrayList(); - String hostName = ""; + String currentNode; try { - hostName = InetAddress.getLocalHost().getCanonicalHostName(); + currentNode = InetAddress.getLocalHost().getCanonicalHostName(); } catch (UnknownHostException e) { - hostName = "localhost"; + currentNode = "localhost"; } + this.currentNode = currentNode; + nodes.add(currentNode); + this.otherNode = currentNode + "_copy"; + nodes.add(otherNode); + } - list.add(hostName); - list.add(hostName + "_copy"); - - return list; + @Override + public Set getNodesInternal() + { + return nodes; } @Override @@ -49,11 +58,11 @@ public ClusterType getClusterType() @Override public String getCurrentNodeName() { - return getNodes().get(0); + return currentNode; } public String locateKey(String key) { - return getNodes().get(1); + return otherNode; } } diff --git a/rubix-core/src/test/java/com/qubole/rubix/core/utils/DockerTestClusterManager.java b/rubix-core/src/test/java/com/qubole/rubix/core/utils/DockerTestClusterManager.java index 5d686f6c..aa33f726 100644 --- a/rubix-core/src/test/java/com/qubole/rubix/core/utils/DockerTestClusterManager.java +++ b/rubix-core/src/test/java/com/qubole/rubix/core/utils/DockerTestClusterManager.java @@ -13,17 +13,18 @@ package com.qubole.rubix.core.utils; import com.google.common.collect.Lists; -import com.qubole.rubix.spi.ClusterManager; +import com.google.common.collect.Sets; +import com.qubole.rubix.spi.AsyncClusterManager; import com.qubole.rubix.spi.ClusterType; -import java.util.List; +import java.util.Set; -public class DockerTestClusterManager extends ClusterManager +public class DockerTestClusterManager extends AsyncClusterManager { @Override - public List getNodesInternal() + public Set getNodesInternal() { - return Lists.newArrayList("172.18.8.1", "172.18.8.2"); + return Sets.newHashSet("172.18.8.1", "172.18.8.2"); } @Override diff --git a/rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/Hadoop2ClusterManager.java b/rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/Hadoop2ClusterManager.java index 708b565b..a0b8c849 100644 --- a/rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/Hadoop2ClusterManager.java +++ b/rubix-hadoop2/src/main/java/com/qubole/rubix/hadoop2/Hadoop2ClusterManager.java @@ -12,10 +12,12 @@ */ package com.qubole.rubix.hadoop2; +import com.google.api.client.util.Sets; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; -import com.qubole.rubix.spi.ClusterManager; +import com.qubole.rubix.spi.AsyncClusterManager; import com.qubole.rubix.spi.ClusterType; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,7 +32,7 @@ /** * Created by sakshia on 28/7/16. */ -public class Hadoop2ClusterManager extends ClusterManager +public class Hadoop2ClusterManager extends AsyncClusterManager { YarnConfiguration yconf; private Log log = LogFactory.getLog(Hadoop2ClusterManager.class); @@ -44,7 +46,7 @@ public void initialize(Configuration conf) } @Override - public List getNodesInternal() + public Set getNodesInternal() { try { List allNodes = Hadoop2ClusterManagerUtil.getAllNodes(yconf); @@ -53,7 +55,7 @@ public List getNodesInternal() } if (allNodes.isEmpty()) { - return ImmutableList.of(); + return ImmutableSet.of(); } Set hosts = new HashSet<>(); @@ -70,8 +72,7 @@ public List getNodesInternal() throw new Exception("No healthy data nodes found."); } - List hostList = Lists.newArrayList(hosts.toArray(new String[0])); - return hostList; + return ImmutableSet.copyOf(hosts); } catch (Exception e) { throw Throwables.propagate(e); diff --git a/rubix-hadoop2/src/test/java/com/qubole/rubix/hadoop2/TestHadoop2ClusterManager.java b/rubix-hadoop2/src/test/java/com/qubole/rubix/hadoop2/TestHadoop2ClusterManager.java index 81ec7175..47a25621 100644 --- a/rubix-hadoop2/src/test/java/com/qubole/rubix/hadoop2/TestHadoop2ClusterManager.java +++ b/rubix-hadoop2/src/test/java/com/qubole/rubix/hadoop2/TestHadoop2ClusterManager.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.List; +import java.util.Set; import static org.testng.Assert.assertTrue; @@ -66,13 +66,13 @@ static ClusterManager buildClusterManager() public void testGetNodes_multipleWorkers() throws IOException { - final List nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( + final Set nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT, worker.new MultipleRunningWorkers(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER); assertTrue(nodeHostnames.size() == 2, "Should only have two nodes"); - assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) && - nodeHostnames.get(1).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); + assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) && + nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); } @Test @@ -82,12 +82,12 @@ public void testGetNodes_multipleWorkers() public void testGetNodes_oneWorker() throws IOException { - final List nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( + final Set nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT, worker.new OneRunningWorker(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER); assertTrue(nodeHostnames.size() == 1, "Should only have one node"); - assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1)); + assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1)); } @Test @@ -97,13 +97,13 @@ public void testGetNodes_oneWorker() public void testGetNodes_oneNewWorker() throws IOException { - final List nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( + final Set nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT, worker.new MultipleWorkersOneNew(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER); assertTrue(nodeHostnames.size() == 2, "Should only have two nodes"); - assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) && - nodeHostnames.get(1).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); + assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) && + nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); } @Test @@ -113,13 +113,13 @@ public void testGetNodes_oneNewWorker() public void testGetNodes_oneRebootedWorker() throws IOException { - final List nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( + final Set nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT, worker.new MultipleWorkersOneRebooted(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER); assertTrue(nodeHostnames.size() == 2, "Should only have two nodes"); - assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) && - nodeHostnames.get(1).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); + assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) && + nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); } @Test @@ -129,12 +129,12 @@ public void testGetNodes_oneRebootedWorker() public void testMasterOnlyCluster() throws IOException { - final List nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( + final Set nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT, worker.new NoWorkers(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER); assertTrue(nodeHostnames.size() == 1, "Should have added localhost in list"); - assertTrue(nodeHostnames.get(0).equals(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname"); + assertTrue(nodeHostnames.contains(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname"); } @Test @@ -144,12 +144,12 @@ public void testMasterOnlyCluster() public void testUnhealthyNodeCluster_decommissioned() throws IOException { - final List nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( + final Set nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT, worker.new MultipleWorkersOneDecommissioned(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER); assertTrue(nodeHostnames.size() == 1, "Should only have one node"); - assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); + assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); } @Test @@ -159,12 +159,12 @@ public void testUnhealthyNodeCluster_decommissioned() public void testUnhealthyNodeCluster_decommissioning() throws IOException { - final List nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( + final Set nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT, worker.new MultipleWorkersOneDecommissioning(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER); assertTrue(nodeHostnames.size() == 1, "Should only have one node"); - assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); + assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); } @Test @@ -174,12 +174,12 @@ public void testUnhealthyNodeCluster_decommissioning() public void testUnhealthyNodeCluster_lost() throws IOException { - final List nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( + final Set nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT, worker.new MultipleWorkersOneLost(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER); assertTrue(nodeHostnames.size() == 1, "Should only have one node"); - assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); + assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); } @Test @@ -189,12 +189,12 @@ public void testUnhealthyNodeCluster_lost() public void testUnhealthyNodeCluster_unhealthy() throws IOException { - final List nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( + final Set nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster( TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT, worker.new MultipleWorkersOneUnhealthy(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER); assertTrue(nodeHostnames.size() == 1, "Should only have one node"); - assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); + assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data"); } @Test diff --git a/rubix-hadoop2/src/test/java/com/qubole/rubix/hadoop2/TestHadoop2ClusterManagerUtil.java b/rubix-hadoop2/src/test/java/com/qubole/rubix/hadoop2/TestHadoop2ClusterManagerUtil.java index b5a04460..c28f6863 100644 --- a/rubix-hadoop2/src/test/java/com/qubole/rubix/hadoop2/TestHadoop2ClusterManagerUtil.java +++ b/rubix-hadoop2/src/test/java/com/qubole/rubix/hadoop2/TestHadoop2ClusterManagerUtil.java @@ -84,7 +84,7 @@ static HttpServer createServer(String endpoint, HttpHandler handler) * @return A list of hostnames for the nodes in the cluster. * @throws IOException if the cluster server could not be created. */ - static List getNodeHostnamesFromCluster(String endpoint, HttpHandler responseHandler, + static Set getNodeHostnamesFromCluster(String endpoint, HttpHandler responseHandler, Configuration conf, ClusterType clusterType) throws IOException { @@ -93,7 +93,7 @@ static List getNodeHostnamesFromCluster(String endpoint, HttpHandler res ClusterManager clusterManager = getClusterManagerInstance(clusterType, conf); clusterManager.initialize(conf); - final List nodes = clusterManager.getNodes(); + final Set nodes = clusterManager.getNodes(); log.info("Got nodes: " + nodes); server.stop(0); @@ -157,10 +157,10 @@ static int matchMemberships(TestWorker prevWorker, TestWorker newWorker, Set nodeHostnames1 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, prevWorker, conf, clusterType); + final Set nodeHostnames1 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, prevWorker, conf, clusterType); Map keyMembership1 = getConsistentHashedMembership(prevWorker, keys, conf, clusterType); - final List nodeHostnames2 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, newWorker, conf, clusterType); + final Set nodeHostnames2 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, newWorker, conf, clusterType); Map keyMembership2 = getConsistentHashedMembership(newWorker, keys, conf, clusterType); int match = 0; diff --git a/rubix-presto/src/main/java/com/qubole/rubix/presto/PrestoClusterManager.java b/rubix-presto/src/main/java/com/qubole/rubix/presto/PrestoClusterManager.java index 3b7e6ad0..820da6df 100644 --- a/rubix-presto/src/main/java/com/qubole/rubix/presto/PrestoClusterManager.java +++ b/rubix-presto/src/main/java/com/qubole/rubix/presto/PrestoClusterManager.java @@ -14,11 +14,12 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.qubole.rubix.common.utils.ClusterUtil; -import com.qubole.rubix.spi.ClusterManager; +import com.qubole.rubix.spi.AsyncClusterManager; import com.qubole.rubix.spi.ClusterType; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,7 +44,7 @@ /** * Created by stagra on 14/1/16. */ -public class PrestoClusterManager extends ClusterManager +public class PrestoClusterManager extends AsyncClusterManager { private static final String DEFAULT_USER = "rubix"; private int serverPort = 8081; @@ -66,7 +67,7 @@ public void initialize(Configuration conf) } @Override - public List getNodesInternal() + public Set getNodesInternal() { try { URL allNodesRequest = getNodeUrl(); @@ -141,11 +142,7 @@ public List getNodesInternal() List allNodes = gson.fromJson(allResponse.toString(), type); List failedNodes = gson.fromJson(failedResponse.toString(), type); if (allNodes.isEmpty()) { - return ImmutableList.of(); - } - - if (failedNodes.isEmpty()) { - failedNodes = ImmutableList.of(); + return ImmutableSet.of(); } // keep only the healthy nodes @@ -156,7 +153,7 @@ public List getNodesInternal() for (Stats node : allNodes) { hosts.add(node.getUri().getHost()); } - return Lists.newArrayList(hosts); + return ImmutableSet.copyOf(hosts); } catch (IOException e) { throw Throwables.propagate(e); diff --git a/rubix-presto/src/test/java/com/qubole/rubix/presto/TestClusterManager.java b/rubix-presto/src/test/java/com/qubole/rubix/presto/TestClusterManager.java index f90abb72..f06d4a31 100644 --- a/rubix-presto/src/test/java/com/qubole/rubix/presto/TestClusterManager.java +++ b/rubix-presto/src/test/java/com/qubole/rubix/presto/TestClusterManager.java @@ -28,6 +28,7 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.List; +import java.util.Set; import static org.testng.Assert.assertTrue; @@ -52,11 +53,11 @@ public void testGetNodes() log.info("STARTED SERVER"); ClusterManager clusterManager = getPrestoClusterManager(); - List nodes = clusterManager.getNodes(); + Set nodes = clusterManager.getNodes(); log.info("Got nodes: " + nodes); assertTrue(nodes.size() == 2, "Should only have two nodes"); - assertTrue(nodes.get(0).equals("192.168.1.3") && nodes.get(1).equals("192.168.2.252"), "Wrong nodes data"); + assertTrue(nodes.contains("192.168.1.3") && nodes.contains("192.168.2.252"), "Wrong nodes data"); server.stop(0); } @@ -73,11 +74,11 @@ public void testMasterOnlyCluster() log.info("STARTED SERVER"); ClusterManager clusterManager = getPrestoClusterManager(); - List nodes = clusterManager.getNodes(); + Set nodes = clusterManager.getNodes(); log.info("Got nodes: " + nodes); assertTrue(nodes.size() == 1, "Should have added localhost in list"); - assertTrue(nodes.get(0).equals(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname"); + assertTrue(nodes.contains(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname"); server.stop(0); } @@ -93,11 +94,11 @@ public void testFailedNodeCluster() log.info("STARTED SERVER"); ClusterManager clusterManager = getPrestoClusterManager(); - List nodes = clusterManager.getNodes(); + Set nodes = clusterManager.getNodes(); log.info("Got nodes: " + nodes); assertTrue(nodes.size() == 1, "Should only have two nodes"); - assertTrue(nodes.get(0).equals("192.168.2.252"), "Wrong nodes data"); + assertTrue(nodes.contains("192.168.2.252"), "Wrong nodes data"); server.stop(0); } diff --git a/rubix-prestosql/src/main/java/com/qubole/rubix/prestosql/PrestoClusterManager.java b/rubix-prestosql/src/main/java/com/qubole/rubix/prestosql/PrestoClusterManager.java index ad203a7a..61016cc6 100644 --- a/rubix-prestosql/src/main/java/com/qubole/rubix/prestosql/PrestoClusterManager.java +++ b/rubix-prestosql/src/main/java/com/qubole/rubix/prestosql/PrestoClusterManager.java @@ -12,34 +12,14 @@ */ package com.qubole.rubix.prestosql; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.reflect.TypeToken; -import com.google.gson.Gson; -import com.qubole.rubix.common.utils.ClusterUtil; -import com.qubole.rubix.spi.ClusterManager; import com.qubole.rubix.spi.ClusterType; +import com.qubole.rubix.spi.SyncClusterManager; import io.prestosql.spi.Node; import io.prestosql.spi.NodeManager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import javax.annotation.Nullable; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.lang.reflect.Type; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URL; import java.net.UnknownHostException; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -48,170 +28,51 @@ /** * Created by stagra on 14/1/16. */ -public class PrestoClusterManager extends ClusterManager +public class PrestoClusterManager extends SyncClusterManager { - private static final String DEFAULT_USER = "rubix"; - private int serverPort = 8081; - private String serverAddress = "localhost"; - - private Log log = LogFactory.getLog(PrestoClusterManager.class); - - @Nullable + private static Log log = LogFactory.getLog(PrestoClusterManager.class); private static volatile NodeManager nodeManager; - public static String serverPortConf = "caching.fs.presto-server-port"; + private volatile Set workerNodes; - // Safe to use single instance of HttpClient since Supplier.get() provides synchronization - @Override - public void initialize(Configuration conf) - throws UnknownHostException + public static void setNodeManager(NodeManager nodeManager) { - super.initialize(conf); - this.serverPort = conf.getInt(serverPortConf, serverPort); - this.serverAddress = ClusterUtil.getMasterHostname(conf); + PrestoClusterManager.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); } @Override - public List getNodesInternal() - { - if (nodeManager != null) { - return getNodesFromNodeManager(); - } - - try { - URL allNodesRequest = getNodeUrl(); - URL failedNodesRequest = getFailedNodeUrl(); - - HttpURLConnection allHttpCon = getHttpURLConnection(allNodesRequest); - - int allNodesResponseCode = allHttpCon.getResponseCode(); - - StringBuilder allResponse = new StringBuilder(); - StringBuilder failedResponse = new StringBuilder(); - try { - if (allNodesResponseCode == HttpURLConnection.HTTP_OK) { - BufferedReader in = new BufferedReader(new InputStreamReader(allHttpCon.getInputStream())); - String inputLine = ""; - try { - while ((inputLine = in.readLine()) != null) { - allResponse.append(inputLine); - } - } - catch (IOException e) { - throw new IOException(e); - } - finally { - in.close(); - } - } - else { - log.warn("v1/node failed with code: " + allNodesResponseCode); - return null; - } - } - catch (IOException e) { - throw new IOException(e); - } - finally { - allHttpCon.disconnect(); - } - - HttpURLConnection failHttpConn = getHttpURLConnection(failedNodesRequest); - int failedNodesResponseCode = failHttpConn.getResponseCode(); - // check on failed nodes - try { - if (failedNodesResponseCode == HttpURLConnection.HTTP_OK) { - BufferedReader in = new BufferedReader(new InputStreamReader(failHttpConn.getInputStream())); - String inputLine; - try { - while ((inputLine = in.readLine()) != null) { - failedResponse.append(inputLine); - } - } - catch (IOException e) { - throw new IOException(e); - } - finally { - in.close(); - } - } - } - catch (IOException e) { - throw new IOException(e); - } - finally { - failHttpConn.disconnect(); - } - - Gson gson = new Gson(); - Type type = new TypeToken>() - { - }.getType(); - - List allNodes = gson.fromJson(allResponse.toString(), type); - List failedNodes = gson.fromJson(failedResponse.toString(), type); - - if (failedNodes.isEmpty()) { - failedNodes = ImmutableList.of(); - } - - // keep only the healthy nodes - allNodes.removeAll(failedNodes); - - Set hosts = new HashSet(); - for (Stats node : allNodes) { - hosts.add(node.getUri().getHost()); - } - - return Lists.newArrayList(hosts.toArray(new String[0])); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - - private HttpURLConnection getHttpURLConnection(URL urlRequest) - throws IOException - { - requireNonNull(urlRequest, "urlRequest is null"); - HttpURLConnection allHttpCon = (HttpURLConnection) urlRequest.openConnection(); - allHttpCon.setConnectTimeout(500); //ms - allHttpCon.setRequestMethod("GET"); - allHttpCon.setRequestProperty("X-Presto-User", DEFAULT_USER); - return allHttpCon; + protected boolean hasStateChanged() { + requireNonNull(nodeManager, "nodeManager is null"); + Set workerNodes = nodeManager.getWorkerNodes(); + boolean hasChanged = !workerNodes.equals(this.workerNodes); + this.workerNodes = workerNodes; + return hasChanged; } - private List getNodesFromNodeManager() + @Override + public Set getNodesInternal() { requireNonNull(nodeManager, "nodeManager is null"); - List workers = nodeManager.getWorkerNodes().stream() - .filter(node -> !node.isCoordinator()) - .map(Node::getHost) - .collect(Collectors.toList()); - - return workers; + return nodeManager.getWorkerNodes().stream() + .map(Node::getHost) + .collect(Collectors.toSet()); } @Override protected String getCurrentNodeHostname() { - if (nodeManager != null) { - return nodeManager.getCurrentNode().getHost(); - } - - return super.getCurrentNodeHostname(); + requireNonNull(nodeManager, "nodeManager is null"); + return nodeManager.getCurrentNode().getHost(); } @Override protected String getCurrentNodeHostAddress() { - if (nodeManager != null) { - try { - return nodeManager.getCurrentNode().getHostAndPort().toInetAddress().getHostAddress(); - } - catch (UnknownHostException e) { - log.warn("Could not get HostAddress from NodeManager", e); - // fallback - } + requireNonNull(nodeManager, "nodeManager is null"); + try { + return nodeManager.getCurrentNode().getHostAndPort().toInetAddress().getHostAddress(); + } + catch (UnknownHostException e) { + log.warn("Could not get HostAddress from NodeManager", e); } return super.getCurrentNodeHostAddress(); @@ -222,90 +83,4 @@ public ClusterType getClusterType() { return ClusterType.PRESTOSQL_CLUSTER_MANAGER; } - - public static void setPrestoServerPort(Configuration conf, int port) - { - conf.setInt(serverPortConf, port); - } - - public static void setNodeManager(NodeManager nodeManager) - { - PrestoClusterManager.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); - } - - private URL getNodeUrl() - throws MalformedURLException - { - return new URL("http://" + serverAddress + ":" + serverPort + "/v1/node"); - } - - private URL getFailedNodeUrl() - throws MalformedURLException - { - return new URL("http://" + serverAddress + ":" + serverPort + "/v1/node/failed"); - } - - public static class Stats - { - URI uri; - String lastResponseTime; - - public Stats() - { - } - - public Stats(URI uri, String lastResponseTime) - { - this.uri = uri; - this.lastResponseTime = lastResponseTime; - } - - public URI getUri() - { - return uri; - } - - public void setURI(URI uri) - { - this.uri = uri; - } - - String getLastResponseTime() - { - return lastResponseTime; - } - - public void setLastResponseTime(String lastResponseTime) - { - this.lastResponseTime = lastResponseTime; - } - - @Override - public boolean equals(Object other) - { - if (this == other) { - return true; - } - if (other == null || getClass() != other.getClass()) { - return false; - } - Stats o = (Stats) other; - - if (!uri.equals(o.getUri())) { - return false; - } - - if (lastResponseTime != null && o.getLastResponseTime() != null) { - return lastResponseTime.equals(o.getLastResponseTime()); - } - - return lastResponseTime == o.getLastResponseTime(); - } - - @Override - public int hashCode() - { - return Objects.hash(uri, lastResponseTime); - } - } } diff --git a/rubix-prestosql/src/test/java/com/qubole/rubix/prestosql/TestClusterManager.java b/rubix-prestosql/src/test/java/com/qubole/rubix/prestosql/TestClusterManager.java index 013ae4b5..1ccc6c9c 100644 --- a/rubix-prestosql/src/test/java/com/qubole/rubix/prestosql/TestClusterManager.java +++ b/rubix-prestosql/src/test/java/com/qubole/rubix/prestosql/TestClusterManager.java @@ -12,7 +12,6 @@ */ package com.qubole.rubix.prestosql; -import com.qubole.rubix.spi.CacheConfig; import com.qubole.rubix.spi.ClusterManager; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; @@ -26,8 +25,9 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.URI; import java.net.UnknownHostException; -import java.util.List; +import java.util.Set; import static org.testng.Assert.assertTrue; @@ -49,16 +49,19 @@ public void testGetNodes() { HttpServer server = createServer("/v1/node", new MultipleWorkers(), "/v1/node/failed", new NoFailedNode()); - log.info("STARTED SERVER"); + try { + log.info("STARTED SERVER"); - ClusterManager clusterManager = getPrestoClusterManager(); - List nodes = clusterManager.getNodes(); - log.info("Got nodes: " + nodes); + ClusterManager clusterManager = getPrestoClusterManager(); + Set nodes = clusterManager.getNodes(); + log.info("Got nodes: " + nodes); - assertTrue(nodes.size() == 2, "Should only have two nodes"); - assertTrue(nodes.get(0).equals("192.168.1.3") && nodes.get(1).equals("192.168.2.252"), "Wrong nodes data"); - - server.stop(0); + assertTrue(nodes.size() == 2, "Should only have two nodes"); + assertTrue(nodes.contains("192.168.1.3") && nodes.contains("192.168.2.252"), "Wrong nodes data"); + } + finally { + server.stop(0); + } } @Test @@ -70,15 +73,20 @@ public void testMasterOnlyCluster() { HttpServer server = createServer("/v1/node", new NoWorker(), "/v1/node/failed", new NoFailedNode()); - log.info("STARTED SERVER"); + try { + log.info("STARTED SERVER"); - ClusterManager clusterManager = getPrestoClusterManager(); - List nodes = clusterManager.getNodes(); - log.info("Got nodes: " + nodes); + ClusterManager clusterManager = getPrestoClusterManager(); + Set nodes = clusterManager.getNodes(); + log.info("Got nodes: " + nodes); + log.info(" Host address: " + InetAddress.getLocalHost().getHostAddress()); - assertTrue(nodes.size() == 1, "Should have added localhost in list"); - assertTrue(nodes.get(0).equals(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname"); - server.stop(0); + assertTrue(nodes.size() == 1, "Should have added localhost in list"); + assertTrue(nodes.contains(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname"); + } + finally { + server.stop(0); + } } @Test @@ -89,17 +97,19 @@ public void testFailedNodeCluster() throws IOException { HttpServer server = createServer("/v1/node", new MultipleWorkers(), "/v1/node/failed", new OneFailedNode()); + try { + log.info("STARTED SERVER"); - log.info("STARTED SERVER"); + ClusterManager clusterManager = getPrestoClusterManager(); + Set nodes = clusterManager.getNodes(); + log.info("Got nodes: " + nodes); - ClusterManager clusterManager = getPrestoClusterManager(); - List nodes = clusterManager.getNodes(); - log.info("Got nodes: " + nodes); - - assertTrue(nodes.size() == 1, "Should only have two nodes"); - assertTrue(nodes.get(0).equals("192.168.2.252"), "Wrong nodes data"); - - server.stop(0); + assertTrue(nodes.size() == 1, "Should only have two nodes"); + assertTrue(nodes.contains("192.168.2.252"), "Wrong nodes data"); + } + finally { + server.stop(0); + } } private HttpServer createServer(String endpoint1, HttpHandler handler1, String endpoint2, HttpHandler handler2) @@ -116,10 +126,11 @@ private HttpServer createServer(String endpoint1, HttpHandler handler1, String e private ClusterManager getPrestoClusterManager() throws UnknownHostException { - ClusterManager clusterManager = new PrestoClusterManager(); + PrestoClusterManager clusterManager = new PrestoClusterManager(); Configuration conf = new Configuration(); - conf.setInt(PrestoClusterManager.serverPortConf, 45326); + conf.setInt(TestingNodeManager.SERVER_PORT_CONF_KEY, 45326); clusterManager.initialize(conf); + PrestoClusterManager.setNodeManager(new TestingNodeManager(conf, new TestingNodeManager.TestingNode(URI.create("http://" + InetAddress.getLocalHost().getHostAddress())))); return clusterManager; } diff --git a/rubix-prestosql/src/test/java/com/qubole/rubix/prestosql/TestingNodeManager.java b/rubix-prestosql/src/test/java/com/qubole/rubix/prestosql/TestingNodeManager.java new file mode 100644 index 00000000..add70a75 --- /dev/null +++ b/rubix-prestosql/src/test/java/com/qubole/rubix/prestosql/TestingNodeManager.java @@ -0,0 +1,287 @@ +/** + * Copyright (c) 2019. Qubole Inc + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package com.qubole.rubix.prestosql; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.qubole.rubix.common.utils.ClusterUtil; +import io.prestosql.spi.HostAddress; +import io.prestosql.spi.Node; +import io.prestosql.spi.NodeManager; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.net.*; +import java.util.*; + +import static java.util.Objects.requireNonNull; + +public class TestingNodeManager implements NodeManager { + private static Log LOG = LogFactory.getLog(TestingNodeManager.class); + private static final int DEFAULT_SERVER_PORT = 8081; + private static final String DEFAULT_USER = "rubix"; + public static final String SERVER_PORT_CONF_KEY = "caching.fs.presto-server-port"; + + private final String serverAddress; + private final Node currentNode; + private final int serverPort; + + public TestingNodeManager(Configuration conf, Node currentNode) + { + this.serverPort = conf.getInt(SERVER_PORT_CONF_KEY, DEFAULT_SERVER_PORT); + this.serverAddress = ClusterUtil.getMasterHostname(conf); + this.currentNode = currentNode; + } + + @Override + public Set getAllNodes() { + return getWorkerNodes(); + } + + @Override + public Set getWorkerNodes() { + try { + URL allNodesRequest = getNodeUrl(); + URL failedNodesRequest = getFailedNodeUrl(); + + HttpURLConnection allHttpCon = getHttpURLConnection(allNodesRequest); + + int allNodesResponseCode = allHttpCon.getResponseCode(); + + StringBuilder allResponse = new StringBuilder(); + StringBuilder failedResponse = new StringBuilder(); + try { + if (allNodesResponseCode == HttpURLConnection.HTTP_OK) { + BufferedReader in = new BufferedReader(new InputStreamReader(allHttpCon.getInputStream())); + String inputLine = ""; + try { + while ((inputLine = in.readLine()) != null) { + allResponse.append(inputLine); + } + } + catch (IOException e) { + throw new IOException(e); + } + finally { + in.close(); + } + } + else { + LOG.warn("v1/node failed with code: " + allNodesResponseCode); + return null; + } + } + catch (IOException e) { + throw new IOException(e); + } + finally { + allHttpCon.disconnect(); + } + + HttpURLConnection failHttpConn = getHttpURLConnection(failedNodesRequest); + int failedNodesResponseCode = failHttpConn.getResponseCode(); + // check on failed nodes + try { + if (failedNodesResponseCode == HttpURLConnection.HTTP_OK) { + BufferedReader in = new BufferedReader(new InputStreamReader(failHttpConn.getInputStream())); + String inputLine; + try { + while ((inputLine = in.readLine()) != null) { + failedResponse.append(inputLine); + } + } + catch (IOException e) { + throw new IOException(e); + } + finally { + in.close(); + } + } + } + catch (IOException e) { + throw new IOException(e); + } + finally { + failHttpConn.disconnect(); + } + + Gson gson = new Gson(); + Type type = new TypeToken>() + { + }.getType(); + + List allNodes = gson.fromJson(allResponse.toString(), type); + List failedNodes = gson.fromJson(failedResponse.toString(), type); + + if (failedNodes.isEmpty()) { + failedNodes = ImmutableList.of(); + } + + // keep only the healthy nodes + allNodes.removeAll(failedNodes); + + Set hosts = new HashSet(); + for (Stats node : allNodes) { + hosts.add(new TestingNode(node.getUri())); + } + + return hosts; + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Override + public Node getCurrentNode() { + return currentNode; + } + + @Override + public String getEnvironment() { + return "testenv"; + } + + private HttpURLConnection getHttpURLConnection(URL urlRequest) + throws IOException + { + requireNonNull(urlRequest, "urlRequest is null"); + HttpURLConnection allHttpCon = (HttpURLConnection) urlRequest.openConnection(); + allHttpCon.setConnectTimeout(500); //ms + allHttpCon.setRequestMethod("GET"); + allHttpCon.setRequestProperty("X-Presto-User", DEFAULT_USER); + return allHttpCon; + } + + private URL getNodeUrl() + throws MalformedURLException + { + return new URL("http://" + serverAddress + ":" + serverPort + "/v1/node"); + } + + private URL getFailedNodeUrl() + throws MalformedURLException + { + return new URL("http://" + serverAddress + ":" + serverPort + "/v1/node/failed"); + } + + public static class TestingNode implements Node + { + private final URI uri; + + public TestingNode(URI uri) { + this.uri = uri; + } + + @Override + public String getHost() { + return uri.getHost(); + } + + @Override + public HostAddress getHostAndPort() { + return HostAddress.fromUri(uri); + } + + @Override + public URI getHttpUri() { + return uri; + } + + @Override + public String getNodeIdentifier() { + return uri.toString(); + } + + @Override + public String getVersion() { + return ""; + } + + @Override + public boolean isCoordinator() { + return false; + } + } + + private static class Stats + { + URI uri; + String lastResponseTime; + + public Stats() + { + } + + public Stats(URI uri, String lastResponseTime) + { + this.uri = uri; + this.lastResponseTime = lastResponseTime; + } + + public URI getUri() + { + return uri; + } + + public void setURI(URI uri) + { + this.uri = uri; + } + + String getLastResponseTime() + { + return lastResponseTime; + } + + public void setLastResponseTime(String lastResponseTime) + { + this.lastResponseTime = lastResponseTime; + } + + @Override + public boolean equals(Object other) + { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + Stats o = (Stats) other; + + if (!uri.equals(o.getUri())) { + return false; + } + + if (lastResponseTime != null && o.getLastResponseTime() != null) { + return lastResponseTime.equals(o.getLastResponseTime()); + } + + return lastResponseTime == o.getLastResponseTime(); + } + + @Override + public int hashCode() + { + return Objects.hash(uri, lastResponseTime); + } + } +} diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/AsyncClusterManager.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/AsyncClusterManager.java new file mode 100644 index 00000000..7db4820e --- /dev/null +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/AsyncClusterManager.java @@ -0,0 +1,86 @@ +/** + * Copyright (c) 2019. Qubole Inc + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package com.qubole.rubix.spi; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import java.net.UnknownHostException; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Objects.requireNonNull; + +/** + * Created by stagra on 14/1/16. + */ + +/* + * This class should be implemented for each engine. + * The implementation should return the nodes in a form which the scheduler of that engine can recognize and route the splits to + */ +public abstract class AsyncClusterManager extends ClusterManager +{ + private static Log log = LogFactory.getLog(AsyncClusterManager.class); + + private final AtomicReference>> nodesCache = new AtomicReference<>(); + + public void initialize(Configuration conf) + throws UnknownHostException + { + super.initialize(conf); + if (nodesCache.get() == null) { + synchronized (nodesCache) { + if (nodesCache.get() == null) { + int nodeRefreshTime = CacheConfig.getClusterNodeRefreshTime(conf); + ExecutorService executor = + Executors.newSingleThreadExecutor( + r -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("rubix-get-nodes-thread"); + t.setDaemon(true); + return t; + }); + + nodesCache.set( + CacheBuilder.newBuilder() + .refreshAfterWrite(nodeRefreshTime, TimeUnit.SECONDS) + .build( + CacheLoader.asyncReloading( + new CacheLoader>() { + @Override + public Set load(String s) { + return getNodesAndUpdateState(); + } + }, + executor))); + } + } + } + } + + // Returns sorted list of nodes in the cluster + public Set getNodes() + { + requireNonNull(nodesCache, "ClusterManager used before initialization"); + return nodesCache.get().getUnchecked("nodes"); + } + +} diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/ClusterManager.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/ClusterManager.java index 11f537fe..8b71c8ac 100644 --- a/rubix-spi/src/main/java/com/qubole/rubix/spi/ClusterManager.java +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/ClusterManager.java @@ -12,10 +12,7 @@ */ package com.qubole.rubix.spi; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -26,13 +23,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import static java.util.Objects.requireNonNull; +import java.util.Set; /** * Created by stagra on 14/1/16. @@ -46,10 +37,9 @@ public abstract class ClusterManager { private static Log log = LogFactory.getLog(ClusterManager.class); - private String currentNodeName; + protected String currentNodeName; private String nodeHostname; private String nodeHostAddress; - private final AtomicReference>> nodesCache = new AtomicReference<>(); // Concluded from testing that Metro Hash results in better load distribution across the nodes in cluster. private final ConsistentHash consistentHashRing = HashRing.newBuilder() .hasher(DefaultHasher.METRO_HASH) @@ -62,7 +52,10 @@ public abstract class ClusterManager * returns null in case node list cannot be fetched * returns empty in case of master-only setup */ - protected abstract List getNodesInternal(); + protected abstract Set getNodesInternal(); + + // Returns sorted list of nodes in the cluster + public abstract Set getNodes(); protected String getCurrentNodeHostname() { @@ -74,16 +67,15 @@ protected String getCurrentNodeHostAddress() return nodeHostAddress; } - private List getNodesAndUpdateState() + protected Set getNodesAndUpdateState() { - requireNonNull(nodesCache, "ClusterManager used before initialization"); - List nodes = getNodesInternal(); + Set nodes = getNodesInternal(); if (nodes == null) { - nodes = ImmutableList.of(); + nodes = ImmutableSet.of(); } else if (nodes.isEmpty()) { // Empty result set => server up and only master node running, return localhost has the only node // Do not need to consider failed nodes list as 1node cluster and server is up since it replied to allNodesRequest - nodes = ImmutableList.of(getCurrentNodeHostAddress()); + nodes = ImmutableSet.of(getCurrentNodeHostAddress()); } // remove stale nodes from consistent hash ring @@ -122,32 +114,11 @@ else if (consistentHashRing.contains(SimpleNode.of(getCurrentNodeHostAddress())) public void initialize(Configuration conf) throws UnknownHostException { - if (nodesCache.get() == null) { - synchronized (nodesCache) { - if (nodesCache.get() == null) { - int nodeRefreshTime = CacheConfig.getClusterNodeRefreshTime(conf); - + if (nodeHostname == null) { + synchronized (this) { + if (nodeHostname == null) { nodeHostname = InetAddress.getLocalHost().getCanonicalHostName(); nodeHostAddress = InetAddress.getLocalHost().getHostAddress(); - - ExecutorService executor = Executors.newSingleThreadExecutor(r -> { - Thread t = Executors.defaultThreadFactory().newThread(r); - t.setName("rubix-get-nodes-thread"); - t.setDaemon(true); - return t; - }); - - nodesCache.set( - CacheBuilder.newBuilder() - .refreshAfterWrite(nodeRefreshTime, TimeUnit.SECONDS) - .build(CacheLoader.asyncReloading(new CacheLoader>() - { - @Override - public List load(String s) - { - return getNodesAndUpdateState(); - } - }, executor))); } } } @@ -158,12 +129,6 @@ public String locateKey(String key) return consistentHashRing.locate(key).orElseThrow(() -> new RuntimeException("Unable to locate key: " + key)).getKey(); } - // Returns sorted list of nodes in the cluster - public List getNodes() - { - return nodesCache.get().getUnchecked("nodes"); - } - public String getCurrentNodeName() { // refresh cluster nodes first, which updates currentNodeName if it is not set. @@ -174,7 +139,7 @@ public String getCurrentNodeName() private void refreshClusterNodes() { // getNodes() updates the currentNodeName - List nodes = getNodes(); + Set nodes = getNodes(); if (nodes == null) { log.error("Initialization not done for Cluster Type: " + getClusterType()); throw new RuntimeException("Unable to find current node name"); diff --git a/rubix-spi/src/main/java/com/qubole/rubix/spi/SyncClusterManager.java b/rubix-spi/src/main/java/com/qubole/rubix/spi/SyncClusterManager.java new file mode 100644 index 00000000..f843de28 --- /dev/null +++ b/rubix-spi/src/main/java/com/qubole/rubix/spi/SyncClusterManager.java @@ -0,0 +1,54 @@ +/** + * Copyright (c) 2019. Qubole Inc + * Licensed under the Apache License, Version 2.0 (the License); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package com.qubole.rubix.spi; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.Set; + +public abstract class SyncClusterManager extends ClusterManager +{ + private static Log log = LogFactory.getLog(SyncClusterManager.class); + + private Set currentNodes; + + protected abstract boolean hasStateChanged(); + + private synchronized void updateStateIfChanged() { + if (hasStateChanged()) { + currentNodes = getNodesAndUpdateState(); + } + } + + @Override + public synchronized String locateKey(String key) + { + updateStateIfChanged(); + return super.locateKey(key); + } + + @Override + public synchronized String getCurrentNodeName() + { + updateStateIfChanged(); + return super.getCurrentNodeName(); + } + // Returns sorted list of nodes in the cluster + @Override + public synchronized Set getNodes() + { + updateStateIfChanged(); + return currentNodes; + } +}