revert Tester.java to r2868 to make compilable

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2884 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index 5e00d7e..0b0aa07 100644
--- a/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/genomix-core/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -7,6 +7,7 @@
 import edu.uci.ics.genomix.data.normalizers.VLongNormalizedKeyComputerFactory;

 import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;

 import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

+import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;

 import edu.uci.ics.genomix.data.std.accessors.VLongBinaryHashFunctionFamily;

 import edu.uci.ics.genomix.data.std.primitive.VLongPointable;

 import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;

@@ -46,370 +47,313 @@
 

 public class Tester {

 

-	private static final Logger LOGGER = Logger.getLogger(Tester.class

-			.getName());

-	public static final String NC1_ID = "nc1";

-	public static final String NC2_ID = "nc2";

-	public static final String NC3_ID = "nc3";

-	public static final String NC4_ID = "nc4";

+    private static final Logger LOGGER = Logger.getLogger(Tester.class.getName());

+    public static final String NC1_ID = "nc1";

+    public static final String NC2_ID = "nc2";

+    public static final String NC3_ID = "nc3";

+    public static final String NC4_ID = "nc4";

 

-	private static ClusterControllerService cc;

-	private static NodeControllerService nc1;

-	private static NodeControllerService nc2;

-	private static NodeControllerService nc3;

-	private static NodeControllerService nc4;

-	private static IHyracksClientConnection hcc;

+    private static ClusterControllerService cc;

+    private static NodeControllerService nc1;

+    private static NodeControllerService nc2;

+    private static NodeControllerService nc3;

+    private static NodeControllerService nc4;

+    private static IHyracksClientConnection hcc;

 

-	// private static final boolean DEBUG = true;

+    //private static final boolean DEBUG = true;

 

-	public static void main(String[] args) throws Exception {

+    public static void main(String[] args) throws Exception {

 

-		LOGGER.setLevel(Level.OFF);

+        LOGGER.setLevel(Level.OFF);

 

-		init();

+        init();

 

-		// Options options = new Options();

+        // Options options = new Options();

 

-		IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);

+        IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);

 

-		/*

-		 * JobSpecification job =

-		 * createJob(parseFileSplits(options.inFileCustomerSplits),

-		 * parseFileSplits(options.inFileOrderSplits),

-		 * parseFileSplits(options.outFileSplits), options.numJoinPartitions,

-		 * options.algo, options.graceInputSize, options.graceRecordsPerFrame,

-		 * options.graceFactor, options.memSize, options.tableSize,

-		 * options.hasGroupBy);

-		 */

+        /*

+         * JobSpecification job =

+         * createJob(parseFileSplits(options.inFileCustomerSplits),

+         * parseFileSplits(options.inFileOrderSplits),

+         * parseFileSplits(options.outFileSplits), options.numJoinPartitions,

+         * options.algo, options.graceInputSize, options.graceRecordsPerFrame,

+         * options.graceFactor, options.memSize, options.tableSize,

+         * options.hasGroupBy);

+         */

 

-		int k, page_num;

-		String file_name = args[0];

-		k = Integer.parseInt(args[1]);

-		page_num = Integer.parseInt(args[2]);

-		int type = Integer.parseInt(args[3]);

+        int k, page_num;

+        String file_name = args[0];

+        k = Integer.parseInt(args[1]);

+        page_num = Integer.parseInt(args[2]);

+        int type = Integer.parseInt(args[3]);

 

-		JobSpecification job = createJob(file_name, k, page_num, type);

+        JobSpecification job = createJob(file_name, k, page_num, type);

 

-		long start = System.currentTimeMillis();

-		JobId jobId = hcc.startJob("test", job);

-		hcc.waitForCompletion(jobId);

-		long end = System.currentTimeMillis();

-		System.err.println(start + " " + end + " " + (end - start));

+        long start = System.currentTimeMillis();

+        JobId jobId = hcc.startJob("test", job);

+        hcc.waitForCompletion(jobId);

+        long end = System.currentTimeMillis();

+        System.err.println(start + " " + end + " " + (end - start));

+        

+    /*   

 

-		/*

-		 * 

-		 * String s = "g:\\data\\results.txt" ;

-		 * 

-		 * filenames = new FileOutputStream(s); // filenames = new

-		 * FileInputStream("filename.txt");

-		 * 

-		 * BufferedWriter writer = new BufferedWriter(new

-		 * OutputStreamWriter(filenames)); writer.write((int) (end-start));

-		 * writer.close();

-		 */

+        String s = "g:\\data\\results.txt" ;

 

-	}

