IMRU automatically generate cluster.conf
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_imru@2637 123451ca-8445-de46-9d55-352943316053
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java
index c7752a8..2116003 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java
@@ -36,7 +36,10 @@
conf.addResource(new Path(hadoopConfPath + "/core-site.xml"));
conf.addResource(new Path(hadoopConfPath + "/mapred-site.xml"));
conf.addResource(new Path(hadoopConfPath + "/hdfs-site.xml"));
- ClusterConfig.setConfPath(clusterConfPath);
+ if (clusterConfPath == null || !new File(clusterConfPath).exists())
+ ClusterConfig.setConf(hcc);
+ else
+ ClusterConfig.setConfPath(clusterConfPath);
confFactory = new ConfigurationFactory(conf);
}
@@ -63,6 +66,7 @@
/**
* run job using low level interface
+ *
* @param job
* @param initialModel
* @param tempPath
@@ -79,6 +83,7 @@
/**
* run job using middle level interface
+ *
* @param job2
* @param tempPath
* @param app
@@ -95,6 +100,7 @@
/**
* run job using high level interface
+ *
* @param job
* @param tempPath
* @param app
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/jobgen/clusterconfig/ClusterConfig.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/jobgen/clusterconfig/ClusterConfig.java
index dffc918..eeccda1 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/jobgen/clusterconfig/ClusterConfig.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/jobgen/clusterconfig/ClusterConfig.java
@@ -34,6 +34,8 @@
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.mapreduce.InputSplit;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -50,7 +52,7 @@
/**
* let tests set config path to be whatever
- *
+ *
* @param confPath
*/
public static void setConfPath(String confPath) {
@@ -59,12 +61,13 @@
/**
* get NC names running on one IP address
- *
+ *
* @param ipAddress
* @return
* @throws HyracksDataException
*/
- public static List<String> getNCNames(String ipAddress) throws HyracksException {
+ public static List<String> getNCNames(String ipAddress)
+ throws HyracksException {
if (NCs == null) {
loadClusterConfig();
}
@@ -75,7 +78,7 @@
* Set location constraints for an operator based on the locations of input
* files in HDFS. Randomly assigns partitions to NCs where the HDFS files
* are local; assigns the rest randomly.
- *
+ *
* @param spec
* A job specification.
* @param operator
@@ -89,8 +92,9 @@
* @return The assigned partition locations.
* @throws HyracksException
*/
- public static String[] setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator,
- List<InputSplit> splits, Random random) throws HyracksException {
+ public static String[] setLocationConstraint(JobSpecification spec,
+ IOperatorDescriptor operator, List<InputSplit> splits, Random random)
+ throws HyracksException {
if (NCs == null) {
loadClusterConfig();
}
@@ -105,16 +109,20 @@
Collections.sort(Arrays.asList(localHosts));
Collections.shuffle(Arrays.asList(localHosts), random);
if (localHosts.length > 0) {
- LOG.info("Partition " + partition + " is local at " + localHosts.length + " hosts: "
+ LOG.info("Partition " + partition + " is local at "
+ + localHosts.length + " hosts: "
+ StringUtils.join(localHosts, ", "));
for (int host = 0; host < localHosts.length; host++) {
- InetAddress[] hostIps = InetAddress.getAllByName(localHosts[host]);
+ InetAddress[] hostIps = InetAddress
+ .getAllByName(localHosts[host]);
for (InetAddress ip : hostIps) {
if (ipToNcMapping.get(ip.getHostAddress()) != null) {
- List<String> ncs = ipToNcMapping.get(ip.getHostAddress());
+ List<String> ncs = ipToNcMapping.get(ip
+ .getHostAddress());
int pos = random.nextInt(ncs.size());
partitionLocations[partition] = ncs.get(pos);
- LOG.info("Partition " + partition + " assigned to " + ncs.get(pos)
+ LOG.info("Partition " + partition
+ + " assigned to " + ncs.get(pos)
+ ", where it is local.");
localAssignments++;
break;
@@ -128,14 +136,16 @@
int pos = random.nextInt(NCs.length);
partitionLocations[partition] = NCs[pos];
nonlocalAssignments++;
- LOG.info("Partition " + partition + " assigned to " + NCs[pos]
+ LOG.info("Partition " + partition + " assigned to "
+ + NCs[pos]
+ " because there is no NC where it is local.");
}
} else {
int pos = random.nextInt(NCs.length);
partitionLocations[partition] = NCs[pos];
nonlocalAssignments++;
- LOG.info("Partition " + partition + " assigned to " + NCs[pos]
+ LOG.info("Partition " + partition + " assigned to "
+ + NCs[pos]
+ " becasue getLocations() returned no locations.");
}
@@ -150,35 +160,68 @@
Map<String, MutableInt> ncPartitionCounts = new HashMap<String, MutableInt>();
for (int i = 0; i < partitionLocations.length; i++) {
if (ncPartitionCounts.get(partitionLocations[i]) == null) {
- ncPartitionCounts.put(partitionLocations[i], new MutableInt(1));
+ ncPartitionCounts.put(partitionLocations[i],
+ new MutableInt(1));
} else {
ncPartitionCounts.get(partitionLocations[i]).increment();
}
}
- for (Map.Entry<String, MutableInt> entry : ncPartitionCounts.entrySet()) {
- LOG.info(entry.getKey() + ": " + entry.getValue().intValue() + " partitions");
+ for (Map.Entry<String, MutableInt> entry : ncPartitionCounts
+ .entrySet()) {
+ LOG.info(entry.getKey() + ": " + entry.getValue().intValue()
+ + " partitions");
}
}
double localityPercentage = ((1.0 * localAssignments) / (localAssignments + nonlocalAssignments)) * 100;
- LOG.info(operator.getClass().getSimpleName() + ": " + localAssignments + " local; " + nonlocalAssignments
- + " non-local; " + localityPercentage + "% locality");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, partitionLocations);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, partitionCount);
+ LOG.info(operator.getClass().getSimpleName() + ": " + localAssignments
+ + " local; " + nonlocalAssignments + " non-local; "
+ + localityPercentage + "% locality");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator,
+ partitionLocations);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, operator,
+ partitionCount);
return partitionLocations;
}
-
+ public static void setConf(HyracksConnection hcc) throws Exception {
+ Map<String, NodeControllerInfo> map = hcc.getNodeControllerInfos();
+ List<String> ncNames = new ArrayList<String>();
+ ipToNcMapping = new HashMap<String, List<String>>();
+ for (String key : map.keySet()) {
+ NodeControllerInfo info = map.get(key);
+ String id = info.getNodeId();
+ byte[] ip = info.getNetworkAddress().getIpAddress();
+ StringBuilder sb = new StringBuilder();
+ for (byte b : ip) {
+ if (sb.length() > 0)
+ sb.append(".");
+ sb.append(b & 0xFF);
+ }
+ LOG.info(id + " " + sb);
+ ncNames.add(id);
+ List<String> ncs = ipToNcMapping.get(id);
+ if (ncs == null) {
+ ncs = new ArrayList<String>();
+ ipToNcMapping.put(id, ncs);
+ }
+ ncs.add(sb.toString());
+ }
+ NCs = ncNames.toArray(new String[ncNames.size()]);
+ }
private static void loadClusterConfig() throws HyracksException {
String line = "";
ipToNcMapping = new HashMap<String, List<String>>();
if (!new File(confPath).exists()) {
- throw new HyracksException("Can't find "+ confPath);
-// NCs=new String[0];
-// return;
+ if (NCs.length > 0)
+ return;
+ throw new HyracksException("Can't find " + confPath);
+ // NCs=new String[0];
+ // return;
}
try {
- BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(confPath)));
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ new FileInputStream(confPath)));
List<String> ncNames = new ArrayList<String>();
while ((line = reader.readLine()) != null) {
String[] ncConfig = line.split(" ");
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGD.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGD.java
index 0e021f7..daa7275 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGD.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGD.java
@@ -24,7 +24,6 @@
+ " -num-rounds 5"//
+ " -temp-path /tmp"//
+ " -model-file /tmp/__imru.txt"//
- + " -cluster-conf imru/imru-core/src/main/resources/conf/cluster.conf"//
+ " -example-paths /input/data.txt").split(" ");
}
Client<LinearModel, LossGradient> client = new Client<LinearModel, LossGradient>(
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldCluster.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldCluster.java
index 0a1fd5d..035e5b8 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldCluster.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldCluster.java
@@ -1,7 +1,9 @@
package edu.uci.ics.hyracks.imru.example.helloworld;
import java.io.File;
+import java.util.Map;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.imru.example.utils.Client;
@@ -15,9 +17,6 @@
if (args.length == 0) {
// if no argument is given, the following code
// create default arguments to run the example
- Client.generateClusterConfig(new File(
- "src/main/resources/cluster.conf"), Client.getLocalIp(),
- Client.getLocalHostName());
String cmdline = "";
// hostname of cluster controller
cmdline += "-host " + Client.getLocalIp();
@@ -28,8 +27,6 @@
// hadoop config path
cmdline += " -hadoop-conf " + System.getProperty("user.home")
+ "/hadoop-0.20.2/conf";
- // ip address and node names
- cmdline += " -cluster-conf src/main/resources/cluster.conf";
// HDFS path to hold intermediate models
cmdline += " -temp-path /helloworld";
// HDFS path of input data
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIndependent.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIndependent.java
index a9b33e5..9bb4342 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIndependent.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIndependent.java
@@ -16,9 +16,6 @@
if (args.length == 0) {
// if no argument is given, the following code
// create default arguments to run the example
- Client.generateClusterConfig(new File(
- "src/main/resources/cluster.conf"), "127.0.0.1", "nc1",
- "127.0.0.1", "nc2");
String cmdline = "";
// hostname of cluster controller
cmdline += "-host localhost";
@@ -29,8 +26,6 @@
// hadoop config path
cmdline += " -hadoop-conf " + System.getProperty("user.home")
+ "/hadoop-0.20.2/conf";
- // ip address and node names
- cmdline += " -cluster-conf src/main/resources/cluster.conf";
// HDFS path to hold intermediate models
cmdline += " -temp-path /helloworld";
// HDFS path of input data
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/utils/Client.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/utils/Client.java
index 76ef22c..2439b04 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/utils/Client.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/utils/Client.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.imru.api2.IMRUJob;
import edu.uci.ics.hyracks.imru.api2.IMRUJob2;
import edu.uci.ics.hyracks.imru.api2.IMRUJobControl;
+import edu.uci.ics.hyracks.imru.jobgen.clusterconfig.ClusterConfig;
/**
* This class wraps IMRU common functions.
@@ -182,6 +183,7 @@
this.control = new IMRUJobControl<Model, T>();
control.connect(options.host, options.port, options.hadoopConfPath,
options.clusterConfPath);
+ hcc = control.hcc;
conf = control.conf;
// set aggregation type
if (options.aggTreeType.equals("none")) {
@@ -195,7 +197,6 @@
throw new IllegalArgumentException("Invalid aggregation tree type");
}
// hyracks connection
- hcc = control.hcc;
}
/**
diff --git a/imru/imru-example/src/main/resources/cluster.conf b/imru/imru-example/src/main/resources/cluster.conf
index d7b4550..2df2a2d 100644
--- a/imru/imru-example/src/main/resources/cluster.conf
+++ b/imru/imru-example/src/main/resources/cluster.conf
@@ -1 +1 @@
-10.0.2.15 vm
+192.168.56.103 vm