git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2713 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index eeeecea..23822a5 100644
--- a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -1,6 +1,13 @@
 package edu.uci.ics.genomix.dataflow;

 

+import java.io.BufferedReader;

+import java.io.BufferedWriter;

 import java.io.File;

+import java.io.FileInputStream;

+import java.io.FileOutputStream;

+import java.io.InputStream;

+import java.io.InputStreamReader;

+import java.io.OutputStreamWriter;

 import java.util.logging.Level;

 import java.util.logging.Logger;

 

@@ -18,11 +25,16 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;

 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;

+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;

+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;

 import edu.uci.ics.hyracks.api.job.JobId;

 import edu.uci.ics.hyracks.api.job.JobSpecification;

 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;

@@ -31,14 +43,21 @@
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;

 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;

 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;

+import edu.uci.ics.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;

 import edu.uci.ics.hyracks.data.std.primitive.LongPointable;

+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;

 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;

+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;

+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

 import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;

+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;

 

 public class Tester {

@@ -46,10 +65,14 @@
     private static final Logger LOGGER = Logger.getLogger(Tester.class.getName());

     public static final String NC1_ID = "nc1";

     public static final String NC2_ID = "nc2";

+    public static final String NC3_ID = "nc3";

+    public static final String NC4_ID = "nc4";

 

     private static ClusterControllerService cc;

     private static NodeControllerService nc1;

     private static NodeControllerService nc2;

+    private static NodeControllerService nc3;

+    private static NodeControllerService nc4;

     private static IHyracksClientConnection hcc;

 

     //private static final boolean DEBUG = true;

@@ -78,29 +101,28 @@
         String file_name = args[0];

         k = Integer.parseInt(args[1]);

         page_num = Integer.parseInt(args[2]);

+        int type = Integer.parseInt(args[3]);

 

-        JobSpecification job = createJob(file_name, k, page_num);

+        JobSpecification job = createJob(file_name, k, page_num, type);

 

         long start = System.currentTimeMillis();

         JobId jobId = hcc.startJob("test", job);

         hcc.waitForCompletion(jobId);

         long end = System.currentTimeMillis();

         System.err.println(start + " " + end + " " + (end - start));

-    }

+        

+        FileOutputStream filenames;

 

-    //    private static FileSplit[] parseFileSplits(String fileSplits) {

-    //        String[] splits = fileSplits.split(",");

-    //        FileSplit[] fSplits = new FileSplit[splits.length];

-    //        for (int i = 0; i < splits.length; ++i) {

-    //            String s = splits[i].trim();

-    //            int idx = s.indexOf(':');

-    //            if (idx < 0) {

-    //                throw new IllegalArgumentException("File split " + s + " not well formed");

-    //            }

-    //            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));

-    //        }

-    //        return fSplits;

-    //    }

+        String s = "g:\\data\\results.txt" ;

+

+        filenames = new FileOutputStream(s);

+        // filenames = new FileInputStream("filename.txt");

+

+        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(filenames));

+        writer.write((int) (end-start));

+        writer.close();

+        

+    }

 

     public static void init() throws Exception {

         CCConfig ccConfig = new CCConfig();

@@ -135,6 +157,24 @@
         ncConfig2.nodeId = NC2_ID;

         nc2 = new NodeControllerService(ncConfig2);

         nc2.start();

+        

+        NCConfig ncConfig3 = new NCConfig();

+        ncConfig3.ccHost = "localhost";

+        ncConfig3.ccPort = 39001;

+        ncConfig3.clusterNetIPAddress = "127.0.0.1";

+        ncConfig3.dataIPAddress = "127.0.0.1";

+        ncConfig3.nodeId = NC3_ID;

+        nc3 = new NodeControllerService(ncConfig3);

+        nc3.start();

+        

+        NCConfig ncConfig4 = new NCConfig();

+        ncConfig4.ccHost = "localhost";

+        ncConfig4.ccPort = 39001;

+        ncConfig4.clusterNetIPAddress = "127.0.0.1";

+        ncConfig4.dataIPAddress = "127.0.0.1";

+        ncConfig4.nodeId = NC4_ID;

+        nc4 = new NodeControllerService(ncConfig4);

+        nc4.start();

 

         hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);

         hcc.createApplication("test", null);

@@ -143,263 +183,132 @@
         }

     }

 

