Merged hyracks_asterix_stabilization_yfix r2529, r2551, r2564, r2575, r2576, r2577, r2584, and r2597

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging_bigmerge_target@2748 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index f1e7acb..dde4443 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
@@ -44,6 +45,7 @@
     protected ITypeTraitProvider typeTraitProvider;
     protected ISerializerDeserializerProvider serializerDeserializerProvider;
     protected IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
+    protected IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
     protected IBinaryComparatorFactoryProvider comparatorFactoryProvider;
     protected IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
     protected IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
@@ -94,6 +96,14 @@
         return hashFunctionFactoryProvider;
     }
 
+    public void setHashFunctionFamilyProvider(IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider) {
+        this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
+    }
+
+    public IBinaryHashFunctionFamilyProvider getHashFunctionFamilyProvider() {
+        return hashFunctionFamilyProvider;
+    }
+
     public void setComparatorFactoryProvider(IBinaryComparatorFactoryProvider comparatorFactoryProvider) {
         this.comparatorFactoryProvider = comparatorFactoryProvider;
     }
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 1d21463..edc1b66 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -84,11 +84,12 @@
                     public JobSpecification createJob(Object appContext) throws AlgebricksException {
                         AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting Job Generation.\n");
                         JobGenContext context = new JobGenContext(null, metadata, appContext,
-                                serializerDeserializerProvider, hashFunctionFactoryProvider, comparatorFactoryProvider,
-                                typeTraitProvider, binaryBooleanInspectorFactory, binaryIntegerInspectorFactory,
-                                printerProvider, nullWriterFactory, normalizedKeyComputerFactoryProvider,
-                                expressionRuntimeProvider, expressionTypeComputer, nullableTypeComputer, oc,
-                                expressionEvalSizeComputer, partialAggregationTypeComputer, frameSize, clusterLocations);
+                                serializerDeserializerProvider, hashFunctionFactoryProvider,
+                                hashFunctionFamilyProvider, comparatorFactoryProvider, typeTraitProvider,
+                                binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider,
+                                nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
+                                expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
+                                partialAggregationTypeComputer, frameSize, clusterLocations);
                         PlanCompiler pc = new PlanCompiler(context);
                         return pc.compilePlan(plan, null);
                     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index c737cc4..6da42b4 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -32,14 +32,21 @@
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
 
 public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
 
@@ -90,6 +97,8 @@
         IVariableTypeEnvironment env = context.getTypeEnvironment(op);
         IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
                 keysLeftBranch, env, context);
+        IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(
+                keysLeftBranch, env, context);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
         int i = 0;
         IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
@@ -97,33 +106,75 @@
             Object t = env.getVarType(v);
             comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
         }
-        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
         IOperatorDescriptor opDesc = null;
-        try {
-            switch (kind) {
-                case INNER: {
-                    opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
-                            maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
-                            hashFunFactories, comparatorFactories, recDescriptor);
-                    break;
-                }
-                case LEFT_OUTER: {
-                    INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
-                    for (int j = 0; j < nullWriterFactories.length; j++) {
-                        nullWriterFactories[j] = context.getNullWriterFactory();
-                    }
-                    opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
-                            maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
-                            hashFunFactories, comparatorFactories, recDescriptor, true, nullWriterFactories);
-                    break;
-                }
-                default: {
-                    throw new NotImplementedException();
-                }
+
+        boolean optimizedHashJoin = true;
+        for (IBinaryHashFunctionFamily family : hashFunFamilies) {
+            if (family == null) {
+                optimizedHashJoin = false;
+                break;
             }
-        } catch (HyracksDataException e) {
-            throw new AlgebricksException(e);
+        }
+
+        if (!optimizedHashJoin) {
+            try {
+                switch (kind) {
+                    case INNER: {
+                        opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                                maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
+                                hashFunFactories, comparatorFactories, recDescriptor);
+                        break;
+                    }
+                    case LEFT_OUTER: {
+                        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+                        for (int j = 0; j < nullWriterFactories.length; j++) {
+                            nullWriterFactories[j] = context.getNullWriterFactory();
+                        }
+                        opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                                maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
+                                hashFunFactories, comparatorFactories, recDescriptor, true, nullWriterFactories);
+                        break;
+                    }
+                    default: {
+                        throw new NotImplementedException();
+                    }
+                }
+            } catch (HyracksDataException e) {
+                throw new AlgebricksException(e);
+            }
+        } else {
+            try {
+                switch (kind) {
+                    case INNER: {
+                        opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                                maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+                                comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
+                                        keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
+                                        keysRight, keysLeft));
+                        break;
+                    }
+                    case LEFT_OUTER: {
+                        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+                        for (int j = 0; j < nullWriterFactories.length; j++) {
+                            nullWriterFactories[j] = context.getNullWriterFactory();
+                        }
+                        opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+                                maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+                                comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
+                                        keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
+                                        keysRight, keysLeft), true, nullWriterFactories);
+                        break;
+                    }
+                    default: {
+                        throw new NotImplementedException();
+                    }
+                }
+            } catch (HyracksDataException e) {
+                throw new AlgebricksException(e);
+            }
         }
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
@@ -140,3 +191,72 @@
     }
 
 }