+        filenames = new FileOutputStream(s);

+        // filenames = new FileInputStream("filename.txt");

 

-	public static void init() throws Exception {

-		CCConfig ccConfig = new CCConfig();

-		ccConfig.clientNetIpAddress = "127.0.0.1";

-		ccConfig.clientNetPort = 39000;

-		ccConfig.clusterNetIpAddress = "127.0.0.1";

-		ccConfig.clusterNetPort = 39001;

-		ccConfig.profileDumpPeriod = -1;

-		File outDir = new File("target/ClusterController");

-		outDir.mkdirs();

-		File ccRoot = File.createTempFile(Tester.class.getName(), ".data",

-				outDir);

-		ccRoot.delete();

-		ccRoot.mkdir();

-		ccConfig.ccRoot = ccRoot.getAbsolutePath();

-		cc = new ClusterControllerService(ccConfig);

-		cc.start();

-		ccConfig.defaultMaxJobAttempts = 0;

+        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(filenames));

+        writer.write((int) (end-start));

+        writer.close();*/

+        

+    }

 

-		NCConfig ncConfig1 = new NCConfig();

-		ncConfig1.ccHost = "localhost";

-		ncConfig1.ccPort = 39001;

-		ncConfig1.clusterNetIPAddress = "127.0.0.1";

-		ncConfig1.dataIPAddress = "127.0.0.1";

-		ncConfig1.nodeId = NC1_ID;

-		nc1 = new NodeControllerService(ncConfig1);

-		nc1.start();

