Implemented the memory-bounded HashGroupby and HashJoin for BigObject

It contains both hash grouby and hash join changes.

The main change is
1. update the ExternalGroupby to Hash-based groupby
2. update the Join operators to use the Buffermanager.

The buffer manager part is moved from the Sort package to upper
level so that it can be shared by all the operators.

Change-Id: I248f3a374fdacad7d57e49cf18d8233745e55460
Reviewed-on: https://asterix-gerrit.ics.uci.edu/398
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index cd45536..a52d21e 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -27,44 +27,79 @@
     <version>0.2.17-SNAPSHOT</version>
   </parent>
 
-  <dependencies>
-  	<dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  		<scope>compile</scope>
-  	</dependency>
-  	<dependency>
-  		<groupId>org.apache.hyracks</groupId>
-  		<artifactId>hyracks-data-std</artifactId>
-  		<version>0.2.17-SNAPSHOT</version>
-  	</dependency>
-  </dependencies>
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>appassembler-maven-plugin</artifactId>
-        <version>1.3</version>
-        <executions>
-          <execution>
-            <configuration>
-              <programs>
-                <program>
-                  <mainClass>org.apache.hyracks.examples.tpch.client.Sort</mainClass>
-                  <name>tpchclient</name>
-                </program>
-              </programs>
-              <repositoryLayout>flat</repositoryLayout>
-              <repositoryName>lib</repositoryName>
-            </configuration>
-            <phase>package</phase>
-            <goals>
-              <goal>assemble</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-dataflow-std</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-data-std</artifactId>
+            <version>0.2.17-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>appassembler-maven-plugin</artifactId>
+                <version>1.3</version>
+                <executions>
+                    <execution>
+                        <id>sort</id>
+                        <configuration>
+                            <programs>
+                                <program>
+                                    <mainClass>org.apache.hyracks.examples.tpch.client.Sort</mainClass>
+                                    <name>sort</name>
+                                </program>
+                            </programs>
+                            <repositoryLayout>flat</repositoryLayout>
+                            <repositoryName>lib</repositoryName>
+                        </configuration>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>assemble</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>group</id>
+                        <configuration>
+                            <programs>
+                                <program>
+                                    <mainClass>org.apache.hyracks.examples.tpch.client.Groupby</mainClass>
+                                    <name>group</name>
+                                </program>
+                            </programs>
+                            <repositoryLayout>flat</repositoryLayout>
+                            <repositoryName>lib</repositoryName>
+                        </configuration>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>assemble</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>join</id>
+                        <configuration>
+                            <programs>
+                                <program>
+                                    <mainClass>org.apache.hyracks.examples.tpch.client.Join</mainClass>
+                                    <name>join</name>
+                                </program>
+                            </programs>
+                            <repositoryLayout>flat</repositoryLayout>
+                            <repositoryName>lib</repositoryName>
+                        </configuration>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>assemble</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java
index ac172fd..15d7f66 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Common.java
@@ -27,8 +27,12 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
 
@@ -56,12 +60,36 @@
             new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
             new UTF8StringSerializerDeserializer() });
 
-    static IValueParserFactory[] orderParserFactories = new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE };
+    static RecordDescriptor lineitemDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
+    static IValueParserFactory[] lineitemParserFactories = new IValueParserFactory[] {
+            IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+            IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+            IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+            FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, };
+
+    static IValueParserFactory[] custParserFactories = new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE };
+    static IValueParserFactory[] orderParserFactories = new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE };
 
     static FileSplit[] parseFileSplits(String fileSplits) {
         String[] splits = fileSplits.split(",");
@@ -77,6 +105,21 @@
         return fSplits;
     }
 