+
+/**
+ * {@ ITuplePairComparatorFactory} implementation for optimized hybrid hash join.
+ */
+class JoinMultiComparatorFactory implements ITuplePairComparatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private final IBinaryComparatorFactory[] binaryComparatorFactories;
+    private final int[] keysLeft;
+    private final int[] keysRight;
+
+    public JoinMultiComparatorFactory(IBinaryComparatorFactory[] binaryComparatorFactory, int[] keysLeft,
+            int[] keysRight) {
+        this.binaryComparatorFactories = binaryComparatorFactory;
+        this.keysLeft = keysLeft;
+        this.keysRight = keysRight;
+    }
+
+    @Override
+    public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+        IBinaryComparator[] binaryComparators = new IBinaryComparator[binaryComparatorFactories.length];
+        for (int i = 0; i < binaryComparators.length; i++) {
+            binaryComparators[i] = binaryComparatorFactories[i].createBinaryComparator();
+        }
+        return new JoinMultiComparator(binaryComparators, keysLeft, keysRight);
+    }
+}
+
+/**
+ * {@ ITuplePairComparator} implementation for optimized hybrid hash join.
+ * The comparator applies multiple binary comparators, one for each key pairs
+ */
+class JoinMultiComparator implements ITuplePairComparator {
+    private final IBinaryComparator[] binaryComparators;
+    private final int[] keysLeft;
+    private final int[] keysRight;
+
+    public JoinMultiComparator(IBinaryComparator[] bComparator, int[] keysLeft, int[] keysRight) {
+        this.binaryComparators = bComparator;
+        this.keysLeft = keysLeft;
+        this.keysRight = keysRight;
+    }
+
+    @Override
+    public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
+        int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+        int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+        int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+        int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+        for (int i = 0; i < binaryComparators.length; i++) {
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, keysLeft[i]);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, keysLeft[i]);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, keysRight[i]);
+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, keysRight[i]);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = binaryComparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0,
+                    accessor1.getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 22a1a81..365d1a5 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
@@ -42,151 +43,167 @@
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 
 public class JobGenContext {
-    private final IOperatorSchema outerFlowSchema;
-    private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
-    private final ISerializerDeserializerProvider serializerDeserializerProvider;
-    private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
-    private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
-    private final IPrinterFactoryProvider printerFactoryProvider;
-    private final ITypeTraitProvider typeTraitProvider;
-    private final IMetadataProvider<?, ?> metadataProvider;
-    private final INullWriterFactory nullWriterFactory;
-    private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
-    private final Object appContext;
-    private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
-    private final IBinaryIntegerInspectorFactory integerInspectorFactory;
-    private final IExpressionRuntimeProvider expressionRuntimeProvider;
-    private final IExpressionTypeComputer expressionTypeComputer;
-    private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
-    private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
-    private final int frameSize;
-    private AlgebricksPartitionConstraint clusterLocations;
-    private int varCounter;
-    private final ITypingContext typingContext;
+	private final IOperatorSchema outerFlowSchema;
+	private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
+	private final ISerializerDeserializerProvider serializerDeserializerProvider;
+	private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
+	private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
+	private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
+	private final IPrinterFactoryProvider printerFactoryProvider;
+	private final ITypeTraitProvider typeTraitProvider;
+	private final IMetadataProvider<?, ?> metadataProvider;
+	private final INullWriterFactory nullWriterFactory;
+	private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
+	private final Object appContext;
+	private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
+	private final IBinaryIntegerInspectorFactory integerInspectorFactory;
+	private final IExpressionRuntimeProvider expressionRuntimeProvider;
+	private final IExpressionTypeComputer expressionTypeComputer;
+	private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
+	private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
+	private final int frameSize;
+	private AlgebricksPartitionConstraint clusterLocations;
+	private int varCounter;
+	private final ITypingContext typingContext;
 
-    public JobGenContext(IOperatorSchema outerFlowSchema, IMetadataProvider<?, ?> metadataProvider, Object appContext,
-            ISerializerDeserializerProvider serializerDeserializerProvider,
-            IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
-            IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider,
-            IBinaryBooleanInspectorFactory booleanInspectorFactory,
-            IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider,
-            INullWriterFactory nullWriterFactory,
-            INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
-            IExpressionRuntimeProvider expressionRuntimeProvider, IExpressionTypeComputer expressionTypeComputer,
-            INullableTypeComputer nullableTypeComputer, ITypingContext typingContext,
-            IExpressionEvalSizeComputer expressionEvalSizeComputer,
-            IPartialAggregationTypeComputer partialAggregationTypeComputer, int frameSize,
-            AlgebricksPartitionConstraint clusterLocations) {
-        this.outerFlowSchema = outerFlowSchema;
-        this.metadataProvider = metadataProvider;
-        this.appContext = appContext;
-        this.serializerDeserializerProvider = serializerDeserializerProvider;
-        this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
-        this.comparatorFactoryProvider = comparatorFactoryProvider;
-        this.typeTraitProvider = typeTraitProvider;
-        this.booleanInspectorFactory = booleanInspectorFactory;
-        this.integerInspectorFactory = integerInspectorFactory;
-        this.printerFactoryProvider = printerFactoryProvider;
-        this.clusterLocations = clusterLocations;
-        this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
-        this.nullWriterFactory = nullWriterFactory;
-        this.expressionRuntimeProvider = expressionRuntimeProvider;
-        this.expressionTypeComputer = expressionTypeComputer;
-        this.typingContext = typingContext;
-        this.expressionEvalSizeComputer = expressionEvalSizeComputer;
-        this.partialAggregationTypeComputer = partialAggregationTypeComputer;
-        this.frameSize = frameSize;
-        this.varCounter = 0;
-    }
+	public JobGenContext(
+			IOperatorSchema outerFlowSchema,
+			IMetadataProvider<?, ?> metadataProvider,
+			Object appContext,
+			ISerializerDeserializerProvider serializerDeserializerProvider,
+			IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
+			IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider,
+			IBinaryComparatorFactoryProvider comparatorFactoryProvider,
+			ITypeTraitProvider typeTraitProvider,
+			IBinaryBooleanInspectorFactory booleanInspectorFactory,
+			IBinaryIntegerInspectorFactory integerInspectorFactory,
+			IPrinterFactoryProvider printerFactoryProvider,
+			INullWriterFactory nullWriterFactory,
+			INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
+			IExpressionRuntimeProvider expressionRuntimeProvider,
+			IExpressionTypeComputer expressionTypeComputer,
+			INullableTypeComputer nullableTypeComputer,
+			ITypingContext typingContext,
+			IExpressionEvalSizeComputer expressionEvalSizeComputer,
+			IPartialAggregationTypeComputer partialAggregationTypeComputer,
+			int frameSize, AlgebricksPartitionConstraint clusterLocations) {
+		this.outerFlowSchema = outerFlowSchema;
+		this.metadataProvider = metadataProvider;
+		this.appContext = appContext;
+		this.serializerDeserializerProvider = serializerDeserializerProvider;
+		this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
+		this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
+		this.comparatorFactoryProvider = comparatorFactoryProvider;
+		this.typeTraitProvider = typeTraitProvider;
+		this.booleanInspectorFactory = booleanInspectorFactory;
+		this.integerInspectorFactory = integerInspectorFactory;
+		this.printerFactoryProvider = printerFactoryProvider;
+		this.clusterLocations = clusterLocations;
+		this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
+		this.nullWriterFactory = nullWriterFactory;
+		this.expressionRuntimeProvider = expressionRuntimeProvider;
+		this.expressionTypeComputer = expressionTypeComputer;
+		this.typingContext = typingContext;
+		this.expressionEvalSizeComputer = expressionEvalSizeComputer;
+		this.partialAggregationTypeComputer = partialAggregationTypeComputer;
+		this.frameSize = frameSize;
+		this.varCounter = 0;
+	}
 
-    public IOperatorSchema getOuterFlowSchema() {
-        return outerFlowSchema;
-    }
+	public IOperatorSchema getOuterFlowSchema() {
+		return outerFlowSchema;
+	}
 
-    public AlgebricksPartitionConstraint getClusterLocations() {
-        return clusterLocations;
-    }
+	public AlgebricksPartitionConstraint getClusterLocations() {
+		return clusterLocations;
+	}
 
-    public IMetadataProvider<?, ?> getMetadataProvider() {
-        return metadataProvider;
-    }
+	public IMetadataProvider<?, ?> getMetadataProvider() {
+		return metadataProvider;
+	}
 
-    public Object getAppContext() {
-        return appContext;
-    }
+	public Object getAppContext() {
+		return appContext;
+	}
 
-    public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
-        return serializerDeserializerProvider;
-    }
+	public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
+		return serializerDeserializerProvider;
+	}
 
