fix hash spillable table factory: the writer is openned twice

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2721 123451ca-8445-de46-9d55-352943316053
diff --git a/genomix/src/main/java/edu/uci/ics/genomix/core/jobgen/clusterconfig/ClusterConfig.java b/genomix/src/main/java/edu/uci/ics/genomix/core/jobgen/clusterconfig/ClusterConfig.java
deleted file mode 100644
index 50060d2..0000000
--- a/genomix/src/main/java/edu/uci/ics/genomix/core/jobgen/clusterconfig/ClusterConfig.java
+++ /dev/null
@@ -1,207 +0,0 @@
-package edu.uci.ics.genomix.core.jobgen.clusterconfig;

-

-import java.io.File;

-import java.io.FileInputStream;

-import java.io.IOException;

-import java.net.InetAddress;

-import java.util.ArrayList;

-import java.util.Arrays;

-import java.util.Collections;

-import java.util.HashMap;

-import java.util.List;

-import java.util.Map;

-import java.util.Properties;

-import java.util.Random;

-

-import org.apache.hadoop.mapreduce.InputSplit;

-

-import edu.uci.ics.hyracks.api.client.HyracksConnection;

-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;

-import edu.uci.ics.hyracks.api.client.NodeControllerInfo;

-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;

-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.api.exceptions.HyracksException;

-import edu.uci.ics.hyracks.api.job.JobSpecification;

-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;

-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;

-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;

-

-public class ClusterConfig {

-

-    private static String[] NCs;

-    private static String storePropertiesPath = "conf/stores.properties";

-    private static String clusterPropertiesPath = "conf/cluster.properties";

-    private static Properties clusterProperties = new Properties();

-    private static Map<String, List<String>> ipToNcMapping;

-    private static String[] stores;

-

-    /**

-     * let tests set config path to be whatever

-     * 

-     * @param propertiesPath

-     *            stores properties file path

-     */

-    public static void setStorePath(String storePropertiesPath) throws HyracksException {

-        ClusterConfig.storePropertiesPath = storePropertiesPath;

-    }

-

-    public static void setClusterPropertiesPath(String clusterPropertiesPath) throws HyracksException {

-        ClusterConfig.clusterPropertiesPath = clusterPropertiesPath;

-    }

-

-    /**

-     * get NC names running on one IP address

-     * 

-     * @param ipAddress

-     * @return

-     * @throws HyracksDataException

-     */

-    public static List<String> getNCNames(String ipAddress) throws HyracksException {

-        return ipToNcMapping.get(ipAddress);

-    }

-

-    /**

-     * get file split provider

-     * 

-     * @param jobId

-     * @return

-     * @throws HyracksDataException

-     */

-    public static IFileSplitProvider getFileSplitProvider(String jobId, String indexName) throws HyracksException {

-        FileSplit[] fileSplits = new FileSplit[stores.length * NCs.length];

-        int i = 0;

-        for (String nc : NCs) {

-            for (String st : stores) {

-                FileSplit split = new FileSplit(nc, st + File.separator + nc + "-data" + File.separator + jobId

-                        + File.separator + indexName);

-                fileSplits[i++] = split;

-            }

-        }

-        return new ConstantFileSplitProvider(fileSplits);

-    }

-

-    private static void loadStores() throws HyracksException {

-        Properties properties = new Properties();

-        try {

-            properties.load(new FileInputStream(storePropertiesPath));

-        } catch (IOException e) {

-            throw new HyracksDataException(e);

-        }

-        String store = properties.getProperty("store");

-        stores = store.split(",");

-    }

-

-    private static void loadClusterProperties() throws HyracksException {

-        try {

-            clusterProperties.load(new FileInputStream(clusterPropertiesPath));

-        } catch (IOException e) {

-            throw new HyracksDataException(e);

-        }

-    }

-

-    public static int getFrameSize() {

-        return Integer.parseInt(clusterProperties.getProperty("FRAME_SIZE"));

-    }

-

-    /**

-     * set location constraint

-     * 

-     * @param spec

-     * @param operator

-     * @throws HyracksDataException

-     */

-    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator,

-            List<InputSplit> splits) throws HyracksException {

-        int count = splits.size();

-        String[] locations = new String[splits.size()];

-        Random random = new Random(System.currentTimeMillis());

-        for (int i = 0; i < splits.size(); i++) {

-            try {

-                String[] loc = splits.get(i).getLocations();

-                Collections.shuffle(Arrays.asList(loc), random);

-                if (loc.length > 0) {

-                    InetAddress[] allIps = InetAddress.getAllByName(loc[0]);

-                    for (InetAddress ip : allIps) {

-                        if (ipToNcMapping.get(ip.getHostAddress()) != null) {

-                            List<String> ncs = ipToNcMapping.get(ip.getHostAddress());

-                            int pos = random.nextInt(ncs.size());

-                            locations[i] = ncs.get(pos);

-                        } else {

-                            int pos = random.nextInt(NCs.length);

-                            locations[i] = NCs[pos];

-                        }

-                    }

-                } else {

-                    int pos = random.nextInt(NCs.length);

-                    locations[i] = NCs[pos];

-                }

-            } catch (IOException e) {

-                throw new HyracksException(e);

-            } catch (InterruptedException e) {

-                throw new HyracksException(e);

-            }

-        }

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);