-    private static JobSpecification createJob(String filename, int k, int page_num) throws HyracksDataException {

+    private static JobSpecification createJob(String filename, int k, int page_num, int type) throws HyracksDataException {

         JobSpecification spec = new JobSpecification();

 

-        /*

-         * IFileSplitProvider custSplitsProvider = new

-         * ConstantFileSplitProvider(customerSplits);

-         * //ConstantFileSplitProvider is the operator to provide the tuples

-         * RecordDescriptor custDesc = new RecordDescriptor(new

-         * ISerializerDeserializer[] {

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE });

-         * 

-         * IFileSplitProvider ordersSplitsProvider = new

-         * ConstantFileSplitProvider(orderSplits); RecordDescriptor ordersDesc =

-         * new RecordDescriptor(new ISerializerDeserializer[] {

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE });

-         * 

-         * RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new

-         * ISerializerDeserializer[] {

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * UTF8StringSerializerDeserializer.INSTANCE });

-         * 

-         * FileScanOperatorDescriptor ordScanner = new

-         * FileScanOperatorDescriptor(spec, ordersSplitsProvider, new

-         * DelimitedDataTupleParserFactory(new IValueParserFactory[] {

-         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

-         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

-         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

-         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

-         * UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);

-         * createPartitionConstraint(spec, ordScanner, orderSplits);

-         * 

-         * FileScanOperatorDescriptor custScanner = new

-         * FileScanOperatorDescriptor(spec, custSplitsProvider, new

-         * DelimitedDataTupleParserFactory(new IValueParserFactory[] {

-         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

-         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

-         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,

-         * UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE },

-         * '|'), custDesc); createPartitionConstraint(spec, custScanner,

-         * customerSplits);

-         * 

-         * IOperatorDescriptor join;

-         * 

-         * if ("nestedloop".equalsIgnoreCase(algo)) { join = new

-         * NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(

-         * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0,

-         * 1), custOrderJoinDesc, memSize);

-         * 

-         * } else if ("gracehash".equalsIgnoreCase(algo)) { join = new

-         * GraceHashJoinOperatorDescriptor( spec, memSize, graceInputSize,

-         * graceRecordsPerFrame, graceFactor, new int[] { 0 }, new int[] { 1 },

-         * new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-         * .of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] {

-         * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },

-         * custOrderJoinDesc);

-         * 

-         * } else if ("hybridhash".equalsIgnoreCase(algo)) { join = new

-         * HybridHashJoinOperatorDescriptor( spec, memSize, graceInputSize,

-         * graceRecordsPerFrame, graceFactor, new int[] { 0 }, new int[] { 1 },

-         * new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-         * .of(UTF8StringPointable.FACTORY) }, new IBinaryComparatorFactory[] {

-         * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },

-         * custOrderJoinDesc);

-         * 

-         * } else { join = new InMemoryHashJoinOperatorDescriptor( spec, new

-         * int[] { 0 }, new int[] { 1 }, new IBinaryHashFunctionFactory[] {

-         * PointableBinaryHashFunctionFactory .of(UTF8StringPointable.FACTORY)

-         * }, new IBinaryComparatorFactory[] {

-         * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },

-         * custOrderJoinDesc, 6000000); }

-         * 

-         * PartitionConstraintHelper.addPartitionCountConstraint(spec, join,

-         * numJoinPartitions);

-         * 

-         * IConnectorDescriptor ordJoinConn = new

-         * MToNPartitioningConnectorDescriptor(spec, new

-         * FieldHashPartitionComputerFactory(new int[] { 1 }, new

-         * IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-         * .of(UTF8StringPointable.FACTORY) })); spec.connect(ordJoinConn,

-         * ordScanner, 0, join, 1);

-         * 

-         * IConnectorDescriptor custJoinConn = new

-         * MToNPartitioningConnectorDescriptor(spec, new

-         * FieldHashPartitionComputerFactory(new int[] { 0 }, new

-         * IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-         * .of(UTF8StringPointable.FACTORY) })); spec.connect(custJoinConn,

-         * custScanner, 0, join, 0);

-         * 

-         * IOperatorDescriptor endingOp = join;

-         * 

-         * if (hasGroupBy) {

-         * 

-         * RecordDescriptor groupResultDesc = new RecordDescriptor(new

-         * ISerializerDeserializer[] {

-         * UTF8StringSerializerDeserializer.INSTANCE,

-         * IntegerSerializerDeserializer.INSTANCE });

-         * 

-         * HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(

-         * spec, new int[] { 6 }, new FieldHashPartitionComputerFactory(new

-         * int[] { 6 }, new IBinaryHashFunctionFactory[] {

-         * PointableBinaryHashFunctionFactory .of(UTF8StringPointable.FACTORY)

-         * }), new IBinaryComparatorFactory[] {

-         * PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },

-         * new MultiFieldsAggregatorFactory( new

-         * IFieldAggregateDescriptorFactory[] { new

-         * CountFieldAggregatorFactory(true) }), groupResultDesc, 16);

-         * createPartitionConstraint(spec, gby, resultSplits);

-         * 

-         * IConnectorDescriptor joinGroupConn = new

-         * MToNPartitioningConnectorDescriptor(spec, new

-         * FieldHashPartitionComputerFactory(new int[] { 6 }, new

-         * IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-         * .of(UTF8StringPointable.FACTORY) })); spec.connect(joinGroupConn,

-         * join, 0, gby, 0);

-         * 

-         * endingOp = gby; }

-         */

-

-        // IOperatorDescriptor printer = DEBUG ? new

-        // PrinterOperatorDescriptor(spec)

-        // : new NullSinkOperatorDescriptor(spec);

-        // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-        // printer, NC1_ID);

-

-        spec.setFrameSize(128);

+        spec.setFrameSize(32768);

 

         FileScanDescriptor scan = new FileScanDescriptor(spec, k);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);

 

         RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {

                 Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

                 IntegerSerializerDeserializer.INSTANCE });

 

         int[] keyFields = new int[] { 0 };