-    public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
-        return hashFunctionFactoryProvider;
-    }
+	public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
+		return hashFunctionFactoryProvider;
+	}
 
-    public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
-        return comparatorFactoryProvider;
-    }
+	public IBinaryHashFunctionFamilyProvider getBinaryHashFunctionFamilyProvider() {
+		return hashFunctionFamilyProvider;
+	}
 
-    public ITypeTraitProvider getTypeTraitProvider() {
-        return typeTraitProvider;
-    }
+	public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
+		return comparatorFactoryProvider;
+	}
 
-    public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
-        return booleanInspectorFactory;
-    }
+	public ITypeTraitProvider getTypeTraitProvider() {
+		return typeTraitProvider;
+	}
 
-    public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
-        return integerInspectorFactory;
-    }
+	public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
+		return booleanInspectorFactory;
+	}
 
-    public IPrinterFactoryProvider getPrinterFactoryProvider() {
-        return printerFactoryProvider;
-    }
+	public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
+		return integerInspectorFactory;
+	}
 
-    public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
-        return expressionRuntimeProvider;
-    }
+	public IPrinterFactoryProvider getPrinterFactoryProvider() {
+		return printerFactoryProvider;
+	}
 
-    public IOperatorSchema getSchema(ILogicalOperator op) {
-        return schemaMap.get(op);
-    }
+	public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
+		return expressionRuntimeProvider;
+	}
 
-    public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
-        schemaMap.put(op, schema);
-    }
+	public IOperatorSchema getSchema(ILogicalOperator op) {
+		return schemaMap.get(op);
+	}
 
-    public LogicalVariable createNewVar() {
-        varCounter++;
-        LogicalVariable var = new LogicalVariable(-varCounter);
-        return var;
-    }
+	public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
+		schemaMap.put(op, schema);
+	}
 
-    public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env) throws AlgebricksException {
-        return expressionTypeComputer.getType(expr, typingContext.getMetadataProvider(), env);
-    }
+	public LogicalVariable createNewVar() {
+		varCounter++;
+		LogicalVariable var = new LogicalVariable(-varCounter);
+		return var;
+	}
 