-        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);

-    }

-

-    /**

-     * set location constraint

-     * 

-     * @param spec

-     * @param operator

-     * @throws HyracksDataException

-     */

-    public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)

-            throws HyracksException {

-        int count = 0;

-        String[] locations = new String[NCs.length * stores.length];

-        for (String nc : NCs) {

-            for (int i = 0; i < stores.length; i++) {

-                locations[count] = nc;

-                count++;

-            }

-        }

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);

-    }

-

-    /**

-     * set location constraint

-     * 

-     * @param spec

-     * @param operator

-     * @throws HyracksDataException

-     */

-    public static void setCountConstraint(JobSpecification spec, IOperatorDescriptor operator) throws HyracksException {

-        int count = NCs.length * stores.length;

-        PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);

-    }

-

-    public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {

-        try {

-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);

-            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();

-            NCs = new String[ncNameToNcInfos.size()];

-            ipToNcMapping = new HashMap<String, List<String>>();

-            int i = 0;

-            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {

-                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())

-                        .getHostAddress();

-                List<String> matchedNCs = ipToNcMapping.get(ipAddr);

-                if (matchedNCs == null) {

-                    matchedNCs = new ArrayList<String>();

-                    ipToNcMapping.put(ipAddr, matchedNCs);

-                }

-                matchedNCs.add(entry.getKey());

-                NCs[i] = entry.getKey();

-                i++;

-            }

-        } catch (Exception e) {

-            throw new IllegalStateException(e);

-        }

-

-        loadClusterProperties();

-        loadStores();

-    }

-}

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java b/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
deleted file mode 100644
index dd4ca25..0000000
--- a/genomix/src/main/java/edu/uci/ics/genomix/data/std/accessors/MurmurHash3BinaryHashFunctionFamily.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package edu.uci.ics.genomix.data.std.accessors;

-

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;

-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

-

-public class MurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily {

-    private static final long serialVersionUID = 1L;

-

-    private static final int C1 = 0xcc9e2d51;

-    private static final int C2 = 0x1b873593;

-    private static final int C3 = 5;

-    private static final int C4 = 0xe6546b64;

-    private static final int C5 = 0x85ebca6b;

-    private static final int C6 = 0xc2b2ae35;

-

-    @Override

-    public IBinaryHashFunction createBinaryHashFunction(final int seed) {

-        return new IBinaryHashFunction() {

-            @Override

-            public int hash(byte[] bytes, int offset, int length) {

-                int h = seed;

-                int p = offset;

-                int remain = length;

-                while (remain > 4) {

-                    int k = ((int) bytes[p]) | (((int) bytes[p + 1]) << 8) | (((int) bytes[p + 2]) << 16)

-                            | (((int) bytes[p + 3]) << 24);

-                    k *= C1;

-                    k = Integer.rotateLeft(k, 15);

-                    k *= C2;

-                    h ^= k;

-                    h = Integer.rotateLeft(h, 13);

-                    h = h * C3 + C4;

-                    p += 4;

-                    remain -= 4;

-                }

-                int k = 0;

-                switch (remain) {

-                    case 3:

-                        k = bytes[p++];

-                    case 2:

-                        k = (k << 8) | bytes[p++];

-                    case 1:

-                        k = (k << 8) | bytes[p++];

-                        k *= C1;

-                        k = Integer.rotateLeft(k, 15);

-                        k *= C2;

-                        h ^= k;

-                        h = Integer.rotateLeft(h, 13);

-                        h = h * C3 + C4;

-                }

-                h ^= length;

-                h ^= (h >>> 16);

-                h *= C5;

-                h ^= (h >>> 13);

-                h *= C6;

-                h ^= (h >>> 16);

-                return h;

-            }

-        };

-    }

-}

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
index e7a8e6c..493ae1e 100644
--- a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/FileScanDescriptor.java
@@ -73,7 +73,7 @@
                     String s = "G:" + File.separator + "data"

                     		+ File.separator + "filename.txt";*/

 