+    static FileSplit[] parseFileSplits(String fileSplits, int count) {
+        String[] splits = fileSplits.split(",");
+        FileSplit[] fSplits = new FileSplit[splits.length];
+        for (int i = 0; i < splits.length; ++i) {
+            String s = splits[i].trim();
+            int idx = s.indexOf(':');
+            if (idx < 0) {
+                throw new IllegalArgumentException("File split " + s + " not well formed");
+            }
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1) + "_"
+                    + count)));
+        }
+        return fSplits;
+    }
+
     static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
         String[] parts = new String[splits.length];
         for (int i = 0; i < splits.length; ++i) {
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
new file mode 100644
index 0000000..2ef5835
--- /dev/null
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.examples.tpch.client;
+
+import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint;
+import static org.apache.hyracks.examples.tpch.client.Common.lineitemDesc;
+import static org.apache.hyracks.examples.tpch.client.Common.lineitemParserFactories;
+import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
+
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+
+/**
+ * The application client for the performance tests of the groupby
+ * operator.
+ */
+public class Groupby {
+    private static class Options {
+        @Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
+        public String host;
+
+        @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1098)")
+        public int port = 1098;
+
+        @Option(name = "-infile-splits", usage = "Comma separated list of file-splits for the input. A file-split is <node-name>:<path>", required = true)
+        public String inFileSplits;
+
+        @Option(name = "-outfile-splits", usage = "Comma separated list of file-splits for the output", required = true)
+        public String outFileSplits;
+
+        @Option(name = "-input-tuples", usage = "Hash table size ", required = true)
+        public int htSize;
+
+        @Option(name = "-input-size", usage = "Physical file size in bytes ", required = true)
+        public long fileSize;
+
+        @Option(name = "-frame-size", usage = "Frame size (default: 32768)", required = false)
+        public int frameSize = 32768;
+
+        @Option(name = "-frame-limit", usage = "memory limit for sorting (default: 4)", required = false)
+        public int frameLimit = 4;
+
+        @Option(name = "-out-plain", usage = "Whether to output plain text (default: true)", required = false)
+        public boolean outPlain = true;
+
+        @Option(name = "-algo", usage = "The algorithm to be used: hash|sort", required = true)
+        public String algo = "hash";
+    }
+
+    public static void main(String[] args) throws Exception {
+        Options options = new Options();
+        CmdLineParser parser = new CmdLineParser(options);
+        if (args.length == 0) {
+            parser.printUsage(System.err);
+            return;
+        }
+        parser.parseArgument(args);
+
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
+
+        JobSpecification job;
+
+        long start = System.currentTimeMillis();
+        job = createJob(parseFileSplits(options.inFileSplits), parseFileSplits(options.outFileSplits), options.htSize,
+                options.fileSize, options.frameLimit, options.frameSize, options.algo, options.outPlain);
+        if (job != null) {
+            System.out.print("CreateJobTime:" + (System.currentTimeMillis() - start));
+            start = System.currentTimeMillis();
+            JobId jobId = hcc.startJob(job);
+            hcc.waitForCompletion(jobId);
+            System.out.println("JobExecuteTime:" + (System.currentTimeMillis() - start));
+        }
+    }
+
+    private static JobSpecification createJob(FileSplit[] inSplits, FileSplit[] outSplits, int htSize, long fileSize,
+            int frameLimit, int frameSize, String alg, boolean outPlain) {
+        JobSpecification spec = new JobSpecification(frameSize);
+        IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
+
+        FileScanOperatorDescriptor fileScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
+                new DelimitedDataTupleParserFactory(lineitemParserFactories, '|'), lineitemDesc);
+
+        createPartitionConstraint(spec, fileScanner, inSplits);
+
+        // Output: each unique string with an integer count
+        RecordDescriptor outDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+                        // IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
+
+        // Specify the grouping key, which will be the string extracted during
+        // the scan.
+        int[] keys = new int[] { 0,
+                // 1
+        };
+
+        AbstractOperatorDescriptor grouper;
+
+        if (alg.equalsIgnoreCase("hash")) {// external hash graph
+            grouper = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize, keys, frameLimit,
+                    new IBinaryComparatorFactory[] {
+                            // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+                            PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+                    new IntegerNormalizedKeyComputerFactory(),
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
+                    new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                            new IntSumFieldAggregatorFactory(keys.length, false) }),
+                    outDesc, outDesc, new HashSpillableTableFactory(
+                            new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }));
+
+            createPartitionConstraint(spec, grouper, outSplits);
+        } else if (alg.equalsIgnoreCase("sort")) {
+            grouper = new SortGroupByOperatorDescriptor(spec, frameLimit, keys, keys,
+                    new IntegerNormalizedKeyComputerFactory(),
+                    new IBinaryComparatorFactory[] {
+                            // PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+                            PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                    new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                            new IntSumFieldAggregatorFactory(keys.length, true) }),
+                    outDesc, outDesc, false);
+
+            createPartitionConstraint(spec, grouper, outSplits);
+        } else {
+            System.err.println("unknow groupby alg:" + alg);
+            return null;
+        }
+        // Connect scanner with the grouper
+        IConnectorDescriptor scanGroupConnDef2 = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keys,
+                        new IBinaryHashFunctionFactory[] {
+                                // PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY),
+                                PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) }));
+        spec.connect(scanGroupConnDef2, fileScanner, 0, grouper, 0);
+
+        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
+
+        AbstractSingleActivityOperatorDescriptor writer;
+
+        if (outPlain)
+            writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
+        else
+            writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+
+        createPartitionConstraint(spec, writer, outSplits);
+
+        IConnectorDescriptor groupOutConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(groupOutConn, grouper, 0, writer, 0);
+
+        spec.addRoot(writer);
+        return spec;
+    }
+
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index fbfc5f5..507e1c7 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -18,26 +18,22 @@
  */
 package org.apache.hyracks.examples.tpch.client;
 
