Bug fixes: 
- external aggregation integration test: fixed the temporal path generator;
- utf-8 compiling failure of storage-am-invertedindex-test on mac.

Features improvement:
- tpch-join client: added algorithm selector "-algo"; added parameters for hybrid/grace-hash operators; added a parameter "-has-groupby" to control whether group-by is performed after the join. 

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@832 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
index 2f66bfd..a134a39 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/spillable/ExternalAggregateTest.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.tests.spillable;
 
 import java.io.File;
+import java.io.IOException;
 
 import org.junit.Test;
 
@@ -59,61 +60,65 @@
 import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
 
 /**
- * @author jarodwen
+ * 
  */
 public class ExternalAggregateTest extends AbstractIntegrationTest {
 
-    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC2_ID,
-            new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
-
-    static final String outSplitsPrefix = System.getProperty("java.io.tmpdir");
-
-    static final String outSplits1 = "nc1:" + outSplitsPrefix + "/aggregation_";
-    static final String outSplits2 = "nc2:" + outSplitsPrefix + "/aggregation_";
+    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(
+            new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+                    "data/tpch0.001/lineitem.tbl"))) });
 
     static final boolean isOutputFile = true;
 
-    final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
-            UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-            IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-            IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-            FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
-            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+    final RecordDescriptor desc = new RecordDescriptor(
+            new ISerializerDeserializer[] {
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE,
+                    FloatSerializerDeserializer.INSTANCE,
+                    FloatSerializerDeserializer.INSTANCE,
+                    FloatSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE,
+                    UTF8StringSerializerDeserializer.INSTANCE });
 
-    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
-            UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
-            IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
-            FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-            UTF8StringParserFactory.INSTANCE, }, '|');
+    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(
+            new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE,
+                    IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                    FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE,
+                    UTF8StringParserFactory.INSTANCE, }, '|');
 
-    private static FileSplit[] parseFileSplits(String fileSplits) {
-        String[] splits = fileSplits.split(",");
-        FileSplit[] fSplits = new FileSplit[splits.length];
-        for (int i = 0; i < splits.length; ++i) {
-            String s = splits[i].trim();
-            int idx = s.indexOf(':');
-            if (idx < 0) {
-                throw new IllegalArgumentException("File split " + s + " not well formed");
-            }
-            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
-        }
-        return fSplits;
-    }
-
-    private static AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, boolean isFile,
-            String prefix) {
+    private AbstractSingleActivityOperatorDescriptor getPrinter(
+            JobSpecification spec, boolean isFile, String prefix)
+            throws IOException {
         AbstractSingleActivityOperatorDescriptor printer;
 
         if (!isOutputFile)
             printer = new PrinterOperatorDescriptor(spec);
         else
-            printer = new PlainFileWriterOperatorDescriptor(spec, new ConstantFileSplitProvider(
-                    parseFileSplits(outSplits1 + prefix + ".nc1, " + outSplits2 + prefix + ".nc2")), "\t");
+            printer = new PlainFileWriterOperatorDescriptor(spec,
+                    new ConstantFileSplitProvider(new FileSplit[] {
+                            new FileSplit(NC1_ID, createTempFile()
+                                    .getAbsolutePath()),
+                            new FileSplit(NC2_ID, createTempFile()
+                                    .getAbsolutePath()) }), "\t");
 
         return printer;
     }
@@ -122,37 +127,51 @@
     public void hashSingleKeyScalarGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 3;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(), new CountAggregatorDescriptorFactory(),
-                new IntSumAggregatorDescriptorFactory(keyFields.length), outputRec,
-                new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize),
-                true);
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new CountAggregatorDescriptorFactory(),
+                new IntSumAggregatorDescriptorFactory(keyFields.length),
+                outputRec,
+                new HashSpillableGroupingTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields,
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
-                "hashSingleKeyScalarGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                isOutputFile, "hashSingleKeyScalarGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -165,37 +184,56 @@
     public void hashMultipleKeyScalarGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE, });
 
         int[] keyFields = new int[] { 0, 9 };
         int frameLimits = 3;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE }, new UTF8StringNormalizedKeyComputerFactory(),