-                    String s = "g:\\data\\filename" + String.valueOf(temp) + ".txt";

+                    String s = "/Users/yingyib/data/filename" + String.valueOf(temp) + ".txt";

 

                     filenames = new FileInputStream(s);

                     // filenames = new FileInputStream("filename.txt");

diff --git a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
index e6a6ebb..067452c 100644
--- a/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
+++ b/genomix/src/main/java/edu/uci/ics/genomix/dataflow/Tester.java
@@ -1,12 +1,8 @@
 package edu.uci.ics.genomix.dataflow;

 

-import java.io.BufferedReader;

 import java.io.BufferedWriter;

 import java.io.File;

-import java.io.FileInputStream;

 import java.io.FileOutputStream;

-import java.io.InputStream;

-import java.io.InputStreamReader;

 import java.io.OutputStreamWriter;

 import java.util.logging.Level;

 import java.util.logging.Logger;

@@ -14,7 +10,7 @@
 import edu.uci.ics.genomix.data.normalizers.Integer64NormalizedKeyComputerFactory;

 import edu.uci.ics.genomix.data.partition.KmerHashPartitioncomputerFactory;

 import edu.uci.ics.genomix.data.serde.ByteSerializerDeserializer;

-import edu.uci.ics.genomix.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;

+import edu.uci.ics.genomix.data.std.accessors.LongBinaryHashFunctionFamily;

 import edu.uci.ics.genomix.dataflow.aggregators.DistributedMergeLmerAggregateFactory;

 import edu.uci.ics.genomix.dataflow.aggregators.MergeKmerAggregateFactory;

 import edu.uci.ics.hyracks.api.client.HyracksConnection;

@@ -27,15 +23,11 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;

-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;

 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;

-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;

 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;

-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;

 import edu.uci.ics.hyracks.api.job.JobId;

 import edu.uci.ics.hyracks.api.job.JobSpecification;

 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;

@@ -44,9 +36,7 @@
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;

 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;

 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;

-import edu.uci.ics.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;

 import edu.uci.ics.hyracks.data.std.primitive.LongPointable;

-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;

 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;

 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;

 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;

@@ -55,7 +45,6 @@
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;

-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;

 import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.hybridhash.HybridHashGroupOperatorDescriptor;

 import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;

