Skip to content

Commit

Permalink
Avoid caching presto worker worker nodes qubole#462
Browse files Browse the repository at this point in the history
  • Loading branch information
James Taylor committed Oct 7, 2020
1 parent 46c8440 commit dfe43b3
Show file tree
Hide file tree
Showing 15 changed files with 590 additions and 401 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -401,7 +401,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l
return fs.getFileBlockLocations(file, start, len);
}

List<String> nodes = clusterManager.getNodes();
Set<String> nodes = clusterManager.getNodes();

if (nodes == null) {
return fs.getFileBlockLocations(file, start, len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,25 @@
*/
package com.qubole.rubix.core.utils;

import com.qubole.rubix.spi.ClusterManager;
import com.qubole.rubix.spi.AsyncClusterManager;
import com.qubole.rubix.spi.ClusterType;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Created by Abhishek on 6/8/18.
*/
public class DummyClusterManager extends ClusterManager
public class DummyClusterManager extends AsyncClusterManager
{
@Override
public List<String> getNodesInternal()
public Set<String> getNodesInternal()
{
List<String> list = new ArrayList<String>();
Set<String> list = new HashSet<>();
String hostName = "";
try {
hostName = InetAddress.getLocalHost().getCanonicalHostName();
Expand All @@ -51,6 +53,6 @@ public ClusterType getClusterType()
@Override
public String getCurrentNodeName()
{
return getNodes().get(0);
return getNodes().iterator().next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,41 @@
*/
package com.qubole.rubix.core.utils;

import com.qubole.rubix.spi.ClusterManager;
import com.qubole.rubix.spi.AsyncClusterManager;
import com.qubole.rubix.spi.ClusterType;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class DummyClusterManagerMultinode extends ClusterManager
public class DummyClusterManagerMultinode extends AsyncClusterManager
{
@Override
public List<String> getNodesInternal()
private final String currentNode;
private final String otherNode;
private final Set<String> nodes = new HashSet<>();

public DummyClusterManagerMultinode()
{
List<String> list = new ArrayList<String>();
String hostName = "";
String currentNode;
try {
hostName = InetAddress.getLocalHost().getCanonicalHostName();
currentNode = InetAddress.getLocalHost().getCanonicalHostName();
}
catch (UnknownHostException e) {
hostName = "localhost";
currentNode = "localhost";
}
this.currentNode = currentNode;
nodes.add(currentNode);
this.otherNode = currentNode + "_copy";
nodes.add(otherNode);
}

list.add(hostName);
list.add(hostName + "_copy");

return list;
@Override
public Set<String> getNodesInternal()
{
return nodes;
}

@Override
Expand All @@ -49,11 +58,11 @@ public ClusterType getClusterType()
@Override
public String getCurrentNodeName()
{
return getNodes().get(0);
return currentNode;
}

public String locateKey(String key)
{
return getNodes().get(1);
return otherNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
package com.qubole.rubix.core.utils;

import com.google.common.collect.Lists;
import com.qubole.rubix.spi.ClusterManager;
import com.google.common.collect.Sets;
import com.qubole.rubix.spi.AsyncClusterManager;
import com.qubole.rubix.spi.ClusterType;

import java.util.List;
import java.util.Set;

public class DockerTestClusterManager extends ClusterManager
public class DockerTestClusterManager extends AsyncClusterManager
{
@Override
public List<String> getNodesInternal()
public Set<String> getNodesInternal()
{
return Lists.newArrayList("172.18.8.1", "172.18.8.2");
return Sets.newHashSet("172.18.8.1", "172.18.8.2");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
*/
package com.qubole.rubix.hadoop2;

import com.google.api.client.util.Sets;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.qubole.rubix.spi.ClusterManager;
import com.qubole.rubix.spi.AsyncClusterManager;
import com.qubole.rubix.spi.ClusterType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -30,7 +32,7 @@
/**
* Created by sakshia on 28/7/16.
*/
public class Hadoop2ClusterManager extends ClusterManager
public class Hadoop2ClusterManager extends AsyncClusterManager
{
YarnConfiguration yconf;
private Log log = LogFactory.getLog(Hadoop2ClusterManager.class);
Expand All @@ -44,7 +46,7 @@ public void initialize(Configuration conf)
}

@Override
public List<String> getNodesInternal()
public Set<String> getNodesInternal()
{
try {
List<Hadoop2ClusterManagerUtil.Node> allNodes = Hadoop2ClusterManagerUtil.getAllNodes(yconf);
Expand All @@ -53,7 +55,7 @@ public List<String> getNodesInternal()
}

if (allNodes.isEmpty()) {
return ImmutableList.of();
return ImmutableSet.of();
}

Set<String> hosts = new HashSet<>();
Expand All @@ -70,8 +72,7 @@ public List<String> getNodesInternal()
throw new Exception("No healthy data nodes found.");
}

List<String> hostList = Lists.newArrayList(hosts.toArray(new String[0]));
return hostList;
return ImmutableSet.copyOf(hosts);
}
catch (Exception e) {
throw Throwables.propagate(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Set;

import static org.testng.Assert.assertTrue;

Expand Down Expand Up @@ -66,13 +66,13 @@ static ClusterManager buildClusterManager()
public void testGetNodes_multipleWorkers()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleRunningWorkers(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 2, "Should only have two nodes");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
nodeHostnames.get(1).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand All @@ -82,12 +82,12 @@ public void testGetNodes_multipleWorkers()
public void testGetNodes_oneWorker()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new OneRunningWorker(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 1, "Should only have one node");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1));
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1));
}

@Test
Expand All @@ -97,13 +97,13 @@ public void testGetNodes_oneWorker()
public void testGetNodes_oneNewWorker()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneNew(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 2, "Should only have two nodes");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
nodeHostnames.get(1).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand All @@ -113,13 +113,13 @@ public void testGetNodes_oneNewWorker()
public void testGetNodes_oneRebootedWorker()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneRebooted(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 2, "Should only have two nodes");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
nodeHostnames.get(1).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand All @@ -129,12 +129,12 @@ public void testGetNodes_oneRebootedWorker()
public void testMasterOnlyCluster()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new NoWorkers(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 1, "Should have added localhost in list");
assertTrue(nodeHostnames.get(0).equals(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname");
assertTrue(nodeHostnames.contains(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname");
}

@Test
Expand All @@ -144,12 +144,12 @@ public void testMasterOnlyCluster()
public void testUnhealthyNodeCluster_decommissioned()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneDecommissioned(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 1, "Should only have one node");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand All @@ -159,12 +159,12 @@ public void testUnhealthyNodeCluster_decommissioned()
public void testUnhealthyNodeCluster_decommissioning()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneDecommissioning(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 1, "Should only have one node");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand All @@ -174,12 +174,12 @@ public void testUnhealthyNodeCluster_decommissioning()
public void testUnhealthyNodeCluster_lost()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneLost(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 1, "Should only have one node");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand All @@ -189,12 +189,12 @@ public void testUnhealthyNodeCluster_lost()
public void testUnhealthyNodeCluster_unhealthy()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneUnhealthy(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 1, "Should only have one node");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ static HttpServer createServer(String endpoint, HttpHandler handler)
* @return A list of hostnames for the nodes in the cluster.
* @throws IOException if the cluster server could not be created.
*/
static List<String> getNodeHostnamesFromCluster(String endpoint, HttpHandler responseHandler,
static Set<String> getNodeHostnamesFromCluster(String endpoint, HttpHandler responseHandler,
Configuration conf, ClusterType clusterType)
throws IOException
{
Expand All @@ -93,7 +93,7 @@ static List<String> getNodeHostnamesFromCluster(String endpoint, HttpHandler res

ClusterManager clusterManager = getClusterManagerInstance(clusterType, conf);
clusterManager.initialize(conf);
final List<String> nodes = clusterManager.getNodes();
final Set<String> nodes = clusterManager.getNodes();
log.info("Got nodes: " + nodes);

server.stop(0);
Expand Down Expand Up @@ -157,10 +157,10 @@ static int matchMemberships(TestWorker prevWorker, TestWorker newWorker, Set<Str
Configuration conf, ClusterType clusterType)
throws IOException
{
final List<String> nodeHostnames1 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, prevWorker, conf, clusterType);
final Set<String> nodeHostnames1 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, prevWorker, conf, clusterType);
Map<String, String> keyMembership1 = getConsistentHashedMembership(prevWorker, keys, conf, clusterType);

final List<String> nodeHostnames2 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, newWorker, conf, clusterType);
final Set<String> nodeHostnames2 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, newWorker, conf, clusterType);
Map<String, String> keyMembership2 = getConsistentHashedMembership(newWorker, keys, conf, clusterType);

int match = 0;
Expand Down
Loading

0 comments on commit dfe43b3

Please sign in to comment.