-                new IntSumAggregatorDescriptorFactory(1), new IntSumAggregatorDescriptorFactory(keyFields.length),
-                outputRec, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] {
+                        UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new IntSumAggregatorDescriptorFactory(1),
+                new IntSumAggregatorDescriptorFactory(keyFields.length),
+                outputRec,
+                new HashSpillableGroupingTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] {
+                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
-                "hashMultipleKeyScalarGroupTest");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                isOutputFile, "hashMultipleKeyScalarGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -208,40 +246,63 @@
     public void hashMultipleKeyMultipleScalarGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE, });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE, });
 
         int[] keyFields = new int[] { 0, 9 };
         int frameLimits = 3;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE }, new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiAggregatorDescriptorFactory(new IAggregatorDescriptorFactory[] {
-                        new IntSumAggregatorDescriptorFactory(1, 2), new IntSumAggregatorDescriptorFactory(2, 3) }),
-                new MultiAggregatorDescriptorFactory(new IAggregatorDescriptorFactory[] {
-                        new IntSumAggregatorDescriptorFactory(2, 2), new IntSumAggregatorDescriptorFactory(3, 3) }),
-                outputRec, new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] {
+                        UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiAggregatorDescriptorFactory(
+                        new IAggregatorDescriptorFactory[] {
+                                new IntSumAggregatorDescriptorFactory(1, 2),
+                                new IntSumAggregatorDescriptorFactory(2, 3) }),
+                new MultiAggregatorDescriptorFactory(
+                        new IAggregatorDescriptorFactory[] {
+                                new IntSumAggregatorDescriptorFactory(2, 2),
+                                new IntSumAggregatorDescriptorFactory(3, 3) }),
+                outputRec,
+                new HashSpillableGroupingTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] {
+                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
-                "hashMultipleKeyMultipleScalarGroupTest");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                isOutputFile, "hashMultipleKeyMultipleScalarGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -254,36 +315,50 @@
     public void hashMultipleKeyNonScalarGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 3;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(), new ConcatAggregatorDescriptorFactory(9),
-                new ConcatAggregatorDescriptorFactory(keyFields.length), outputRec,
-                new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize),
-                true);
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new ConcatAggregatorDescriptorFactory(9),
+                new ConcatAggregatorDescriptorFactory(keyFields.length),
+                outputRec,
+                new HashSpillableGroupingTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields,
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
-                "hashMultipleKeyNonScalarGroupTest");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                isOutputFile, "hashMultipleKeyNonScalarGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -296,43 +371,66 @@
     public void hashMultipleKeyMultipleFieldsGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0, 9 };
         int frameLimits = 3;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
