recover Tesert to 4thread

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2985 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index f979070..a353ec7 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -43,6 +43,7 @@
 import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;

+import edu.uci.ics.hyracks.hdfs.utils.HyracksUtils;

 

 public class Tester {

 

@@ -64,38 +65,43 @@
 

 	public static void main(String[] args) throws Exception {

 

-		LOGGER.setLevel(Level.OFF);

+		try {

+			LOGGER.setLevel(Level.OFF);

 

-		init();

+			init();

 

-		// Options options = new Options();

+			// Options options = new Options();

 

-		IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);

+			IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1",

+					39000);

 

-		/*

-		 * JobSpecification job =

-		 * createJob(parseFileSplits(options.inFileCustomerSplits),

-		 * parseFileSplits(options.inFileOrderSplits),

-		 * parseFileSplits(options.outFileSplits), options.numJoinPartitions,

-		 * options.algo, options.graceInputSize, options.graceRecordsPerFrame,

-		 * options.graceFactor, options.memSize, options.tableSize,

-		 * options.hasGroupBy);

-		 */

+			/*

+			 * JobSpecification job =

+			 * createJob(parseFileSplits(options.inFileCustomerSplits),

+			 * parseFileSplits(options.inFileOrderSplits),

+			 * parseFileSplits(options.outFileSplits),

+			 * options.numJoinPartitions, options.algo, options.graceInputSize,

+			 * options.graceRecordsPerFrame, options.graceFactor,

+			 * options.memSize, options.tableSize, options.hasGroupBy);

+			 */

 

-		int k, page_num;

-		String file_name = args[0];

-		k = Integer.parseInt(args[1]);

-		page_num = Integer.parseInt(args[2]);

-		int type = Integer.parseInt(args[3]);

+			int k, page_num;

+			String file_name = args[0];

+			k = Integer.parseInt(args[1]);

+			page_num = Integer.parseInt(args[2]);

+			int type = Integer.parseInt(args[3]);

 

-		JobSpecification job = createJob(file_name, k, page_num, type);

+			JobSpecification job = createJob(file_name, k, page_num, type);

 

-		long start = System.currentTimeMillis();

-		JobId jobId = hcc.startJob("test", job);

-		hcc.waitForCompletion(jobId);

-		long end = System.currentTimeMillis();

-		System.err.println(start + " " + end + " " + (end - start));

-

+			long start = System.currentTimeMillis();

+			JobId jobId = hcc.startJob("test", job);

+			hcc.waitForCompletion(jobId);

+			long end = System.currentTimeMillis();

+			System.err.println(start + " " + end + " " + (end - start));

+		} finally {

+			HyracksUtils.destroyApp("test");

+			HyracksUtils.deinit();

+		}

 		/*

 		 * 

 		 * String s = "g:\\data\\results.txt" ;

@@ -137,22 +143,30 @@
 		nc1 = new NodeControllerService(ncConfig1);

 		nc1.start();

 

-		/*

-		 * NCConfig ncConfig2 = new NCConfig(); ncConfig2.ccHost = "localhost";

-		 * ncConfig2.ccPort = 39001; ncConfig2.clusterNetIPAddress =

-		 * "127.0.0.1"; ncConfig2.dataIPAddress = "127.0.0.1"; ncConfig2.nodeId

-		 * = NC2_ID; nc2 = new NodeControllerService(ncConfig2); nc2.start();

-		 * 

-		 * NCConfig ncConfig3 = new NCConfig(); ncConfig3.ccHost = "localhost";

-		 * ncConfig3.ccPort = 39001; ncConfig3.clusterNetIPAddress =

-		 * "127.0.0.1"; ncConfig3.dataIPAddress = "127.0.0.1"; ncConfig3.nodeId

-		 * = NC3_ID; nc3 = new NodeControllerService(ncConfig3); nc3.start();

-		 * 

-		 * NCConfig ncConfig4 = new NCConfig(); ncConfig4.ccHost = "localhost";

-		 * ncConfig4.ccPort = 39001; ncConfig4.clusterNetIPAddress =

-		 * "127.0.0.1"; ncConfig4.dataIPAddress = "127.0.0.1"; ncConfig4.nodeId

-		 * = NC4_ID; nc4 = new NodeControllerService(ncConfig4); nc4.start();

-		 */

+		NCConfig ncConfig2 = new NCConfig();

+		ncConfig2.ccHost = "localhost";

+		ncConfig2.ccPort = 39001;

+		ncConfig2.clusterNetIPAddress = "127.0.0.1";

+		ncConfig2.dataIPAddress = "127.0.0.1";

+		ncConfig2.nodeId = NC2_ID;

+		nc2 = new NodeControllerService(ncConfig2);

+		nc2.start();

+		NCConfig ncConfig3 = new NCConfig();

+		ncConfig3.ccHost = "localhost";

+		ncConfig3.ccPort = 39001;

+		ncConfig3.clusterNetIPAddress = "127.0.0.1";

+		ncConfig3.dataIPAddress = "127.0.0.1";

+		ncConfig3.nodeId = NC3_ID;

+		nc3 = new NodeControllerService(ncConfig3);

+		nc3.start();

+		NCConfig ncConfig4 = new NCConfig();

+		ncConfig4.ccHost = "localhost";

+		ncConfig4.ccPort = 39001;

+		ncConfig4.clusterNetIPAddress = "127.0.0.1";

+		ncConfig4.dataIPAddress = "127.0.0.1";

+		ncConfig4.nodeId = NC4_ID;

+		nc4 = new NodeControllerService(ncConfig4);

+		nc4.start();

 

 		hcc = new HyracksConnection(ccConfig.clientNetIpAddress,

 				ccConfig.clientNetPort);

@@ -170,10 +184,10 @@
 		spec.setFrameSize(32768);

 

 		FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);

-		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

-		// NC1_ID, NC2_ID,NC3_ID,NC4_ID);

 		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

-				NC1_ID);

+				NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

+		// NC1_ID);

 

 		RecordDescriptor outputRec = new RecordDescriptor(

 				new ISerializerDeserializer[] { null,

@@ -308,29 +322,30 @@
 					new DistributedMergeLmerAggregateFactory(), outputRec, true);

 		}

 

-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-				single_grouper, NC1_ID);

 		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-		// single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+		// single_grouper, NC1_ID);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+				single_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);

 

 		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(

 				spec);

 		spec.connect(readfileConn, scan, 0, single_grouper, 0);

 

-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-				cross_grouper, NC1_ID);

 		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-		// cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+		// cross_grouper, NC1_ID);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+				cross_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);

 		spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

 

 		// PrinterOperatorDescriptor printer = new

 		// PrinterOperatorDescriptor(spec);

 		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec,

 				"G:\\data\\result");

-		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-		// printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

 		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,

-				NC1_ID);

+				NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		// printer,

+		// NC1_ID);

 

 		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

 		spec.connect(printConn, cross_grouper, 0, printer, 0);