IMRU automatically generate cluster.conf


git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_imru@2637 123451ca-8445-de46-9d55-352943316053
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java
index c7752a8..2116003 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java
@@ -36,7 +36,10 @@
         conf.addResource(new Path(hadoopConfPath + "/core-site.xml"));
         conf.addResource(new Path(hadoopConfPath + "/mapred-site.xml"));
         conf.addResource(new Path(hadoopConfPath + "/hdfs-site.xml"));
-        ClusterConfig.setConfPath(clusterConfPath);
+        if (clusterConfPath == null || !new File(clusterConfPath).exists())
+            ClusterConfig.setConf(hcc);
+        else
+            ClusterConfig.setConfPath(clusterConfPath);
         confFactory = new ConfigurationFactory(conf);
     }
 
@@ -63,6 +66,7 @@
 
     /**
      * run job using low level interface
+     * 
      * @param job
      * @param initialModel
      * @param tempPath
@@ -79,6 +83,7 @@
 
     /**
      * run job using middle level interface
+     * 
      * @param job2
      * @param tempPath
      * @param app
@@ -95,6 +100,7 @@
 
     /**
      * run job using high level interface
+     * 
      * @param job
      * @param tempPath
      * @param app
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/jobgen/clusterconfig/ClusterConfig.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/jobgen/clusterconfig/ClusterConfig.java
index dffc918..eeccda1 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/jobgen/clusterconfig/ClusterConfig.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/jobgen/clusterconfig/ClusterConfig.java
@@ -34,6 +34,8 @@
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.mapreduce.InputSplit;
 
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -50,7 +52,7 @@
 
     /**
      * let tests set config path to be whatever
-     *
+     * 
      * @param confPath
      */
     public static void setConfPath(String confPath) {
@@ -59,12 +61,13 @@
 
     /**
      * get NC names running on one IP address
-     *
+     * 
      * @param ipAddress
      * @return
      * @throws HyracksDataException
      */
-    public static List<String> getNCNames(String ipAddress) throws HyracksException {
+    public static List<String> getNCNames(String ipAddress)
+            throws HyracksException {
         if (NCs == null) {
             loadClusterConfig();
         }
@@ -75,7 +78,7 @@
      * Set location constraints for an operator based on the locations of input
      * files in HDFS. Randomly assigns partitions to NCs where the HDFS files
      * are local; assigns the rest randomly.
-     *
+     * 
      * @param spec
      *            A job specification.
      * @param operator
@@ -89,8 +92,9 @@
      * @return The assigned partition locations.
      * @throws HyracksException
      */
-    public static String[] setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator,
-            List<InputSplit> splits, Random random) throws HyracksException {
+    public static String[] setLocationConstraint(JobSpecification spec,
+            IOperatorDescriptor operator, List<InputSplit> splits, Random random)
+            throws HyracksException {
         if (NCs == null) {
             loadClusterConfig();
         }
@@ -105,16 +109,20 @@
                 Collections.sort(Arrays.asList(localHosts));
                 Collections.shuffle(Arrays.asList(localHosts), random);
                 if (localHosts.length > 0) {
-                    LOG.info("Partition " + partition + " is local at " + localHosts.length + " hosts: "
+                    LOG.info("Partition " + partition + " is local at "
+                            + localHosts.length + " hosts: "
                             + StringUtils.join(localHosts, ", "));
                     for (int host = 0; host < localHosts.length; host++) {
-                        InetAddress[] hostIps = InetAddress.getAllByName(localHosts[host]);
+                        InetAddress[] hostIps = InetAddress
+                                .getAllByName(localHosts[host]);
                         for (InetAddress ip : hostIps) {
                             if (ipToNcMapping.get(ip.getHostAddress()) != null) {
-                                List<String> ncs = ipToNcMapping.get(ip.getHostAddress());
+                                List<String> ncs = ipToNcMapping.get(ip
+                                        .getHostAddress());
                                 int pos = random.nextInt(ncs.size());
                                 partitionLocations[partition] = ncs.get(pos);
-                                LOG.info("Partition " + partition + " assigned to " + ncs.get(pos)
+                                LOG.info("Partition " + partition
+                                        + " assigned to " + ncs.get(pos)
                                         + ", where it is local.");
                                 localAssignments++;
                                 break;
@@ -128,14 +136,16 @@
                         int pos = random.nextInt(NCs.length);
                         partitionLocations[partition] = NCs[pos];
                         nonlocalAssignments++;
-                        LOG.info("Partition " + partition + " assigned to " + NCs[pos]
+                        LOG.info("Partition " + partition + " assigned to "
+                                + NCs[pos]
                                 + " because there is no NC where it is local.");
                     }
                 } else {
                     int pos = random.nextInt(NCs.length);
                     partitionLocations[partition] = NCs[pos];
                     nonlocalAssignments++;
-                    LOG.info("Partition " + partition + " assigned to " + NCs[pos]
+                    LOG.info("Partition " + partition + " assigned to "
+                            + NCs[pos]
                             + " becasue getLocations() returned no locations.");
 
                 }
@@ -150,35 +160,68 @@
             Map<String, MutableInt> ncPartitionCounts = new HashMap<String, MutableInt>();
             for (int i = 0; i < partitionLocations.length; i++) {
                 if (ncPartitionCounts.get(partitionLocations[i]) == null) {
-                    ncPartitionCounts.put(partitionLocations[i], new MutableInt(1));
+                    ncPartitionCounts.put(partitionLocations[i],
+                            new MutableInt(1));
                 } else {
                     ncPartitionCounts.get(partitionLocations[i]).increment();
                 }
             }
-            for (Map.Entry<String, MutableInt> entry : ncPartitionCounts.entrySet()) {
-                LOG.info(entry.getKey() + ": " + entry.getValue().intValue() + " partitions");
+            for (Map.Entry<String, MutableInt> entry : ncPartitionCounts
+                    .entrySet()) {
+                LOG.info(entry.getKey() + ": " + entry.getValue().intValue()
+                        + " partitions");
             }
         }
         double localityPercentage = ((1.0 * localAssignments) / (localAssignments + nonlocalAssignments)) * 100;
-        LOG.info(operator.getClass().getSimpleName() + ": " + localAssignments + " local; " + nonlocalAssignments
-                + " non-local; " + localityPercentage + "% locality");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, partitionLocations);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, partitionCount);
+        LOG.info(operator.getClass().getSimpleName() + ": " + localAssignments
+                + " local; " + nonlocalAssignments + " non-local; "
+                + localityPercentage + "% locality");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator,
+                partitionLocations);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator,
+                partitionCount);
         return partitionLocations;
     }
 
-
+    public static void setConf(HyracksConnection hcc) throws Exception {
+        Map<String, NodeControllerInfo> map = hcc.getNodeControllerInfos();
+        List<String> ncNames = new ArrayList<String>();
+        ipToNcMapping = new HashMap<String, List<String>>();
+        for (String key : map.keySet()) {
+            NodeControllerInfo info = map.get(key);
+            String id = info.getNodeId();
+            byte[] ip = info.getNetworkAddress().getIpAddress();
+            StringBuilder sb = new StringBuilder();
+            for (byte b : ip) {
+                if (sb.length() > 0)
+                    sb.append(".");
+                sb.append(b & 0xFF);
+            }
+            LOG.info(id + " " + sb);
+            ncNames.add(id);
+            List<String> ncs = ipToNcMapping.get(id);
+            if (ncs == null) {
+                ncs = new ArrayList<String>();
+                ipToNcMapping.put(id, ncs);
+            }
+            ncs.add(sb.toString());
+        }
+        NCs = ncNames.toArray(new String[ncNames.size()]);
+    }
 
     private static void loadClusterConfig() throws HyracksException {
         String line = "";
         ipToNcMapping = new HashMap<String, List<String>>();
         if (!new File(confPath).exists()) {
-            throw new HyracksException("Can't find "+ confPath);
-//            NCs=new String[0];
-//            return;
+            if (NCs.length > 0)
+                return;
+            throw new HyracksException("Can't find " + confPath);
+            // NCs=new String[0];
+            // return;
         }
         try {
-            BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(confPath)));
+            BufferedReader reader = new BufferedReader(new InputStreamReader(
+                    new FileInputStream(confPath)));
             List<String> ncNames = new ArrayList<String>();
             while ((line = reader.readLine()) != null) {
                 String[] ncConfig = line.split(" ");
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGD.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGD.java
index 0e021f7..daa7275 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGD.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGD.java
@@ -24,7 +24,6 @@
                         + " -num-rounds 5"//
                         + " -temp-path /tmp"//
                         + " -model-file /tmp/__imru.txt"//
-                        + " -cluster-conf imru/imru-core/src/main/resources/conf/cluster.conf"//
                         + " -example-paths /input/data.txt").split(" ");
             }
             Client<LinearModel, LossGradient> client = new Client<LinearModel, LossGradient>(
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldCluster.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldCluster.java
index 0a1fd5d..035e5b8 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldCluster.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldCluster.java
@@ -1,7 +1,9 @@
 package edu.uci.ics.hyracks.imru.example.helloworld;
 
 import java.io.File;
+import java.util.Map;
 
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.imru.example.utils.Client;
 
@@ -15,9 +17,6 @@
         if (args.length == 0) {
             // if no argument is given, the following code
             // create default arguments to run the example
-            Client.generateClusterConfig(new File(
-                    "src/main/resources/cluster.conf"), Client.getLocalIp(),
-                    Client.getLocalHostName());
             String cmdline = "";
             // hostname of cluster controller
             cmdline += "-host " + Client.getLocalIp();
@@ -28,8 +27,6 @@
             // hadoop config path
             cmdline += " -hadoop-conf " + System.getProperty("user.home")
                     + "/hadoop-0.20.2/conf";
-            // ip address and node names
-            cmdline += " -cluster-conf src/main/resources/cluster.conf";
             // HDFS path to hold intermediate models
             cmdline += " -temp-path /helloworld";
             // HDFS path of input data
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIndependent.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIndependent.java
index a9b33e5..9bb4342 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIndependent.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIndependent.java
@@ -16,9 +16,6 @@
         if (args.length == 0) {
             // if no argument is given, the following code
             // create default arguments to run the example
-            Client.generateClusterConfig(new File(
-                    "src/main/resources/cluster.conf"), "127.0.0.1", "nc1",
-                    "127.0.0.1", "nc2");
             String cmdline = "";
             // hostname of cluster controller
             cmdline += "-host localhost";
@@ -29,8 +26,6 @@
             // hadoop config path
             cmdline += " -hadoop-conf " + System.getProperty("user.home")
                     + "/hadoop-0.20.2/conf";
-            // ip address and node names
-            cmdline += " -cluster-conf src/main/resources/cluster.conf";
             // HDFS path to hold intermediate models
             cmdline += " -temp-path /helloworld";
             // HDFS path of input data
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/utils/Client.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/utils/Client.java
index 76ef22c..2439b04 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/utils/Client.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/utils/Client.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.imru.api2.IMRUJob;
 import edu.uci.ics.hyracks.imru.api2.IMRUJob2;
 import edu.uci.ics.hyracks.imru.api2.IMRUJobControl;
+import edu.uci.ics.hyracks.imru.jobgen.clusterconfig.ClusterConfig;
 
 /**
  * This class wraps IMRU common functions.
@@ -182,6 +183,7 @@
         this.control = new IMRUJobControl<Model, T>();
         control.connect(options.host, options.port, options.hadoopConfPath,
                 options.clusterConfPath);
+        hcc = control.hcc;
         conf = control.conf;
         // set aggregation type
         if (options.aggTreeType.equals("none")) {
@@ -195,7 +197,6 @@
             throw new IllegalArgumentException("Invalid aggregation tree type");
         }
         // hyracks connection
-        hcc = control.hcc;
     }
 
     /**
diff --git a/imru/imru-example/src/main/resources/cluster.conf b/imru/imru-example/src/main/resources/cluster.conf
index d7b4550..2df2a2d 100644
--- a/imru/imru-example/src/main/resources/cluster.conf
+++ b/imru/imru-example/src/main/resources/cluster.conf
@@ -1 +1 @@
-10.0.2.15 vm
+192.168.56.103 vm