@@ -63,307 +52,367 @@
 

 public class Tester {

 

-    private static final Logger LOGGER = Logger.getLogger(Tester.class.getName());

-    public static final String NC1_ID = "nc1";

-    public static final String NC2_ID = "nc2";

-    public static final String NC3_ID = "nc3";

-    public static final String NC4_ID = "nc4";

+	private static final Logger LOGGER = Logger.getLogger(Tester.class

+			.getName());

+	public static final String NC1_ID = "nc1";

+	public static final String NC2_ID = "nc2";

+	public static final String NC3_ID = "nc3";

+	public static final String NC4_ID = "nc4";

 

-    private static ClusterControllerService cc;

-    private static NodeControllerService nc1;

-    private static NodeControllerService nc2;

-    private static NodeControllerService nc3;

-    private static NodeControllerService nc4;

-    private static IHyracksClientConnection hcc;

+	private static ClusterControllerService cc;

+	private static NodeControllerService nc1;

+	private static NodeControllerService nc2;

+	private static NodeControllerService nc3;

+	private static NodeControllerService nc4;

+	private static IHyracksClientConnection hcc;

 

-    //private static final boolean DEBUG = true;

+	// private static final boolean DEBUG = true;

 

-    public static void main(String[] args) throws Exception {

+	public static void main(String[] args) throws Exception {

 

-        LOGGER.setLevel(Level.OFF);

+		LOGGER.setLevel(Level.OFF);

 

-        init();

+		init();

 

-        // Options options = new Options();

+		// Options options = new Options();

 

-        IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);

+		IHyracksClientConnection hcc = new HyracksConnection("127.0.0.1", 39000);

 

-        /*

-         * JobSpecification job =

-         * createJob(parseFileSplits(options.inFileCustomerSplits),

-         * parseFileSplits(options.inFileOrderSplits),

-         * parseFileSplits(options.outFileSplits), options.numJoinPartitions,

-         * options.algo, options.graceInputSize, options.graceRecordsPerFrame,

-         * options.graceFactor, options.memSize, options.tableSize,

-         * options.hasGroupBy);

-         */

+		/*

+		 * JobSpecification job =

+		 * createJob(parseFileSplits(options.inFileCustomerSplits),

+		 * parseFileSplits(options.inFileOrderSplits),

+		 * parseFileSplits(options.outFileSplits), options.numJoinPartitions,

+		 * options.algo, options.graceInputSize, options.graceRecordsPerFrame,

+		 * options.graceFactor, options.memSize, options.tableSize,

+		 * options.hasGroupBy);

+		 */

 

-        int k, page_num;

-        String file_name = args[0];

-        k = Integer.parseInt(args[1]);

-        page_num = Integer.parseInt(args[2]);

-        int type = Integer.parseInt(args[3]);

+		int k, page_num;

+		String file_name = args[0];

+		k = Integer.parseInt(args[1]);

+		page_num = Integer.parseInt(args[2]);

+		int type = Integer.parseInt(args[3]);

 

-        JobSpecification job = createJob(file_name, k, page_num, type);

+		JobSpecification job = createJob(file_name, k, page_num, type);

 

-        long start = System.currentTimeMillis();

-        JobId jobId = hcc.startJob("test", job);

-        hcc.waitForCompletion(jobId);

-        long end = System.currentTimeMillis();

-        System.err.println(start + " " + end + " " + (end - start));

-        

-        FileOutputStream filenames;

+		long start = System.currentTimeMillis();

+		JobId jobId = hcc.startJob("test", job);

+		hcc.waitForCompletion(jobId);

+		long end = System.currentTimeMillis();

+		System.err.println(start + " " + end + " " + (end - start));

 

-        String s = "g:\\data\\results.txt" ;

+		FileOutputStream filenames;

 

-        filenames = new FileOutputStream(s);

-        // filenames = new FileInputStream("filename.txt");

+		String s = "/Users/yingyib/data/results.txt";

 

-        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(filenames));

-        writer.write((int) (end-start));

-        writer.close();

-        

-    }

+		filenames = new FileOutputStream(s);

+		// filenames = new FileInputStream("filename.txt");

 

-    public static void init() throws Exception {

-        CCConfig ccConfig = new CCConfig();

-        ccConfig.clientNetIpAddress = "127.0.0.1";

-        ccConfig.clientNetPort = 39000;

-        ccConfig.clusterNetIpAddress = "127.0.0.1";

-        ccConfig.clusterNetPort = 39001;

-        ccConfig.profileDumpPeriod = 10000;

-        File outDir = new File("target/ClusterController");

-        outDir.mkdirs();

-        File ccRoot = File.createTempFile(Tester.class.getName(), ".data", outDir);

-        ccRoot.delete();

-        ccRoot.mkdir();

-        ccConfig.ccRoot = ccRoot.getAbsolutePath();

-        cc = new ClusterControllerService(ccConfig);

-        cc.start();

+		BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(

+				filenames));

+		writer.write((int) (end - start));

+		writer.close();

 

-        NCConfig ncConfig1 = new NCConfig();

-        ncConfig1.ccHost = "localhost";

-        ncConfig1.ccPort = 39001;

-        ncConfig1.clusterNetIPAddress = "127.0.0.1";

-        ncConfig1.dataIPAddress = "127.0.0.1";

-        ncConfig1.nodeId = NC1_ID;

-        nc1 = new NodeControllerService(ncConfig1);

-        nc1.start();

+	}

 

-        NCConfig ncConfig2 = new NCConfig();

-        ncConfig2.ccHost = "localhost";

-        ncConfig2.ccPort = 39001;

-        ncConfig2.clusterNetIPAddress = "127.0.0.1";

