Merged hyracks_dev_next -r 1287 into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@1288 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-examples/tpch-example/.project b/hyracks/hyracks-examples/tpch-example/.project
deleted file mode 100644
index dfa44a1..0000000
--- a/hyracks/hyracks-examples/tpch-example/.project
+++ /dev/null
@@ -1,17 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
-	<name>tpch-example</name>
-	<comment></comment>
-	<projects>
-	</projects>
-	<buildSpec>
-		<buildCommand>
-			<name>org.maven.ide.eclipse.maven2Builder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-	</buildSpec>
-	<natures>
-		<nature>org.maven.ide.eclipse.maven2Nature</nature>
-	</natures>
-</projectDescription>
diff --git a/hyracks/hyracks-examples/tpch-example/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-examples/tpch-example/.settings/org.maven.ide.eclipse.prefs
deleted file mode 100644
index e6f9a9e..0000000
--- a/hyracks/hyracks-examples/tpch-example/.settings/org.maven.ide.eclipse.prefs
+++ /dev/null
@@ -1,9 +0,0 @@
-#Sun Aug 29 19:38:10 PDT 2010
-activeProfiles=
-eclipse.preferences.version=1
-fullBuildGoals=process-test-resources
-includeModules=false
-resolveWorkspaceProjects=true
-resourceFilterGoals=process-resources resources\:testResources
-skipCompilerPlugin=true
-version=1
diff --git a/hyracks/hyracks-examples/tpch-example/pom.xml b/hyracks/hyracks-examples/tpch-example/pom.xml
index db0db56..f2c8786 100644
--- a/hyracks/hyracks-examples/tpch-example/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/pom.xml
@@ -2,13 +2,13 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples</groupId>
   <artifactId>tpch-example</artifactId>
-  <version>0.1.9-SNAPSHOT</version>
+  <version>0.2.0-SNAPSHOT</version>
   <packaging>pom</packaging>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks-examples</artifactId>
-    <version>0.1.9-SNAPSHOT</version>
+    <version>0.2.0-SNAPSHOT</version>
   </parent>
 
   <modules>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchapp/.classpath b/hyracks/hyracks-examples/tpch-example/tpchapp/.classpath
deleted file mode 100644
index d0bec0f..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchapp/.classpath
+++ /dev/null
@@ -1,6 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
-	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.5"/>
-	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
-	<classpathentry kind="output" path="target/classes"/>
-</classpath>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchapp/.project b/hyracks/hyracks-examples/tpch-example/tpchapp/.project
deleted file mode 100644
index 46037da..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchapp/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
-	<name>tpchapp</name>
-	<comment></comment>
-	<projects>
-	</projects>
-	<buildSpec>
-		<buildCommand>
-			<name>org.eclipse.jdt.core.javabuilder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-		<buildCommand>
-			<name>org.maven.ide.eclipse.maven2Builder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-	</buildSpec>
-	<natures>
-		<nature>org.eclipse.jdt.core.javanature</nature>
-		<nature>org.maven.ide.eclipse.maven2Nature</nature>
-	</natures>
-</projectDescription>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchapp/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-examples/tpch-example/tpchapp/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index b61b886..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchapp/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,6 +0,0 @@
-#Thu Aug 04 11:50:35 PDT 2011
-eclipse.preferences.version=1
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.5
-org.eclipse.jdt.core.compiler.compliance=1.5
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
-org.eclipse.jdt.core.compiler.source=1.5
diff --git a/hyracks/hyracks-examples/tpch-example/tpchapp/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-examples/tpch-example/tpchapp/.settings/org.maven.ide.eclipse.prefs
deleted file mode 100644
index e6f9a9e..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchapp/.settings/org.maven.ide.eclipse.prefs
+++ /dev/null
@@ -1,9 +0,0 @@
-#Sun Aug 29 19:38:10 PDT 2010
-activeProfiles=
-eclipse.preferences.version=1
-fullBuildGoals=process-test-resources
-includeModules=false
-resolveWorkspaceProjects=true
-resourceFilterGoals=process-resources resources\:testResources
-skipCompilerPlugin=true
-version=1
diff --git a/hyracks/hyracks-examples/tpch-example/tpchapp/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchapp/pom.xml
index 8b9798a..05256a3 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchapp/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchapp/pom.xml
@@ -2,12 +2,10 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
   <artifactId>tpchapp</artifactId>
-  <version>0.1.9-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>tpch-example</artifactId>
-    <version>0.1.9-SNAPSHOT</version>
+    <version>0.2.0-SNAPSHOT</version>
   </parent>
 
   <build>
@@ -80,8 +78,13 @@
     <dependency>
         <groupId>edu.uci.ics.hyracks</groupId>
         <artifactId>hyracks-dataflow-std</artifactId>
-        <version>0.1.9-SNAPSHOT</version>
+        <version>0.2.0-SNAPSHOT</version>
         <scope>compile</scope>
     </dependency>