+    public static void init() throws Exception {

+        CCConfig ccConfig = new CCConfig();

+        ccConfig.clientNetIpAddress = "127.0.0.1";

+        ccConfig.clientNetPort = 39000;

+        ccConfig.clusterNetIpAddress = "127.0.0.1";

+        ccConfig.clusterNetPort = 39001;

+        ccConfig.profileDumpPeriod = -1;

+        File outDir = new File("target/ClusterController");

+        outDir.mkdirs();

+        File ccRoot = File.createTempFile(Tester.class.getName(), ".data", outDir);

+        ccRoot.delete();

+        ccRoot.mkdir();

+        ccConfig.ccRoot = ccRoot.getAbsolutePath();

+        cc = new ClusterControllerService(ccConfig);

+        cc.start();

+        ccConfig.defaultMaxJobAttempts = 0;

 

-		NCConfig ncConfig2 = new NCConfig();

-		ncConfig2.ccHost = "localhost";

-		ncConfig2.ccPort = 39001;

-		ncConfig2.clusterNetIPAddress = "127.0.0.1";

-		ncConfig2.dataIPAddress = "127.0.0.1";

-		ncConfig2.nodeId = NC2_ID;

-		nc2 = new NodeControllerService(ncConfig2);

-		nc2.start();

+        NCConfig ncConfig1 = new NCConfig();

+        ncConfig1.ccHost = "localhost";

+        ncConfig1.ccPort = 39001;

+        ncConfig1.clusterNetIPAddress = "127.0.0.1";

+        ncConfig1.dataIPAddress = "127.0.0.1";

+        ncConfig1.nodeId = NC1_ID;

+        nc1 = new NodeControllerService(ncConfig1);

+        nc1.start();

 

-		NCConfig ncConfig3 = new NCConfig();

-		ncConfig3.ccHost = "localhost";

-		ncConfig3.ccPort = 39001;

-		ncConfig3.clusterNetIPAddress = "127.0.0.1";

-		ncConfig3.dataIPAddress = "127.0.0.1";

-		ncConfig3.nodeId = NC3_ID;

-		nc3 = new NodeControllerService(ncConfig3);

-		nc3.start();

+        NCConfig ncConfig2 = new NCConfig();

+        ncConfig2.ccHost = "localhost";

+        ncConfig2.ccPort = 39001;

+        ncConfig2.clusterNetIPAddress = "127.0.0.1";

+        ncConfig2.dataIPAddress = "127.0.0.1";

+        ncConfig2.nodeId = NC2_ID;

+        nc2 = new NodeControllerService(ncConfig2);

+        nc2.start();

+        

+        NCConfig ncConfig3 = new NCConfig();

+        ncConfig3.ccHost = "localhost";

+        ncConfig3.ccPort = 39001;

+        ncConfig3.clusterNetIPAddress = "127.0.0.1";

+        ncConfig3.dataIPAddress = "127.0.0.1";

+        ncConfig3.nodeId = NC3_ID;

+        nc3 = new NodeControllerService(ncConfig3);

+        nc3.start();

+        

+        NCConfig ncConfig4 = new NCConfig();

+        ncConfig4.ccHost = "localhost";

+        ncConfig4.ccPort = 39001;

+        ncConfig4.clusterNetIPAddress = "127.0.0.1";

+        ncConfig4.dataIPAddress = "127.0.0.1";

+        ncConfig4.nodeId = NC4_ID;

+        nc4 = new NodeControllerService(ncConfig4);

+        nc4.start();

 

-		NCConfig ncConfig4 = new NCConfig();

-		ncConfig4.ccHost = "localhost";

-		ncConfig4.ccPort = 39001;

-		ncConfig4.clusterNetIPAddress = "127.0.0.1";

-		ncConfig4.dataIPAddress = "127.0.0.1";

-		ncConfig4.nodeId = NC4_ID;

-		nc4 = new NodeControllerService(ncConfig4);

-		nc4.start();

+        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);

+        hcc.createApplication("test", null);

+        if (LOGGER.isLoggable(Level.INFO)) {

+            LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());

+        }

+    }

 

-		hcc = new HyracksConnection(ccConfig.clientNetIpAddress,

-				ccConfig.clientNetPort);

-		hcc.createApplication("test", null);

-		if (LOGGER.isLoggable(Level.INFO)) {

-			LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());

-		}

-	}

+    private static JobSpecification createJob(String filename, int k, int page_num, int type) throws HyracksDataException {

+        JobSpecification spec = new JobSpecification();

 

         //spec.setFrameSize(32768);

-        spec.setFrameSize(32768);

+        spec.setFrameSize(64);

 

-		// spec.setFrameSize(32768);

-		spec.setFrameSize(64);

+        FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);

 

-		FileScanDescriptor scan = new FileScanDescriptor(spec, k, filename);

-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

-				NC1_ID, NC2_ID, NC3_ID, NC4_ID);

-		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

-		// NC1_ID);

+        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {null, ByteSerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE});

+                //Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

+                //ByteSerializerDeserializer.INSTANCE });

 

        int[] keyFields = new int[] { 0 };

         int frameLimits = 4096;		// hyracks oriented

-        //int tableSize = 10485767;	// hyracks oriented

-        int tableSize = 2351137;	// hyracks oriented

+        int tableSize = 10485767;	// hyracks oriented

 

-		int[] keyFields = new int[] { 0 };

-		int frameLimits = 4096; // hyracks oriented

-		int tableSize = 10485767; // hyracks oriented

+        AbstractOperatorDescriptor single_grouper;

+        IConnectorDescriptor conn_partition;

+        AbstractOperatorDescriptor cross_grouper;

 

-		AbstractOperatorDescriptor single_grouper;

-		IConnectorDescriptor conn_partition;