-    public INullWriterFactory getNullWriterFactory() {
-        return nullWriterFactory;
-    }
+	public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env)
+			throws AlgebricksException {
+		return expressionTypeComputer.getType(expr,
+				typingContext.getMetadataProvider(), env);
+	}
 
-    public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
-        return normalizedKeyComputerFactoryProvider;
-    }
+	public INullWriterFactory getNullWriterFactory() {
+		return nullWriterFactory;
+	}
 
-    public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
-        return expressionEvalSizeComputer;
-    }
+	public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
+		return normalizedKeyComputerFactoryProvider;
+	}
 
-    public int getFrameSize() {
-        return frameSize;
-    }
+	public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
+		return expressionEvalSizeComputer;
+	}
 
-    public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
-        return partialAggregationTypeComputer;
-    }
+	public int getFrameSize() {
+		return frameSize;
+	}
 
-    public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
-        return typingContext.getOutputTypeEnvironment(op);
-    }
+	public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
+		return partialAggregationTypeComputer;
+	}
+
+	public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
+		return typingContext.getOutputTypeEnvironment(op);
+	}
 
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
index 790fb93..530d19c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
 import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
 import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
@@ -31,6 +32,7 @@
 import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -42,8 +44,8 @@
 
     @SuppressWarnings("rawtypes")
     public static RecordDescriptor mkRecordDescriptor(IVariableTypeEnvironment env, IOperatorSchema opSchema,
-            JobGenContext context) throws AlgebricksException {        
-		ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
+            JobGenContext context) throws AlgebricksException {
+        ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
         ITypeTraits[] typeTraits = new ITypeTraits[opSchema.getSize()];
         ISerializerDeserializerProvider sdp = context.getSerializerDeserializerProvider();
         ITypeTraitProvider ttp = context.getTypeTraitProvider();
@@ -59,7 +61,7 @@
         }
         return new RecordDescriptor(fields, typeTraits);
     }
-    
+
     public static IPrinterFactory[] mkPrinterFactories(IOperatorSchema opSchema, IVariableTypeEnvironment env,
             JobGenContext context, int[] printColumns) throws AlgebricksException {
         IPrinterFactory[] pf = new IPrinterFactory[printColumns.length];
@@ -94,7 +96,20 @@
         }
         return funFactories;
     }
-    
+
+    public static IBinaryHashFunctionFamily[] variablesToBinaryHashFunctionFamilies(
+            Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
+            throws AlgebricksException {
+        IBinaryHashFunctionFamily[] funFamilies = new IBinaryHashFunctionFamily[varLogical.size()];
+        int i = 0;
+        IBinaryHashFunctionFamilyProvider bhffProvider = context.getBinaryHashFunctionFamilyProvider();
+        for (LogicalVariable var : varLogical) {
+            Object type = env.getVarType(var);
+            funFamilies[i++] = bhffProvider.getBinaryHashFunctionFamily(type);
+        }
+        return funFamilies;
+    }
+
     public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(
             Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
             throws AlgebricksException {
@@ -107,15 +122,14 @@
         }
         return compFactories;
     }
-    
-    public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(
-            List<LogicalVariable> varLogical, int start, int size, IVariableTypeEnvironment env, JobGenContext context)
-            throws AlgebricksException {
+
+    public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(List<LogicalVariable> varLogical,
+            int start, int size, IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
         IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[size];
         IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
         for (int i = 0; i < size; i++) {
-                Object type = env.getVarType(varLogical.get(start + i));
-                compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
+            Object type = env.getVarType(varLogical.get(start + i));
+            compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
         }
         return compFactories;
     }
@@ -144,15 +158,14 @@
         }
         return typeTraits;
     }