+    <dependency>
+    	<groupId>edu.uci.ics.hyracks</groupId>
+    	<artifactId>hyracks-data-std</artifactId>
+    	<version>0.2.0-SNAPSHOT</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/.classpath b/hyracks/hyracks-examples/tpch-example/tpchclient/.classpath
deleted file mode 100644
index 1f3c1ff..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/.classpath
+++ /dev/null
@@ -1,7 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
-	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
-	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
-	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
-	<classpathentry kind="output" path="target/classes"/>
-</classpath>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/.project b/hyracks/hyracks-examples/tpch-example/tpchclient/.project
deleted file mode 100644
index b0effc6..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
-	<name>tpchclient</name>
-	<comment></comment>
-	<projects>
-	</projects>
-	<buildSpec>
-		<buildCommand>
-			<name>org.eclipse.jdt.core.javabuilder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-		<buildCommand>
-			<name>org.maven.ide.eclipse.maven2Builder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-	</buildSpec>
-	<natures>
-		<nature>org.maven.ide.eclipse.maven2Nature</nature>
-		<nature>org.eclipse.jdt.core.javanature</nature>
-	</natures>
-</projectDescription>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/.settings/org.eclipse.jdt.core.prefs b/hyracks/hyracks-examples/tpch-example/tpchclient/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index 375e12e..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,6 +0,0 @@
-#Fri May 20 19:34:07 PDT 2011
-eclipse.preferences.version=1
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
-org.eclipse.jdt.core.compiler.compliance=1.6
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
-org.eclipse.jdt.core.compiler.source=1.6
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/.settings/org.maven.ide.eclipse.prefs b/hyracks/hyracks-examples/tpch-example/tpchclient/.settings/org.maven.ide.eclipse.prefs
deleted file mode 100644
index 1b13d8b..0000000
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/.settings/org.maven.ide.eclipse.prefs
+++ /dev/null
@@ -1,9 +0,0 @@
-#Wed Aug 11 19:09:15 PDT 2010
-activeProfiles=
-eclipse.preferences.version=1
-fullBuildGoals=process-test-resources
-includeModules=false
-resolveWorkspaceProjects=true
-resourceFilterGoals=process-resources resources\:testResources
-skipCompilerPlugin=true
-version=1
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index f9fe4d2..d9b5fce 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -2,21 +2,24 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks.examples.tpch</groupId>
   <artifactId>tpchclient</artifactId>
-  <version>0.1.9-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks.examples</groupId>
     <artifactId>tpch-example</artifactId>
-    <version>0.1.9-SNAPSHOT</version>
+    <version>0.2.0-SNAPSHOT</version>
   </parent>
 
   <dependencies>
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-dataflow-std</artifactId>
-  		<version>0.1.9-SNAPSHOT</version>
+  		<version>0.2.0-SNAPSHOT</version>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-data-std</artifactId>
+  		<version>0.2.0-SNAPSHOT</version>
+  	</dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index b1a1224..72533e7 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -16,34 +16,37 @@
 
 import java.io.File;
 import java.util.EnumSet;
-import java.util.UUID;
 
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
-import edu.uci.ics.hyracks.api.client.HyracksRMIConnection;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.hash.UTF8StringBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.CountAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.IFieldValueResultingAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.aggregators.MultiAggregatorFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
@@ -52,7 +55,13 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FrameFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.join.GraceHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
 
 public class Main {
     private static class Options {
@@ -60,7 +69,7 @@
         public String host;
 
         @Option(name = "-port", usage = "Hyracks Cluster Controller Port (default: 1099)", required = false)
-        public int port = 1099;
+        public int port = 1098;
 
         @Option(name = "-app", usage = "Hyracks Application name", required = true)
         public String app;
@@ -79,6 +88,29 @@
 
         @Option(name = "-profile", usage = "Enable/Disable profiling. (default: enabled)")
         public boolean profile = true;
+
+        @Option(name = "-table-size", usage = "Table size for in-memory hash join", required = false)
+        public int tableSize = 8191;
+
+        @Option(name = "-algo", usage = "Join types", required = true)
+        public String algo;
+
+        // For grace/hybrid hash join only
+        @Option(name = "-mem-size", usage = "Memory size for hash join", required = true)
+        public int memSize;
+
+        @Option(name = "-input-size", usage = "Input size of the grace/hybrid hash join", required = false)
+        public int graceInputSize = 10;
+
+        @Option(name = "-records-per-frame", usage = "Records per frame for grace/hybrid hash join", required = false)
+        public int graceRecordsPerFrame = 200;
+
+        @Option(name = "-grace-factor", usage = "Factor of the grace/hybrid hash join", required = false)
+        public double graceFactor = 1.2;
+
+        // Whether group-by is processed after the join
+        @Option(name = "-has-groupby", usage = "Whether to have group-by operation after join (default: disabled)", required = false)
+        public boolean hasGroupBy = false;
     }
 
     public static void main(String[] args) throws Exception {
@@ -86,14 +118,15 @@
         CmdLineParser parser = new CmdLineParser(options);
         parser.parseArgument(args);
 
-        IHyracksClientConnection hcc = new HyracksRMIConnection(options.host, options.port);
+        IHyracksClientConnection hcc = new HyracksConnection(options.host, options.port);
 
         JobSpecification job = createJob(parseFileSplits(options.inFileCustomerSplits),
                 parseFileSplits(options.inFileOrderSplits), parseFileSplits(options.outFileSplits),
-                options.numJoinPartitions);
+                options.numJoinPartitions, options.algo, options.graceInputSize, options.graceRecordsPerFrame,
+                options.graceFactor, options.memSize, options.tableSize, options.hasGroupBy);
 
         long start = System.currentTimeMillis();
-        UUID jobId = hcc.createJob(options.app, job,
+        JobId jobId = hcc.createJob(options.app, job,
                 options.profile ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
         hcc.start(jobId);
         hcc.waitForCompletion(jobId);
@@ -116,7 +149,8 @@
     }
 
     private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
-            FileSplit[] resultSplits, int numJoinPartitions) {
+            FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
+            double graceFactor, int memSize, int tableSize, boolean hasGroupBy) throws HyracksDataException {
         JobSpecification spec = new JobSpecification();
 
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
@@ -161,46 +195,99 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
         createPartitionConstraint(spec, custScanner, customerSplits);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
-                new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc,
-                6000000);
+        IOperatorDescriptor join;
+
+        if ("nestedloop".equalsIgnoreCase(algo)) {
+            join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), custOrderJoinDesc, memSize);
+
+        } else if ("gracehash".equalsIgnoreCase(algo)) {
+            join = new GraceHashJoinOperatorDescriptor(
+                    spec,
+                    memSize,
+                    graceInputSize,
+                    graceRecordsPerFrame,
+                    graceFactor,
+                    new int[] { 0 },
+                    new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                            .of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                    custOrderJoinDesc);
+
+        } else if ("hybridhash".equalsIgnoreCase(algo)) {
+            join = new HybridHashJoinOperatorDescriptor(
+                    spec,
+                    memSize,
+                    graceInputSize,
+                    graceRecordsPerFrame,
+                    graceFactor,
+                    new int[] { 0 },
+                    new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                            .of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                    custOrderJoinDesc);
+
+        } else {
+            join = new InMemoryHashJoinOperatorDescriptor(
+                    spec,
+                    new int[] { 0 },
+                    new int[] { 1 },
+                    new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                            .of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                    custOrderJoinDesc, 6000000);
+        }
+
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);
 
