clean up hivesterix client ip/port issues

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3115 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
index 666d361..648deb6 100644
--- a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
+++ b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
@@ -1,7 +1,6 @@
 package edu.uci.ics.hivesterix.common.config;

 

 import java.io.BufferedReader;

-import java.io.DataInputStream;

 import java.io.FileInputStream;

 import java.io.InputStream;

 import java.io.InputStreamReader;

@@ -28,6 +27,7 @@
 @SuppressWarnings({ "rawtypes", "deprecation" })

 public class ConfUtil {

     private static final String clusterPropertiesPath = "conf/cluster.properties";

+    private static final String masterFilePath = "conf/master";

 

     private static JobConf job;

     private static HiveConf hconf;

@@ -134,15 +134,27 @@
                 clusterProps.load(confIn);

                 confIn.close();

             }

-            Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");

-            BufferedReader ipReader = new BufferedReader(new InputStreamReader(new DataInputStream(

-                    process.getInputStream())));

-            String ipAddress = ipReader.readLine();

-            ipReader.close();

-            int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

-            int mpl = Integer.parseInt(hconf.get("hive.hyracks.parrallelism"));

 

-            hcc = new HyracksConnection(ipAddress, port);

+            if (hcc == null) {

+                BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));

+                String masterNode = ipReader.readLine();

+                ipReader.close();

+

+                InetAddress[] ips = InetAddress.getAllByName(masterNode);

+                int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

+                for (InetAddress ip : ips) {

+                    if (ip.getAddress().length <= 4) {

+                        try {

+                            hcc = new HyracksConnection(ip.getHostAddress(), port);

+                            break;

+                        } catch (Exception e) {

+                            continue;

+                        }

+                    }

+                }

+            }

+

+            int mpl = Integer.parseInt(hconf.get("hive.hyracks.parrallelism"));

             topology = hcc.getClusterTopology();

             ncNameToNcInfos = hcc.getNodeControllerInfos();

             NCs = new String[ncNameToNcInfos.size() * mpl];

diff --git a/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java b/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
index e6f47cf..379737f 100644
--- a/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
+++ b/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
@@ -1,12 +1,12 @@
 package edu.uci.ics.hivesterix.runtime.exec;

 

 import java.io.BufferedReader;

-import java.io.DataInputStream;

 import java.io.FileInputStream;

 import java.io.InputStream;

 import java.io.InputStreamReader;

 import java.io.PrintWriter;

 import java.io.Serializable;

+import java.net.InetAddress;

 import java.util.ArrayList;

 import java.util.HashMap;

 import java.util.Iterator;

@@ -80,6 +80,7 @@
 

     private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());

     private static final String clusterPropertiesPath = "conf/cluster.properties";

+    private static final String masterFilePath = "conf/master";

 

     private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

     private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

@@ -151,6 +152,11 @@
      */

     private Properties clusterProps;

 

+    /**

+     * the Hyracks client connection

+     */

+    private IHyracksClientConnection hcc;

+

     public HyracksExecutionEngine(HiveConf conf) {

         this.conf = conf;

         init(conf);

@@ -533,15 +539,26 @@
             confIn.close();

         }

 

-        Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");

-        BufferedReader ipReader = new BufferedReader(new InputStreamReader(

-                new DataInputStream(process.getInputStream())));

-        String ipAddress = ipReader.readLine();

-        ipReader.close();

-        int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

-        String applicationName = "hivesterix";

+        if (hcc == null) {

+            BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));

+            String masterNode = ipReader.readLine();

+            ipReader.close();

 

-        IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);

+            InetAddress[] ips = InetAddress.getAllByName(masterNode);

+            int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

+            for (InetAddress ip : ips) {

+                if (ip.getAddress().length <= 4) {

+                    try {

+                        hcc = new HyracksConnection(ip.getHostAddress(), port);

+                        break;

+                    } catch (Exception e) {

+                        continue;

+                    }

+                }

+            }

+        }

+

+        String applicationName = "hivesterix";

         long start = System.currentTimeMillis();

         JobId jobId = hcc.startJob(applicationName, job);

         hcc.waitForCompletion(jobId);

diff --git a/hivesterix/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java b/hivesterix/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
index e455527..802777c 100644
--- a/hivesterix/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
+++ b/hivesterix/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
@@ -1,7 +1,6 @@
 package edu.uci.ics.hivesterix.test.base;

 

 import java.io.BufferedReader;

-import java.io.DataInputStream;

 import java.io.File;

 import java.io.FileInputStream;

 import java.io.FileNotFoundException;

@@ -9,6 +8,7 @@
 import java.io.IOException;

 import java.io.InputStream;

 import java.io.InputStreamReader;

+import java.net.InetAddress;

 import java.util.ArrayList;

 import java.util.HashMap;

 import java.util.Iterator;

@@ -45,8 +45,9 @@
     private static final String PATH_TO_DATA = "src/test/resources/runtimefunctionts/data/";

 

     private static final String clusterPropertiesPath = "conf/cluster.properties";

-    private Properties clusterProps;

+    private static final String masterFilePath = "conf/master";

 

+    private Properties clusterProps;

     private MiniDFSCluster dfsCluster;

     private MiniMRCluster mrCluster;

 

@@ -109,11 +110,16 @@
             clusterProps.load(confIn);

             confIn.close();

         }

-        Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");

-        BufferedReader ipReader = new BufferedReader(new InputStreamReader(

-                new DataInputStream(process.getInputStream())));

-        String ipAddress = ipReader.readLine();

+        BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));

+        String masterNode = ipReader.readLine();

         ipReader.close();

+        InetAddress[] ips = InetAddress.getAllByName(masterNode);

+        String ipAddress = null;

+        for (InetAddress ip : ips) {

+            if (ip.getAddress().length <= 4) {

+                ipAddress = ip.getHostAddress();

+            }

+        }

         int clientPort = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

         int netPort = Integer.parseInt(clusterProps.getProperty("CC_CLUSTERPORT"));

         String applicationName = "hivesterix";