-        int frameLimits = 10;

-        int tableSize = 128;

+        int frameLimits = 4096;

+        int tableSize = 10485767;

 

-        ExternalGroupOperatorDescriptor single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

-                frameLimits,

-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

-                // new IntSumFieldAggregatorFactory(1, false) }),

+        AbstractOperatorDescriptor single_grouper;

+        IConnectorDescriptor conn_partition;

+        AbstractOperatorDescriptor cross_grouper;

+        

 

-                new MergeKmerAggregateFactory(),

-                // new IntSumFieldAggregatorFactory(1, false) }),

-                outputRec, new HashSpillableTableFactory(

-                        new FieldHashPartitionComputerFactory(keyFields,

-                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                        .of(LongPointable.FACTORY) }), tableSize), true);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID);

+        

+        if(0 == type){//external group by

+            single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

+                    frameLimits,

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

+                    new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

+                    // new IntSumFieldAggregatorFactory(1, false) }),

 

+                    new DistributedMergeLmerAggregateFactory(),

+                    // new IntSumFieldAggregatorFactory(1, false) }),

+                    outputRec, new HashSpillableTableFactory(

+                            new FieldHashPartitionComputerFactory(keyFields,

+                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+                                            .of(LongPointable.FACTORY) }), tableSize), true);

+        

+            conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+                    new KmerHashPartitioncomputerFactory());

+            cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

+                    frameLimits,

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

+                    new Integer64NormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),

+                    // new IntSumFieldAggregatorFactory(1, false) }),

+

+                    new DistributedMergeLmerAggregateFactory(),

+                    // new IntSumFieldAggregatorFactory(1, false) }),

+                    outputRec, new HashSpillableTableFactory(

+                            new FieldHashPartitionComputerFactory(keyFields,

+                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+                                            .of(LongPointable.FACTORY) }), tableSize), true);

+        }

+        else if( 1 == type){

+            single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

+                    frameLimits,

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

+                    new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

+                    // new IntSumFieldAggregatorFactory(1, false) }),

+

+                    new DistributedMergeLmerAggregateFactory(),

+                    // new IntSumFieldAggregatorFactory(1, false) }),

+                    outputRec, new HashSpillableTableFactory(

+                            new FieldHashPartitionComputerFactory(keyFields,

+                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+                                            .of(LongPointable.FACTORY) }), tableSize), true);

+            conn_partition = new  MToNPartitioningMergingConnectorDescriptor(spec, new KmerHashPartitioncomputerFactory(), 

+                  keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY)} );

+            cross_grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields, 

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },  new DistributedMergeLmerAggregateFactory(), 

+                    outputRec);

+        }

+        else{

+            long inputSizeInRawRecords = 32768;

+            long inputSizeInUniqueKeys = 32768;

+            int recordSizeInBytes = 9;

+            int hashfuncStartLevel = 1;

+            single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

+                    frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

+                    new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }, 