-        RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
 
-        HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
-                spec,
-                new int[] { 6 },
-                new FieldHashPartitionComputerFactory(new int[] { 6 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }),
-                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE },
-                new MultiAggregatorFactory(new IFieldValueResultingAggregatorFactory[] { new CountAggregatorFactory() }),
-                groupResultDesc, 16);
-        createPartitionConstraint(spec, gby, resultSplits);
+        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IOperatorDescriptor endingOp = join;
+
+        if (hasGroupBy) {
+
+            RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                    UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+            HashGroupOperatorDescriptor gby = new HashGroupOperatorDescriptor(
+                    spec,
+                    new int[] { 6 },
+                    new FieldHashPartitionComputerFactory(new int[] { 6 },
+                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                    .of(UTF8StringPointable.FACTORY) }),
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                    new MultiFieldsAggregatorFactory(
+                            new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(true) }),
+                    groupResultDesc, 16);
+            createPartitionConstraint(spec, gby, resultSplits);
+
+            IConnectorDescriptor joinGroupConn = new MToNPartitioningConnectorDescriptor(spec,
+                    new FieldHashPartitionComputerFactory(new int[] { 6 },
+                            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                    .of(UTF8StringPointable.FACTORY) }));
+            spec.connect(joinGroupConn, join, 0, gby, 0);
+
+            endingOp = gby;
+        }
 
         IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(resultSplits);
         FrameFileWriterOperatorDescriptor writer = new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
         createPartitionConstraint(spec, writer, resultSplits);
 
-        IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
-
-        IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(custJoinConn, custScanner, 0, join, 0);
-
-        IConnectorDescriptor joinGroupConn = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 6 },
-                        new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE }));
-        spec.connect(joinGroupConn, join, 0, gby, 0);
-
-        IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);
-        spec.connect(gbyPrinterConn, gby, 0, writer, 0);
+        IConnectorDescriptor endingPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(endingPrinterConn, endingOp, 0, writer, 0);
 
         spec.addRoot(writer);
         return spec;
@@ -213,4 +300,60 @@
         }
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, op, parts);
     }
+
+    static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+        private static final long serialVersionUID = 1L;
+
+        private final IBinaryComparatorFactory bFactory;
+        private final int pos0;
+        private final int pos1;
+
+        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
+            this.bFactory = bFactory;
+            this.pos0 = pos0;
+            this.pos1 = pos1;
+        }
+
+        @Override
+        public ITuplePairComparator createTuplePairComparator() {
+            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
+        }
+    }
+
+    static class JoinComparator implements ITuplePairComparator {
+
+        private final IBinaryComparator bComparator;
+        private final int field0;
+        private final int field1;
+
+        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
+            this.bComparator = bComparator;
+            this.field0 = field0;
+            this.field1 = field1;
+        }
+
+        @Override
+        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
+            int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+            int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+            return 0;
+        }
+    }
 }
\ No newline at end of file