-        ncConfig2.dataIPAddress = "127.0.0.1";

-        ncConfig2.nodeId = NC2_ID;

-        nc2 = new NodeControllerService(ncConfig2);

-        nc2.start();

-        

-        NCConfig ncConfig3 = new NCConfig();

-        ncConfig3.ccHost = "localhost";

-        ncConfig3.ccPort = 39001;

-        ncConfig3.clusterNetIPAddress = "127.0.0.1";

-        ncConfig3.dataIPAddress = "127.0.0.1";

-        ncConfig3.nodeId = NC3_ID;

-        nc3 = new NodeControllerService(ncConfig3);

-        nc3.start();

-        

-        NCConfig ncConfig4 = new NCConfig();

-        ncConfig4.ccHost = "localhost";

-        ncConfig4.ccPort = 39001;

-        ncConfig4.clusterNetIPAddress = "127.0.0.1";

-        ncConfig4.dataIPAddress = "127.0.0.1";

-        ncConfig4.nodeId = NC4_ID;

-        nc4 = new NodeControllerService(ncConfig4);

-        nc4.start();

+	public static void init() throws Exception {

+		CCConfig ccConfig = new CCConfig();

+		ccConfig.clientNetIpAddress = "127.0.0.1";

+		ccConfig.clientNetPort = 39000;

+		ccConfig.clusterNetIpAddress = "127.0.0.1";

+		ccConfig.clusterNetPort = 39001;

+		ccConfig.profileDumpPeriod = -1;

+		File outDir = new File("target/ClusterController");

+		outDir.mkdirs();

+		File ccRoot = File.createTempFile(Tester.class.getName(), ".data",

+				outDir);

+		ccRoot.delete();

+		ccRoot.mkdir();

+		ccConfig.ccRoot = ccRoot.getAbsolutePath();

+		cc = new ClusterControllerService(ccConfig);

+		cc.start();

 

-        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);

-        hcc.createApplication("test", null);

-        if (LOGGER.isLoggable(Level.INFO)) {

-            LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());

-        }

-    }

+		NCConfig ncConfig1 = new NCConfig();

+		ncConfig1.ccHost = "localhost";

+		ncConfig1.ccPort = 39001;

+		ncConfig1.clusterNetIPAddress = "127.0.0.1";

+		ncConfig1.dataIPAddress = "127.0.0.1";

+		ncConfig1.nodeId = NC1_ID;

+		ncConfig1.ioDevices = "/tmp/t1";

+		nc1 = new NodeControllerService(ncConfig1);

+		nc1.start();

 

-    private static JobSpecification createJob(String filename, int k, int page_num, int type) throws HyracksDataException {

-        JobSpecification spec = new JobSpecification();

+		NCConfig ncConfig2 = new NCConfig();

+		ncConfig2.ccHost = "localhost";

+		ncConfig2.ccPort = 39001;

+		ncConfig2.clusterNetIPAddress = "127.0.0.1";

+		ncConfig2.dataIPAddress = "127.0.0.1";

+		ncConfig2.nodeId = NC2_ID;

+		ncConfig2.ioDevices = "/tmp/t2";

+		nc2 = new NodeControllerService(ncConfig2);

+		nc2.start();

 

-        spec.setFrameSize(32768);

+		NCConfig ncConfig3 = new NCConfig();

+		ncConfig3.ccHost = "localhost";

+		ncConfig3.ccPort = 39001;

+		ncConfig3.clusterNetIPAddress = "127.0.0.1";

+		ncConfig3.dataIPAddress = "127.0.0.1";

+		ncConfig3.nodeId = NC3_ID;

+		ncConfig3.ioDevices = "/tmp/t3";

+		nc3 = new NodeControllerService(ncConfig3);

+		nc3.start();

 

-        FileScanDescriptor scan = new FileScanDescriptor(spec, k);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan, NC1_ID);

+		NCConfig ncConfig4 = new NCConfig();

+		ncConfig4.ccHost = "localhost";

+		ncConfig4.ccPort = 39001;

+		ncConfig4.clusterNetIPAddress = "127.0.0.1";

+		ncConfig4.dataIPAddress = "127.0.0.1";

+		ncConfig4.nodeId = NC4_ID;

+		ncConfig4.ioDevices = "/tmp/t4";