-                        UTF8StringBinaryComparatorFactory.INSTANCE }, new UTF8StringNormalizedKeyComputerFactory(),
-                new MultiAggregatorDescriptorFactory(new IAggregatorDescriptorFactory[] {
-                        new IntSumAggregatorDescriptorFactory(1, 2), new IntSumAggregatorDescriptorFactory(2, 3),
-                        new ConcatAggregatorDescriptorFactory(9, 4) }), new MultiAggregatorDescriptorFactory(
-                        new IAggregatorDescriptorFactory[] { new IntSumAggregatorDescriptorFactory(2, 2),
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
+                new IBinaryComparatorFactory[] {
+                        UTF8StringBinaryComparatorFactory.INSTANCE,
+                        UTF8StringBinaryComparatorFactory.INSTANCE },
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new MultiAggregatorDescriptorFactory(
+                        new IAggregatorDescriptorFactory[] {
+                                new IntSumAggregatorDescriptorFactory(1, 2),
+                                new IntSumAggregatorDescriptorFactory(2, 3),
+                                new ConcatAggregatorDescriptorFactory(9, 4) }),
+                new MultiAggregatorDescriptorFactory(
+                        new IAggregatorDescriptorFactory[] {
+                                new IntSumAggregatorDescriptorFactory(2, 2),
                                 new IntSumAggregatorDescriptorFactory(3, 3),
-                                new ConcatAggregatorDescriptorFactory(4, 4) }), outputRec,
-                new HashSpillableGroupingTableFactory(new FieldHashPartitionComputerFactory(keyFields,
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE,
-                                UTF8StringBinaryHashFunctionFactory.INSTANCE }), tableSize), true);
+                                new ConcatAggregatorDescriptorFactory(4, 4) }),
+                outputRec,
+                new HashSpillableGroupingTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
+                                new IBinaryHashFunctionFactory[] {
+                                        UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                        UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                        tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields, new IBinaryHashFunctionFactory[] {
-                        UTF8StringBinaryHashFunctionFactory.INSTANCE, UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(keyFields,
+                        new IBinaryHashFunctionFactory[] {
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE,
+                                UTF8StringBinaryHashFunctionFactory.INSTANCE, }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
-                "hashMultipleKeyMultipleFieldsGroupTest");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                isOutputFile, "hashMultipleKeyMultipleFieldsGroupTest");
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
@@ -345,37 +443,51 @@
     public void hashSingleKeyScalarAvgGroupTest() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
-                desc);
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, NC2_ID);
 
-        RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
         int frameLimits = 3;
         int tableSize = 8;
 
-        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(spec, keyFields, frameLimits,
+        ExternalGroupOperatorDescriptor grouper = new ExternalGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                frameLimits,
                 new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new UTF8StringNormalizedKeyComputerFactory(), new AvgAggregatorDescriptorFactory(1),
-                new AvgAggregatorDescriptorFactory(keyFields.length), outputRec, new HashSpillableGroupingTableFactory(
-                        new FieldHashPartitionComputerFactory(keyFields,
+                new UTF8StringNormalizedKeyComputerFactory(),
+                new AvgAggregatorDescriptorFactory(1),
+                new AvgAggregatorDescriptorFactory(keyFields.length),
+                outputRec,
+                new HashSpillableGroupingTableFactory(
+                        new FieldHashPartitionComputerFactory(
+                                keyFields,
                                 new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
                         tableSize), true);
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                NC2_ID, NC1_ID);
 
-        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(keyFields,
+        IConnectorDescriptor conn1 = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(conn1, csvScanner, 0, grouper, 0);
 
-        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, isOutputFile,
-                "hashSingleKeyScalarGroupTest");
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                isOutputFile, "hashSingleKeyScalarGroupTest");
 
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                NC2_ID, NC1_ID);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, grouper, 0, printer, 0);
diff --git a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 09d0e5a..a43aafb 100644
--- a/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -22,13 +22,18 @@
 
 import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -52,7 +57,10 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
 
 public class Main {
     private static class Options {
@@ -79,6 +87,29 @@
 
         @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
         public boolean profile = true;
+
+        @Option(name = "-table-size", usage = "Table size for in-memory hash join", required = false)
+        public int tableSize = 8191;
+
+        @Option(name = "-algo", usage = "Join types", required = true)
+        public String algo;
+
+        // For grace/hybrid hash join only
+        @Option(name = "-mem-size", usage = "Memory size for hash join", required = true)
+        public int memSize;
+
+        @Option(name = "-input-size", usage = "Input size of the grace/hybrid hash join", required = false)
+        public int graceInputSize = 10;
+
+        @Option(name = "-records-per-frame", usage = "Records per frame for grace/hybrid hash join", required = false)
+        public int graceRecordsPerFrame = 200;
+
+        @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join", required = false)
+        public double graceFactor = 1.2;
+
+        // Whether group-by is processed after the join
+        @Option(name = "-has-groupby", usage = "Whether to have group-by operation after join (default: disabled)", required = false)
+        public boolean hasGroupBy = false;
     }
 
     public static void main(String[] args) throws Exception {
@@ -86,15 +117,24 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host,
+                options.port);
 
-        JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
-                parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
-                options.numJoinPartitions);
+        JobSpecification job = createJob(
+                parseFileSplits(options.inFileCustomerSplits),
+                parseFileSplits(options.inFileOrderSplits),
+                parseFileSplits(options.outFileSplits),
+                options.numJoinPartitions, options.algo,
+                options.graceInputSize, options.graceRecordsPerFrame,
+                options.graceFactor, options.memSize, options.tableSize,
+                options.hasGroupBy);
 
         long start = System.currentTimeMillis();
