recover Tesert to 4thread
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2985 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index f979070..a353ec7 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -43,6 +43,7 @@
import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;
public class Tester {
@@ -64,38 +65,43 @@
public static void main(String[] args) throws Exception {
- LOGGER.setLevel(Level.OFF);
+ try {
+ LOGGER.setLevel(Level.OFF);
- init();
+ init();
- // Options options = new Options();
+ // Options options = new Options();
- IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);
+ IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1",
+ 39000);
- /*
- * JobSpecification job =
- * createJob(parseFileSplits(options.inFileCustomerSplits),
- * parseFileSplits(options.inFileOrderSplits),
- * parseFileSplits(options.outFileSplits), options.numJoinPartitions,
- * options.algo, options.graceInputSize, options.graceRecordsPerFrame,
- * options.graceFactor, options.memSize, options.tableSize,
- * options.hasGroupBy);
- */
+ /*
+ * JobSpecification job =
+ * createJob(parseFileSplits(options.inFileCustomerSplits),
+ * parseFileSplits(options.inFileOrderSplits),
+ * parseFileSplits(options.outFileSplits),
+ * options.numJoinPartitions, options.algo, options.graceInputSize,
+ * options.graceRecordsPerFrame, options.graceFactor,
+ * options.memSize, options.tableSize, options.hasGroupBy);
+ */
- int k, page_num;
- String file_name = args[0];
- k = Integer.parseInt(args[1]);
- page_num = Integer.parseInt(args[2]);
- int type = Integer.parseInt(args[3]);
+ int k, page_num;
+ String file_name = args[0];
+ k = Integer.parseInt(args[1]);
+ page_num = Integer.parseInt(args[2]);
+ int type = Integer.parseInt(args[3]);
- JobSpecification job = createJob(file_name, k, page_num, type);
+ JobSpecification job = createJob(file_name, k, page_num, type);
- long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob("test", job);
- hcc.waitForCompletion(jobId);
- long end = System.currentTimeMillis();
- System.err.println(start + " " + end + " " + (end - start));
-
+ long start = System.currentTimeMillis();
+ JobId jobId = hcc.startJob("test", job);
+ hcc.waitForCompletion(jobId);
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
+ } finally {
+ HyracksUtils.destroyApp("test");
+ HyracksUtils.deinit();
+ }
/*
*
* String s = "g:\\data\\results.txt" ;
@@ -137,22 +143,30 @@
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
- /*
- * NCConfig ncConfig2 = new NCConfig(); ncConfig2.ccHost = "localhost";
- * ncConfig2.ccPort = 39001; ncConfig2.clusterNetIPAddress =
- * "127.0.0.1"; ncConfig2.dataIPAddress = "127.0.0.1"; ncConfig2.nodeId
- * = NC2_ID; nc2 = new NodeControllerService(ncConfig2); nc2.start();
- *
- * NCConfig ncConfig3 = new NCConfig(); ncConfig3.ccHost = "localhost";
- * ncConfig3.ccPort = 39001; ncConfig3.clusterNetIPAddress =
- * "127.0.0.1"; ncConfig3.dataIPAddress = "127.0.0.1"; ncConfig3.nodeId
- * = NC3_ID; nc3 = new NodeControllerService(ncConfig3); nc3.start();
- *
- * NCConfig ncConfig4 = new NCConfig(); ncConfig4.ccHost = "localhost";
- * ncConfig4.ccPort = 39001; ncConfig4.clusterNetIPAddress =
- * "127.0.0.1"; ncConfig4.dataIPAddress = "127.0.0.1"; ncConfig4.nodeId
- * = NC4_ID; nc4 = new NodeControllerService(ncConfig4); nc4.start();
- */
+ NCConfig ncConfig2 = new NCConfig();
+ ncConfig2.ccHost = "localhost";
+ ncConfig2.ccPort = 39001;
+ ncConfig2.clusterNetIPAddress = "127.0.0.1";
+ ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.nodeId = NC2_ID;
+ nc2 = new NodeControllerService(ncConfig2);
+ nc2.start();
+ NCConfig ncConfig3 = new NCConfig();
+ ncConfig3.ccHost = "localhost";
+ ncConfig3.ccPort = 39001;
+ ncConfig3.clusterNetIPAddress = "127.0.0.1";
+ ncConfig3.dataIPAddress = "127.0.0.1";
+ ncConfig3.nodeId = NC3_ID;
+ nc3 = new NodeControllerService(ncConfig3);
+ nc3.start();
+ NCConfig ncConfig4 = new NCConfig();
+ ncConfig4.ccHost = "localhost";
+ ncConfig4.ccPort = 39001;
+ ncConfig4.clusterNetIPAddress = "127.0.0.1";
+ ncConfig4.dataIPAddress = "127.0.0.1";
+ ncConfig4.nodeId = NC4_ID;
+ nc4 = new NodeControllerService(ncConfig4);
+ nc4.start();
hcc = new HyracksConnection(ccConfig.clientNetIpAddress,
ccConfig.clientNetPort);
@@ -170,10 +184,10 @@
spec.setFrameSize(32768);
FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);
- // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,
- // NC1_ID, NC2_ID,NC3_ID,NC4_ID);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,
- NC1_ID);
+ NC1_ID, NC2_ID, NC3_ID, NC4_ID);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,
+ // NC1_ID);
RecordDescriptor outputRec = new RecordDescriptor(
new ISerializerDeserializer[] { null,
@@ -308,29 +322,30 @@
new DistributedMergeLmerAggregateFactory(), outputRec, true);
}
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- single_grouper, NC1_ID);
// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- // single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
+ // single_grouper, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ single_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);
IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(
spec);
spec.connect(readfileConn, scan, 0, single_grouper, 0);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- cross_grouper, NC1_ID);
// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- // cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
+ // cross_grouper, NC1_ID);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ cross_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);
spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);
// PrinterOperatorDescriptor printer = new
// PrinterOperatorDescriptor(spec);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec,
"G:\\data\\result");
- // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- // printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- NC1_ID);
+ NC1_ID, NC2_ID, NC3_ID, NC4_ID);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ // printer,
+ // NC1_ID);
IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);
spec.connect(printConn, cross_grouper, 0, printer, 0);