-import static org.apache.hyracks.examples.tpch.client.Common.*;
+import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint;
+import static org.apache.hyracks.examples.tpch.client.Common.custParserFactories;
+import static org.apache.hyracks.examples.tpch.client.Common.orderParserFactories;
+import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
 
 import java.util.EnumSet;
 
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
-import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
@@ -45,11 +41,11 @@
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
@@ -57,16 +53,21 @@
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
 import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
-import org.apache.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
+import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
-import org.apache.hyracks.dataflow.std.group.hash.HashGroupOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
 import org.apache.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
 
 public class Join {
     private static class Options {
@@ -91,23 +92,23 @@
         @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
         public boolean profile = true;
 
-        @Option(name = "-table-size", usage = "Table size for in-memory hash join", required = false)
+        @Option(name = "-table-size", usage = "Table size for in-memory hash join. (default: 8191)", required = false)
         public int tableSize = 8191;
 
-        @Option(name = "-algo", usage = "Join types", required = true)
+        @Option(name = "-algo", usage = "Join types:InMem|NestedLoop|Hybrid|Grace", required = true)
         public String algo;
 
         // For grace/hybrid hash join only
         @Option(name = "-mem-size", usage = "Memory size for hash join", required = true)
         public int memSize;
 
-        @Option(name = "-input-size", usage = "Input size of the grace/hybrid hash join", required = false)
-        public int graceInputSize = 10;
+        @Option(name = "-input-size", usage = "Input size of the hybrid hash join", required = false)
+        public int graceInputSize = 100000;
 
-        @Option(name = "-records-per-frame", usage = "Records per frame for grace/hybrid hash join", required = false)
+        @Option(name = "-records-per-frame", usage = "Records per frame for hybrid hash join", required = false)
         public int graceRecordsPerFrame = 200;
 
-        @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join", required = false)
+        @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join, (default:1.2)", required = false)
         public double graceFactor = 1.2;
 
         // Whether group-by is processed after the join
@@ -121,6 +122,10 @@
     public static void main(String[] args) throws Exception {
         Options options = new Options();
         CmdLineParser parser = new CmdLineParser(options);
+        if (args.length == 0) {
+            parser.printUsage(System.err);
+            return;
+        }
         parser.parseArgument(args);
 
         IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
@@ -129,6 +134,9 @@
                 parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
                 options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
                 options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy, options.frameSize);
+        if (job == null) {
+            return;
+        }
 
         long start = System.currentTimeMillis();
         JobId jobId = hcc.startJob(job,
@@ -141,87 +149,76 @@
     private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
             FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
             double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         JobSpecification spec = new JobSpecification(frameSize);
 
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
+        long custFileSize = 0;
+        for (int i = 0; i < customerSplits.length; i++) {
+            custFileSize += customerSplits[i].getLocalFile().getFile().length();
+        }
 
         IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
+        long orderFileSize = 0;
+        for (int i = 0; i < orderSplits.length; i++) {
+            orderFileSize += orderSplits[i].getLocalFile().getFile().length();
+        }
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), Common.ordersDesc);
+                new DelimitedDataTupleParserFactory(orderParserFactories, '|'), Common.ordersDesc);
         createPartitionConstraint(spec, ordScanner, orderSplits);
 
         FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), Common.custDesc);
+                new DelimitedDataTupleParserFactory(custParserFactories, '|'), Common.custDesc);
         createPartitionConstraint(spec, custScanner, customerSplits);
 
         IOperatorDescriptor join;
 
         if ("nestedloop".equalsIgnoreCase(algo)) {
-            join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
-                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), Common.custOrderJoinDesc,
-                    memSize, false, null);
+            join = new NestedLoopJoinOperatorDescriptor(spec,
+                    new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+                    Common.custOrderJoinDesc, memSize, false, null);
 
-        } else if ("gracehash".equalsIgnoreCase(algo)) {
-            join = new GraceHashJoinOperatorDescriptor(
-                    spec,
-                    memSize,
-                    graceInputSize,
-                    graceRecordsPerFrame,
-                    graceFactor,
-                    new int[] { 0 },
-                    new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                            .of(UTF8StringPointable.FACTORY) },
+        } else if ("inmem".equalsIgnoreCase(algo)) {
+            join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 }, new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] {
+                            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    Common.custOrderJoinDesc, null);
+                    Common.custOrderJoinDesc, tableSize, null);
 
