clean up hivesterix client ip/port issues
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3115 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
index 666d361..648deb6 100644
--- a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
+++ b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
@@ -1,7 +1,6 @@
package edu.uci.ics.hivesterix.common.config;
import java.io.BufferedReader;
-import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -28,6 +27,7 @@
@SuppressWarnings({ "rawtypes", "deprecation" })
public class ConfUtil {
private static final String clusterPropertiesPath = "conf/cluster.properties";
+ private static final String masterFilePath = "conf/master";
private static JobConf job;
private static HiveConf hconf;
@@ -134,15 +134,27 @@
clusterProps.load(confIn);
confIn.close();
}
- Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");
- BufferedReader ipReader = new BufferedReader(new InputStreamReader(new DataInputStream(
- process.getInputStream())));
- String ipAddress = ipReader.readLine();
- ipReader.close();
- int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));
- int mpl = Integer.parseInt(hconf.get("hive.hyracks.parrallelism"));
- hcc = new HyracksConnection(ipAddress, port);
+ if (hcc == null) {
+ BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));
+ String masterNode = ipReader.readLine();
+ ipReader.close();
+
+ InetAddress[] ips = InetAddress.getAllByName(masterNode);
+ int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));
+ for (InetAddress ip : ips) {
+ if (ip.getAddress().length <= 4) {
+ try {
+ hcc = new HyracksConnection(ip.getHostAddress(), port);
+ break;
+ } catch (Exception e) {
+ continue;
+ }
+ }
+ }
+ }
+
+ int mpl = Integer.parseInt(hconf.get("hive.hyracks.parrallelism"));
topology = hcc.getClusterTopology();
ncNameToNcInfos = hcc.getNodeControllerInfos();
NCs = new String[ncNameToNcInfos.size() * mpl];
diff --git a/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java b/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
index e6f47cf..379737f 100644
--- a/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
+++ b/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
@@ -1,12 +1,12 @@
package edu.uci.ics.hivesterix.runtime.exec;
import java.io.BufferedReader;
-import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.Serializable;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -80,6 +80,7 @@
private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());
private static final String clusterPropertiesPath = "conf/cluster.properties";
+ private static final String masterFilePath = "conf/master";
private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
@@ -151,6 +152,11 @@
*/
private Properties clusterProps;
+ /**
+ * the Hyracks client connection
+ */
+ private IHyracksClientConnection hcc;
+
public HyracksExecutionEngine(HiveConf conf) {
this.conf = conf;
init(conf);
@@ -533,15 +539,26 @@
confIn.close();
}
- Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");
- BufferedReader ipReader = new BufferedReader(new InputStreamReader(
- new DataInputStream(process.getInputStream())));
- String ipAddress = ipReader.readLine();
- ipReader.close();
- int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));
- String applicationName = "hivesterix";
+ if (hcc == null) {
+ BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));
+ String masterNode = ipReader.readLine();
+ ipReader.close();
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ InetAddress[] ips = InetAddress.getAllByName(masterNode);
+ int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));
+ for (InetAddress ip : ips) {
+ if (ip.getAddress().length <= 4) {
+ try {
+ hcc = new HyracksConnection(ip.getHostAddress(), port);
+ break;
+ } catch (Exception e) {
+ continue;
+ }
+ }
+ }
+ }
+
+ String applicationName = "hivesterix";
long start = System.currentTimeMillis();
JobId jobId = hcc.startJob(applicationName, job);
hcc.waitForCompletion(jobId);
diff --git a/hivesterix/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java b/hivesterix/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
index e455527..802777c 100644
--- a/hivesterix/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
+++ b/hivesterix/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
@@ -1,7 +1,6 @@
package edu.uci.ics.hivesterix.test.base;
import java.io.BufferedReader;
-import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -9,6 +8,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -45,8 +45,9 @@
private static final String PATH_TO_DATA = "src/test/resources/runtimefunctionts/data/";
private static final String clusterPropertiesPath = "conf/cluster.properties";
- private Properties clusterProps;
+ private static final String masterFilePath = "conf/master";
+ private Properties clusterProps;
private MiniDFSCluster dfsCluster;
private MiniMRCluster mrCluster;
@@ -109,11 +110,16 @@
clusterProps.load(confIn);
confIn.close();
}
- Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");
- BufferedReader ipReader = new BufferedReader(new InputStreamReader(
- new DataInputStream(process.getInputStream())));
- String ipAddress = ipReader.readLine();
+ BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));
+ String masterNode = ipReader.readLine();
ipReader.close();
+ InetAddress[] ips = InetAddress.getAllByName(masterNode);
+ String ipAddress = null;
+ for (InetAddress ip : ips) {
+ if (ip.getAddress().length <= 4) {
+ ipAddress = ip.getHostAddress();
+ }
+ }
int clientPort = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));
int netPort = Integer.parseInt(clusterProps.getProperty("CC_CLUSTERPORT"));
String applicationName = "hivesterix";