-		AbstractOperatorDescriptor cross_grouper;

+        

+        if(0 == type){//external group by

+            single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

+                    frameLimits,

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

+                    new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

+                    // new IntSumFieldAggregatorFactory(1, false) }),

 

-		if (0 == type) {// external group by

-			single_grouper = new ExternalGroupOperatorDescriptor(

-					spec,

-					keyFields,

-					frameLimits,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) },

-					new VLongNormalizedKeyComputerFactory(),

-					new MergeKmerAggregateFactory(),

-					// new IntSumFieldAggregatorFactory(1, false) }),

+                    new DistributedMergeLmerAggregateFactory(),

+                    // new IntSumFieldAggregatorFactory(1, false) }),

+                    outputRec, new HashSpillableTableFactory(

+                            new FieldHashPartitionComputerFactory(keyFields,

+                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+                                            .of(VLongPointable.FACTORY) }), tableSize), true);

+        

+            conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+                    new KmerHashPartitioncomputerFactory());

+            cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

+                    frameLimits,

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

+                    new VLongNormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),

+                    // new IntSumFieldAggregatorFactory(1, false) }),

 

-					new DistributedMergeLmerAggregateFactory(),

-					// new IntSumFieldAggregatorFactory(1, false) }),

-					outputRec,

-					new HashSpillableTableFactory(

-							new FieldHashPartitionComputerFactory(

-									keyFields,

-									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-											.of(VLongPointable.FACTORY) }),

-							tableSize), true);

+                    new DistributedMergeLmerAggregateFactory(),

+                    // new IntSumFieldAggregatorFactory(1, false) }),

+                    outputRec, new HashSpillableTableFactory(

+                            new FieldHashPartitionComputerFactory(keyFields,

+                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+                                            .of(VLongPointable.FACTORY) }), tableSize), true);

+        }

+        else if( 1 == type){

+            single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

+                    frameLimits,

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

+                    new VLongNormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

+                    // new IntSumFieldAggregatorFactory(1, false) }),

+                    new DistributedMergeLmerAggregateFactory(),

+                    // new IntSumFieldAggregatorFactory(1, false) }),

+                    outputRec, new HashSpillableTableFactory(

+                            new FieldHashPartitionComputerFactory(keyFields,

+                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+                                            .of(VLongPointable.FACTORY) }), tableSize), true);

+            conn_partition = new  MToNPartitioningMergingConnectorDescriptor(spec, new KmerHashPartitioncomputerFactory(), 

+                  keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY)} );

+            cross_grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields, 

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },  

+                    new DistributedMergeLmerAggregateFactory(), 

+                    outputRec);

+        }

+        else{

+            long inputSizeInRawRecords = 154000000;

+            long inputSizeInUniqueKeys = 38500000;

+            int recordSizeInBytes = 4;

+            int hashfuncStartLevel = 1;

+            single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

+                    frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

+                    new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},

+                    //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

+                    hashfuncStartLevel, 

+                    new VLongNormalizedKeyComputerFactory(),

+                    new MergeKmerAggregateFactory(),

+                    new DistributedMergeLmerAggregateFactory(),

+                    outputRec, true);

+            conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+                    new KmerHashPartitioncomputerFactory());

+            recordSizeInBytes = 13;

+            cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

+                    frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(VLongPointable.FACTORY) },

+                    new IBinaryHashFunctionFamily[] {new VLongBinaryHashFunctionFamily()},

+                    //new IBinaryHashFunctionFamily[] {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

+                    hashfuncStartLevel, 

+                    new VLongNormalizedKeyComputerFactory(),

+                    new DistributedMergeLmerAggregateFactory(),

+                    new DistributedMergeLmerAggregateFactory(),

+                    outputRec, true);            

+        }

+        

+        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+        

+        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(readfileConn, scan, 0, single_grouper, 0);

+        

 

-			conn_partition = new MToNPartitioningConnectorDescriptor(spec,

-					new KmerHashPartitioncomputerFactory());