+                    hashfuncStartLevel, 

+                    new Integer64NormalizedKeyComputerFactory(),

+                    new DistributedMergeLmerAggregateFactory(),

+                    new DistributedMergeLmerAggregateFactory(),

+                    outputRec);

+            conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+                    new KmerHashPartitioncomputerFactory());

+            recordSizeInBytes = 13;

+            cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

+                    frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

+                    new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }, 

+                    hashfuncStartLevel, 

+                    new Integer64NormalizedKeyComputerFactory(),

+                    new DistributedMergeLmerAggregateFactory(),

+                    new DistributedMergeLmerAggregateFactory(),

+                    outputRec);

+            

+        }

+        

+        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+        

         IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);

         spec.connect(readfileConn, scan, 0, single_grouper, 0);

 

-        IConnectorDescriptor conn_partition = new MToNPartitioningConnectorDescriptor(spec,

-                new KmerHashPartitioncomputerFactory());

-

-        ExternalGroupOperatorDescriptor cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

-                frameLimits,

-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                new Integer64NormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),

-                // new IntSumFieldAggregatorFactory(1, false) }),

-

-                new DistributedMergeLmerAggregateFactory(),

-                // new IntSumFieldAggregatorFactory(1, false) }),

-                outputRec, new HashSpillableTableFactory(

-                        new FieldHashPartitionComputerFactory(keyFields,

-                                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                        .of(LongPointable.FACTORY) }), tableSize), true);

-

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID);

+        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

         spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

 

         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID);

+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

+        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

 

         IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

         spec.connect(printConn, cross_grouper, 0, printer, 0);

 

-        //spec.connect(printConn, cross_grouper, 0, printer, 0);

-

-        /*

-         * GenKmerDescriptor kmerGen = new GenKmerDescriptor(spec, page_num, k);

-         * PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-         * kmerGen, NC1_ID);

-         * 

-         * IConnectorDescriptor readfileConn = new

-         * OneToOneConnectorDescriptor(spec); spec.connect(readfileConn, scan,

-         * 0, kmerGen, 0);

-         * 

-         * PrinterOperatorDescriptor printer = new

-         * PrinterOperatorDescriptor(spec);

-         * PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

-         * printer, NC1_ID);

-         * 

-         * IConnectorDescriptor printConn = new

-         * OneToOneConnectorDescriptor(spec); //spec.connect(printConn, scan, 0,

-         * printer, 0);

-         * 

-         * //IConnectorDescriptor printConn = new

-         * OneToOneConnectorDescriptor(spec); spec.connect(printConn, kmerGen,

-         * 0, printer, 0);

-         * 

-         * //IFileSplitProvider outSplitProvider = new

-         * ConstantFileSplitProvider(resultSplits);

-         * //FrameFileWriterOperatorDescriptor writer = new

-         * FrameFileWriterOperatorDescriptor(spec, outSplitProvider);

-         * //createPartitionConstraint(spec, writer, resultSplits);

-         * 

-         * //IConnectorDescriptor endingPrinterConn = new

-         * OneToOneConnectorDescriptor(spec); //spec.connect(endingPrinterConn,

-         * endingOp, 0, writer, 0);

-         */

-

         spec.addRoot(printer);

 

+        if( 1 == type ){

+            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());

+        }

         // System.out.println(spec.toString());

         return spec;

     }

 

-    //    private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {

-    //        String[] parts = new String[splits.length];

-    //        for (int i = 0; i < splits.length; ++i) {

-    //            parts[i] = splits[i].getNodeName();

-    //        }

-    //        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);

-    //    }

 

     static class JoinComparatorFactory implements ITuplePairComparatorFactory {

         private static final long serialVersionUID = 1L;

diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 62320c5..1cd98fb 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -139,7 +139,7 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("open(" + pid + " by " + taId);
         }
-        fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+        fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString().replace(':', '_'));
         handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
         size = 0;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 47a8616..0af2dd8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -48,10 +48,10 @@
         @Override
         public void writeData(Object[] data) throws HyracksDataException {
             for (int i = 0; i < data.length; ++i) {
-                System.err.print(StringSerializationUtils.toString(data[i]));
-                System.err.print(", ");
+            //    System.err.print(StringSerializationUtils.toString(data[i]));
+            //    System.err.print(", ");
             }
-            System.err.println();
+            //System.err.println();
         }
 
         @Override