-    
-    public static ITypeTraits[] variablesToTypeTraits(
-            List<LogicalVariable> varLogical, int start, int size, IVariableTypeEnvironment env, JobGenContext context)
-            throws AlgebricksException {
+
+    public static ITypeTraits[] variablesToTypeTraits(List<LogicalVariable> varLogical, int start, int size,
+            IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
         ITypeTraits[] typeTraits = new ITypeTraits[size];
         ITypeTraitProvider typeTraitProvider = context.getTypeTraitProvider();
         for (int i = 0; i < size; i++) {
-                Object type = env.getVarType(varLogical.get(start + i));
-                typeTraits[i] = typeTraitProvider.getTypeTrait(type);
+            Object type = env.getVarType(varLogical.get(start + i));
+            typeTraits[i] = typeTraitProvider.getTypeTrait(type);
         }
         return typeTraits;
     }
diff --git a/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryHashFunctionFamilyProvider.java b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryHashFunctionFamilyProvider.java
new file mode 100644
index 0000000..8a992b3
--- /dev/null
+++ b/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryHashFunctionFamilyProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.data;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public interface IBinaryHashFunctionFamilyProvider {
+
+	public IBinaryHashFunctionFamily getBinaryHashFunctionFamily(Object type)
+			throws AlgebricksException;
+}
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/RawUTF8StringPointable.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/RawUTF8StringPointable.java
new file mode 100644
index 0000000..c90ce5a
--- /dev/null
+++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/RawUTF8StringPointable.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.data.std.primitive;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.api.AbstractPointable;
+import edu.uci.ics.hyracks.data.std.api.IComparable;
+import edu.uci.ics.hyracks.data.std.api.IHashable;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+
+/**
+ * This class provides the raw bytes-based comparison and hash function for UTF8 strings.
+ * Note that the comparison may not deliver the correct ordering for certain languages that include 2 or 3 bytes characters.
+ * But it works for single-byte character languages.
+ */
+public final class RawUTF8StringPointable extends AbstractPointable implements IHashable, IComparable {
+    public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public boolean isFixedLength() {
+            return false;
+        }
+
+        @Override
+        public int getFixedLength() {
+            return 0;
+        }
+    };
+
+    public static final IPointableFactory FACTORY = new IPointableFactory() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public IPointable createPointable() {
+            return new RawUTF8StringPointable();
+        }
+
+        @Override
+        public ITypeTraits getTypeTraits() {
+            return TYPE_TRAITS;
+        }
+    };
+
+    @Override
+    public int compareTo(IPointable pointer) {
+        return compareTo(pointer.getByteArray(), pointer.getStartOffset(), pointer.getLength());
+    }
+
+    @Override
+    public int compareTo(byte[] bytes, int start, int length) {
+        int utflen1 = UTF8StringPointable.getUTFLength(this.bytes, this.start);
+        int utflen2 = UTF8StringPointable.getUTFLength(bytes, start);
+
+        int c1 = 0;
+        int c2 = 0;
+
+        int s1Start = this.start + 2;
+        int s2Start = start + 2;
+
+        while (c1 < utflen1 && c2 < utflen2) {
+            char ch1 = (char) this.bytes[s1Start + c1];
+            char ch2 = (char) bytes[s2Start + c2];
+
+            if (ch1 != ch2) {
+                return ch1 - ch2;
+            }
+            c1++;
+            c2++;
+        }
+        return utflen1 - utflen2;
+    }
+
+    @Override
+    public int hash() {
+        int h = 0;
+        int utflen = UTF8StringPointable.getUTFLength(bytes, start);
+        int sStart = start + 2;
+        int c = 0;
+
+        while (c < utflen) {
+            char ch = (char) bytes[sStart + c];
+            h = 31 * h + ch;
+            c++;
+        }
+        return h;
+    }
+
+    public void toString(StringBuilder buffer) {
+        UTF8StringPointable.toString(buffer, bytes, start);
+    }
+}
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/UTF8StringPointable.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/UTF8StringPointable.java
index f6d6093..866ebb0 100644
--- a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/UTF8StringPointable.java
+++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/primitive/UTF8StringPointable.java
@@ -216,4 +216,4 @@
     public void toString(StringBuilder buffer) {
         toString(buffer, bytes, start);
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..e95e9c2
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java
@@ -0,0 +1,28 @@
+package edu.uci.ics.hyracks.dataflow.common.data.normalizers;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class DoubleNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+        return new INormalizedKeyComputer() {
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                int prefix = IntegerSerializerDeserializer.getInt(bytes, start);
+                if (prefix >= 0) {
+                    return prefix ^ Integer.MIN_VALUE;
+                } else {
+                    return (int) ((long) 0xffffffff - (long) prefix);
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..d58afc1
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java
@@ -0,0 +1,28 @@
+package edu.uci.ics.hyracks.dataflow.common.data.normalizers;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+
+public class FloatNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+        return new INormalizedKeyComputer() {
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                int prefix = IntegerSerializerDeserializer.getInt(bytes, start);
+                if (prefix >= 0) {
+                    return prefix ^ Integer.MIN_VALUE;
+                } else {
+                    return (int) ((long) 0xffffffff - (long) prefix);
+                }
+            }
+
+        };
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..4589909
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -0,0 +1,55 @@
+package edu.uci.ics.hyracks.dataflow.common.data.normalizers;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+
+public class Integer64NormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+
+    private static final long serialVersionUID = 8735044913496854551L;
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+        return new INormalizedKeyComputer() {
+            private static final int POSTIVE_LONG_MASK = (3 << 30);
+            private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
+            private static final int NEGATIVE_LONG_MASK = (0 << 30);
+
+            @Override
+            public int normalize(byte[] bytes, int start, int length) {
+                long value = Integer64SerializerDeserializer.getLong(bytes, start);
+                int highValue = (int) (value >> 32);
+                if (highValue > 0) {
+                    /**
+                     * larger than Integer.MAX
+                     */
+                    int highNmk = getKey(highValue);
+                    highNmk >>= 2;
+                    highNmk |= POSTIVE_LONG_MASK;
+                    return highNmk;
+                } else if (highValue == 0) {
+                    /**
+                     * smaller than Integer.MAX but >=0
+                     */
+                    int lowNmk = (int) value;
+                    lowNmk >>= 2;
+                    lowNmk |= NON_NEGATIVE_INT_MASK;
+                    return lowNmk;
+                } else {
+                    /**
+                     * less than 0: have not optimized for that
+                     */
+                    int highNmk = getKey(highValue);
+                    highNmk >>= 2;
+                    highNmk |= NEGATIVE_LONG_MASK;
+                    return highNmk;
+                }
+            }
+
+            private int getKey(int value) {
+                return value ^ Integer.MIN_VALUE;
+            }
+
+        };
+    }
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
index 2f7a778..6a01842 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
@@ -27,8 +27,7 @@
             @Override
             public int normalize(byte[] bytes, int start, int length) {
                 int value = IntegerSerializerDeserializer.getInt(bytes, start);
-                long unsignedValue = (long) value;
-                return (int) ((unsignedValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
+                return value ^Integer.MIN_VALUE;
             }
         };
     }
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
index 6a9dab7..d9191c7 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
@@ -32,7 +32,7 @@
         return new IValueParser() {
             @Override
             public void parse(char[] buffer, int start, int length, DataOutput out) throws HyracksDataException {
-                long n = 0;
+                int n = 0;
                 int sign = 1;
                 int i = 0;
                 boolean pre = true;
@@ -102,7 +102,7 @@
                             throw new HyracksDataException("Encountered " + ch);
                     }
                 }
-
+                
                 try {
                     out.writeLong(n * sign);
                 } catch (IOException e) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 2905574..6e2b16a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -81,6 +81,8 @@
 
     private int[] buildPSizeInFrames; //Used for partition tuning
     private int freeFramesCounter; //Used for partition tuning
+    
+    private boolean isTableEmpty;	//Added for handling the case, where build side is empty (tableSize is 0)
 
     public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
             String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
@@ -89,10 +91,10 @@
         this.memForJoin = memForJoin;
         this.buildRd = buildRd;
         this.probeRd = probeRd;
-        this.buildHpc = probeHpc;
-        this.probeHpc = buildHpc;
-        this.buildKeys = keys0;
-        this.probeKeys = keys1;
+        this.buildHpc = buildHpc; 	
+        this.probeHpc = probeHpc; 	
+        this.buildKeys = keys1; 	
+        this.probeKeys = keys0;		
         this.comparators = comparators;
         this.rel0Name = rel0Name;
         this.rel1Name = rel1Name;
@@ -117,10 +119,10 @@
         this.memForJoin = memForJoin;
         this.buildRd = buildRd;
         this.probeRd = probeRd;
-        this.buildHpc = probeHpc;
-        this.probeHpc = buildHpc;
-        this.buildKeys = keys0;
-        this.probeKeys = keys1;
+        this.buildHpc = buildHpc; 	
+        this.probeHpc = probeHpc; 	
+        this.buildKeys = keys1; 	
+        this.probeKeys = keys0;		
         this.comparators = comparators;
         this.rel0Name = rel0Name;
         this.rel1Name = rel1Name;
@@ -171,6 +173,12 @@
     public void build(ByteBuffer buffer) throws HyracksDataException {
         accessorBuild.reset(buffer);
         int tupleCount = accessorBuild.getTupleCount();
+   
+        boolean print = false;
+    	if(print){
+    		accessorBuild.prettyPrint();
+    	}
+        
         for (int i = 0; i < tupleCount; ++i) {
             int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
             processTuple(i, pid);
@@ -338,6 +346,7 @@
 
         createInMemoryJoiner(inMemTupCount);
         cacheInMemJoin();
+        this.isTableEmpty = (inMemTupCount == 0);
     }
 
     private void partitionTune() throws HyracksDataException {
@@ -457,10 +466,14 @@
     }
 
     public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-
         accessorProbe.reset(buffer);
         int tupleCount = accessorProbe.getTupleCount();
 
+        boolean print = false;
+    	if(print){
+    		accessorProbe.prettyPrint();
+    	}
+        
         if (numOfSpilledParts == 0) {
             inMemJoiner.join(buffer, writer);
             return;
@@ -604,4 +617,8 @@
                 + freeFramesCounter;
         return s;
     }
+    
+    public boolean isTableEmpty(){
+    	return this.isTableEmpty;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 3a7ee2c..4842ad7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -167,10 +167,10 @@
         ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(probeAid, buildAid);
 
         builder.addActivity(this, phase1);
-        builder.addSourceEdge(0, phase1, 0);
+        builder.addSourceEdge(1, phase1, 0);
 
         builder.addActivity(this, phase2);
-        builder.addSourceEdge(1, phase2, 0);
+        builder.addSourceEdge(0, phase2, 0);
 
         builder.addBlockingEdge(phase1, phase2);
 
@@ -253,14 +253,7 @@
             for (int i = 0; i < comparatorFactories.length; i++) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
-
-            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
-            if (isLeftOuter) {
-                for (int i = 0; i < nullWriterFactories1.length; i++) {
-                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
-                }
-            }
-
+            
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
                         .getJobId(), new TaskId(getActivityId(), partition));
@@ -278,9 +271,17 @@
                     state.memForJoin = memsize - 2;
                     state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor,
                             nPartitions);
-                    state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
-                            PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
-                            buildHpc);
+                    if(!isLeftOuter){
+                    	state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+                                PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
+                                buildHpc);
+                    }
+                    else{
+                    	state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+                                PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
+                                buildHpc, isLeftOuter, nullWriterFactories1);
+                    }
+                    
                     state.hybridHJ.initBuild();
                 }
 