-        JobId jobId = hcc.createJob(options.app, job,
-                options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        JobId jobId = hcc.createJob(
+                options.app,
+                job,
+                options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet
+                        .noneOf(JobFlag.class));
         hcc.start(jobId);
         hcc.waitForCompletion(jobId);
         long end = System.currentTimeMillis();
@@ -108,109 +148,271 @@
             String s = splits[i].trim();
             int idx = s.indexOf(':');
             if (idx < 0) {
-                throw new IllegalArgumentException("File split " + s + " not well formed");
+                throw new IllegalArgumentException("File split " + s
+                        + " not well formed");
             }
-            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(new File(s.substring(idx + 1))));
+            fSplits[i] = new FileSplit(s.substring(0, idx), new FileReference(
+                    new File(s.substring(idx + 1))));
         }
         return fSplits;
     }
 
-    private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
-            FileSplit[] resultSplits, int numJoinPartitions) {
+    private static JobSpecification createJob(FileSplit[] customerSplits,
+            FileSplit[] orderSplits, FileSplit[] resultSplits,
+            int numJoinPartitions, String algo, int graceInputSize,
+            int graceRecordsPerFrame, double graceFactor, int memSize,
+            int tableSize, boolean hasGroupBy) throws HyracksDataException {
         JobSpecification spec = new JobSpecification();
 
-        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(
+                customerSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
-        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(orderSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE });
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(
+                orderSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
-        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE });
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        UTF8StringSerializerDeserializer.INSTANCE });
 
-        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(
+                spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+                        UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
         createPartitionConstraint(spec, ordScanner, orderSplits);
 
-        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(
+                spec, custSplitsProvider, new DelimitedDataTupleParserFactory(
+                        new IValueParserFactory[] {
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE,
+                                UTF8StringParserFactory.INSTANCE }, '|'),
+                custDesc);
         createPartitionConstraint(spec, custScanner, customerSplits);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
-                new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc,
-                6000000);
-        PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
+        IOperatorDescriptor join;
 