-			cross_grouper = new ExternalGroupOperatorDescriptor(

-					spec,

-					keyFields,

-					frameLimits,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) },

-					new VLongNormalizedKeyComputerFactory(),

-					new DistributedMergeLmerAggregateFactory(),

-					// new IntSumFieldAggregatorFactory(1, false) }),

+        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+        spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

 

-					new DistributedMergeLmerAggregateFactory(),

-					// new IntSumFieldAggregatorFactory(1, false) }),

-					outputRec,

-					new HashSpillableTableFactory(

-							new FieldHashPartitionComputerFactory(

-									keyFields,

-									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-											.of(VLongPointable.FACTORY) }),

-							tableSize), true);

-		} else if (1 == type) {

-			single_grouper = new ExternalGroupOperatorDescriptor(

-					spec,

-					keyFields,

-					frameLimits,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) },

-					new VLongNormalizedKeyComputerFactory(),

-					new MergeKmerAggregateFactory(),

-					// new IntSumFieldAggregatorFactory(1, false) }),

-					new DistributedMergeLmerAggregateFactory(),

-					// new IntSumFieldAggregatorFactory(1, false) }),

-					outputRec,

-					new HashSpillableTableFactory(

-							new FieldHashPartitionComputerFactory(

-									keyFields,

-									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-											.of(VLongPointable.FACTORY) }),

-							tableSize), true);

-			conn_partition = new MToNPartitioningMergingConnectorDescriptor(

-					spec,

-					new KmerHashPartitioncomputerFactory(),

-					keyFields,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) });

-			cross_grouper = new PreclusteredGroupOperatorDescriptor(

-					spec,

-					keyFields,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) },

-					new DistributedMergeLmerAggregateFactory(), outputRec);

-		} else {

-			long inputSizeInRawRecords = 154000000;

-			long inputSizeInUniqueKeys = 38500000;

-			int recordSizeInBytes = 4;

-			int hashfuncStartLevel = 1;

-			single_grouper = new HybridHashGroupOperatorDescriptor(

-					spec,

-					keyFields,

-					frameLimits,

-					inputSizeInRawRecords,

-					inputSizeInUniqueKeys,

-					recordSizeInBytes,

-					tableSize,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) },

-					new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },

-					// new IBinaryHashFunctionFamily[]

-					// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

-					hashfuncStartLevel,

-					new VLongNormalizedKeyComputerFactory(),

-					new MergeKmerAggregateFactory(),

-					new DistributedMergeLmerAggregateFactory(), outputRec, true);

-			conn_partition = new MToNPartitioningConnectorDescriptor(spec,

-					new KmerHashPartitioncomputerFactory());

-			recordSizeInBytes = 13;

-			cross_grouper = new HybridHashGroupOperatorDescriptor(

-					spec,

-					keyFields,

-					frameLimits,

-					inputSizeInRawRecords,

-					inputSizeInUniqueKeys,

-					recordSizeInBytes,

-					tableSize,

-					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

-							.of(VLongPointable.FACTORY) },

-					new IBinaryHashFunctionFamily[] { new VLongBinaryHashFunctionFamily() },

-					// new IBinaryHashFunctionFamily[]

-					// {MurmurHash3BinaryHashFunctionFamily.INSTANCE},

-					hashfuncStartLevel,

-					new VLongNormalizedKeyComputerFactory(),

-					new DistributedMergeLmerAggregateFactory(),

-					new DistributedMergeLmerAggregateFactory(), outputRec, true);

-		}

+        //PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec, "G:\\data\\result");

+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

 

-		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-		// single_grouper, NC1_ID);

-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-				single_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

+        spec.connect(printConn, cross_grouper, 0, printer, 0);

+        //spec.connect(readfileConn, scan, 0, printer, 0);

 

-		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(

-				spec);

-		spec.connect(readfileConn, scan, 0, single_grouper, 0);

+        spec.addRoot(printer);

 