+		nc4 = new NodeControllerService(ncConfig4);

+		nc4.start();

 

-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {

-                Integer64SerializerDeserializer.INSTANCE, ByteSerializerDeserializer.INSTANCE,

-                IntegerSerializerDeserializer.INSTANCE });

+		hcc = new HyracksConnection(ccConfig.clientNetIpAddress,

+				ccConfig.clientNetPort);

+		hcc.createApplication("test", null);

+		if (LOGGER.isLoggable(Level.INFO)) {

+			LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());

+		}

+	}

 

-        int[] keyFields = new int[] { 0 };

-        int frameLimits = 4096;

-        int tableSize = 10485767;

+	private static JobSpecification createJob(String filename, int k,

+			int page_num, int type) throws HyracksDataException {

+		JobSpecification spec = new JobSpecification();

 

-        AbstractOperatorDescriptor single_grouper;

-        IConnectorDescriptor conn_partition;

-        AbstractOperatorDescriptor cross_grouper;

-        

+		spec.setFrameSize(32768);

 

-        

-        if(0 == type){//external group by

-            single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                    new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

+		FileScanDescriptor scan = new FileScanDescriptor(spec, k);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

+				NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scan,

+		// NC1_ID);

 

-                    new DistributedMergeLmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

-                    outputRec, new HashSpillableTableFactory(

-                            new FieldHashPartitionComputerFactory(keyFields,

-                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                            .of(LongPointable.FACTORY) }), tableSize), true);

-        

-            conn_partition = new MToNPartitioningConnectorDescriptor(spec,

-                    new KmerHashPartitioncomputerFactory());

-            cross_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                    new Integer64NormalizedKeyComputerFactory(), new DistributedMergeLmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

+		RecordDescriptor outputRec = new RecordDescriptor(

+				new ISerializerDeserializer[] {

+						Integer64SerializerDeserializer.INSTANCE,

+						ByteSerializerDeserializer.INSTANCE,

+						IntegerSerializerDeserializer.INSTANCE });

 

-                    new DistributedMergeLmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

-                    outputRec, new HashSpillableTableFactory(

-                            new FieldHashPartitionComputerFactory(keyFields,

-                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                            .of(LongPointable.FACTORY) }), tableSize), true);

-        }

-        else if( 1 == type){

-            single_grouper = new ExternalGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                    new Integer64NormalizedKeyComputerFactory(), new MergeKmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

-                    new DistributedMergeLmerAggregateFactory(),

-                    // new IntSumFieldAggregatorFactory(1, false) }),

-                    outputRec, new HashSpillableTableFactory(

-                            new FieldHashPartitionComputerFactory(keyFields,

-                                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

-                                            .of(LongPointable.FACTORY) }), tableSize), true);

-            conn_partition = new  MToNPartitioningMergingConnectorDescriptor(spec, new KmerHashPartitioncomputerFactory(), 

-                  keyFields, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY)} );

-            cross_grouper = new PreclusteredGroupOperatorDescriptor(spec, keyFields, 

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },  

-                    new DistributedMergeLmerAggregateFactory(), 

-                    outputRec);

-        }

-        else{

-            long inputSizeInRawRecords = 154000000;

-            long inputSizeInUniqueKeys = 38500000;

-            int recordSizeInBytes = 9;

-            int hashfuncStartLevel = 1;

-            single_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                    new IBinaryHashFunctionFamily[] {new MurmurHash3BinaryHashFunctionFamily()}, 

-                    hashfuncStartLevel, 

-                    new Integer64NormalizedKeyComputerFactory(),

-                    new MergeKmerAggregateFactory(),

-                    new DistributedMergeLmerAggregateFactory(),

-                    outputRec, true);

-            conn_partition = new MToNPartitioningConnectorDescriptor(spec,

-                    new KmerHashPartitioncomputerFactory());

-            recordSizeInBytes = 13;

-            cross_grouper = new HybridHashGroupOperatorDescriptor(spec, keyFields,

-                    frameLimits, inputSizeInRawRecords, inputSizeInUniqueKeys, recordSizeInBytes, tableSize,

-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(LongPointable.FACTORY) },

-                    new IBinaryHashFunctionFamily[] {new MurmurHash3BinaryHashFunctionFamily()}, 

-                    hashfuncStartLevel, 