-        RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        if ("nestedloop".equalsIgnoreCase(algo)) {
+            join = new NestedLoopJoinOperatorDescriptor(spec,
+                    new JoinComparatorFactory(
+                            UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
+                    custOrderJoinDesc, memSize);
 
-        HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
+        } else if ("gracehash".equalsIgnoreCase(algo)) {
+            join = new GraceHashJoinOperatorDescriptor(
+                    spec,
+                    memSize,
+                    graceInputSize,
+                    graceRecordsPerFrame,
+                    graceFactor,
+                    new int[] { 0 },
+                    new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    custOrderJoinDesc);
+
+        } else if ("hybridhash".equalsIgnoreCase(algo)) {
+            join = new HybridHashJoinOperatorDescriptor(
+                    spec,
+                    memSize,
+                    graceInputSize,
+                    graceRecordsPerFrame,
+                    graceFactor,
+                    new int[] { 0 },
+                    new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    custOrderJoinDesc);
+
+        } else {
+            join = new InMemoryHashJoinOperatorDescriptor(
+                    spec,
+                    new int[] { 0 },
+                    new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    custOrderJoinDesc, 6000000);
+        }
+
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, join,
+                numJoinPartitions);
+
+        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(
                 spec,
-                new int[] { 6 },
-                new FieldHashPartitionComputerFactory(new int[] { 6 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
-                groupResultDesc, 16);
-        createPartitionConstraint(spec, gby, resultSplits);
-
-        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
-        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
-        createPartitionConstraint(spec, writer, resultSplits);
-
-        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                new FieldHashPartitionComputerFactory(
+                        new int[] { 1 },
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 1);
 
-        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
+        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        new int[] { 0 },
                         new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(custJoinConn, custScanner, 0, join, 0);
 
-        IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 6 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(joinGroupConn, join, 0, gby, 0);
+        IOperatorDescriptor endingOp = join;
 
-        IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(gbyPrinterConn, gby, 0, writer, 0);
+        if (hasGroupBy) {
+
+            RecordDescriptor groupResultDesc = new RecordDescriptor(
+                    new ISerializerDeserializer[] {
+                            UTF8StringSerializerDeserializer.INSTANCE,
+                            IntegerSerializerDeserializer.INSTANCE });
+
+            HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
+                    spec,
+                    new int[] { 6 },
+                    new FieldHashPartitionComputerFactory(
+                            new int[] { 6 },
+                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
+                    new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
+                    new MultiAggregatorFactory(
+                            new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
+                    groupResultDesc, 16);
+            createPartitionConstraint(spec, gby, resultSplits);
+
+            IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(
+                    spec,
+                    new FieldHashPartitionComputerFactory(
+                            new int[] { 6 },
+                            new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
+            spec.connect(joinGroupConn, join, 0, gby, 0);
+
+            endingOp = gby;
+        }
+
+        IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(
+                resultSplits);
+        FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(
+                spec, outSplitProvider);
+        createPartitionConstraint(spec, writer, resultSplits);
+
+        IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(
+                spec);
+        spec.connect(endingPrinterConn, endingOp, 0, writer, 0);
 
         spec.addRoot(writer);
         return spec;
     }
 
-    private static void createPartitionConstraint(JobSpecification spec, IOperatorDescriptor op, FileSplit[] splits) {
+    private static void createPartitionConstraint(JobSpecification spec,
+            IOperatorDescriptor op, FileSplit[] splits) {
         String[] parts = new String[splits.length];
         for (int i = 0; i < splits.length; ++i) {
             parts[i] = splits[i].getNodeName();
         }
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
+        PartitionConstraintHelper
+                .addAbsoluteLocationConstraint(spec, op, parts);
+    }
+
+    static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+        private static final long serialVersionUID = 1L;
+
+        private final IBinaryComparatorFactory bFactory;
+        private final int pos0;
+        private final int pos1;
+
+        public JoinComparatorFactory(IBinaryComparatorFactory bFactory,
+                int pos0, int pos1) {
+            this.bFactory = bFactory;
+            this.pos0 = pos0;
+            this.pos1 = pos1;
+        }
+
+        @Override
+        public ITuplePairComparator createTuplePairComparator() {
+            return new JoinComparator(bFactory.createBinaryComparator(), pos0,
+                    pos1);
+        }
+    }
+
+    static class JoinComparator implements ITuplePairComparator {
+
+        private final IBinaryComparator bComparator;
+        private final int field0;
+        private final int field1;
+
+        public JoinComparator(IBinaryComparator bComparator, int field0,
+                int field1) {
+            this.bComparator = bComparator;
+            this.field0 = field0;
+            this.field1 = field1;
+        }
+
+        @Override
+        public int compare(IFrameTupleAccessor accessor0, int tIndex0,
+                IFrameTupleAccessor accessor1, int tIndex1) {
+            int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+            int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0
+                    + fStartOffset0, fLen0, accessor1.getBuffer().array(),
+                    fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+            return 0;
+        }
     }
 }
\ No newline at end of file
diff --git a/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml b/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
index 7d40c81..50bea01 100644
--- a/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
+++ b/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
@@ -19,6 +19,7 @@
         <configuration>
           <source>1.6</source>
           <target>1.6</target>
+          <encoding>UTF-8</encoding>
         </configuration>
       </plugin>
     </plugins>