-		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-		// cross_grouper,NC1_ID);

-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-				cross_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);

-		spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

+        if( 1 == type ){

+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());

+        }

+        // System.out.println(spec.toString());

+        return spec;

+    }

 

-		// PrinterOperatorDescriptor printer = new

-		// PrinterOperatorDescriptor(spec, "G:\\data\\result");

-		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

-		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,

-				NC1_ID, NC2_ID, NC3_ID, NC4_ID);

-		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-		// printer, NC1_ID);

 

-		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

-		spec.connect(printConn, cross_grouper, 0, printer, 0);

-		// spec.connect(readfileConn, scan, 0, printer, 0);

+    static class JoinComparatorFactory implements ITuplePairComparatorFactory {

+        private static final long serialVersionUID = 1L;

 

-		spec.addRoot(printer);

+        private final IBinaryComparatorFactory bFactory;

+        private final int pos0;

+        private final int pos1;

 

-		if (1 == type) {

-			spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());

-		}

-		// System.out.println(spec.toString());

-		return spec;

-	}

+        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {

+            this.bFactory = bFactory;

+            this.pos0 = pos0;

+            this.pos1 = pos1;

+        }

 

-	static class JoinComparatorFactory implements ITuplePairComparatorFactory {

-		private static final long serialVersionUID = 1L;

+        @Override

+        public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {

+            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);

+        }

+    }

 

-		private final IBinaryComparatorFactory bFactory;

-		private final int pos0;

-		private final int pos1;

+    static class JoinComparator implements ITuplePairComparator {

 

-		public JoinComparatorFactory(IBinaryComparatorFactory bFactory,

-				int pos0, int pos1) {

-			this.bFactory = bFactory;

-			this.pos0 = pos0;

-			this.pos1 = pos1;

-		}

+        private final IBinaryComparator bComparator;

+        private final int field0;

+        private final int field1;

 

-		@Override

-		public ITuplePairComparator createTuplePairComparator(

-				IHyracksTaskContext ctx) {

-			return new JoinComparator(bFactory.createBinaryComparator(), pos0,

-					pos1);

-		}

-	}

+        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {

+            this.bComparator = bComparator;

+            this.field0 = field0;

+            this.field1 = field1;

+        }

 

-	static class JoinComparator implements ITuplePairComparator {

+        @Override

+        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {

+            int tStart0 = accessor0.getTupleStartOffset(tIndex0);

+            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;

 

-		private final IBinaryComparator bComparator;

-		private final int field0;

-		private final int field1;

+            int tStart1 = accessor1.getTupleStartOffset(tIndex1);

+            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;

 

-		public JoinComparator(IBinaryComparator bComparator, int field0,

-				int field1) {

-			this.bComparator = bComparator;

-			this.field0 = field0;

-			this.field1 = field1;

-		}

+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);

+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);

+            int fLen0 = fEnd0 - fStart0;

 

-		@Override

-		public int compare(IFrameTupleAccessor accessor0, int tIndex0,

-				IFrameTupleAccessor accessor1, int tIndex1) {

-			int tStart0 = accessor0.getTupleStartOffset(tIndex0);

-			int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;

+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);

+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);

+            int fLen1 = fEnd1 - fStart1;

 

-			int tStart1 = accessor1.getTupleStartOffset(tIndex1);

-			int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;

-

-			int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);

-			int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);

-			int fLen0 = fEnd0 - fStart0;

-

-			int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);

-			int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);

-			int fLen1 = fEnd1 - fStart1;

-

-			int c = bComparator.compare(accessor0.getBuffer().array(), fStart0

-					+ fStartOffset0, fLen0, accessor1.getBuffer().array(),

-					fStart1 + fStartOffset1, fLen1);

-			if (c != 0) {

-				return c;

-			}

-			return 0;

-		}

-	}

+            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1

+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);

+            if (c != 0) {

+                return c;

+            }

+            return 0;

+        }

+    }

 

 }