-                    new Integer64NormalizedKeyComputerFactory(),

-                    new DistributedMergeLmerAggregateFactory(),

-                    new DistributedMergeLmerAggregateFactory(),

-                    outputRec, true);            

-        }

-        

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, single_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        

-        IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(spec);

-        spec.connect(readfileConn, scan, 0, single_grouper, 0);

+		int[] keyFields = new int[] { 0 };

+		int frameLimits = 4096;

+		int tableSize = 10485767;

 

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper,NC1_ID);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, cross_grouper, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

+		AbstractOperatorDescriptor single_grouper;

+		IConnectorDescriptor conn_partition;

+		AbstractOperatorDescriptor cross_grouper;

 

-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID, NC2_ID,NC3_ID,NC4_ID);

-        //PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);

+		if (0 == type) {// external group by

+			single_grouper = new ExternalGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(LongPointable.FACTORY) },

+					new Integer64NormalizedKeyComputerFactory(),

+					new MergeKmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

 

-        IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

-        spec.connect(printConn, cross_grouper, 0, printer, 0);

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					outputRec,

+					new HashSpillableTableFactory(

+							new FieldHashPartitionComputerFactory(

+									keyFields,

+									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+											.of(LongPointable.FACTORY) }),

+							tableSize), true);

 

-        spec.addRoot(printer);

+			conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+					new KmerHashPartitioncomputerFactory());

+			cross_grouper = new ExternalGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(LongPointable.FACTORY) },

+					new Integer64NormalizedKeyComputerFactory(),

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

 

-        if( 1 == type ){

-            spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());

-        }

-        // System.out.println(spec.toString());

-        return spec;

-    }

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					outputRec,

+					new HashSpillableTableFactory(

+							new FieldHashPartitionComputerFactory(

+									keyFields,

+									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+											.of(LongPointable.FACTORY) }),

+							tableSize), true);

+		} else if (1 == type) {

+			single_grouper = new ExternalGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(LongPointable.FACTORY) },

+					new Integer64NormalizedKeyComputerFactory(),

+					new MergeKmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					new DistributedMergeLmerAggregateFactory(),

+					// new IntSumFieldAggregatorFactory(1, false) }),

+					outputRec,

+					new HashSpillableTableFactory(

+							new FieldHashPartitionComputerFactory(

+									keyFields,

+									new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory

+											.of(LongPointable.FACTORY) }),

+							tableSize), true);

+			conn_partition = new MToNPartitioningMergingConnectorDescriptor(

+					spec,

+					new KmerHashPartitioncomputerFactory(),

+					keyFields,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(LongPointable.FACTORY) });

+			cross_grouper = new PreclusteredGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(LongPointable.FACTORY) },

+					new DistributedMergeLmerAggregateFactory(), outputRec);

+		} else {

+			long inputSizeInRawRecords = 154000000;

+			long inputSizeInUniqueKeys = 38500000;

+			int recordSizeInBytes = 9;

+			int hashfuncStartLevel = 1;

+			single_grouper = new HybridHashGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					inputSizeInRawRecords,

+					inputSizeInUniqueKeys,

+					recordSizeInBytes,

+					tableSize,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(LongPointable.FACTORY) },

+					new IBinaryHashFunctionFamily[] { new LongBinaryHashFunctionFamily() },

+					hashfuncStartLevel,

+					new Integer64NormalizedKeyComputerFactory(),

+					new MergeKmerAggregateFactory(),

+					new DistributedMergeLmerAggregateFactory(), outputRec, true);

+			conn_partition = new MToNPartitioningConnectorDescriptor(spec,

+					new KmerHashPartitioncomputerFactory());

+			recordSizeInBytes = 13;

+			cross_grouper = new HybridHashGroupOperatorDescriptor(

+					spec,

+					keyFields,

+					frameLimits,

+					inputSizeInRawRecords,

+					inputSizeInUniqueKeys,

+					recordSizeInBytes,

+					tableSize,

+					new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory

+							.of(LongPointable.FACTORY) },

+					new IBinaryHashFunctionFamily[] { new LongBinaryHashFunctionFamily() },

+					hashfuncStartLevel,

+					new Integer64NormalizedKeyComputerFactory(),

+					new DistributedMergeLmerAggregateFactory(),

+					new DistributedMergeLmerAggregateFactory(), outputRec, true);