-        } else if ("hybridhash".equalsIgnoreCase(algo)) {
-            join = new HybridHashJoinOperatorDescriptor(
-                    spec,
-                    memSize,
-                    graceInputSize,
-                    graceRecordsPerFrame,
-                    graceFactor,
-                    new int[] { 0 },
-                    new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                            .of(UTF8StringPointable.FACTORY) },
+        } else if ("hybrid".equalsIgnoreCase(algo)) {
+            join = new OptimizedHybridHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceFactor,
+                    new int[] { 0 }, new int[] { 1 },
+                    new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                    Common.custOrderJoinDesc,
+                    new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
+                    new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
+                    null);
+
+        } else if ("grace".equalsIgnoreCase(algo)) {
+            join = new GraceHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceRecordsPerFrame, graceFactor,
+                    new int[] { 0 }, new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] {
+                            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     Common.custOrderJoinDesc, null);
 
         } else {
-            join = new InMemoryHashJoinOperatorDescriptor(
-                    spec,
-                    new int[] { 0 },
-                    new int[] { 1 },
-                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                            .of(UTF8StringPointable.FACTORY) },
-                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    Common.custOrderJoinDesc, 6000000, null);
+            System.err.println("unknown algorithm:" + algo);
+            return null;
         }
 
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
 
         IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
+                new FieldHashPartitionComputerFactory(new int[] { 1 }, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 1);
 
         IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                .of(UTF8StringPointable.FACTORY) }));
+                new FieldHashPartitionComputerFactory(new int[] { 0 }, new IBinaryHashFunctionFactory[] {
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
         spec.connect(custJoinConn, custScanner, 0, join, 0);
 
         IOperatorDescriptor endingOp = join;
@@ -231,29 +228,33 @@
             RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                     new UTF8StringSerializerDeserializer(), IntegerSerializerDeserializer.INSTANCE });
 
-            HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
-                    spec,
-                    new int[] { 6 },
-                    new FieldHashPartitionComputerFactory(new int[] { 6 },
-                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                    .of(UTF8StringPointable.FACTORY) }),
+            ExternalGroupOperatorDescriptor gby = new ExternalGroupOperatorDescriptor(spec, tableSize,
+                    custFileSize + orderFileSize, new int[] { 6 }, memSize,
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    new MultiFieldsAggregatorFactory(
-                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
-                    groupResultDesc, 16);
+                    new UTF8StringNormalizedKeyComputerFactory(),
+                    new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                            new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(3, false),
+                            new FloatSumFieldAggregatorFactory(5, false) }),
+                    new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+                            new IntSumFieldAggregatorFactory(1, false), new IntSumFieldAggregatorFactory(2, false),
+                            new FloatSumFieldAggregatorFactory(3, false) }),
+                    groupResultDesc, groupResultDesc, new HashSpillableTableFactory(
+                            new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }));
+
             createPartitionConstraint(spec, gby, resultSplits);
 
             IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
-                    new FieldHashPartitionComputerFactory(new int[] { 6 },
-                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
-                                    .of(UTF8StringPointable.FACTORY) }));
+                    new FieldHashPartitionComputerFactory(new int[] { 6 }, new IBinaryHashFunctionFactory[] {
+                            PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
             spec.connect(joinGroupConn, join, 0, gby, 0);
 
             endingOp = gby;
         }
 
         IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
-        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+        //FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+        IOperatorDescriptor writer = new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, "|");
+
         createPartitionConstraint(spec, writer, resultSplits);
 
         IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
@@ -262,63 +263,4 @@
         spec.addRoot(writer);
         return spec;
     }
-
-
-
-    static class JoinComparatorFactory implements ITuplePairComparatorFactory {
-        private static final long serialVersionUID = 1L;
-
-        private final IBinaryComparatorFactory bFactory;
-        private final int pos0;
-        private final int pos1;
-
-        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
-            this.bFactory = bFactory;
-            this.pos0 = pos0;
-            this.pos1 = pos1;
-        }
-
-        @Override
-        public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
-            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
-        }
-    }
-
-    static class JoinComparator implements ITuplePairComparator {
-
-        private final IBinaryComparator bComparator;
-        private final int field0;
-        private final int field1;
-
-        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
-            this.bComparator = bComparator;
-            this.field0 = field0;
-            this.field1 = field1;
-        }
-
-        @Override
-        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1)
-                throws HyracksDataException {
-            int tStart0 = accessor0.getTupleStartOffset(tIndex0);
-            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
-
-            int tStart1 = accessor1.getTupleStartOffset(tIndex1);
-            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
-
-            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
-            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
-            int fLen0 = fEnd0 - fStart0;
-
-            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
-            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
-            int fLen1 = fEnd1 - fStart1;
-
-            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
-                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
-            if (c != 0) {
-                return c;
-            }
-            return 0;
-        }
-    }
 }
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
index f2dd519..28e6dd9 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
@@ -54,7 +55,6 @@
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.TopKSorterOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.sort.buffermanager.EnumFreeSlotPolicy;
 
 public class Sort {
     private static class Options {