@@ -368,7 +369,9 @@
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    state.hybridHJ.probe(buffer, writer);
+                	if(!state.hybridHJ.isTableEmpty()){
+                		state.hybridHJ.probe(buffer, writer);
+                	}
                 }
 
                 @Override
diff --git a/hyracks/hyracks-dist/pom.xml b/hyracks/hyracks-dist/pom.xml
new file mode 100755
index 0000000..4f9fc20
--- /dev/null
+++ b/hyracks/hyracks-dist/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<artifactId>hyracks</artifactId>
+		<groupId>edu.uci.ics.hyracks</groupId>
+		<version>0.2.2-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>hyracks-dist</artifactId>
+	<name>hyracks-dist</name>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>2.0.2</version>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-resources-plugin</artifactId>
+				<version>2.5</version>
+				<executions>
+					<execution>
+						<id>copy-scripts</id>
+						<!-- here the phase you need -->
+						<phase>package</phase>
+						<goals>
+							<goal>copy-resources</goal>
+						</goals>
+						<configuration>
+							<outputDirectory>target/appassembler/</outputDirectory>
+							<resources>
+								<resource>
+									<directory>src/main/resources</directory>
+								</resource>
+							</resources>
+							<directoryMode>0755</directoryMode>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-antrun-plugin</artifactId>
+				<version>1.6</version>
+				<executions>
+					<execution>
+						<id>process-test-classes</id>
+						<phase>package</phase>
+						<configuration>
+							<target>
+								<chmod file="target/appassembler/bin/*)" perm="755" />
+							</target>
+						</configuration>
+						<goals>
+							<goal>run</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/getip.sh b/hyracks/hyracks-dist/src/main/resources/bin/getip.sh
new file mode 100755
index 0000000..e0cdf73
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/getip.sh
@@ -0,0 +1,21 @@
+#get the OS
+OS_NAME=`uname -a|awk '{print $1}'`
+LINUX_OS='Linux'
+
+if [ $OS_NAME = $LINUX_OS ];
+then
+        #Get IP Address
+        IPADDR=`/sbin/ifconfig eth0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+	if [ "$IPADDR" = "" ]
+        then
+		IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+        fi 
+else
+        IPADDR=`/sbin/ifconfig en1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+	if [ "$IPADDR" = "" ]
+        then
+                IPADDR=`/sbin/ifconfig lo0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+        fi
+
+fi
+echo $IPADDR
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startAllNCs.sh b/hyracks/hyracks-dist/src/main/resources/bin/startAllNCs.sh
new file mode 100755
index 0000000..629bd90
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startAllNCs.sh
@@ -0,0 +1,6 @@
+PREGELIX_PATH=`pwd`
+
+for i in `cat conf/slaves`
+do
+   ssh $i "cd ${PREGELIX_PATH}; bin/startnc.sh"
+done
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startCluster.sh b/hyracks/hyracks-dist/src/main/resources/bin/startCluster.sh
new file mode 100755
index 0000000..a0c2063
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startCluster.sh
@@ -0,0 +1,3 @@
+bin/startcc.sh
+sleep 5
+bin/startAllNCs.sh
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startDebugNc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startDebugNc.sh
new file mode 100755
index 0000000..fe6cf27
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startDebugNc.sh
@@ -0,0 +1,50 @@
+hostname
+
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CURRENT_PATH=`pwd`
+CCHOST=`ssh ${CCHOST_NAME} "cd ${CURRENT_PATH}; bin/getip.sh"`
+
+#Import cluster properties
+. conf/cluster.properties
+. conf/debugnc.properties
+
+#Clean up temp dir
+
+rm -rf $NCTMP_DIR2
+mkdir $NCTMP_DIR2
+
+#Clean up log dir
+rm -rf $NCLOGS_DIR2
+mkdir $NCLOGS_DIR2
+
+
+#Clean up I/O working dir
+io_dirs=$(echo $IO_DIRS2 | tr "," "\n")
+for io_dir in $io_dirs
+do
+	rm -rf $io_dir
+	mkdir $io_dir
+done
+
+#Set JAVA_HOME
+export JAVA_HOME=$JAVA_HOME
+
+#Get OS
+IPADDR=`bin/getip.sh`
+
+#Get node ID
+NODEID=`hostname | cut -d '.' -f 1`
+NODEID=${NODEID}2
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$NCJAVA_OPTS2
+
+cd $HYRACKS_HOME
+HYRACKS_HOME=`pwd`
+
+#Enter the temp dir
+cd $NCTMP_DIR2
+
+#Launch hyracks nc
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS2}" &> $NCLOGS_DIR2/$NODEID.log &
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
new file mode 100755
index 0000000..fe2551d
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+hostname
+
+#Import cluster properties
+. conf/cluster.properties
+
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CCHOST=`bin/getip.sh`
+
+#Remove the temp dir
+rm -rf $CCTMP_DIR
+mkdir $CCTMP_DIR
+
+#Remove the logs dir
+rm -rf $CCLOGS_DIR
+mkdir $CCLOGS_DIR
+
+#Export JAVA_HOME and JAVA_OPTS
+export JAVA_HOME=$JAVA_HOME
+export JAVA_OPTS=$CCJAVA_OPTS
+
+#Launch hyracks cc script
+chmod -R 755 $HYRACKS_HOME
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
new file mode 100755
index 0000000..6e0f90e
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
@@ -0,0 +1,49 @@
+hostname
+
+MY_NAME=`hostname`
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CURRENT_PATH=`pwd`
+CCHOST=`ssh ${CCHOST_NAME} "cd ${CURRENT_PATH}; bin/getip.sh"`
+
+#Import cluster properties
+. conf/cluster.properties
+
+#Clean up temp dir
+
+rm -rf $NCTMP_DIR
+mkdir $NCTMP_DIR
+
+#Clean up log dir
+rm -rf $NCLOGS_DIR
+mkdir $NCLOGS_DIR
+
+
+#Clean up I/O working dir
+io_dirs=$(echo $IO_DIRS | tr "," "\n")
+for io_dir in $io_dirs
+do
+	rm -rf $io_dir
+	mkdir $io_dir
+done
+
+#Set JAVA_HOME
+export JAVA_HOME=$JAVA_HOME
+
+IPADDR=`bin/getip.sh`
+#echo $IPADDR
+
+#Get node ID
+NODEID=`hostname | cut -d '.' -f 1`
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$NCJAVA_OPTS
+
+cd $HYRACKS_HOME
+HYRACKS_HOME=`pwd`
+
+#Enter the temp dir
+cd $NCTMP_DIR
+
+#Launch hyracks nc
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopAllNCs.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopAllNCs.sh
new file mode 100755
index 0000000..12367c1
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopAllNCs.sh
@@ -0,0 +1,6 @@
+PREGELIX_PATH=`pwd`
+
+for i in `cat conf/slaves`
+do
+   ssh $i "cd ${PREGELIX_PATH}; bin/stopnc.sh"
+done
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopCluster.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopCluster.sh
new file mode 100755
index 0000000..4889934
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopCluster.sh
@@ -0,0 +1,3 @@
+bin/stopAllNCs.sh
+sleep 2
+bin/stopcc.sh
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopcc.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopcc.sh
new file mode 100755
index 0000000..c2f525a
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopcc.sh
@@ -0,0 +1,10 @@
+hostname
+. conf/cluster.properties
+
+#Kill process
+PID=`ps -ef|grep ${USER}|grep java|grep hyracks|awk '{print $2}'`
+echo $PID
+kill -9 $PID
+
+#Clean up CC temp dir
+rm -rf $CCTMP_DIR/*
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
new file mode 100755
index 0000000..03ce4e7
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/bin/stopnc.sh
@@ -0,0 +1,23 @@
+hostname
+. conf/cluster.properties
+
+#Kill process
+PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
+
+if [ "$PID" == "" ]; then
+  USERID=`id | sed 's/^uid=//;s/(.*$//'`
+  PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
+fi
+
+echo $PID
+kill -9 $PID
+
+#Clean up I/O working dir
+io_dirs=$(echo $IO_DIRS | tr "," "\n")
+for io_dir in $io_dirs
+do
+	rm -rf $io_dir/*
+done
+
+#Clean up NC temp dir
+rm -rf $NCTMP_DIR/*
diff --git a/hyracks/hyracks-dist/src/main/resources/conf/cluster.properties b/hyracks/hyracks-dist/src/main/resources/conf/cluster.properties
new file mode 100755
index 0000000..3b382f7
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/conf/cluster.properties
@@ -0,0 +1,37 @@
+#The CC port for Hyracks clients
+CC_CLIENTPORT=3099
+
+#The CC port for Hyracks cluster management
+CC_CLUSTERPORT=1099
+
+#The directory of hyracks binaries
+HYRACKS_HOME=../../../
+
+#The tmp directory for cc to install jars
+CCTMP_DIR=/tmp/t1
+
+#The tmp directory for nc to install jars
+NCTMP_DIR=/tmp/t2
+
+#The directory to put cc logs
+CCLOGS_DIR=$CCTMP_DIR/logs
+
+#The directory to put nc logs
+NCLOGS_DIR=$NCTMP_DIR/logs
+
+#Comma separated I/O directories for the spilling of external sort
+IO_DIRS="/tmp/t3,/tmp/t4"
+
+#The JAVA_HOME
+JAVA_HOME=$JAVA_HOME
+
+#The frame size of the internal dataflow engine
+FRAME_SIZE=65536
+
+#CC JAVA_OPTS
+CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
+
+#NC JAVA_OPTS
+NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+
diff --git a/hyracks/hyracks-dist/src/main/resources/conf/debugnc.properties b/hyracks/hyracks-dist/src/main/resources/conf/debugnc.properties
new file mode 100755
index 0000000..27afa26
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/conf/debugnc.properties
@@ -0,0 +1,12 @@
+#The tmp directory for nc to install jars
+NCTMP_DIR2=/tmp/t-1
+
+#The directory to put nc logs
+NCLOGS_DIR2=$NCTMP_DIR/logs
+
+#Comma separated I/O directories for the spilling of external sort
+IO_DIRS2="/tmp/t-2,/tmp/t-3"
+
+#NC JAVA_OPTS
+NCJAVA_OPTS2="-Xdebug -Xrunjdwp:transport=dt_socket,address=7003,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+
diff --git a/hyracks/hyracks-dist/src/main/resources/conf/master b/hyracks/hyracks-dist/src/main/resources/conf/master
new file mode 100755
index 0000000..2fbb50c
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/conf/master
@@ -0,0 +1 @@
+localhost
diff --git a/hyracks/hyracks-dist/src/main/resources/conf/slaves b/hyracks/hyracks-dist/src/main/resources/conf/slaves
new file mode 100755
index 0000000..2fbb50c
--- /dev/null
+++ b/hyracks/hyracks-dist/src/main/resources/conf/slaves
@@ -0,0 +1 @@
+localhost
diff --git a/hyracks/pom.xml b/hyracks/pom.xml
index 26bd647..1d4e094 100644
--- a/hyracks/pom.xml
+++ b/hyracks/pom.xml
@@ -1,4 +1,4 @@
-
+<?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
@@ -8,7 +8,7 @@
   <name>hyracks</name>
 
   <properties>
-    <jvm.extraargs />
+    <jvm.extraargs/>
   </properties>
 
   <build>
@@ -101,5 +101,6 @@
     <module>hyracks-hadoop-compat</module>
     <!--module>hyracks-yarn</module-->
     <module>hyracks-maven-plugins</module>
+    <module>hyracks-dist</module>
   </modules>
-</project>
+</project>
\ No newline at end of file