+		}

 

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		// single_grouper, NC1_ID);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+				single_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);

 

-    static class JoinComparatorFactory implements ITuplePairComparatorFactory {

-        private static final long serialVersionUID = 1L;

+		IConnectorDescriptor readfileConn = new OneToOneConnectorDescriptor(

+				spec);

+		spec.connect(readfileConn, scan, 0, single_grouper, 0);

 

-        private final IBinaryComparatorFactory bFactory;

-        private final int pos0;

-        private final int pos1;

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		// cross_grouper,NC1_ID);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+				cross_grouper, NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+		spec.connect(conn_partition, single_grouper, 0, cross_grouper, 0);

 

-        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {

-            this.bFactory = bFactory;

-            this.pos0 = pos0;

-            this.pos1 = pos1;

-        }

+		PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);

+		PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,

+				NC1_ID, NC2_ID, NC3_ID, NC4_ID);

+		// PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,

+		// printer, NC1_ID);

 

-        @Override

-        public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {

-            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);

-        }

-    }

+		IConnectorDescriptor printConn = new OneToOneConnectorDescriptor(spec);

+		spec.connect(printConn, cross_grouper, 0, printer, 0);

 

-    static class JoinComparator implements ITuplePairComparator {

+		spec.addRoot(printer);

 

-        private final IBinaryComparator bComparator;

-        private final int field0;

-        private final int field1;

+		if (1 == type) {

+			spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());

+		}

+		// System.out.println(spec.toString());

+		return spec;

+	}

 

-        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {

-            this.bComparator = bComparator;

-            this.field0 = field0;

-            this.field1 = field1;

-        }

+	static class JoinComparatorFactory implements ITuplePairComparatorFactory {

+		private static final long serialVersionUID = 1L;

 

-        @Override

-        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {

-            int tStart0 = accessor0.getTupleStartOffset(tIndex0);

-            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;

+		private final IBinaryComparatorFactory bFactory;

+		private final int pos0;

+		private final int pos1;

 

-            int tStart1 = accessor1.getTupleStartOffset(tIndex1);

-            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;

+		public JoinComparatorFactory(IBinaryComparatorFactory bFactory,

+				int pos0, int pos1) {

+			this.bFactory = bFactory;

+			this.pos0 = pos0;

+			this.pos1 = pos1;

+		}

 

-            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);

-            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);

-            int fLen0 = fEnd0 - fStart0;

+		@Override

+		public ITuplePairComparator createTuplePairComparator(

+				IHyracksTaskContext ctx) {

+			return new JoinComparator(bFactory.createBinaryComparator(), pos0,

+					pos1);

+		}

+	}

 

-            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);

-            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);

-            int fLen1 = fEnd1 - fStart1;

+	static class JoinComparator implements ITuplePairComparator {

 

-            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1

-                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);

-            if (c != 0) {

-                return c;

-            }

-            return 0;

-        }

-    }

+		private final IBinaryComparator bComparator;

+		private final int field0;

+		private final int field1;

+

+		public JoinComparator(IBinaryComparator bComparator, int field0,

+				int field1) {

+			this.bComparator = bComparator;

+			this.field0 = field0;

+			this.field1 = field1;

+		}

+

+		@Override

+		public int compare(IFrameTupleAccessor accessor0, int tIndex0,

+				IFrameTupleAccessor accessor1, int tIndex1) {

+			int tStart0 = accessor0.getTupleStartOffset(tIndex0);

+			int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;

+

+			int tStart1 = accessor1.getTupleStartOffset(tIndex1);

+			int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;

+

+			int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);

+			int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);

+			int fLen0 = fEnd0 - fStart0;

+

+			int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);

+			int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);

+			int fLen1 = fEnd1 - fStart1;

+

+			int c = bComparator.compare(accessor0.getBuffer().array(), fStart0

+					+ fStartOffset0, fLen0, accessor1.getBuffer().array(),

+					fStart1 + fStartOffset1, fLen1);

+			if (c != 0) {

+				return c;

+			}

+			return 0;

+		}

+	}

 

 }

diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index f2b56fa..974ef3b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -255,8 +255,7 @@
                 }
 
                 outputAppender.reset(outputFrame, true);
-
-                writer.open();
+                //writer.open();
 
                 if (tPointers == null) {
                     // Not sorted