ASTERIXDB-1556, ASTERIXDB-1733: Hash Group By and Hash Join conform to the memory budget

 - External Hash Group By and Hash Join now conform to the memory budget (compiler.groupmemory and compiler.joinmemory)
 - For Optimzed Hybrid Hash Join, we calculate the expected hash table size when the build phase is done and
   try to spill one or more partitions if the freespace can't afford the hash table size.
 - For External Hash Group By, the number of hash entries (hash table size) is calculated based on
   an estimation of the aggregated tuple size and possible hash values for the given field size in that tuple.
 - Garbage Collection feature has been added to SerializableHashTable. For external hash group-by,
   whenever we spill a data partition to the disk, we also check the ratio of garbage in the hash table.
   If it's greater than the given threshold, we conduct a GC on Hash Table.

Change-Id: I2b323e9a2141b4c1dd1652a360d2d9354d3bc3f5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1056
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 9632aad..5aaf87b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -163,7 +163,6 @@
                                     ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
                                             gby.getGroupByList(),
                                             physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
-                                            physicalOptimizationConfig.getExternalGroupByTableSize(),
                                             (long) physicalOptimizationConfig.getMaxFramesExternalGroupBy()
                                                     * physicalOptimizationConfig.getFrameSize());
                                     generateMergeAggregationExpressions(gby, context);
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index bad6a35..8a96882 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -64,7 +64,7 @@
   </property>
   <property>
     <name>compiler.joinmemory</name>
-    <value>160KB</value>
+    <value>256KB</value>
   </property>
   <property>
     <name>storage.buffercache.pagesize</name>
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration2.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration2.xml
index 7b8d274..2989fa9 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration2.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration2.xml
@@ -64,7 +64,7 @@
     </property>
     <property>
         <name>compiler.joinmemory</name>
-        <value>160KB</value>
+        <value>256KB</value>
     </property>
     <property>
         <name>compiler.parallelism</name>
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration3.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration3.xml
index 85b0cdb..85881f9 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration3.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration3.xml
@@ -64,7 +64,7 @@
     </property>
     <property>
         <name>compiler.joinmemory</name>
-        <value>160KB</value>
+        <value>256KB</value>
     </property>
     <property>
         <name>compiler.parallelism</name>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm
index 1ebdc02..15b1baa 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm
@@ -14,7 +14,7 @@
         },
         "compiler.framesize": 32768,
         "compiler.groupmemory": 163840,
-        "compiler.joinmemory": 163840,
+        "compiler.joinmemory": 262144,
         "compiler.parallelism": 0,
         "compiler.pregelix.home": "~/pregelix",
         "compiler.sortmemory": 327680,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.adm
index dbdf8f0..11e964c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.adm
@@ -14,7 +14,7 @@
         },
         "compiler.framesize": 32768,
         "compiler.groupmemory": 163840,
-        "compiler.joinmemory": 163840,
+        "compiler.joinmemory": 262144,
         "compiler.parallelism": -1,
         "compiler.pregelix.home": "~/pregelix",
         "compiler.sortmemory": 327680,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.adm
index fd268da..83f18f1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.adm
@@ -14,7 +14,7 @@
         },
         "compiler.framesize": 32768,
         "compiler.groupmemory": 163840,
-        "compiler.joinmemory": 163840,
+        "compiler.joinmemory": 262144,
         "compiler.parallelism": 3,
         "compiler.pregelix.home": "~/pregelix",
         "compiler.sortmemory": 327680,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
index b77d567..c53a87b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
@@ -87,5 +87,16 @@
       <artifactId>commons-lang3</artifactId>
       <version>3.5</version>
     </dependency>
+    <dependency>
+      <groupId>com.e-movimento.tinytools</groupId>
+      <artifactId>privilegedaccessor</artifactId>
+      <version>1.2.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 7772620..8555ade 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -65,19 +65,18 @@
 import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
 
 public class ExternalGroupByPOperator extends AbstractPhysicalOperator {
 
-    private final int tableSize;
-    private final long fileSize;
+    private final long inputSize;
     private final int frameLimit;
     private List<LogicalVariable> columnSet = new ArrayList<LogicalVariable>();
 
     public ExternalGroupByPOperator(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList, int frameLimit,
-            int tableSize, long fileSize) {
-        this.tableSize = tableSize;
+            long fileSize) {
         this.frameLimit = frameLimit;
-        this.fileSize = fileSize;
+        this.inputSize = fileSize;
         computeColumnSet(gbyList);
     }
 
@@ -256,7 +255,14 @@
 
         INormalizedKeyComputerFactory normalizedKeyFactory = JobGenHelper
                 .variablesToAscNormalizedKeyComputerFactory(gbyCols, aggOpInputEnv, context);
-        ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
+
+        // Calculates the hash table size (# of unique hash values) based on the budget and a tuple size.
+        int memoryBudgetInBytes = context.getFrameSize() * frameLimit;
+        int groupByColumnsCount = gby.getGroupByList().size() + numFds;
+        int hashTableSize = calculateGroupByTableCardinality(memoryBudgetInBytes, groupByColumnsCount,
+                context.getFrameSize());
+
+        ExternalGroupOperatorDescriptor gbyOpDesc = new ExternalGroupOperatorDescriptor(spec, hashTableSize, inputSize,
                 keyAndDecFields, frameLimit, comparatorFactories, normalizedKeyFactory, aggregatorFactory, mergeFactory,
                 recordDescriptor, recordDescriptor, new HashSpillableTableFactory(hashFunctionFactories));
         contributeOpDesc(builder, gby, gbyOpDesc);
@@ -275,4 +281,52 @@
     public boolean expensiveThanMaterialization() {
         return true;
     }
+
+    /**
+     * Based on a rough estimation of a tuple (each field size: 4 bytes) size and the number of possible hash values
+     * for the given number of group-by columns, calculates the number of hash entries for the hash table in Group-by.
+     * The formula is min(# of possible hash values, # of possible tuples in the data table).
+     * This method assumes that the group-by table consists of hash table that stores hash value of tuple pointer
+     * and data table actually stores the aggregated tuple.
+     * For more details, refer to this JIRA issue: https://issues.apache.org/jira/browse/ASTERIXDB-1556
+     *
+     * @param memoryBudgetByteSize
+     * @param numberOfGroupByColumns
+     * @return group-by table size (the cardinality of group-by table)
+     */
+    public static int calculateGroupByTableCardinality(long memoryBudgetByteSize, int numberOfGroupByColumns,
+            int frameSize) {
+        // Estimates a minimum tuple size with n fields:
+        // (4:tuple offset in a frame, 4n:each field offset in a tuple, 4n:each field size 4 bytes)
+        int tupleByteSize = 4 + 8 * numberOfGroupByColumns;
+
+        // Maximum number of tuples
+        long maxNumberOfTuplesInDataTable = memoryBudgetByteSize / tupleByteSize;
+
+        // To calculate possible hash values, this counts the number of bits.
+        // We assume that each field consists of 4 bytes.
+        // Also, too high range that is greater than Long.MAXVALUE (64 bits) is not necessary for our calculation.
+        // And, this should not generate negative numbers when shifting the number.
+        int numberOfBits = Math.min(61, numberOfGroupByColumns * 4 * 8);
+
+        // Possible number of unique hash entries
+        long possibleNumberOfHashEntries = 2L << numberOfBits;
+
+        // Between # of entries in Data table and # of possible hash values, we choose the smaller one.
+        long groupByTableCardinality = Math.min(possibleNumberOfHashEntries, maxNumberOfTuplesInDataTable);
+        long groupByTableByteSize = SerializableHashTable.getExpectedTableByteSize(groupByTableCardinality, frameSize);
+
+        // Gets the ratio of hash-table size in the total size (hash + data table).
+        double hashTableRatio = (double) groupByTableByteSize / (groupByTableByteSize + memoryBudgetByteSize);
+
+        // Gets the table size based on the ratio that we have calculated.
+        long finalGroupByTableByteSize = (long) (hashTableRatio * memoryBudgetByteSize);
+
+        long finalGroupByTableCardinality = finalGroupByTableByteSize
+                / SerializableHashTable.getExpectedByteSizePerHashValue();
+
+        // The maximum cardinality of a hash table: Integer.MAX_VALUE
+        return finalGroupByTableCardinality > Integer.MAX_VALUE ? Integer.MAX_VALUE
+                : (int) finalGroupByTableCardinality;
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 248bc4f..ee0bec7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -61,6 +61,7 @@
 
 public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
 
+    // The maximum number of in-memory frames that this hash join can use.
     private final int memSizeInFrames;
     private final int maxInputBuildSizeInFrames;
     private final int aveRecordsPerFrame;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index fff0fb4..a1d496d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -50,15 +50,19 @@
 public class InMemoryHashJoinPOperator extends AbstractHashJoinPOperator {
 
     private final int tableSize;
+    // The maximum number of in-memory frames that this hash join can use.
+    private final int memSizeInFrames;
 
     /**
      * builds on the first operator and probes on the second.
      */
 
     public InMemoryHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
-            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, int tableSize) {
+            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, int tableSize,
+            int memSizeInFrames) {
         super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
         this.tableSize = tableSize;
+        this.memSizeInFrames = memSizeInFrames;
     }
 
     @Override
@@ -106,7 +110,7 @@
         switch (kind) {
             case INNER: {
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
-                        comparatorFactories, recDescriptor, tableSize, predEvaluatorFactory);
+                        comparatorFactories, recDescriptor, tableSize, predEvaluatorFactory, memSizeInFrames);
                 break;
             }
             case LEFT_OUTER: {
@@ -116,7 +120,7 @@
                 }
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
                         comparatorFactories, predEvaluatorFactory, recDescriptor, true, nonMatchWriterFactories,
-                        tableSize);
+                        tableSize, memSizeInFrames);
                 break;
             }
             default: {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperatorTest.java b/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperatorTest.java
new file mode 100644
index 0000000..09b8ec0
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperatorTest.java
@@ -0,0 +1,128 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.junit.Assert;
+import org.junit.Test;
+
+import junit.extensions.PA;
+
+public class ExternalGroupByPOperatorTest {
+
+    @Test
+    public void testCalculateGroupByTableCardinality() throws Exception {
+
+        // Creates a dummy variable and an expression that are needed by the operator. They are not used by this test.
+        LogicalVariable v = new LogicalVariable(0);
+        MutableObject<ILogicalExpression> e = new MutableObject<ILogicalExpression>(new VariableReferenceExpression(v));
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = new ArrayList<>();
+        gbyList.add(new Pair<>(v, e));
+        ExternalGroupByPOperator eGByOp = new ExternalGroupByPOperator(gbyList, 0, 0);
+
+        // Test 1: compiler.groupmemory: 512 bytes, frame size: 256 bytes, with 1 column group-by
+        long memoryBudgetInBytes = 512;
+        int numberOfGroupByColumns = 1;
+        int frameSize = 256;
+        int resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 9);
+
+        // Sets the frame size to 128KB.
+        frameSize = 128 * 1024;
+
+        // Test 2: memory size: 1 MB, frame size: 128 KB, 1 column group-by
+        memoryBudgetInBytes = 1024 * 1024;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 19660);
+
+        // Test 3: memory size: 100 MB, frame size: 128 KB, 1 column group-by
+        memoryBudgetInBytes = 1024 * 1024 * 100;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 1937883);
+
+        // Test 4: memory size: 1 GB, frame size: 128 KB, 1 column group-by
+        memoryBudgetInBytes = 1024 * 1024 * 1024;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 19841178);
+
+        // Test 5: memory size: 10 GB, frame size: 128 KB, 1 column group-by
+        memoryBudgetInBytes = 1024 * 1024 * 1024 * 10L;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 198409112);
+
+        // Test 6: memory size: 100 GB, frame size: 128 KB, 1 column group-by
+        memoryBudgetInBytes = 1024 * 1024 * 1024 * 100L;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 1962753871);
+
+        // Test 7: memory size: 1 TB, frame size: 128 KB, 1 column group-by
+        // The cardinality will be set to Integer.MAX_VALUE in this case since the budget is too huge.
+        memoryBudgetInBytes = 1024 * 1024 * 1024 * 1024L;
+        frameSize = 128 * 1024;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 2147483647);
+
+        // Test 8: memory size: 1 MB, frame size: 128 KB, 2 columns group-by
+        memoryBudgetInBytes = 1024 * 1024;
+        numberOfGroupByColumns = 2;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 16681);
+
+        // Test 9: memory size: 1 MB, frame size: 128 KB, 3 columns group-by
+        memoryBudgetInBytes = 1024 * 1024;
+        numberOfGroupByColumns = 3;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 15176);
+
+        // Test 10: memory size: 1 MB, frame size: 128 KB, 4 columns group-by
+        memoryBudgetInBytes = 1024 * 1024;
+        numberOfGroupByColumns = 4;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 13878);
+
+        // Test 11: memory size: 32 MB, frame size: 128 KB, 2 columns group-by
+        memoryBudgetInBytes = 1024 * 1024 * 32L;
+        numberOfGroupByColumns = 4;
+        resultCardinality = (int) PA.invokeMethod(eGByOp, "calculateGroupByTableCardinality(long,int,int)",
+                memoryBudgetInBytes, numberOfGroupByColumns, frameSize);
+        Assert.assertTrue(resultCardinality == 408503);
+    }
+
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 5240781..6fdcfdf 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -167,7 +167,6 @@
                                     ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
                                             gby.getGroupByList(),
                                             physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
-                                            physicalOptimizationConfig.getExternalGroupByTableSize(),
                                             (long) physicalOptimizationConfig.getMaxFramesExternalGroupBy()
                                                     * physicalOptimizationConfig.getFrameSize());
                                     op.setPhysicalOperator(externalGby);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 3332836..d66e6fc 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -24,7 +24,6 @@
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -116,8 +115,9 @@
                 AlgebricksConfig.ALGEBRICKS_LOGGER
                         .fine("// HybridHashJoin inner branch " + opBuild + " fits in memory\n");
                 // maintains the local properties on the probe side
-                op.setPhysicalOperator(new InMemoryHashJoinPOperator(hhj.getKind(), hhj.getPartitioningType(),
-                        hhj.getKeysLeftBranch(), hhj.getKeysRightBranch(), v.getNumberOfTuples() * 2));
+                op.setPhysicalOperator(
+                        new InMemoryHashJoinPOperator(hhj.getKind(), hhj.getPartitioningType(), hhj.getKeysLeftBranch(),
+                                hhj.getKeysRightBranch(), v.getNumberOfTuples() * 2, hhj.getMemSizeInFrames()));
             }
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameTupleReversibleAppender.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameTupleReversibleAppender.java
new file mode 100644
index 0000000..a74f6b8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameTupleReversibleAppender.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.api.comm;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A Tuple Appender class whose last append action can be canceled.
+ */
+public interface IFrameTupleReversibleAppender {
+
+    /**
+     * Cancels the effect of the last append operation. i.e. undoes the last append operation.
+     */
+    boolean cancelAppend() throws HyracksDataException;
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java
index 60c6e6d..f1ea839 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FixedSizeFrameTupleAppender.java
@@ -19,9 +19,13 @@
 
 package org.apache.hyracks.dataflow.common.comm.io;
 
+import org.apache.hyracks.api.comm.FrameConstants;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrameTupleReversibleAppender;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.IntSerDeUtils;
 
-public class FixedSizeFrameTupleAppender extends FrameTupleAppender {
+public class FixedSizeFrameTupleAppender extends FrameTupleAppender implements IFrameTupleReversibleAppender {
     @Override
     protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException {
         if (hasEnoughSpace(fieldCount, dataLength)) {
@@ -29,4 +33,26 @@
         }
         return false;
     }
-}
+
+    /**
+     * Cancels the lastly performed append operation. i.e. decreases the tuple count and resets the data end offset.
+     */
+    @Override
+    public boolean cancelAppend() throws HyracksDataException {
+        // Decreases tupleCount by one.
+        tupleCount = IntSerDeUtils.getInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()));
+        if (tupleCount == 0) {
+            // There is no inserted tuple in the given frame. This should not happen.
+            return false;
+        }
+        tupleCount = tupleCount - 1;
+
+        // Resets tupleCount and DataEndOffset.
+        IntSerDeUtils.putInt(array, FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+        tupleDataEndOffset = tupleCount == 0 ? FrameConstants.TUPLE_START_OFFSET
+                : IntSerDeUtils.getInt(array,
+                        FrameHelper.getTupleCountOffset(frame.getFrameSize()) - tupleCount * FrameConstants.SIZE_LEN);
+        return true;
+    }
+
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
index fb160f0..71ac6a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java
@@ -88,4 +88,8 @@
     public long getFileSize() {
         return size;
     }
+
+    public void setDeleteAfterClose(boolean deleteAfterClose) {
+        this.deleteAfterClose = deleteAfterClose;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameBufferManager.java
new file mode 100644
index 0000000..700500b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FrameBufferManager.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * General Frame based buffer manager class
+ */
+public class FrameBufferManager implements IFrameBufferManager {
+
+    ArrayList<ByteBuffer> buffers = new ArrayList<>();
+
+    @Override
+    public void reset() throws HyracksDataException {
+        buffers.clear();
+    }
+
+    @Override
+    public BufferInfo getFrame(int frameIndex, BufferInfo returnedInfo) {
+        returnedInfo.reset(buffers.get(frameIndex), 0, buffers.get(frameIndex).capacity());
+        return returnedInfo;
+    }
+
+    @Override
+    public int getNumFrames() {
+        return buffers.size();
+    }
+
+    @Override
+    public int insertFrame(ByteBuffer frame) throws HyracksDataException {
+        buffers.add(frame);
+        return buffers.size() - 1;
+    }
+
+    @Override
+    public void close() {
+        buffers = null;
+    }
+
+}
+
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FramePoolBackedFrameBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FramePoolBackedFrameBufferManager.java
new file mode 100644
index 0000000..9808797
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/FramePoolBackedFrameBufferManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This is a simple frame based buffer manager that uses a deallocatable frame pool.
+ * We assume that the list of assigned buffers are managed by another classes that call the methods of this class.
+ */
+public class FramePoolBackedFrameBufferManager implements ISimpleFrameBufferManager {
+
+    private final IDeallocatableFramePool framePool;
+
+    public FramePoolBackedFrameBufferManager(IDeallocatableFramePool framePool) {
+        this.framePool = framePool;
+    }
+
+    @Override
+    public ByteBuffer acquireFrame(int frameSize) throws HyracksDataException {
+        return framePool.allocateFrame(frameSize);
+    }
+
+    @Override
+    public void releaseFrame(ByteBuffer frame) {
+        framePool.deAllocateBuffer(frame);
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
index a0a9ab0..6c08be2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/IPartitionedTupleBufferManager.java
@@ -59,7 +59,7 @@
     /**
      * Insert tuple {@code tupleId} from the {@code tupleAccessor} into the given partition.
      * The returned handle is written into the tuplepointer
-     * 
+     *
      * @param partition
      *            the id of the partition to insert the tuple
      * @param tupleAccessor
@@ -75,8 +75,13 @@
             throws HyracksDataException;
 
     /**
+     * Cancels the effect of last insertTuple() operation. i.e. undoes the last insertTuple() operation.
+     */
+    void cancelInsertTuple(int partition) throws HyracksDataException;
+
+    /**
      * Reset to the initial states. The previous allocated resources won't be released in order to be used in the next round.
-     * 
+     *
      * @throws HyracksDataException
      */
     void reset() throws HyracksDataException;
@@ -93,7 +98,7 @@
      * This partition will not be cleared.
      * Currently it is used by Join where we flush the inner partition to the join (as a frameWriter),
      * but we will still keep the inner for the next outer partition.
-     * 
+     *
      * @param pid
      * @param writer
      * @throws HyracksDataException
@@ -102,7 +107,7 @@
 
     /**
      * Clear the memory occupation of the particular partition.
-     * 
+     *
      * @param partition
      * @throws HyracksDataException
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ISimpleFrameBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ISimpleFrameBufferManager.java
new file mode 100644
index 0000000..b0addfd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/ISimpleFrameBufferManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Manages the buffer space in the unit of frame.
+ * This buffer manager is suitable for a structure that manages
+ * the list of assigned frames on its own (e.g., SerializableHashTable class).
+ */
+public interface ISimpleFrameBufferManager {
+
+    /**
+     * Gets a frame from this buffer manager.
+     */
+    public ByteBuffer acquireFrame(int frameSize) throws HyracksDataException;
+
+    /**
+     * Releases a frame to this buffer manager.
+     */
+    public void releaseFrame(ByteBuffer frame);
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
index f9387a9..12985c0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/PreferToSpillFullyOccupiedFramePolicy.java
@@ -30,32 +30,34 @@
 
     private final IPartitionedTupleBufferManager bufferManager;
     private final BitSet spilledStatus;
-    private final int minFrameSize;
 
-    public PreferToSpillFullyOccupiedFramePolicy(IPartitionedTupleBufferManager bufferManager, BitSet spilledStatus,
-            int minFrameSize) {
+    public PreferToSpillFullyOccupiedFramePolicy(IPartitionedTupleBufferManager bufferManager, BitSet spilledStatus) {
         this.bufferManager = bufferManager;
         this.spilledStatus = spilledStatus;
-        this.minFrameSize = minFrameSize;
     }
 
+    /**
+     * This method tries to find a victim partition.
+     * We want to keep in-memory partitions (not spilled to the disk yet) as long as possible to reduce the overhead
+     * of writing to and reading from the disk.
+     * If the given partition contains one or more tuple, then try to spill the given partition.
+     * If not, try to flush another an in-memory partition.
+     * Note: right now, the createAtMostOneFrameForSpilledPartitionConstrain we are using for a spilled partition
+     * enforces that the number of maximum frame for a spilled partition is 1.
+     */
     public int selectVictimPartition(int failedToInsertPartition) {
-        // To avoid flush the half-full frame, it's better to spill itself.
+        // To avoid flushing another partition with the last half-full frame, it's better to spill the given partition
+        // since one partition needs to be spilled to the disk anyway. Another reason is that we know that
+        // the last frame in this partition is full.
         if (bufferManager.getNumTuples(failedToInsertPartition) > 0) {
             return failedToInsertPartition;
         }
-        int partitionToSpill = findSpilledPartitionWithMaxMemoryUsage();
-        int maxToSpillPartSize = 0;
-        // if we couldn't find the already spilled partition, or it is too small to flush that one,
-        // try to flush an in memory partition.
-        if (partitionToSpill < 0
-                || (maxToSpillPartSize = bufferManager.getPhysicalSize(partitionToSpill)) == minFrameSize) {
-            int partitionInMem = findInMemPartitionWithMaxMemoryUsage();
-            if (partitionInMem >= 0 && bufferManager.getPhysicalSize(partitionInMem) > maxToSpillPartSize) {
-                partitionToSpill = partitionInMem;
-            }
-        }
-        return partitionToSpill;
+        // If the given partition doesn't contain any tuple in memory, try to flush a different in-memory partition.
+        // We are not trying to steal a frame from another spilled partition since once spilled, a partition can only
+        // have only one frame and we don't know whether the frame is fully occupied or not.
+        // TODO: Once we change this policy (spilled partition can have only one frame in memory),
+        //       we need to revise this method, too.
+        return findInMemPartitionWithMaxMemoryUsage();
     }
 
     public int findInMemPartitionWithMaxMemoryUsage() {
@@ -80,8 +82,8 @@
     }
 
     /**
-     * Create an constrain for the already spilled partition that it can only use at most one frame.
-     * 
+     * Create a constrain for the already spilled partition that it can only use at most one frame.
+     *
      * @param spillStatus
      * @return
      */
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/TupleInFrameListAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/TupleInFrameListAccessor.java
new file mode 100644
index 0000000..2c9d24a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/TupleInFrameListAccessor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.buffermanager;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender;
+import org.apache.hyracks.dataflow.std.sort.util.IAppendDeletableFrameTupleAccessor;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+/**
+ * tuple accessor class for a frame in a frame list using a tuple pointer.
+ */
+public class TupleInFrameListAccessor extends AbstractTuplePointerAccessor {
+
+    private IAppendDeletableFrameTupleAccessor bufferAccessor;
+    private List<ByteBuffer> bufferFrames;
+
+    public TupleInFrameListAccessor(RecordDescriptor rd, List<ByteBuffer> bufferFrames) {
+        bufferAccessor = new DeletableFrameTupleAppender(rd);
+        this.bufferFrames = bufferFrames;
+    }
+
+    @Override
+    IFrameTupleAccessor getInnerAccessor() {
+        return bufferAccessor;
+    }
+
+    @Override
+    void resetInnerAccessor(TuplePointer tuplePointer) {
+        bufferAccessor.reset(bufferFrames.get(tuplePointer.getFrameIndex()));
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 4d4f279..4578c2e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -20,7 +20,6 @@
 package org.apache.hyracks.dataflow.std.buffermanager;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.hyracks.api.comm.FixedSizeFrame;
@@ -54,6 +53,18 @@
     private BufferInfo tempInfo;
     private final IPartitionedMemoryConstrain constrain;
 
+    // In case where a frame pool is shared by one or more buffer manager(s), it can be provided from the caller.
+    public VPartitionTupleBufferManager(IPartitionedMemoryConstrain constrain, int partitions,
+            IDeallocatableFramePool framePool) throws HyracksDataException {
+        this.constrain = constrain;
+        this.framePool = framePool;
+        this.partitionArray = new IFrameBufferManager[partitions];
+        this.numTuples = new int[partitions];
+        this.appendFrame = new FixedSizeFrame();
+        this.appender = new FixedSizeFrameTupleAppender();
+        this.tempInfo = new BufferInfo(null, -1, -1);
+    }
+
     public VPartitionTupleBufferManager(IHyracksFrameMgrContext ctx, IPartitionedMemoryConstrain constrain,
             int partitions, int frameLimitInBytes) throws HyracksDataException {
         this.constrain = constrain;
@@ -146,6 +157,17 @@
                 tupleAccessor.getTupleStartOffset(tupleId), tupleAccessor.getTupleLength(tupleId), pointer);
     }
 
+    @Override
+    public void cancelInsertTuple(int partition) throws HyracksDataException {
+        int fid = getLastBuffer(partition);
+        if (fid < 0) {
+            throw new HyracksDataException("Couldn't get the last frame for the given partition.");
+        }
+        partitionArray[partition].getFrame(fid, tempInfo);
+        deleteTupleFromBuffer(tempInfo);
+        numTuples[partition]--;
+    }
+
     private static int calculateActualSize(int[] fieldEndOffsets, int size) {
         if (fieldEndOffsets != null) {
             return FrameHelper.calcRequiredSpace(fieldEndOffsets.length, size);
@@ -200,11 +222,25 @@
         return -1;
     }
 
+    private void deleteTupleFromBuffer(BufferInfo bufferInfo) throws HyracksDataException {
+        if (bufferInfo.getBuffer() != appendFrame.getBuffer()) {
+            appendFrame.reset(bufferInfo.getBuffer());
+            appender.reset(appendFrame, false);
+        }
+        if (!appender.cancelAppend()) {
+            throw new HyracksDataException("Undoing the last insertion in the given frame couldn't be done.");
+        }
+    }
+
     private int getLastBufferOrCreateNewIfNotExist(int partition, int actualSize) throws HyracksDataException {
         if (partitionArray[partition] == null || partitionArray[partition].getNumFrames() == 0) {
-            partitionArray[partition] = new PartitionFrameBufferManager();
+            partitionArray[partition] = new FrameBufferManager();
             return createNewBuffer(partition, actualSize);
         }
+        return getLastBuffer(partition);
+    }
+
+    private int getLastBuffer(int partition) throws HyracksDataException {
         return partitionArray[partition].getNumFrames() - 1;
     }
 
@@ -219,39 +255,6 @@
         Arrays.fill(partitionArray, null);
     }
 
-    private static class PartitionFrameBufferManager implements IFrameBufferManager {
-
-        ArrayList<ByteBuffer> buffers = new ArrayList<>();
-
-        @Override
-        public void reset() throws HyracksDataException {
-            buffers.clear();
-        }
-
-        @Override
-        public BufferInfo getFrame(int frameIndex, BufferInfo returnedInfo) {
-            returnedInfo.reset(buffers.get(frameIndex), 0, buffers.get(frameIndex).capacity());
-            return returnedInfo;
-        }
-
-        @Override
-        public int getNumFrames() {
-            return buffers.size();
-        }
-
-        @Override
-        public int insertFrame(ByteBuffer frame) throws HyracksDataException {
-            buffers.add(frame);
-            return buffers.size() - 1;
-        }
-
-        @Override
-        public void close() {
-            buffers = null;
-        }
-
-    }
-
     @Override
     public ITuplePointerAccessor getTuplePointerAccessor(final RecordDescriptor recordDescriptor) {
         return new AbstractTuplePointerAccessor() {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 34fdc48..cd79a30 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -36,7 +36,11 @@
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
+import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedTupleBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
 import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
 import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
@@ -51,6 +55,10 @@
     private static final double FUDGE_FACTOR = 1.1;
     private static final long serialVersionUID = 1L;
     private final IBinaryHashFunctionFamily[] hashFunctionFamilies;
+    private static final int MIN_DATA_TABLE_FRAME_LIMT = 1;
+    private static final int MIN_HASH_TABLE_FRAME_LIMT = 2;
+    private static final int OUTPUT_FRAME_LIMT = 1;
+    private static final int MIN_FRAME_LIMT = MIN_DATA_TABLE_FRAME_LIMT + MIN_HASH_TABLE_FRAME_LIMT + OUTPUT_FRAME_LIMT;
 
     public HashSpillableTableFactory(IBinaryHashFunctionFamily[] hashFunctionFamilies) {
         this.hashFunctionFamilies = hashFunctionFamilies;
@@ -62,11 +70,15 @@
             final INormalizedKeyComputer firstKeyNormalizerFactory, IAggregatorDescriptorFactory aggregateFactory,
             RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, final int framesLimit,
             final int seed) throws HyracksDataException {
-        if (framesLimit < 2) {
-            throw new HyracksDataException("The frame limit is too small to partition the data");
-        }
         final int tableSize = suggestTableSize;
 
+        // For HashTable, we need to have at least two frames (one for header and one for content).
+        // For DataTable, we need to have at least one frame.
+        // For the output, we need to have at least one frame.
+        if (framesLimit < MIN_FRAME_LIMT) {
+            throw new HyracksDataException("The given frame limit is too small to partition the data.");
+        }
+
         final int[] intermediateResultKeys = new int[keyFields.length];
         for (int i = 0; i < keyFields.length; i++) {
             intermediateResultKeys[i] = i;
@@ -78,6 +90,12 @@
         final ITuplePartitionComputer tpc = new FieldHashPartitionComputerFamily(keyFields, hashFunctionFamilies)
                 .createPartitioner(seed);
 
+        // For calculating hash value for the already aggregated tuples (not incoming tuples)
+        // This computer is required to calculate the hash value of a aggregated tuple
+        // while doing the garbage collection work on Hash Table.
+        final ITuplePartitionComputer tpcIntermediate = new FieldHashPartitionComputerFamily(intermediateResultKeys,
+                hashFunctionFamilies).createPartitioner(seed);
+
         final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor,
                 outRecordDescriptor, keyFields, intermediateResultKeys, null);
 
@@ -86,31 +104,42 @@
         final ArrayTupleBuilder stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
 
         //TODO(jf) research on the optimized partition size
-        final int numPartitions = getNumOfPartitions((int) (inputDataBytesSize / ctx.getInitialFrameSize()),
-                framesLimit - 1);
+        long memoryBudget = Math.max(MIN_DATA_TABLE_FRAME_LIMT + MIN_HASH_TABLE_FRAME_LIMT,
+                framesLimit - OUTPUT_FRAME_LIMT - MIN_HASH_TABLE_FRAME_LIMT);
+
+        final int numPartitions = getNumOfPartitions(inputDataBytesSize / ctx.getInitialFrameSize(), memoryBudget);
         final int entriesPerPartition = (int) Math.ceil(1.0 * tableSize / numPartitions);
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine(
-                    "create hashtable, table size:" + tableSize + " file size:" + inputDataBytesSize + "  partitions:"
+                    "created hashtable, table size:" + tableSize + " file size:" + inputDataBytesSize + "  #partitions:"
                     + numPartitions);
         }
 
         final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
 
-        final ISerializableTable hashTableForTuplePointer = new SerializableHashTable(tableSize, ctx);
-
         return new ISpillableTable() {
 
             private final TuplePointer pointer = new TuplePointer();
             private final BitSet spilledSet = new BitSet(numPartitions);
-            final IPartitionedTupleBufferManager bufferManager = new VPartitionTupleBufferManager(ctx,
+            // This frame pool will be shared by both data table and hash table.
+            private final IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx,
+                    framesLimit * ctx.getInitialFrameSize());
+            // buffer manager for hash table
+            private final ISimpleFrameBufferManager bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(
+                    framePool);
+
+            private final ISerializableTable hashTableForTuplePointer = new SerializableHashTable(tableSize, ctx,
+                    bufferManagerForHashTable);
+
+            // buffer manager for data table
+            final IPartitionedTupleBufferManager bufferManager = new VPartitionTupleBufferManager(
                     PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledSet),
-                    numPartitions, framesLimit * ctx.getInitialFrameSize());
+                    numPartitions, framePool);
 
             final ITuplePointerAccessor bufferAccessor = bufferManager.getTuplePointerAccessor(outRecordDescriptor);
 
             private final PreferToSpillFullyOccupiedFramePolicy spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(
-                    bufferManager, spilledSet, ctx.getInitialFrameSize());
+                    bufferManager, spilledSet);
 
             private final FrameTupleAppender outputAppender = new FrameTupleAppender(new VSizeFrame(ctx));
 
@@ -125,6 +154,17 @@
                 for (int p = getFirstEntryInHashTable(partition); p < getLastEntryInHashTable(partition); p++) {
                     hashTableForTuplePointer.delete(p);
                 }
+
+                // Checks whether the garbage collection is required and conducts a garbage collection if so.
+                if (hashTableForTuplePointer.isGarbageCollectionNeeded()) {
+                    int numberOfFramesReclaimed = hashTableForTuplePointer.collectGarbage(bufferAccessor,
+                            tpcIntermediate);
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("Garbage Collection on Hash table is done. Deallocated frames:"
+                                + numberOfFramesReclaimed);
+                    }
+                }
+
                 bufferManager.clearPartition(partition);
             }
 
@@ -152,20 +192,34 @@
                         return true;
                     }
                 }
-
                 return insertNewAggregateEntry(entryInHashTable, accessor, tIndex);
             }
 
+            /**
+             * Inserts a new aggregate entry into the data table and hash table.
+             * This insertion must be an atomic operation. We cannot have a partial success or failure.
+             * So, if an insertion succeeds on the data table and the same insertion on the hash table fails, then
+             * we need to revert the effect of data table insertion.
+             */
             private boolean insertNewAggregateEntry(int entryInHashTable, IFrameTupleAccessor accessor, int tIndex)
                     throws HyracksDataException {
                 initStateTupleBuilder(accessor, tIndex);
                 int pid = getPartition(entryInHashTable);
 
+                // Insertion to the data table
                 if (!bufferManager.insertTuple(pid, stateTupleBuilder.getByteArray(),
                         stateTupleBuilder.getFieldEndOffsets(), 0, stateTupleBuilder.getSize(), pointer)) {
                     return false;
                 }
-                hashTableForTuplePointer.insert(entryInHashTable, pointer);
+
+                // Insertion to the hash table
+                if (!hashTableForTuplePointer.insert(entryInHashTable, pointer)) {
+                    // To preserve the atomicity of this method, we need to undo the effect
+                    // of the above bufferManager.insertTuple() call since the given insertion has failed.
+                    bufferManager.cancelInsertTuple(pid);
+                    return false;
+                }
+
                 return true;
             }
 
@@ -240,25 +294,30 @@
     }
 
     /**
-     * Calculate the number of partitions for Data table. The formula is from Shapiro's paper -
+     * Calculates the number of partitions for Data table. The formula is from Shapiro's paper -
      * http://cs.stanford.edu/people/chrismre/cs345/rl/shapiro.pdf. Check the page 249 for more details.
      * If the required number of frames is greater than the number of available frames, we make sure that
      * at least two partitions will be created. Also, if the number of partitions is greater than the memory budget,
      * we may not allocate at least one frame for each partition in memory. So, we also deal with those cases
      * at the final part of the method.
+     * The maximum number of partitions is limited to Integer.MAX_VALUE.
      */
-    private int getNumOfPartitions(int nubmerOfInputFrames, int frameLimit) {
+    private int getNumOfPartitions(long nubmerOfInputFrames, long frameLimit) {
         if (frameLimit >= nubmerOfInputFrames * FUDGE_FACTOR) {
-            return 1; // all in memory, we will create a big partition.
+            // all in memory, we will create two big partitions. We set 2 (not 1) to avoid the corner case
+            // where the only partition may be spilled to the disk. This may happen since this formula doesn't consider
+            // the hash table size. If this is the case, we will have an indefinite loop - keep spilling the same
+            // partition again and again.
+            return 2;
         }
-        int numberOfPartitions = (int) (Math
+        long numberOfPartitions = (long) (Math
                 .ceil((nubmerOfInputFrames * FUDGE_FACTOR - frameLimit) / (frameLimit - 1)));
         numberOfPartitions = Math.max(2, numberOfPartitions);
         if (numberOfPartitions > frameLimit) {
-            numberOfPartitions = (int) Math.ceil(Math.sqrt(nubmerOfInputFrames * FUDGE_FACTOR));
-            return Math.max(2, Math.min(numberOfPartitions, frameLimit));
+            numberOfPartitions = (long) Math.ceil(Math.sqrt(nubmerOfInputFrames * FUDGE_FACTOR));
+            return (int) Math.min(Math.max(2, Math.min(numberOfPartitions, frameLimit)), Integer.MAX_VALUE);
         }
-        return numberOfPartitions;
+        return (int) Math.min(numberOfPartitions, Integer.MAX_VALUE);
     }
 
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
index dbe6858..629f211 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
@@ -27,8 +27,8 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface ISpillableTableFactory extends Serializable {
-    ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int inputSizeInTuple, long dataBytesSize, int[] keyFields,
-            IBinaryComparator[] comparatorFactories, INormalizedKeyComputer firstKeyNormalizerFactory,
+    ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int inputSizeInTuple, long dataBytesSize,
+            int[] keyFields, IBinaryComparator[] comparatorFactories, INormalizedKeyComputer firstKeyNormalizerFactory,
             IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int framesLimit, int seed) throws HyracksDataException;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index 852a160..7d10802 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -106,7 +106,7 @@
 
     @Override
     public void close() throws HyracksDataException {
-        if (isFailed) {
+        if (isFailed && state.getRuns() != null) {
             for (int i = 0; i < state.getRuns().length; i++) {
                 RunFileWriter run = state.getRuns()[i];
                 if (run != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
index 433b75d..4e0724c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
@@ -65,12 +65,13 @@
             RecordDescriptor outRecordDesc, ISpillableTableFactory spillableTableFactory) {
         super(spec, 1, 1);
         this.framesLimit = framesLimit;
-        if (framesLimit <= 1) {
+        if (framesLimit <= 3) {
             /**
-             * Minimum of 2 frames: 1 for input records, and 1 for output
+             * Minimum of 4 frames: 1 for input records, 1 for output, and 2 for hash table (1 header and 1 content)
              * aggregation results.
              */
-            throw new IllegalStateException("frame limit should at least be 2, but it is " + framesLimit + "!");
+            throw new IllegalStateException(
+                    "Frame limit for the External Group Operator should at least be 4, but it is " + framesLimit + "!");
         }
         this.partialAggregatorFactory = partialAggregatorFactory;
         this.intermediateAggregateFactory = intermediateAggregateFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 4354367..ad14bad 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -58,7 +58,7 @@
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
-import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
+import org.apache.hyracks.dataflow.std.structures.SimpleSerializableHashTable;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
 public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -308,10 +308,11 @@
                     ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
                             .createPartitioner();
                     int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
-                    ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+                    ISerializableTable table = new SimpleSerializableHashTable(tableSize, ctx);
                     state.joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpc0,
-                            new FrameTupleAccessor(rd1), hpc1, new FrameTuplePairComparator(keys0, keys1, comparators),
-                            isLeftOuter, nullWriters1, table, predEvaluator);
+                            new FrameTupleAccessor(rd1), rd1, hpc1,
+                            new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table,
+                            predEvaluator, null);
                     bufferForPartitions = new IFrame[state.nPartitions];
                     state.fWriters = new RunFileWriter[state.nPartitions];
                     for (int i = 0; i < state.nPartitions; i++) {
@@ -492,7 +493,7 @@
                             } else {
                                 tableSize = (int) (memsize * recordsPerFrame * factor);
                             }
-                            ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+                            ISerializableTable table = new SimpleSerializableHashTable(tableSize, ctx);
                             for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
                                 RunFileWriter buildWriter = buildWriters[partitionid];
                                 RunFileWriter probeWriter = probeWriters[partitionid];
@@ -501,9 +502,9 @@
                                 }
                                 table.reset();
                                 InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize,
-                                        new FrameTupleAccessor(rd0), hpcRep0, new FrameTupleAccessor(rd1), hpcRep1,
+                                        new FrameTupleAccessor(rd0), hpcRep0, new FrameTupleAccessor(rd1), rd1, hpcRep1,
                                         new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
-                                        nullWriters1, table, predEvaluator);
+                                        nullWriters1, table, predEvaluator, null);
 
                                 if (buildWriter != null) {
                                     RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index ed7ae8e..8e52838 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -22,6 +22,7 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -31,18 +32,20 @@
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.TupleInFrameListAccessor;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
 
 public class InMemoryHashJoin {
 
-    private final IHyracksTaskContext ctx;
     private final List<ByteBuffer> buffers;
     private final FrameTupleAccessor accessorBuild;
     private final ITuplePartitionComputer tpcBuild;
@@ -57,23 +60,29 @@
     private final TuplePointer storedTuplePointer;
     private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
     private final IPredicateEvaluator predEvaluator;
+    private TupleInFrameListAccessor tupleAccessor;
+    // To release frames
+    ISimpleFrameBufferManager bufferManager;
     private final boolean isTableCapacityNotZero;
 
     private static final Logger LOGGER = Logger.getLogger(InMemoryHashJoin.class.getName());
 
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessorProbe,
-            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, ITuplePartitionComputer tpcBuild,
-            FrameTuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
-            ISerializableTable table, IPredicateEvaluator predEval) throws HyracksDataException {
-        this(ctx, tableSize, accessorProbe, tpcProbe, accessorBuild, tpcBuild, comparator, isLeftOuter,
-                missingWritersBuild, table, predEval, false);
+            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
+            ITuplePartitionComputer tpcBuild, FrameTuplePairComparator comparator, boolean isLeftOuter,
+            IMissingWriter[] missingWritersBuild, ISerializableTable table, IPredicateEvaluator predEval,
+            ISimpleFrameBufferManager bufferManager)
+            throws HyracksDataException {
+        this(ctx, tableSize, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, comparator, isLeftOuter,
+                missingWritersBuild, table, predEval, false, bufferManager);
     }
 
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessorProbe,
-            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, ITuplePartitionComputer tpcBuild,
-            FrameTuplePairComparator comparator, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
-            ISerializableTable table, IPredicateEvaluator predEval, boolean reverse) throws HyracksDataException {
-        this.ctx = ctx;
+            ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild,
+            RecordDescriptor rDBuild, ITuplePartitionComputer tpcBuild, FrameTuplePairComparator comparator,
+            boolean isLeftOuter, IMissingWriter[] missingWritersBuild, ISerializableTable table,
+            IPredicateEvaluator predEval, boolean reverse, ISimpleFrameBufferManager bufferManager)
+            throws HyracksDataException {
         this.tableSize = tableSize;
         this.table = table;
         storedTuplePointer = new TuplePointer();
@@ -98,6 +107,8 @@
             missingTupleBuild = null;
         }
         reverseOutputOrder = reverse;
+        this.tupleAccessor = new TupleInFrameListAccessor(rDBuild, buffers);
+        this.bufferManager = bufferManager;
         if (tableSize != 0) {
             isTableCapacityNotZero = true;
         } else {
@@ -115,10 +126,33 @@
         for (int i = 0; i < tCount; ++i) {
             int entry = tpcBuild.partition(accessorBuild, i, tableSize);
             storedTuplePointer.reset(bIndex, i);
-            table.insert(entry, storedTuplePointer);
+            // If an insertion fails, then tries to insert the same tuple pointer again after compacting the table.
+            if (!table.insert(entry, storedTuplePointer)) {
+                compactTableAndInsertAgain(entry, storedTuplePointer);
+            }
         }
     }
 
+    public boolean compactTableAndInsertAgain(int entry, TuplePointer tPointer) throws HyracksDataException {
+        boolean oneMoreTry = false;
+        if (compactHashTable() >= 0) {
+            oneMoreTry = table.insert(entry, tPointer);
+        }
+        return oneMoreTry;
+    }
+
+    /**
+     * Tries to compact the table to make some space.
+     *
+     * @return the number of frames that have been reclaimed. If no compaction has happened, the value -1 is returned.
+     */
+    public int compactHashTable() throws HyracksDataException {
+        if (table.isGarbageCollectionNeeded()) {
+            return table.collectGarbage(tupleAccessor, tpcBuild);
+        }
+        return -1;
+    }
+
     /**
      * Reads the given tuple from the probe side and joins it with tuples from the build side.
      * This method assumes that the accessorProbe is already set to the current probe frame.
@@ -165,14 +199,18 @@
     public void closeJoin(IFrameWriter writer) throws HyracksDataException {
         appender.write(writer, true);
         int nFrames = buffers.size();
-        int totalSize = 0;
-        for (int i = 0; i < nFrames; i++) {
-            totalSize += buffers.get(i).capacity();
+        // Frames assigned to the data table will be released here.
+        if (bufferManager != null) {
+            for (int i = 0; i < nFrames; i++) {
+                bufferManager.releaseFrame(buffers.get(i));
+            }
         }
         buffers.clear();
-        ctx.deallocateFrames(totalSize);
-        LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID "
-                + Thread.currentThread().getId() + ".");
+
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID "
+                    + Thread.currentThread().getId() + ".");
+        }
     }
 
     public void closeTable() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 0d6d163..a8d3f7e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -49,6 +49,10 @@
 import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
@@ -63,10 +67,13 @@
     private final boolean isLeftOuter;
     private final IMissingWriterFactory[] nonMatchWriterFactories;
     private final int tableSize;
+    // The maximum number of in-memory frames that this hash join can use.
+    private final int memSizeInFrames;
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, int tableSize, IPredicateEvaluatorFactory predEvalFactory) {
+            RecordDescriptor recordDescriptor, int tableSize, IPredicateEvaluatorFactory predEvalFactory,
+            int memSizeInFrames) {
         super(spec, 2, 1);
         this.keys0 = keys0;
         this.keys1 = keys1;
@@ -77,12 +84,13 @@
         this.isLeftOuter = false;
         this.nonMatchWriterFactories = null;
         this.tableSize = tableSize;
+        this.memSizeInFrames = memSizeInFrames;
     }
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
             IPredicateEvaluatorFactory predEvalFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter,
-            IMissingWriterFactory[] missingWriterFactories1, int tableSize) {
+            IMissingWriterFactory[] missingWriterFactories1, int tableSize, int memSizeInFrames) {
         super(spec, 2, 1);
         this.keys0 = keys0;
         this.keys1 = keys1;
@@ -93,20 +101,22 @@
         this.isLeftOuter = isLeftOuter;
         this.nonMatchWriterFactories = missingWriterFactories1;
         this.tableSize = tableSize;
+        this.memSizeInFrames = memSizeInFrames;
     }
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, int tableSize) {
-        this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, recordDescriptor, tableSize, null);
+            RecordDescriptor recordDescriptor, int tableSize, int memSizeInFrames) {
+        this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, recordDescriptor, tableSize, null,
+                memSizeInFrames);
     }
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
             RecordDescriptor recordDescriptor, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1,
-            int tableSize) {
+            int tableSize, int memSizeInFrames) {
         this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, null, recordDescriptor, isLeftOuter,
-                nullWriterFactories1, tableSize);
+                nullWriterFactories1, tableSize, memSizeInFrames);
     }
 
     @Override
@@ -177,6 +187,10 @@
             final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
                     : predEvaluatorFactory.createPredicateEvaluator());
 
+            final int memSizeInBytes = memSizeInFrames * ctx.getInitialFrameSize();
+            final IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx, memSizeInBytes);
+            final ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
+
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private HashBuildTaskState state;
 
@@ -188,19 +202,38 @@
                             .createPartitioner();
                     state = new HashBuildTaskState(ctx.getJobletContext().getJobId(),
                             new TaskId(getActivityId(), partition));
-                    ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+                    ISerializableTable table = new SerializableHashTable(tableSize, ctx, bufferManager);
                     state.joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpc0,
-                            new FrameTupleAccessor(rd1), hpc1, new FrameTuplePairComparator(keys0, keys1, comparators),
-                            isLeftOuter, nullWriters1, table, predEvaluator);
+                            new FrameTupleAccessor(rd1), rd1, hpc1,
+                            new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table,
+                            predEvaluator, bufferManager);
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    ByteBuffer copyBuffer = ctx.allocateFrame(buffer.capacity());
+                    ByteBuffer copyBuffer = allocateBuffer(buffer.capacity());
                     FrameUtils.copyAndFlip(buffer, copyBuffer);
                     state.joiner.build(copyBuffer);
                 }
 
+                private ByteBuffer allocateBuffer(int frameSize) throws HyracksDataException {
+                    ByteBuffer newBuffer = bufferManager.acquireFrame(frameSize);
+                    if (newBuffer != null) {
+                        return newBuffer;
+                    }
+                    // At this moment, there is no enough memory since the newBuffer is null.
+                    // But, there may be a chance if we can compact the table, one or more frame may be reclaimed.
+                    if (state.joiner.compactHashTable() > 0) {
+                        newBuffer = bufferManager.acquireFrame(frameSize);
+                        if (newBuffer != null) {
+                            return newBuffer;
+                        }
+                    }
+                    // At this point, we have no way to get a frame.
+                    throw new HyracksDataException(
+                            "Can't allocate one more frame. Assign more memory to InMemoryHashJoin.");
+                }
+
                 @Override
                 public void close() throws HyracksDataException {
                     ctx.setStateObject(state);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 4f85e57..17f009e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -21,7 +21,6 @@
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -38,7 +37,11 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.io.RunFileReader;
 import org.apache.hyracks.dataflow.common.io.RunFileWriter;
+import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
 import org.apache.hyracks.dataflow.std.buffermanager.IPartitionedTupleBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import org.apache.hyracks.dataflow.std.buffermanager.PreferToSpillFullyOccupiedFramePolicy;
 import org.apache.hyracks.dataflow.std.buffermanager.VPartitionTupleBufferManager;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
@@ -85,7 +88,7 @@
 
     private final BitSet spilledStatus; //0=resident, 1=spilled
     private final int numOfPartitions;
-    private final int memForJoin;
+    private final int memSizeInFrames;
     private InMemoryHashJoin inMemJoiner; //Used for joining resident partitions
 
     private IPartitionedTupleBufferManager bufferManager;
@@ -94,6 +97,9 @@
     private final FrameTupleAccessor accessorBuild;
     private final FrameTupleAccessor accessorProbe;
 
+    private IDeallocatableFramePool framePool;
+    private ISimpleFrameBufferManager bufferManagerForHashTable;
+
     private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls that cause role-reversal
 
     // stats information
@@ -103,13 +109,14 @@
                                                        // we mainly use it to match the corresponding function signature.
     private int[] probePSizeInTups;
 
-    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String probeRelName,
+    public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memSizeInFrames, int numOfPartitions,
+            String probeRelName,
             String buildRelName, int[] probeKeys, int[] buildKeys, IBinaryComparator[] comparators,
             RecordDescriptor probeRd, RecordDescriptor buildRd, ITuplePartitionComputer probeHpc,
             ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval, boolean isLeftOuter,
             IMissingWriterFactory[] nullWriterFactories1) {
         this.ctx = ctx;
-        this.memForJoin = memForJoin;
+        this.memSizeInFrames = memSizeInFrames;
         this.buildRd = buildRd;
         this.probeRd = probeRd;
         this.buildHpc = buildHpc;
@@ -142,11 +149,12 @@
     }
 
     public void initBuild() throws HyracksDataException {
-        bufferManager = new VPartitionTupleBufferManager(ctx,
+        framePool = new DeallocatableFramePool(ctx, memSizeInFrames * ctx.getInitialFrameSize());
+        bufferManagerForHashTable = new FramePoolBackedFrameBufferManager(framePool);
+        bufferManager = new VPartitionTupleBufferManager(
                 PreferToSpillFullyOccupiedFramePolicy.createAtMostOneFrameForSpilledPartitionConstrain(spilledStatus),
-                numOfPartitions, memForJoin * ctx.getInitialFrameSize());
-        spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, spilledStatus,
-                ctx.getInitialFrameSize());
+                numOfPartitions, framePool);
+        spillPolicy = new PreferToSpillFullyOccupiedFramePolicy(bufferManager, spilledStatus);
         spilledStatus.clear();
         buildPSizeInTups = new int[numOfPartitions];
     }
@@ -173,7 +181,7 @@
         int victimPartition = spillPolicy.selectVictimPartition(pid);
         if (victimPartition < 0) {
             throw new HyracksDataException(
-                    "No more space left in the memory buffer, please give join more memory budgets.");
+                    "No more space left in the memory buffer, please assign more memory to hash-join.");
         }
         spillPartition(victimPartition);
     }
@@ -185,6 +193,13 @@
         spilledStatus.set(pid);
     }
 
+    private void closeBuildPartition(int pid) throws HyracksDataException {
+        if (buildRFWriters[pid] == null) {
+            throw new HyracksDataException("Tried to close the non-existing file writer.");
+        }
+        buildRFWriters[pid].close();
+    }
+
     private RunFileWriter getSpillWriterOrCreateNewOneIfNotExist(int pid, SIDE whichSide) throws HyracksDataException {
         RunFileWriter[] runFileWriters = null;
         String refName = null;
@@ -209,20 +224,17 @@
     }
 
     public void closeBuild() throws HyracksDataException {
-
+        // Flushes the remaining chunks of the all spilled partitions to the disk.
         closeAllSpilledPartitions(SIDE.BUILD);
 
-        bringBackSpilledPartitionIfHasMoreMemory(); //Trying to bring back as many spilled partitions as possible, making them resident
-
-        int inMemTupCount = 0;
-
-        for (int i = spilledStatus.nextClearBit(0); i >= 0
-                && i < numOfPartitions; i = spilledStatus.nextClearBit(i + 1)) {
-            inMemTupCount += buildPSizeInTups[i];
-        }
+        // Makes the space for the in-memory hash table (some partitions may need to be spilled to the disk
+        // during this step in order to make the space.)
+        // and tries to bring back as many spilled partitions as possible if there is free space.
+        int inMemTupCount = makeSpaceForHashTableAndBringBackSpilledPartitions();
 
         createInMemoryJoiner(inMemTupCount);
-        cacheInMemJoin();
+
+        loadDataInMemJoin();
     }
 
     /**
@@ -261,24 +273,150 @@
         }
     }
 
-    private void bringBackSpilledPartitionIfHasMoreMemory() throws HyracksDataException {
+    /**
+     * Makes the space for the hash table. If there is no enough space, one or more partitions will be spilled
+     * to the disk until the hash table can fit into the memory. After this, bring back spilled partitions
+     * if there is available memory.
+     *
+     * @return the number of tuples in memory after this method is executed.
+     * @throws HyracksDataException
+     */
+    private int makeSpaceForHashTableAndBringBackSpilledPartitions() throws HyracksDataException {
         // we need number of |spilledPartitions| buffers to store the probe data
-        int freeSpace = (memForJoin - spilledStatus.cardinality()) * ctx.getInitialFrameSize();
+        int frameSize = ctx.getInitialFrameSize();
+        long freeSpace = (long) (memSizeInFrames - spilledStatus.cardinality()) * frameSize;
+
+        // For partitions in main memory, we deduct their size from the free space.
+        int inMemTupCount = 0;
         for (int p = spilledStatus.nextClearBit(0); p >= 0
                 && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
             freeSpace -= bufferManager.getPhysicalSize(p);
+            inMemTupCount += buildPSizeInTups[p];
         }
 
-        int pid = 0;
-        while ((pid = selectPartitionsToReload(freeSpace, pid)) >= 0) {
-            if (!loadPartitionInMem(pid, buildRFWriters[pid])) {
-                return;
+        // Calculates the expected hash table size for the given number of tuples in main memory
+        // and deducts it from the free space.
+        long hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount,
+                frameSize);
+        freeSpace -= hashTableByteSizeForInMemTuples;
+
+        // In the case where free space is less than zero after considering the hash table size,
+        // we need to spill more partitions until we can accommodate the hash table in memory.
+        // TODO: there may be different policies (keep spilling minimum, spilling maximum, find a similar size to the
+        //                                        hash table, or keep spilling from the first partition)
+        boolean moreSpilled = false;
+
+        // No space to accommodate the hash table? Then, we spill one or more partitions to the disk.
+        if (freeSpace < 0) {
+            // Tries to find a best-fit partition not to spill many partitions.
+            int pidToSpill = selectSinglePartitionToSpill(freeSpace, inMemTupCount, frameSize);
+            if (pidToSpill >= 0) {
+                // There is a suitable one. We spill that partition to the disk.
+                long hashTableSizeDecrease = -SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
+                        inMemTupCount, -buildPSizeInTups[pidToSpill], frameSize);
+                freeSpace = freeSpace + bufferManager.getPhysicalSize(pidToSpill) + hashTableSizeDecrease;
+                inMemTupCount -= buildPSizeInTups[pidToSpill];
+                spillPartition(pidToSpill);
+                closeBuildPartition(pidToSpill);
+                moreSpilled = true;
+            } else {
+                // There is no single suitable partition. So, we need to spill multiple partitions to the disk
+                // in order to accommodate the hash table.
+                for (int p = spilledStatus.nextClearBit(0); p >= 0
+                        && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
+                    int spaceToBeReturned = bufferManager.getPhysicalSize(p);
+                    int numberOfTuplesToBeSpilled = buildPSizeInTups[p];
+                    if (spaceToBeReturned == 0 || numberOfTuplesToBeSpilled == 0) {
+                        continue;
+                    }
+                    spillPartition(p);
+                    closeBuildPartition(p);
+                    moreSpilled = true;
+                    // Since the number of tuples in memory has been decreased,
+                    // the hash table size will be decreased, too.
+                    // We put minus since the method returns a negative value to represent a newly reclaimed space.
+                    long expectedHashTableSizeDecrease = -SerializableHashTable
+                            .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, -numberOfTuplesToBeSpilled,
+                                    frameSize);
+                    freeSpace = freeSpace + spaceToBeReturned + expectedHashTableSizeDecrease;
+                    // Adjusts the hash table size
+                    inMemTupCount -= numberOfTuplesToBeSpilled;
+                    if (freeSpace >= 0) {
+                        break;
+                    }
+                }
             }
-            freeSpace -= bufferManager.getPhysicalSize(pid);
         }
+
+        // If more partitions have been spilled to the disk, calculate the expected hash table size again
+        // before bringing some partitions to main memory.
+        if (moreSpilled) {
+            hashTableByteSizeForInMemTuples = SerializableHashTable.getExpectedTableByteSize(inMemTupCount,
+                    frameSize);
+        }
+
+        // Brings back some partitions if there is enough free space.
+        int pid = 0;
+        while ((pid = selectPartitionsToReload(freeSpace, pid, inMemTupCount)) >= 0) {
+            if (!loadSpilledPartitionToMem(pid, buildRFWriters[pid])) {
+                break;
+            }
+            long expectedHashTableByteSizeIncrease = SerializableHashTable
+                    .calculateByteSizeDeltaForTableSizeChange(inMemTupCount, buildPSizeInTups[pid], frameSize);
+            freeSpace = freeSpace - bufferManager.getPhysicalSize(pid) - expectedHashTableByteSizeIncrease;
+            inMemTupCount += buildPSizeInTups[pid];
+            // Adjusts the hash table size
+            hashTableByteSizeForInMemTuples += expectedHashTableByteSizeIncrease;
+        }
+
+        return inMemTupCount;
     }
 
-    private boolean loadPartitionInMem(int pid, RunFileWriter wr) throws HyracksDataException {
+    /**
+     * Finds a best-fit partition that will be spilled to the disk to make enough space to accommodate the hash table.
+     *
+     * @return the partition id that will be spilled to the disk. Returns -1 if there is no single suitable partition.
+     */
+    private int selectSinglePartitionToSpill(long currentFreeSpace, int currentInMemTupCount, int frameSize) {
+        long spaceAfterSpill;
+        long minSpaceAfterSpill = (long) memSizeInFrames * frameSize;
+        int minSpaceAfterSpillPartID = -1;
+
+        for (int p = spilledStatus.nextClearBit(0); p >= 0
+                && p < numOfPartitions; p = spilledStatus.nextClearBit(p + 1)) {
+            if (buildPSizeInTups[p] == 0 || bufferManager.getPhysicalSize(p) == 0) {
+                continue;
+            }
+            // We put minus since the method returns a negative value to represent a newly reclaimed space.
+            spaceAfterSpill = currentFreeSpace + bufferManager.getPhysicalSize(p) + (-SerializableHashTable
+                    .calculateByteSizeDeltaForTableSizeChange(currentInMemTupCount, -buildPSizeInTups[p], frameSize));
+            if (spaceAfterSpill == 0) {
+                // Found the perfect one. Just returns this partition.
+                return p;
+            } else if (spaceAfterSpill > 0 && spaceAfterSpill < minSpaceAfterSpill) {
+                // We want to find the best-fit partition to avoid many partition spills.
+                minSpaceAfterSpill = spaceAfterSpill;
+                minSpaceAfterSpillPartID = p;
+            }
+        }
+        return minSpaceAfterSpillPartID;
+    }
+
+    private int selectPartitionsToReload(long freeSpace, int pid, int inMemTupCount) {
+        for (int i = spilledStatus.nextSetBit(pid); i >= 0
+                && i < numOfPartitions; i = spilledStatus.nextSetBit(i + 1)) {
+            int spilledTupleCount = buildPSizeInTups[i];
+            // Expected hash table size increase after reloading this partition
+            long expectedHashTableByteSizeIncrease = SerializableHashTable.calculateByteSizeDeltaForTableSizeChange(
+                    inMemTupCount, spilledTupleCount, ctx.getInitialFrameSize());
+            if (freeSpace >= buildRFWriters[i].getFileSize() + expectedHashTableByteSizeIncrease) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private boolean loadSpilledPartitionToMem(int pid, RunFileWriter wr) throws HyracksDataException {
         RunFileReader r = wr.createReader();
         r.open();
         if (reloadBuffer == null) {
@@ -288,7 +426,8 @@
             accessorBuild.reset(reloadBuffer.getBuffer());
             for (int tid = 0; tid < accessorBuild.getTupleCount(); tid++) {
                 if (!bufferManager.insertTuple(pid, accessorBuild, tid, tempPtr)) {
-                    // for some reason (e.g. due to fragmentation) if the inserting failed, we need to clear the occupied frames
+                    // for some reason (e.g. due to fragmentation) if the inserting failed,
+                    // we need to clear the occupied frames
                     bufferManager.clearPartition(pid);
                     r.close();
                     return false;
@@ -296,33 +435,23 @@
             }
         }
 
-        FileUtils.deleteQuietly(wr.getFileReference().getFile()); // delete the runfile if it already loaded into memory.
+        // Closes and deletes the run file if it is already loaded into memory.
+        r.setDeleteAfterClose(true);
         r.close();
         spilledStatus.set(pid, false);
         buildRFWriters[pid] = null;
         return true;
     }
 
-    private int selectPartitionsToReload(int freeSpace, int pid) {
-        for (int i = spilledStatus.nextSetBit(pid); i >= 0
-                && i < numOfPartitions; i = spilledStatus.nextSetBit(i + 1)) {
-            assert buildRFWriters[i].getFileSize() > 0 : "How comes a spilled partition have size 0?";
-            if (freeSpace >= buildRFWriters[i].getFileSize()) {
-                return i;
-            }
-        }
-        return -1;
-    }
-
     private void createInMemoryJoiner(int inMemTupCount) throws HyracksDataException {
-        ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx);
+        ISerializableTable table = new SerializableHashTable(inMemTupCount, ctx, bufferManagerForHashTable);
         this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount, new FrameTupleAccessor(probeRd), probeHpc,
-                new FrameTupleAccessor(buildRd), buildHpc,
+                new FrameTupleAccessor(buildRd), buildRd, buildHpc,
                 new FrameTuplePairComparator(probeKeys, buildKeys, comparators), isLeftOuter, nonMatchWriters, table,
-                predEvaluator, isReversed);
+                predEvaluator, isReversed, bufferManagerForHashTable);
     }
 
-    private void cacheInMemJoin() throws HyracksDataException {
+    private void loadDataInMemJoin() throws HyracksDataException {
 
         for (int pid = 0; pid < numOfPartitions; pid++) {
             if (!spilledStatus.get(pid)) {
@@ -391,7 +520,6 @@
                 probePSizeInTups[pid]++;
             }
         }
-
     }
 
     private void flushBigProbeObjectToDisk(int pid, FrameTupleAccessor accessorProbe, int i)
@@ -419,6 +547,7 @@
         bufferManager.close();
         inMemJoiner = null;
         bufferManager = null;
+        bufferManagerForHashTable = null;
     }
 
     /**
@@ -475,4 +604,71 @@
     public void setIsReversed(boolean b) {
         this.isReversed = b;
     }
+
+    /**
+     * Prints out the detailed information for partitions: in-memory and spilled partitions.
+     * This method exists for a debug purpose.
+     */
+    public String printPartitionInfo(SIDE whichSide) {
+        StringBuilder buf = new StringBuilder();
+        buf.append(">>> " + this + " " + Thread.currentThread().getId() + " printInfo():" + "\n");
+        if (whichSide == SIDE.BUILD) {
+            buf.append("BUILD side" + "\n");
+        } else {
+            buf.append("PROBE side" + "\n");
+        }
+        buf.append("# of partitions:\t" + numOfPartitions + "\t#spilled:\t" + spilledStatus.cardinality()
+                + "\t#in-memory:\t" + (numOfPartitions - spilledStatus.cardinality()) + "\n");
+        buf.append("(A) Spilled partitions" + "\n");
+        int spilledTupleCount = 0;
+        int spilledPartByteSize = 0;
+        for (int pid = spilledStatus.nextSetBit(0); pid >= 0
+                && pid < numOfPartitions; pid = spilledStatus.nextSetBit(pid + 1)) {
+            if (whichSide == SIDE.BUILD) {
+                spilledTupleCount += buildPSizeInTups[pid];
+                spilledPartByteSize += buildRFWriters[pid].getFileSize();
+                buf.append("part:\t" + pid + "\t#tuple:\t" + buildPSizeInTups[pid] + "\tsize(MB):\t"
+                        + ((double) buildRFWriters[pid].getFileSize() / 1048576) + "\n");
+            } else {
+                spilledTupleCount += probePSizeInTups[pid];
+                spilledPartByteSize += probeRFWriters[pid].getFileSize();
+            }
+        }
+        if (spilledStatus.cardinality() > 0) {
+            buf.append("# of spilled tuples:\t" + spilledTupleCount + "\tsize(MB):\t"
+                    + ((double) spilledPartByteSize / 1048576) + "avg #tuples per spilled part:\t"
+                    + (spilledTupleCount / spilledStatus.cardinality()) + "\tavg size per part(MB):\t"
+                    + ((double) spilledPartByteSize / 1048576 / spilledStatus.cardinality()) + "\n");
+        }
+        buf.append("(B) In-memory partitions" + "\n");
+        int inMemoryTupleCount = 0;
+        int inMemoryPartByteSize = 0;
+        for (int pid = spilledStatus.nextClearBit(0); pid >= 0
+                && pid < numOfPartitions; pid = spilledStatus.nextClearBit(pid + 1)) {
+            if (whichSide == SIDE.BUILD) {
+                inMemoryTupleCount += buildPSizeInTups[pid];
+                inMemoryPartByteSize += bufferManager.getPhysicalSize(pid);
+            } else {
+                inMemoryTupleCount += probePSizeInTups[pid];
+                inMemoryPartByteSize += bufferManager.getPhysicalSize(pid);
+            }
+        }
+        if (spilledStatus.cardinality() > 0) {
+            buf.append("# of in-memory tuples:\t" + inMemoryTupleCount + "\tsize(MB):\t"
+                    + ((double) inMemoryPartByteSize / 1048576) + "avg #tuples per spilled part:\t"
+                    + (inMemoryTupleCount / spilledStatus.cardinality()) + "\tavg size per part(MB):\t"
+                    + ((double) inMemoryPartByteSize / 1048576 / (numOfPartitions - spilledStatus.cardinality()))
+                    + "\n");
+        }
+        if (inMemoryTupleCount + spilledTupleCount > 0) {
+            buf.append("# of all tuples:\t" + (inMemoryTupleCount + spilledTupleCount) + "\tsize(MB):\t"
+                    + ((double) (inMemoryPartByteSize + spilledPartByteSize) / 1048576) + " ratio of spilled tuples:\t"
+                    + (spilledTupleCount / (inMemoryTupleCount + spilledTupleCount)) + "\n");
+        } else {
+            buf.append("# of all tuples:\t" + (inMemoryTupleCount + spilledTupleCount) + "\tsize(MB):\t"
+                    + ((double) (inMemoryPartByteSize + spilledPartByteSize) / 1048576) + " ratio of spilled tuples:\t"
+                    + "N/A" + "\n");
+        }
+        return buf.toString();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index e308dd8..a72c0c6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -59,6 +59,10 @@
 import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import org.apache.hyracks.dataflow.std.structures.ISerializableTable;
 import org.apache.hyracks.dataflow.std.structures.SerializableHashTable;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
@@ -70,10 +74,12 @@
  *         partitions.
  *         - Operator overview:
  *         Assume we are trying to do (R Join S), with M buffers available, while we have an estimate on the size
- *         of R (in terms of buffers). HHJ (Hybrid Hash Join) has two main phases: Build and Probe, where in our implementation Probe phase
- *         can apply HHJ recursively, based on the value of M and size of R and S. HHJ phases proceed as follow:
+ *         of R (in terms of buffers). HHJ (Hybrid Hash Join) has two main phases: Build and Probe,
+ *         where in our implementation Probe phase can apply HHJ recursively, based on the value of M and size of
+ *         R and S. HHJ phases proceed as follow:
  *         BUILD:
- *         Calculate number of partitions (Based on the size of R, fudge factor and M) [See Shapiro's paper for the detailed discussion].
+ *         Calculate number of partitions (Based on the size of R, fudge factor and M)
+ *         [See Shapiro's paper for the detailed discussion].
  *         Initialize the build phase (one frame per partition, all partitions considered resident at first)
  *         Read tuples of R, frame by frame, and hash each tuple (based on a given hash function) to find
  *         its target partition and try to append it to that partition:
@@ -81,9 +87,9 @@
  *         if no free buffer is available, find the largest resident partition and spill it. Using its freed
  *         buffers after spilling, allocate a new buffer for the target partition.
  *         Being done with R, close the build phase. (During closing we write the very last buffer of each
- *         spilled partition to the disk, and we do partition tuning, where we try to bring back as many buffers, belonging to
- *         spilled partitions as possible into memory, based on the free buffers - We will stop at the point where remaining free buffers is not enough
- *         for reloading an entire partition back into memory)
+ *         spilled partition to the disk, and we do partition tuning, where we try to bring back as many buffers,
+ *         belonging to spilled partitions as possible into memory, based on the free buffers - We will stop at the
+ *         point where remaining free buffers is not enough for reloading an entire partition back into memory)
  *         Create the hash table for the resident partitions (basically we create an in-memory hash join here)
  *         PROBE:
  *         Initialize the probe phase on S (mainly allocate one buffer per spilled partition, and one buffer
@@ -112,7 +118,7 @@
     private static final String PROBE_REL = "RelR";
     private static final String BUILD_REL = "RelS";
 
-    private final int frameLimit;
+    private final int memSizeInFrames;
     private final int inputsize0;
     private final double fudgeFactor;
     private final int[] probeKeys;
@@ -127,21 +133,20 @@
     private final IMissingWriterFactory[] nonMatchWriterFactories;
 
     //Flags added for test purpose
-    private static boolean skipInMemoryHJ = false;
-    private static boolean forceNLJ = false;
-    private static boolean forceRR = false;
+    private boolean skipInMemoryHJ = false;
+    private boolean forceNLJ = false;
+    private boolean forceRoleReversal = false;
 
     private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoinOperatorDescriptor.class.getName());
 
-    public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int frameLimit, int inputsize0,
-            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
-            ITuplePairComparatorFactory tupPaircomparatorFactory01,
+    public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memSizeInFrames,
+            int inputsize0, double factor, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, ITuplePairComparatorFactory tupPaircomparatorFactory01,
             ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory,
             boolean isLeftOuter, IMissingWriterFactory[] nonMatchWriterFactories) throws HyracksDataException {
-
         super(spec, 2, 1);
-        this.frameLimit = frameLimit;
+        this.memSizeInFrames = memSizeInFrames;
         this.inputsize0 = inputsize0;
         this.fudgeFactor = factor;
         this.probeKeys = keys0;
@@ -156,15 +161,15 @@
         this.nonMatchWriterFactories = nonMatchWriterFactories;
     }
 
-    public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int frameLimit, int inputsize0,
-            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
-            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
-            ITuplePairComparatorFactory tupPaircomparatorFactory01,
+    public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memSizeInFrames,
+            int inputsize0, double factor, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, ITuplePairComparatorFactory tupPaircomparatorFactory01,
             ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory)
             throws HyracksDataException {
-        this(spec, frameLimit, inputsize0, factor, keys0, keys1, hashFunctionGeneratorFactories, comparatorFactories,
-                recordDescriptor, tupPaircomparatorFactory01, tupPaircomparatorFactory10, predEvaluatorFactory, false,
-                null);
+        this(spec, memSizeInFrames, inputsize0, factor, keys0, keys1, hashFunctionGeneratorFactories,
+                comparatorFactories, recordDescriptor, tupPaircomparatorFactory01, tupPaircomparatorFactory10,
+                predEvaluatorFactory, false, null);
     }
 
     @Override
@@ -190,19 +195,21 @@
     private int getNumberOfPartitions(int memorySize, int buildSize, double factor, int nPartitions)
             throws HyracksDataException {
         int numberOfPartitions = 0;
-        if (memorySize <= 1) {
-            throw new HyracksDataException("not enough memory is available for Hybrid Hash Join");
+        if (memorySize <= 2) {
+            throw new HyracksDataException("Not enough memory is available for Hybrid Hash Join.");
         }
-        if (memorySize > buildSize) {
-            return 1; //We will switch to in-Mem HJ eventually
+        if (memorySize > buildSize * factor) {
+            // We will switch to in-Mem HJ eventually: create two big partitions.
+            // We set 2 (not 1) to avoid a corner case where the only partition may be spilled to the disk.
+            // This may happen since this formula doesn't consider the hash table size. If this is the case,
+            // we will do a nested loop join after some iterations. But, this is not effective.
+            return 2;
         }
         numberOfPartitions = (int) (Math.ceil((buildSize * factor / nPartitions - memorySize) / (memorySize - 1)));
-        if (numberOfPartitions <= 0) {
-            numberOfPartitions = 1; //becomes in-memory hash join
-        }
+        numberOfPartitions = Math.max(2, numberOfPartitions);
         if (numberOfPartitions > memorySize) {
             numberOfPartitions = (int) Math.ceil(Math.sqrt(buildSize * factor / nPartitions));
-            return (numberOfPartitions < memorySize ? numberOfPartitions : memorySize);
+            return Math.max(2, Math.min(numberOfPartitions, memorySize));
         }
         return numberOfPartitions;
     }
@@ -276,10 +283,10 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    if (frameLimit <= 2) { //Dedicated buffers: One buffer to read and one buffer for output
-                        throw new HyracksDataException("not enough memory for Hybrid Hash Join");
+                    if (memSizeInFrames <= 2) { //Dedicated buffers: One buffer to read and two buffers for output
+                        throw new HyracksDataException("Not enough memory is assigend for Hybrid Hash Join.");
                     }
-                    state.memForJoin = frameLimit - 2;
+                    state.memForJoin = memSizeInFrames - 2;
                     state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor,
                             nPartitions);
                     state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
@@ -399,6 +406,7 @@
 
                 @Override
                 public void fail() throws HyracksDataException {
+                    state.hybridHJ.clearProbeTempFiles();
                     writer.fail();
                 }
 
@@ -426,6 +434,7 @@
                             int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
                             joinPartitionPair(bReader, pReader, bSize, pSize, 1);
                         }
+
                     } finally {
                         writer.close();
                     }
@@ -442,8 +451,9 @@
                     ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
                             hashFunctionGeneratorFactories).createPartitioner(level);
 
-                    long buildPartSize = buildSideReader.getFileSize() / ctx.getInitialFrameSize();
-                    long probePartSize = probeSideReader.getFileSize() / ctx.getInitialFrameSize();
+                    int frameSize = ctx.getInitialFrameSize();
+                    long buildPartSize = buildSideReader.getFileSize() / frameSize;
+                    long probePartSize = probeSideReader.getFileSize() / frameSize;
                     int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple);
 
                     if (LOGGER.isLoggable(Level.FINE)) {
@@ -453,12 +463,20 @@
                                 + "  - LeftOuter is " + isLeftOuter);
                     }
 
+                    // Calculate the expected hash table size for the both side.
+                    long expectedHashTableSizeForBuildInFrame = SerializableHashTable
+                            .getExpectedTableFrameCount(buildSizeInTuple, frameSize);
+                    long expectedHashTableSizeForProbeInFrame = SerializableHashTable
+                            .getExpectedTableFrameCount(probeSizeInTuple, frameSize);
+
                     //Apply in-Mem HJ if possible
-                    if (!skipInMemoryHJ && ((buildPartSize < state.memForJoin)
-                            || (probePartSize < state.memForJoin && !isLeftOuter))) {
+                    if (!skipInMemoryHJ && ((buildPartSize + expectedHashTableSizeForBuildInFrame < state.memForJoin)
+                            || (probePartSize + expectedHashTableSizeForProbeInFrame < state.memForJoin
+                                    && !isLeftOuter))) {
+
                         int tabSize = -1;
-                        if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize))) {
-                            //Case 1.1 - InMemHJ (wout Role-Reversal)
+                        if (!forceRoleReversal && (isLeftOuter || (buildPartSize < probePartSize))) {
+                            //Case 1.1 - InMemHJ (without Role-Reversal)
                             if (LOGGER.isLoggable(Level.FINE)) {
                                 LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "
                                         + level + "]");
@@ -473,9 +491,8 @@
                                     buildSideReader, probeSideReader); // checked-confirmed
                         } else { //Case 1.2 - InMemHJ with Role Reversal
                             if (LOGGER.isLoggable(Level.FINE)) {
-                                LOGGER.fine(
-                                        "\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
-                                                + level + "]");
+                                LOGGER.fine("\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ"
+                                        + "WITH RoleReversal - [Level " + level + "]");
                             }
                             tabSize = probeSizeInTuple;
                             if (tabSize == 0) {
@@ -492,8 +509,8 @@
                         if (LOGGER.isLoggable(Level.FINE)) {
                             LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
                         }
-                        if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) {
-                            //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+                        if (!forceRoleReversal && (isLeftOuter || buildPartSize < probePartSize)) {
+                            //Case 2.1 - Recursive HHJ (without Role-Reversal)
                             if (LOGGER.isLoggable(Level.FINE)) {
                                 LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
                                         + level + "]");
@@ -543,7 +560,6 @@
                     }
                     rHHj.closeBuild();
                     buildSideReader.close();
-
                     probeSideReader.open();
                     rHHj.initProbe();
                     rPartbuff.reset();
@@ -560,9 +576,8 @@
                     BitSet rPStatus = rHHj.getPartitionStatus();
                     if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
                         if (LOGGER.isLoggable(Level.FINE)) {
-                            LOGGER.fine(
-                                    "\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
-                                            + level + "]");
+                            LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH "
+                                    + "(isLeftOuter || build<probe) - [Level " + level + "]");
                         }
                         for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                             RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
@@ -571,7 +586,7 @@
                             int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
 
                             if (rbrfw == null || rprfw == null) {
-                                if (isLeftOuter && rprfw != null) { // if outer join, we don't reverse
+                                if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role.
                                     appendNullToProbeTuples(rprfw);
                                 }
                                 continue;
@@ -587,15 +602,15 @@
                     } else { //Case 2.1.2 - Switch to NLJ
                         if (LOGGER.isLoggable(Level.FINE)) {
                             LOGGER.fine(
-                                    "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
-                                            + level + "]");
+                                    "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe)"
+                                            + " - [Level " + level + "]");
                         }
                         for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
                             RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
                             RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
 
                             if (rbrfw == null || rprfw == null) {
-                                if (isLeftOuter && rprfw != null) { // if outer join, we don't reverse
+                                if (isLeftOuter && rprfw != null) { // For the outer join, we don't reverse the role.
                                     appendNullToProbeTuples(rprfw);
                                 }
                                 continue;
@@ -605,9 +620,9 @@
                             int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
                             // NLJ order is outer + inner, the order is reversed from the other joins
                             if (isLeftOuter || probeSideInTups < buildSideInTups) {
-                                applyNestedLoopJoin(probeRd, buildRd, frameLimit, rprfw, rbrfw); //checked-modified
+                                applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw); //checked-modified
                             } else {
-                                applyNestedLoopJoin(buildRd, probeRd, frameLimit, rbrfw, rprfw); //checked-modified
+                                applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw); //checked-modified
                             }
                         }
                     }
@@ -642,17 +657,34 @@
 
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
 
-                    ISerializableTable table = new SerializableHashTable(tabSize, ctx);
+                    IDeallocatableFramePool framePool = new DeallocatableFramePool(ctx,
+                            state.memForJoin * ctx.getInitialFrameSize());
+                    ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
+
+                    ISerializableTable table = new SerializableHashTable(tabSize, ctx, bufferManager);
                     InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(probeRDesc),
-                            hpcRepProbe, new FrameTupleAccessor(buildRDesc), hpcRepBuild,
+                            hpcRepProbe, new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild,
                             new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nonMatchWriter, table,
-                            predEvaluator, isReversed);
+                            predEvaluator, isReversed, bufferManager);
 
                     bReader.open();
                     rPartbuff.reset();
                     while (bReader.nextFrame(rPartbuff)) {
-                        //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
-                        ByteBuffer copyBuffer = ctx.allocateFrame(rPartbuff.getFrameSize());
+                        // We need to allocate a copyBuffer, because this buffer gets added to the buffers list
+                        // in the InMemoryHashJoin.
+                        ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
+                        // If a frame cannot be allocated, there may be a chance if we can compact the table,
+                        // one or more frame may be reclaimed.
+                        if (copyBuffer == null) {
+                            if (joiner.compactHashTable() > 0) {
+                                copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
+                            }
+                            if (copyBuffer == null) {
+                                // Still no frame is allocated? At this point, we have no way to get a frame.
+                                throw new HyracksDataException(
+                                        "Can't allocate one more frame. Assign more memory to InMemoryHashJoin.");
+                            }
+                        }
                         FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
                         joiner.build(copyBuffer);
                         rPartbuff.reset();
@@ -672,7 +704,8 @@
 
                 private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
                         RunFileReader outerReader, RunFileReader innerReader) throws HyracksDataException {
-                    // The nested loop join result is outer + inner. All the other operator is probe + build. Hence the reverse relation is different
+                    // The nested loop join result is outer + inner. All the other operator is probe + build.
+                    // Hence the reverse relation is different.
                     boolean isReversed = outerRd == buildRd && innerRd == probeRd;
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
                     ITuplePairComparator nljComptorOuterInner = isReversed ? nljComparatorBuild2Probe
@@ -716,6 +749,7 @@
     }
 
     public void setForceRR(boolean b) {
-        forceRR = (!isLeftOuter && b);
+        forceRoleReversal = !isLeftOuter && b;
     }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
index 8cd6792..d0e0616 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/ISerializableTable.java
@@ -18,17 +18,22 @@
  */
 package org.apache.hyracks.dataflow.std.structures;
 
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
 
 public interface ISerializableTable {
 
-    void insert(int entry, TuplePointer tuplePointer) throws HyracksDataException;
+    boolean insert(int entry, TuplePointer tuplePointer) throws HyracksDataException;
 
     void delete(int entry);
 
     boolean getTuplePointer(int entry, int offset, TuplePointer tuplePointer);
 
-    int getFrameCount();
+    /**
+     * Returns the byte size of entire frames that are currently allocated to the table.
+     */
+    int getCurrentByteSize();
 
     int getTupleCount();
 
@@ -37,4 +42,26 @@
     void reset();
 
     void close();
+
+    boolean isGarbageCollectionNeeded();
+
+    /**
+     * Collects garbages in the given table, if any. For example, compacts the table by
+     * removing the garbage created by internal migration or lazy deletion operation.
+     * The desired result of this method is a compacted table without any garbage (no wasted space).
+     *
+     * @param bufferAccessor:
+     *            required to access the real tuple to calculate the original hash value
+     * @param tpc:
+     *            hash function
+     * @return the number of frames that are reclaimed.
+     * @throws HyracksDataException
+     */
+    int collectGarbage(ITuplePointerAccessor bufferAccessor, ITuplePartitionComputer tpc)
+            throws HyracksDataException;
+
+    /**
+     * Prints out the internal information of this table.
+     */
+    String printInfo();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
index 9584f26..ca97be3 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -18,307 +18,552 @@
  */
 package org.apache.hyracks.dataflow.std.structures;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
 
 /**
- * An entry in the table is: #elements, #no-empty elements; fIndex, tIndex;
- * fIndex, tIndex; .... <fIndex, tIndex> forms a tuple pointer
+ * This is an extension of SimpleSerializableHashTable class.
+ * A buffer manager needs to be assigned to allocate/release frames for this table so that
+ * the maximum memory usage can be bounded under the certain limit.
  */
-public class SerializableHashTable implements ISerializableTable {
+public class SerializableHashTable extends SimpleSerializableHashTable {
 
-    private static final int INT_SIZE = 4;
-    private static final int INIT_ENTRY_SIZE = 4;
+    protected double garbageCollectionThreshold;
+    protected int wastedIntSpaceCount = 0;
+    protected ISimpleFrameBufferManager bufferManager;
 
-    private IntSerDeBuffer[] headers;
-    private List<IntSerDeBuffer> contents = new ArrayList<>();
-    private List<Integer> frameCurrentIndex = new ArrayList<>();
-    private final IHyracksFrameMgrContext ctx;
-    private final int frameCapacity;
-    private int currentLargestFrameIndex = 0;
-    private int tupleCount = 0;
-    private int headerFrameCount = 0;
-    private TuplePointer tempTuplePointer = new TuplePointer();
+    public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx,
+            ISimpleFrameBufferManager bufferManager) throws HyracksDataException {
+        this(tableSize, ctx, bufferManager, 0.1);
+    }
 
-    public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx) throws HyracksDataException {
-        this.ctx = ctx;
-        int frameSize = ctx.getInitialFrameSize();
+    public SerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx,
+            ISimpleFrameBufferManager bufferManager, double garbageCollectionThreshold)
+            throws HyracksDataException {
+        super(tableSize, ctx, false);
+        this.bufferManager = bufferManager;
 
-        int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1;
-        int headerSize = tableSize * INT_SIZE * 2 / frameSize + residual;
-        headers = new IntSerDeBuffer[headerSize];
-
-        IntSerDeBuffer frame = new IntSerDeBuffer(ctx.allocateFrame().array());
-        contents.add(frame);
-        frameCurrentIndex.add(0);
+        ByteBuffer newFrame = getFrame(frameSize);
+        if (newFrame == null) {
+            throw new HyracksDataException("Can't allocate a frame for Hash Table. Please allocate more budget.");
+        }
+        IntSerDeBuffer frame = new IntSerDeBuffer(newFrame);
         frameCapacity = frame.capacity();
+        contents.add(frame);
+        currentOffsetInEachFrameList.add(0);
+        this.garbageCollectionThreshold = garbageCollectionThreshold;
     }
 
     @Override
-    public void insert(int entry, TuplePointer pointer) throws HyracksDataException {
-        int hFrameIndex = getHeaderFrameIndex(entry);
-        int headerOffset = getHeaderFrameOffset(entry);
-        IntSerDeBuffer header = headers[hFrameIndex];
-        if (header == null) {
-            header = new IntSerDeBuffer(ctx.allocateFrame().array());
-            headers[hFrameIndex] = header;
-            resetFrame(header);
-            headerFrameCount++;
+    ByteBuffer getFrame(int size) throws HyracksDataException {
+        ByteBuffer newFrame = bufferManager.acquireFrame(size);
+        if (newFrame != null) {
+            currentByteSize += size;
         }
-        int frameIndex = header.getInt(headerOffset);
-        int offsetIndex = header.getInt(headerOffset + 1);
-        if (frameIndex < 0) {
-            // insert first tuple into the entry
-            insertNewEntry(header, headerOffset, INIT_ENTRY_SIZE, pointer);
-        } else {
-            // insert non-first tuple into the entry
-            insertNonFirstTuple(header, headerOffset, frameIndex, offsetIndex, pointer);
-        }
-        tupleCount++;
+        return newFrame;
     }
 
     @Override
-    public void delete(int entry) {
-        int hFrameIndex = getHeaderFrameIndex(entry);
-        int headerOffset = getHeaderFrameOffset(entry);
-        IntSerDeBuffer header = headers[hFrameIndex];
-        if (header != null) {
-            int frameIndex = header.getInt(headerOffset);
-            int offsetIndex = header.getInt(headerOffset + 1);
-            if (frameIndex >= 0) {
-                IntSerDeBuffer frame = contents.get(frameIndex);
-                int entryUsedItems = frame.getInt(offsetIndex + 1);
-                frame.writeInt(offsetIndex + 1, 0);
-                tupleCount -= entryUsedItems;
-            }
-        }
-    }
-
-    @Override
-    public boolean getTuplePointer(int entry, int offset, TuplePointer dataPointer) {
-        int hFrameIndex = getHeaderFrameIndex(entry);
-        int headerOffset = getHeaderFrameOffset(entry);
-        IntSerDeBuffer header = headers[hFrameIndex];
-        if (header == null) {
-            dataPointer.reset(-1, -1);
-            return false;
-        }
-        int frameIndex = header.getInt(headerOffset);
-        int offsetIndex = header.getInt(headerOffset + 1);
-        if (frameIndex < 0) {
-            dataPointer.reset(-1, -1);
-            return false;
-        }
-        IntSerDeBuffer frame = contents.get(frameIndex);
-        int entryUsedItems = frame.getInt(offsetIndex + 1);
-        if (offset > entryUsedItems - 1) {
-            dataPointer.reset(-1, -1);
-            return false;
-        }
-        int startIndex = offsetIndex + 2 + offset * 2;
-        while (startIndex >= frameCapacity) {
-            ++frameIndex;
-            startIndex -= frameCapacity;
-        }
-        frame = contents.get(frameIndex);
-        dataPointer.reset(frame.getInt(startIndex), frame.getInt(startIndex + 1));
-        return true;
+    void increaseWastedSpaceCount(int size) {
+        wastedIntSpaceCount += size;
     }
 
     @Override
     public void reset() {
-        for (IntSerDeBuffer frame : headers)
-            if (frame != null)
-                resetFrame(frame);
-
-        frameCurrentIndex.clear();
-        for (int i = 0; i < contents.size(); i++) {
-            frameCurrentIndex.add(0);
-        }
-
-        currentLargestFrameIndex = 0;
-        tupleCount = 0;
-    }
-
-    @Override
-    public int getFrameCount() {
-        return headerFrameCount + contents.size();
-    }
-
-    @Override
-    public int getTupleCount() {
-        return tupleCount;
-    }
-
-    @Override
-    public int getTupleCount(int entry) {
-        int hFrameIndex = getHeaderFrameIndex(entry);
-        int headerOffset = getHeaderFrameOffset(entry);
-        IntSerDeBuffer header = headers[hFrameIndex];
-        if (header != null) {
-            int frameIndex = header.getInt(headerOffset);
-            int offsetIndex = header.getInt(headerOffset + 1);
-            if (frameIndex >= 0) {
-                IntSerDeBuffer frame = contents.get(frameIndex);
-                int entryUsedItems = frame.getInt(offsetIndex + 1);
-                return entryUsedItems;
-            }
-        }
-        return 0;
+        super.reset();
+        currentByteSize = 0;
     }
 
     @Override
     public void close() {
-        int nFrames = contents.size();
-        int hFrames = 0;
         for (int i = 0; i < headers.length; i++) {
             if (headers[i] != null) {
-                hFrames++;
+                bufferManager.releaseFrame(headers[i].getByteBuffer());
                 headers[i] = null;
             }
         }
+        for (int i = 0; i < contents.size(); i++) {
+            bufferManager.releaseFrame(contents.get(i).getByteBuffer());
+        }
         contents.clear();
-        frameCurrentIndex.clear();
+        currentOffsetInEachFrameList.clear();
         tupleCount = 0;
-        currentLargestFrameIndex = 0;
-        ctx.deallocateFrames((nFrames + hFrames) * frameCapacity * 4);
+        currentByteSize = 0;
+        currentLargestFrameNumber = 0;
     }
 
-    private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer)
+    @Override
+    public boolean isGarbageCollectionNeeded() {
+        return wastedIntSpaceCount > frameCapacity * (currentLargestFrameNumber + 1) * garbageCollectionThreshold;
+    }
+
+    /**
+     * Collects garbages. The steps are as follows.
+     * #1. Initialize the Reader and Writer. The starting frame index is set to zero at this moment.
+     * #2. Read a content frame. Find and read a slot data. Check the number of used count for the slot.
+     * If it's not -1 (meaning that it is being used now), we move it to to the
+     * current writing offset of the Writer frame. Update the corresponding h() value pointer for this location
+     * in the header frame. We can find the h() value of the slot using a first tuple pointer in the slot.
+     * If the number is -1 (meaning that it is migrated to a new place due to an overflow or deleted),
+     * just reclaim the space by letting other slot move to this space.
+     * #3. Once a Reader reaches the end of a frame, read next frame by frame. This applies to the Writer, too. i.e.
+     * If the writing offset pointer reaches at the end of a frame, then writing frame will be set to the next frame.
+     * #4. Repeat #1 ~ #3 until all frames are read.
+     *
+     * @return the number of frames that are reclaimed. The value -1 is returned when no compaction was happened.
+     */
+    @Override
+    public int collectGarbage(ITuplePointerAccessor bufferAccessor, ITuplePartitionComputer tpc)
             throws HyracksDataException {
-        IntSerDeBuffer lastFrame = contents.get(currentLargestFrameIndex);
-        int lastIndex = frameCurrentIndex.get(currentLargestFrameIndex);
-        int requiredIntCapacity = entryCapacity * 2;
-        int startFrameIndex = currentLargestFrameIndex;
+        // Keeps the garbage collection related variable
+        GarbageCollectionInfo gcInfo = new GarbageCollectionInfo();
 
-        if (lastIndex + requiredIntCapacity >= frameCapacity) {
-            IntSerDeBuffer newFrame;
-            startFrameIndex++;
-            do {
-                if (currentLargestFrameIndex >= contents.size() - 1) {
-                    newFrame = new IntSerDeBuffer(ctx.allocateFrame().array());
-                    currentLargestFrameIndex++;
-                    contents.add(newFrame);
-                    frameCurrentIndex.add(0);
+        int slotCapacity;
+        int slotUsedCount;
+        int capacityInIntCount;
+        int nextSlotIntPosInPageForGC;
+        boolean currentPageChanged;
+        IntSerDeBuffer currentReadContentFrameForGC;
+        IntSerDeBuffer currentWriteContentFrameForGC = contents.get(gcInfo.currentGCWritePageForGC);
+        int lastOffsetInLastFrame = currentOffsetInEachFrameList.get(contents.size() - 1);
+
+        // Step #1. Reads a content frame until it reaches the end of content frames.
+        while (gcInfo.currentReadPageForGC <= currentLargestFrameNumber) {
+
+            gcInfo.currentReadIntOffsetInPageForGC = 0;
+            currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+
+            // Step #2. Advances the reader until it hits the end of the given frame.
+            while (gcInfo.currentReadIntOffsetInPageForGC < frameCapacity) {
+                nextSlotIntPosInPageForGC = findNextSlotInPage(currentReadContentFrameForGC,
+                        gcInfo.currentReadIntOffsetInPageForGC);
+
+                if (nextSlotIntPosInPageForGC == INVALID_VALUE) {
+                    // There isn't a valid slot in the page. Exits the loop #2 and reads the next frame.
+                    break;
+                }
+
+                // Valid slot found. Reads the given slot information.
+                slotCapacity = currentReadContentFrameForGC.getInt(nextSlotIntPosInPageForGC);
+                slotUsedCount = currentReadContentFrameForGC.getInt(nextSlotIntPosInPageForGC + 1);
+                capacityInIntCount = (slotCapacity + 1) * 2;
+
+                // Used count should not be -1 (migrated or deleted).
+                if (slotUsedCount != INVALID_VALUE) {
+                    // To prepare hash pointer (header -> content) update, read the first tuple pointer in the old slot.
+                    tempTuplePointer.reset(currentReadContentFrameForGC.getInt(nextSlotIntPosInPageForGC + 2),
+                            currentReadContentFrameForGC.getInt(nextSlotIntPosInPageForGC + 3));
+
+                    // Check whether there is at least some space to put some part of the slot.
+                    // If not, advance the write pointer to the next page.
+                    if ((gcInfo.currentWriteIntOffsetInPageForGC + 4) > frameCapacity
+                            && gcInfo.currentGCWritePageForGC < currentLargestFrameNumber) {
+                        // Swipe the region that can't be used.
+                        currentWriteContentFrameForGC.writeInvalidVal(gcInfo.currentWriteIntOffsetInPageForGC,
+                                frameCapacity - gcInfo.currentWriteIntOffsetInPageForGC);
+                        gcInfo.currentGCWritePageForGC++;
+                        currentWriteContentFrameForGC = contents.get(gcInfo.currentGCWritePageForGC);
+                        gcInfo.currentWriteIntOffsetInPageForGC = 0;
+                    }
+
+                    // Migrates this slot to the current offset in Writer's Frame if possible.
+                    currentPageChanged = MigrateSlot(gcInfo, bufferAccessor, tpc, capacityInIntCount,
+                            nextSlotIntPosInPageForGC);
+
+                    if (currentPageChanged) {
+                        currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+                        currentWriteContentFrameForGC = contents.get(gcInfo.currentGCWritePageForGC);
+                    }
                 } else {
-                    currentLargestFrameIndex++;
-                    frameCurrentIndex.set(currentLargestFrameIndex, 0);
+                    // A useless slot (either migrated or deleted) is found. Resets the space
+                    // so it will be occupied by the next valid slot.
+                    currentPageChanged = resetSlotSpace(gcInfo, nextSlotIntPosInPageForGC, capacityInIntCount);
+
+                    if (currentPageChanged) {
+                        currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+                    }
+
                 }
-                requiredIntCapacity -= frameCapacity;
-            } while (requiredIntCapacity > 0);
-            lastIndex = 0;
-            lastFrame = contents.get(startFrameIndex);
-        }
-
-        // set header
-        header.writeInt(headerOffset, startFrameIndex);
-        header.writeInt(headerOffset + 1, lastIndex);
-
-        // set the entry
-        lastFrame.writeInt(lastIndex, entryCapacity - 1);
-        lastFrame.writeInt(lastIndex + 1, 1);
-        lastFrame.writeInt(lastIndex + 2, pointer.getFrameIndex());
-        lastFrame.writeInt(lastIndex + 3, pointer.getTupleIndex());
-        int newLastIndex = lastIndex + entryCapacity * 2;
-        newLastIndex = newLastIndex < frameCapacity ? newLastIndex : frameCapacity - 1;
-        frameCurrentIndex.set(startFrameIndex, newLastIndex);
-
-        requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastIndex);
-        while (requiredIntCapacity > 0) {
-            startFrameIndex++;
-            requiredIntCapacity -= frameCapacity;
-            newLastIndex = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity : frameCapacity - 1;
-            frameCurrentIndex.set(startFrameIndex, newLastIndex);
-        }
-    }
-
-    private void insertNonFirstTuple(IntSerDeBuffer header, int headerOffset, int frameIndexArg, int offsetIndex,
-            TuplePointer pointer) throws HyracksDataException {
-        int frameIndex = frameIndexArg;
-        IntSerDeBuffer frame = contents.get(frameIndex);
-        int entryItems = frame.getInt(offsetIndex);
-        int entryUsedItems = frame.getInt(offsetIndex + 1);
-
-        if (entryUsedItems < entryItems) {
-            frame.writeInt(offsetIndex + 1, entryUsedItems + 1);
-            int startIndex = offsetIndex + 2 + entryUsedItems * 2;
-            while (startIndex >= frameCapacity) {
-                ++frameIndex;
-                startIndex -= frameCapacity;
             }
-            frame = contents.get(frameIndex);
-            frame.writeInt(startIndex, pointer.getFrameIndex());
-            frame.writeInt(startIndex + 1, pointer.getTupleIndex());
+
+            // Reached the end of a frame. Advances the Reader.
+            if (gcInfo.currentReadPageForGC == currentLargestFrameNumber) {
+                break;
+            }
+            gcInfo.currentReadPageForGC++;
+        }
+
+        // More unused frames at the end?
+        int extraFrames = 0;
+        if (contents.size() > (currentLargestFrameNumber + 1)) {
+            extraFrames = contents.size() - (currentLargestFrameNumber + 1);
+        }
+
+        // Done reading all frames. So, releases unnecessary frames.
+        int numberOfFramesToBeDeallocated = gcInfo.currentReadPageForGC + extraFrames - gcInfo.currentGCWritePageForGC;
+
+        if (numberOfFramesToBeDeallocated >= 1) {
+            for (int i = 0; i < numberOfFramesToBeDeallocated; i++) {
+                currentByteSize -= contents.get(gcInfo.currentGCWritePageForGC + 1).getByteCapacity();
+                bufferManager.releaseFrame(contents.get(gcInfo.currentGCWritePageForGC + 1).getByteBuffer());
+                contents.remove(gcInfo.currentGCWritePageForGC + 1);
+                currentOffsetInEachFrameList.remove(gcInfo.currentGCWritePageForGC + 1);
+            }
         } else {
-            int capacity = (entryItems + 1) * 2;
-            header.writeInt(headerOffset, -1);
-            header.writeInt(headerOffset + 1, -1);
-            int fIndex = frame.getInt(offsetIndex + 2);
-            int tIndex = frame.getInt(offsetIndex + 3);
-            tempTuplePointer.reset(fIndex, tIndex);
-            this.insertNewEntry(header, headerOffset, capacity, tempTuplePointer);
-            int newFrameIndex = header.getInt(headerOffset);
-            int newTupleIndex = header.getInt(headerOffset + 1);
-
-            for (int i = 1; i < entryUsedItems; i++) {
-                int startIndex = offsetIndex + 2 + i * 2;
-                int startFrameIndex = frameIndex;
-                while (startIndex >= frameCapacity) {
-                    ++startFrameIndex;
-                    startIndex -= frameCapacity;
-                }
-                frame = contents.get(startFrameIndex);
-                fIndex = frame.getInt(startIndex);
-                tIndex = frame.getInt(startIndex + 1);
-                tempTuplePointer.reset(fIndex, tIndex);
-                insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, tempTuplePointer);
+            // For this case, we check whether the last offset is changed.
+            // If not, we didn't get any space from the operation.
+            int afterLastOffsetInLastFrame = currentOffsetInEachFrameList.get(gcInfo.currentGCWritePageForGC);
+            if (lastOffsetInLastFrame == afterLastOffsetInLastFrame) {
+                numberOfFramesToBeDeallocated = -1;
             }
-            insertNonFirstTuple(header, headerOffset, newFrameIndex, newTupleIndex, pointer);
+        }
+
+        // Resets the current offset in the last frame so that the future insertions will work without an issue.
+        currentLargestFrameNumber = gcInfo.currentGCWritePageForGC;
+        currentOffsetInEachFrameList.set(gcInfo.currentGCWritePageForGC, gcInfo.currentWriteIntOffsetInPageForGC);
+
+        wastedIntSpaceCount = 0;
+        tempTuplePointer.reset(INVALID_VALUE, INVALID_VALUE);
+
+        return numberOfFramesToBeDeallocated;
+    }
+
+    /**
+     * Migrates the current slot to the designated place and reset the current space using INVALID_VALUE.
+     *
+     * @return true if the current page has been changed. false if not.
+     */
+    private boolean MigrateSlot(GarbageCollectionInfo gcInfo, ITuplePointerAccessor bufferAccessor,
+            ITuplePartitionComputer tpc, int capacityInIntCount, int nextSlotIntPosInPageForGC)
+            throws HyracksDataException {
+        boolean currentPageChanged = false;
+        // If the reader and writer indicate the same slot location, a move is not required.
+        if (gcInfo.isReaderWriterAtTheSamePos()) {
+            int intToRead = capacityInIntCount;
+            int intReadAtThisTime;
+            gcInfo.currentReadIntOffsetInPageForGC = nextSlotIntPosInPageForGC;
+            while (intToRead > 0) {
+                intReadAtThisTime = Math.min(intToRead, frameCapacity - gcInfo.currentReadIntOffsetInPageForGC);
+                gcInfo.currentReadIntOffsetInPageForGC += intReadAtThisTime;
+                if (gcInfo.currentReadIntOffsetInPageForGC >= frameCapacity
+                        && gcInfo.currentReadPageForGC < currentLargestFrameNumber) {
+                    gcInfo.currentReadPageForGC++;
+                    gcInfo.currentReadIntOffsetInPageForGC = 0;
+                    currentPageChanged = true;
+                }
+                intToRead -= intReadAtThisTime;
+            }
+
+            gcInfo.currentGCWritePageForGC = gcInfo.currentReadPageForGC;
+            gcInfo.currentWriteIntOffsetInPageForGC = gcInfo.currentReadIntOffsetInPageForGC;
+
+            return currentPageChanged;
+        }
+
+        // The reader is ahead of the writer. We can migrate the given slot towards to the beginning of
+        // the content frame(s).
+        int tempWriteIntPosInPage = gcInfo.currentWriteIntOffsetInPageForGC;
+        int tempReadIntPosInPage = nextSlotIntPosInPageForGC;
+        int chunksToMove = capacityInIntCount;
+        int chunksToMoveAtThisTime;
+
+        // To keep the original writing page that is going to be used for updating the header to content frame,
+        // we declare a local variable.
+        int tempWritePage = gcInfo.currentGCWritePageForGC;
+
+        // Keeps the maximum INT chunks that writer/reader can write in the current page.
+        int oneTimeIntCapacityForWriter;
+        int oneTimeIntCapacityForReader;
+
+        IntSerDeBuffer currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+        IntSerDeBuffer currentWriteContentFrameForGC = contents.get(gcInfo.currentGCWritePageForGC);
+
+        // Moves the slot.
+        while (chunksToMove > 0) {
+            oneTimeIntCapacityForWriter = Math.min(chunksToMove, frameCapacity - tempWriteIntPosInPage);
+            oneTimeIntCapacityForReader = Math.min(chunksToMove, frameCapacity - tempReadIntPosInPage);
+
+            // Since the location of Reader and Writer are different, we can only move a minimum chunk
+            // before the current page of either Reader or Writer changes.
+            chunksToMoveAtThisTime = Math.min(oneTimeIntCapacityForWriter, oneTimeIntCapacityForReader);
+
+            // Moves a part of the slot from the Reader to Writer
+            System.arraycopy(currentReadContentFrameForGC.bytes, tempReadIntPosInPage * INT_SIZE,
+                    currentWriteContentFrameForGC.bytes, tempWriteIntPosInPage * INT_SIZE,
+                    chunksToMoveAtThisTime * INT_SIZE);
+
+            // Clears that part in the Reader
+            for (int i = 0; i < chunksToMoveAtThisTime; i++) {
+                // Do not blindly put -1 since there might be overlapping between writer and reader.
+                if ((gcInfo.currentReadPageForGC != tempWritePage)
+                        || (tempReadIntPosInPage + i >= tempWriteIntPosInPage + chunksToMoveAtThisTime)) {
+                    currentReadContentFrameForGC.writeInvalidVal(tempReadIntPosInPage + i, chunksToMoveAtThisTime - i);
+                    break;
+                }
+            }
+
+            // Advances the pointer
+            tempWriteIntPosInPage += chunksToMoveAtThisTime;
+            tempReadIntPosInPage += chunksToMoveAtThisTime;
+
+            // Once the writer pointer hits the end of the page, we move to the next content page.
+            if (tempWriteIntPosInPage >= frameCapacity && tempWritePage < currentLargestFrameNumber) {
+                tempWritePage++;
+                currentPageChanged = true;
+                currentWriteContentFrameForGC = contents.get(tempWritePage);
+                tempWriteIntPosInPage = 0;
+            }
+
+            // Once the reader pointer hits the end of the page, we move to the next content page.
+            if (tempReadIntPosInPage >= frameCapacity && gcInfo.currentReadPageForGC < currentLargestFrameNumber) {
+                gcInfo.currentReadPageForGC++;
+                currentPageChanged = true;
+                currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+                tempReadIntPosInPage = 0;
+            }
+
+            chunksToMove -= chunksToMoveAtThisTime;
+        }
+
+        updateHeaderToContentPointerInHeaderFrame(bufferAccessor, tpc, tempTuplePointer, gcInfo.currentGCWritePageForGC,
+                gcInfo.currentWriteIntOffsetInPageForGC);
+
+        gcInfo.currentGCWritePageForGC = tempWritePage;
+        gcInfo.currentWriteIntOffsetInPageForGC = tempWriteIntPosInPage;
+        gcInfo.currentReadIntOffsetInPageForGC = tempReadIntPosInPage;
+
+        return currentPageChanged;
+    }
+
+    /**
+     * Completely removes the slot in the given content frame(s) and resets the space.
+     * For this method, we assume that this slot is not moved to somewhere else.
+     *
+     * @return true if the current page has been changed. false if not.
+     */
+    private boolean resetSlotSpace(GarbageCollectionInfo gcInfo, int slotIntPos, int capacityInIntCount) {
+        boolean currentPageChanged = false;
+        int tempReadIntPosInPage = slotIntPos;
+        int chunksToDelete = capacityInIntCount;
+        int chunksToDeleteAtThisTime;
+        IntSerDeBuffer currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+
+        while (chunksToDelete > 0) {
+            chunksToDeleteAtThisTime = Math.min(chunksToDelete, frameCapacity - tempReadIntPosInPage);
+
+            // Clears that part in the Reader
+            currentReadContentFrameForGC.writeInvalidVal(tempReadIntPosInPage, chunksToDeleteAtThisTime);
+
+            // Advances the pointer
+            tempReadIntPosInPage += chunksToDeleteAtThisTime;
+
+            // Once the reader pointer hits the end of the page, we move to the next content page.
+            if (tempReadIntPosInPage >= frameCapacity && gcInfo.currentReadPageForGC < currentLargestFrameNumber) {
+                gcInfo.currentReadPageForGC++;
+                currentPageChanged = true;
+                currentReadContentFrameForGC = contents.get(gcInfo.currentReadPageForGC);
+                tempReadIntPosInPage = 0;
+            }
+
+            chunksToDelete -= chunksToDeleteAtThisTime;
+        }
+
+        gcInfo.currentReadIntOffsetInPageForGC = tempReadIntPosInPage;
+
+        return currentPageChanged;
+    }
+
+    /**
+     * Updates the given Header to Content Frame Pointer after calculating the corresponding hash value from the
+     * given tuple pointer.
+     */
+    private void updateHeaderToContentPointerInHeaderFrame(ITuplePointerAccessor bufferAccessor,
+            ITuplePartitionComputer tpc, TuplePointer hashedTuple, int newContentFrame,
+            int newOffsetInContentFrame) throws HyracksDataException {
+        // Finds the original hash value. We assume that bufferAccessor and tpc is already assigned.
+        bufferAccessor.reset(hashedTuple);
+        int entry = tpc.partition(bufferAccessor, hashedTuple.getTupleIndex(), tableSize);
+
+        // Finds the location of the hash value in the header frame arrays.
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer headerFrame = headers[headerFrameIndex];
+
+        // Updates the hash value.
+        headerFrame.writeInt(offsetInHeaderFrame, newContentFrame);
+        headerFrame.writeInt(offsetInHeaderFrame + 1, newOffsetInContentFrame);
+    }
+
+
+    /**
+     * Tries to find the next valid slot position in the given content frame from the current position.
+     */
+    private int findNextSlotInPage(IntSerDeBuffer frame, int readIntPosAtPage) {
+        // Sanity check
+        if (readIntPosAtPage >= frameCapacity) {
+            return INVALID_VALUE;
+        }
+        int intOffset = readIntPosAtPage;
+        while (frame.getInt(intOffset) == INVALID_VALUE) {
+            intOffset++;
+            if (intOffset >= frameCapacity) {
+                // Couldn't find the next slot in the given page.
+                return INVALID_VALUE;
+            }
+        }
+        return intOffset;
+    }
+
+    /**
+     * Keeps the garbage collection related variables
+     */
+    private static class GarbageCollectionInfo {
+        int currentReadPageForGC;
+        int currentReadIntOffsetInPageForGC;
+        int currentGCWritePageForGC;
+        int currentWriteIntOffsetInPageForGC;
+
+        public GarbageCollectionInfo() {
+            currentReadPageForGC = 0;
+            currentReadIntOffsetInPageForGC = 0;
+            currentGCWritePageForGC = 0;
+            currentWriteIntOffsetInPageForGC = 0;
+        }
+
+        /**
+         * Checks whether the writing position and the reading position are the same.
+         */
+        public boolean isReaderWriterAtTheSamePos() {
+            return currentReadPageForGC == currentGCWritePageForGC
+                    && currentReadIntOffsetInPageForGC == currentWriteIntOffsetInPageForGC;
         }
     }
 
-    private void resetFrame(IntSerDeBuffer frame) {
-        for (int i = 0; i < frameCapacity; i++)
-            frame.writeInt(i, -1);
+    /**
+     * Returns the current status of this table: the number of slots, frames, space utilization, etc.
+     */
+    @Override
+    public String printInfo() {
+        SlotInfoPair<Integer, Integer> slotInfo = new SlotInfoPair<>(0, 0);
+
+        int nFrames = contents.size();
+        int hFrames = 0;
+        // Histogram Information - counts the number of used count per slot used count (e.g., 10,2 means that
+        // there are 10 hash slots that only has two hash entries in it.)
+        Map<Integer, Integer> headerSlotUsedCountMap = new TreeMap<>();
+
+        // Histogram Information - counts the number of capacity count per slot count (10,3 means that
+        // there are 10 hash slots whose capacity is 3.)
+        Map<Integer, Integer> headerSlotCapaCountMap = new TreeMap<>();
+
+        int headerSlotUsedCount = 0;
+        int headerSlotTotalCount;
+        double headerSlotUsedRatio = 0.0;
+        IntSerDeBuffer header;
+        int tupleUsedCount;
+        int tupleUsedCountFromMap;
+        int capacity;
+        int capacityFromMap;
+        for (int i = 0; i < headers.length; i++) {
+            if (headers[i] != null) {
+                header = headers[i];
+                for (int j = 0; j < frameCapacity; j = j + 2) {
+                    if (header.getInt(j) >= 0) {
+                        headerSlotUsedCount++;
+                        getSlotInfo(header.getInt(j), header.getInt(j + 1), slotInfo);
+                        capacity = slotInfo.first;
+                        tupleUsedCount = slotInfo.second;
+                        // UsedCount increase
+                        if (headerSlotUsedCountMap.containsKey(tupleUsedCount)) {
+                            tupleUsedCountFromMap = headerSlotUsedCountMap.get(tupleUsedCount);
+                            headerSlotUsedCountMap.put(tupleUsedCount, tupleUsedCountFromMap + 1);
+                        } else {
+                            headerSlotUsedCountMap.put(tupleUsedCount, 1);
+                        }
+                        // Capacity increase
+                        if (headerSlotCapaCountMap.containsKey(capacity)) {
+                            capacityFromMap = headerSlotCapaCountMap.get(capacity);
+                            headerSlotCapaCountMap.put(capacity, capacityFromMap + 1);
+                        } else {
+                            headerSlotCapaCountMap.put(capacity, 1);
+                        }
+                        headerSlotUsedCount++;
+                    }
+                }
+                hFrames++;
+            }
+        }
+        headerSlotTotalCount = hFrames * frameCapacity / 2;
+        if (headerSlotTotalCount > 0) {
+            headerSlotUsedRatio = (double) headerSlotUsedCount / (double) headerSlotTotalCount;
+        }
+        int total = hFrames + nFrames;
+        StringBuilder buf = new StringBuilder();
+        buf.append("\n>>> " + this + " " + Thread.currentThread().getId() + "::printInfo()" + "\n");
+        buf.append("(A) hash table cardinality (# of slot):\t" + tableSize + "\tExpected Table Size(MB):\t"
+                + ((double) getExpectedTableByteSize(tableSize, frameCapacity * 4) / 1048576) + "\twasted size(MB):\t"
+                + ((double) wastedIntSpaceCount * 4 / 1048576) + "\n");
+        buf.append("(B) # of header frames:\t" + hFrames + "\tsize(MB)\t"
+                + ((double) hFrames * frameCapacity * 4 / 1048576) + "\tratio (B/D)\t" + ((double) hFrames / total)
+                + "\n");
+        buf.append("(C) # of content frames:\t" + nFrames + "\tsize(MB)\t"
+                + ((double) nFrames * frameCapacity * 4 / 1048576) + "\tratio (C/D)\t" + ((double) nFrames / total)
+                + "\n");
+        buf.append("(D) # of total frames:\t" + total + "\tsize(MB)\t" + ((double) total * frameCapacity * 4 / 1048576)
+                + "\n");
+        buf.append("(E) # of used header entries:\t" + headerSlotUsedCount + "\n");
+        buf.append("(F) # of all possible header entries:\t" + headerSlotTotalCount + "\n");
+        buf.append("(G) header entries used ratio (E/F):\t" + headerSlotUsedRatio + "\n");
+        buf.append("(H) used count histogram (used count, its frequency):" + "\n");
+        int totalContentUsedCount = 0;
+        for (Map.Entry<Integer, Integer> entry : headerSlotUsedCountMap.entrySet()) {
+            buf.append(entry.getKey() + "\t" + entry.getValue() + "\n");
+            totalContentUsedCount += (entry.getKey() * entry.getValue());
+        }
+        buf.append("(H-1) total used count in content frames:\t" + totalContentUsedCount + "\n");
+
+        int totalContentCapaCount = 0;
+        buf.append("(I) capacity count histogram (capacity, its frequency):" + "\n");
+        for (Map.Entry<Integer, Integer> entry : headerSlotCapaCountMap.entrySet()) {
+            buf.append(entry.getKey() + "\t" + entry.getValue() + "\n");
+            totalContentCapaCount += (entry.getKey() * entry.getValue());
+        }
+        buf.append("(I-1) total capacity in content frames:\t" + totalContentCapaCount + "\n");
+        buf.append("(J) ratio of used count in content frames (H-1 / I-1):\t"
+                + ((double) totalContentUsedCount / totalContentCapaCount) + "\n");
+        return buf.toString();
     }
 
-    private int getHeaderFrameIndex(int entry) {
-        int frameIndex = entry * 2 / frameCapacity;
-        return frameIndex;
+    /**
+     * Returns the capacity and the usedCount for the given slot in this table.
+     */
+    public void getSlotInfo(int contentFrameIndex, int contentOffsetIndex, SlotInfoPair<Integer, Integer> slotInfo) {
+        IntSerDeBuffer frame = contents.get(contentFrameIndex);
+        int entryCapacity = frame.getInt(contentOffsetIndex);
+        int entryUsedItems = frame.getInt(contentOffsetIndex + 1);
+        slotInfo.reset(entryCapacity, entryUsedItems);
     }
 
-    private int getHeaderFrameOffset(int entry) {
-        int offset = entry * 2 % frameCapacity;
-        return offset;
-    }
+    private static class SlotInfoPair<T1, T2> {
 
-    private static class IntSerDeBuffer {
+        private T1 first;
+        private T2 second;
 
-        private byte[] bytes;
-
-        public IntSerDeBuffer(byte[] data) {
-            this.bytes = data;
+        public SlotInfoPair(T1 first, T2 second) {
+            this.first = first;
+            this.second = second;
         }
 
-        public int getInt(int pos) {
-            int offset = pos * 4;
-            return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16)
-                    + ((bytes[offset + 2] & 0xff) << 8) + ((bytes[offset + 3] & 0xff) << 0);
-        }
-
-        public void writeInt(int pos, int value) {
-            int offset = pos * 4;
-            bytes[offset++] = (byte) (value >> 24);
-            bytes[offset++] = (byte) (value >> 16);
-            bytes[offset++] = (byte) (value >> 8);
-            bytes[offset++] = (byte) (value);
-        }
-
-        public int capacity() {
-            return bytes.length / 4;
+        public void reset(T1 first, T2 second) {
+            this.first = first;
+            this.second = second;
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
new file mode 100644
index 0000000..5b7d364
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTable.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.structures;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
+
+/**
+ * This table consists of header frames and content frames.
+ * Header indicates the first entry slot location for the given integer value.
+ * A header slot consists of [content frame number], [offset in that frame] to get
+ * the first tuple's pointer information that shares the same hash value.
+ * An entry slot in the content frame is as follows.
+ * [capacity of the slot], [# of occupied elements], {[frameIndex], [tupleIndex]}+;
+ * <fIndex, tIndex> forms a tuple pointer.
+ * WARNING: this hash table can grow up indefinitely and may generate Out Of Memory Exception.
+ * So, do not use this in production and use SerializableHashTable class instead
+ * since that should be managed by a buffer manager.
+ */
+public class SimpleSerializableHashTable implements ISerializableTable {
+
+    // unit size: int
+    protected static final int INT_SIZE = 4;
+    // Initial entry slot size
+    protected static final int INIT_ENTRY_SIZE = 4;
+    protected static final int INVALID_VALUE = 0xFFFFFFFF;
+    protected static final byte INVALID_BYTE_VALUE = (byte) 0xFF;
+
+    // Header frame array
+    protected IntSerDeBuffer[] headers;
+    // Content frame list
+    protected List<IntSerDeBuffer> contents = new ArrayList<>();
+    protected List<Integer> currentOffsetInEachFrameList = new ArrayList<>();
+    protected int frameCapacity;
+    protected int currentLargestFrameNumber = 0;
+    // The byte size of total frames that are allocated to the headers and contents
+    protected int currentByteSize = 0;
+    protected int tupleCount = 0;
+    protected TuplePointer tempTuplePointer = new TuplePointer();
+    protected int tableSize;
+    protected int frameSize;
+    protected IHyracksFrameMgrContext ctx;
+
+    public SimpleSerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx) throws HyracksDataException {
+        this(tableSize, ctx, true);
+    }
+
+    public SimpleSerializableHashTable(int tableSize, final IHyracksFrameMgrContext ctx, boolean frameInitRequired)
+            throws HyracksDataException {
+        this.ctx = ctx;
+        frameSize = ctx.getInitialFrameSize();
+        int residual = tableSize * INT_SIZE * 2 % frameSize == 0 ? 0 : 1;
+        int headerSize = tableSize * INT_SIZE * 2 / frameSize + residual;
+        headers = new IntSerDeBuffer[headerSize];
+        this.tableSize = tableSize;
+        if (frameInitRequired) {
+            ByteBuffer newFrame = getFrame(frameSize);
+            if (newFrame == null) {
+                throw new HyracksDataException("Can't initialize the Hash Table. Please assign more memory.");
+            }
+            IntSerDeBuffer frame = new IntSerDeBuffer(newFrame);
+            frameCapacity = frame.capacity();
+            contents.add(frame);
+            currentOffsetInEachFrameList.add(0);
+        }
+    }
+
+    ByteBuffer getFrame(int size) throws HyracksDataException {
+        currentByteSize += size;
+        return ctx.allocateFrame(size);
+    }
+
+    void increaseWastedSpaceCount(int size) {
+        // Do nothing. For this simple implementation, we don't count the wasted space.
+    }
+
+    @Override
+    public boolean insert(int entry, TuplePointer pointer) throws HyracksDataException {
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer headerFrame = headers[headerFrameIndex];
+        if (headerFrame == null) {
+            ByteBuffer newFrame = getFrame(frameSize);
+            if (newFrame == null) {
+                return false;
+            }
+            headerFrame = new IntSerDeBuffer(newFrame);
+            headers[headerFrameIndex] = headerFrame;
+        }
+        int contentFrameIndex = headerFrame.getInt(offsetInHeaderFrame);
+        boolean result;
+        if (contentFrameIndex < 0) {
+            // Since the initial value of index and offset is -1, this means that the slot for
+            // this entry is not created yet. So, create the entry slot and insert first tuple into that slot.
+            // OR, the previous slot becomes full and the newly double-sized slot is about to be created.
+            result = insertNewEntry(headerFrame, offsetInHeaderFrame, INIT_ENTRY_SIZE, pointer);
+        } else {
+            // The entry slot already exists. Insert non-first tuple into the entry slot
+            int offsetInContentFrame = headerFrame.getInt(offsetInHeaderFrame + 1);
+            result = insertNonFirstTuple(headerFrame, offsetInHeaderFrame, contentFrameIndex, offsetInContentFrame,
+                    pointer);
+        }
+
+        if (result) {
+            tupleCount++;
+        }
+
+        return result;
+    }
+
+    @Override
+    /**
+     * Reset the slot information for the entry. The connection (pointer) between header frame and
+     * content frame will be also lost. Specifically, we reset the number of used count in the slot as -1
+     * so that the space could be reclaimed.
+     */
+    public void delete(int entry) {
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer header = headers[headerFrameIndex];
+        if (header != null) {
+            int contentFrameIndex = header.getInt(offsetInHeaderFrame);
+            int offsetInContentFrame = header.getInt(offsetInHeaderFrame + 1);
+            if (contentFrameIndex >= 0) {
+                IntSerDeBuffer frame = contents.get(contentFrameIndex);
+                int entrySlotCapacity = frame.getInt(offsetInContentFrame);
+                int entryUsedItems = frame.getInt(offsetInContentFrame + 1);
+                // Set used count as -1 in the slot so that the slot space could be reclaimed.
+                frame.writeInvalidVal(offsetInContentFrame + 1, 1);
+                // Also reset the header (frmaeIdx, offset) to content frame pointer.
+                header.writeInvalidVal(offsetInHeaderFrame, 2);
+                tupleCount = tupleCount - entryUsedItems;
+                increaseWastedSpaceCount((entrySlotCapacity + 1) * 2);
+            }
+        }
+    }
+
+    @Override
+    /**
+     * For the given integer value, get the n-th (n = offsetInSlot) tuple pointer in the corresponding slot.
+     */
+    public boolean getTuplePointer(int entry, int offsetInSlot, TuplePointer dataPointer) {
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer header = headers[headerFrameIndex];
+        if (header == null) {
+            dataPointer.reset(INVALID_VALUE, INVALID_VALUE);
+            return false;
+        }
+        int contentFrameIndex = header.getInt(offsetInHeaderFrame);
+        int offsetInContentFrame = header.getInt(offsetInHeaderFrame + 1);
+        if (contentFrameIndex < 0) {
+            dataPointer.reset(INVALID_VALUE, INVALID_VALUE);
+            return false;
+        }
+        IntSerDeBuffer frame = contents.get(contentFrameIndex);
+        int entryUsedCountInSlot = frame.getInt(offsetInContentFrame + 1);
+        if (offsetInSlot > entryUsedCountInSlot - 1) {
+            dataPointer.reset(INVALID_VALUE, INVALID_VALUE);
+            return false;
+        }
+        int startOffsetInContentFrame = offsetInContentFrame + 2 + offsetInSlot * 2;
+        while (startOffsetInContentFrame >= frameCapacity) {
+            ++contentFrameIndex;
+            startOffsetInContentFrame -= frameCapacity;
+        }
+        frame = contents.get(contentFrameIndex);
+        dataPointer.reset(frame.getInt(startOffsetInContentFrame), frame.getInt(startOffsetInContentFrame + 1));
+        return true;
+    }
+
+    @Override
+    public void reset() {
+        for (IntSerDeBuffer frame : headers) {
+            if (frame != null) {
+                frame.resetFrame();
+            }
+        }
+
+        currentOffsetInEachFrameList.clear();
+        for (int i = 0; i < contents.size(); i++) {
+            currentOffsetInEachFrameList.add(0);
+        }
+
+        currentLargestFrameNumber = 0;
+        tupleCount = 0;
+        currentByteSize = 0;
+    }
+
+    @Override
+    public int getCurrentByteSize() {
+        return currentByteSize;
+    }
+
+    @Override
+    public int getTupleCount() {
+        return tupleCount;
+    }
+
+    @Override
+    /**
+     * Returns the tuple count in the slot for the given entry.
+     */
+    public int getTupleCount(int entry) {
+        int headerFrameIndex = getHeaderFrameIndex(entry);
+        int offsetInHeaderFrame = getHeaderFrameOffset(entry);
+        IntSerDeBuffer headerFrame = headers[headerFrameIndex];
+        if (headerFrame != null) {
+            int contentFrameIndex = headerFrame.getInt(offsetInHeaderFrame);
+            int offsetInContentFrame = headerFrame.getInt(offsetInHeaderFrame + 1);
+            if (contentFrameIndex >= 0) {
+                IntSerDeBuffer frame = contents.get(contentFrameIndex);
+                int entryUsedCountInSlot = frame.getInt(offsetInContentFrame + 1);
+                return entryUsedCountInSlot;
+            }
+        }
+        return 0;
+    }
+
+    @Override
+    public void close() {
+        int nFrames = contents.size();
+        for (int i = 0; i < headers.length; i++) {
+            headers[i] = null;
+        }
+        contents.clear();
+        currentOffsetInEachFrameList.clear();
+        tupleCount = 0;
+        currentByteSize = 0;
+        currentLargestFrameNumber = 0;
+        ctx.deallocateFrames(nFrames);
+    }
+
+    protected boolean insertNewEntry(IntSerDeBuffer header, int offsetInHeaderFrame, int entryCapacity,
+            TuplePointer pointer) throws HyracksDataException {
+        IntSerDeBuffer lastContentFrame = contents.get(currentLargestFrameNumber);
+        int lastOffsetInCurrentFrame = currentOffsetInEachFrameList.get(currentLargestFrameNumber);
+        int requiredIntCapacity = entryCapacity * 2;
+        int currentFrameNumber = currentLargestFrameNumber;
+        boolean currentFrameNumberChanged = false;
+
+        if (lastOffsetInCurrentFrame + requiredIntCapacity >= frameCapacity) {
+            IntSerDeBuffer newContentFrame;
+            // At least we need to have the mata-data (slot capacity and used count) and
+            // one tuplePointer in the same frame (4 INT_SIZE).
+            // So, if there is not enough space for this, we just move on to the next page.
+            if ((lastOffsetInCurrentFrame + 4) > frameCapacity) {
+                // Swipe the region that can't be used.
+                lastContentFrame.writeInvalidVal(lastOffsetInCurrentFrame, frameCapacity - lastOffsetInCurrentFrame);
+                currentFrameNumber++;
+                lastOffsetInCurrentFrame = 0;
+                currentFrameNumberChanged = true;
+            }
+            do {
+                if (currentLargestFrameNumber >= contents.size() - 1) {
+                    ByteBuffer newFrame = getFrame(frameSize);
+                    if (newFrame == null) {
+                        return false;
+                    }
+                    newContentFrame = new IntSerDeBuffer(newFrame);
+                    currentLargestFrameNumber++;
+                    contents.add(newContentFrame);
+                    currentOffsetInEachFrameList.add(0);
+                } else {
+                    currentLargestFrameNumber++;
+                    currentOffsetInEachFrameList.set(currentLargestFrameNumber, 0);
+                }
+                requiredIntCapacity -= frameCapacity;
+            } while (requiredIntCapacity > 0);
+        }
+
+        if (currentFrameNumberChanged) {
+            lastContentFrame = contents.get(currentFrameNumber);
+        }
+
+        // sets the header
+        header.writeInt(offsetInHeaderFrame, currentFrameNumber);
+        header.writeInt(offsetInHeaderFrame + 1, lastOffsetInCurrentFrame);
+
+        // sets the entry & its slot.
+        // 1. slot capacity
+        lastContentFrame.writeInt(lastOffsetInCurrentFrame, entryCapacity - 1);
+        // 2. used count in the slot
+        lastContentFrame.writeInt(lastOffsetInCurrentFrame + 1, 1);
+        // 3. initial entry in the slot
+        lastContentFrame.writeInt(lastOffsetInCurrentFrame + 2, pointer.getFrameIndex());
+        lastContentFrame.writeInt(lastOffsetInCurrentFrame + 3, pointer.getTupleIndex());
+        int newLastOffsetInContentFrame = lastOffsetInCurrentFrame + entryCapacity * 2;
+        newLastOffsetInContentFrame = newLastOffsetInContentFrame < frameCapacity ? newLastOffsetInContentFrame
+                    : frameCapacity - 1;
+        currentOffsetInEachFrameList.set(currentFrameNumber, newLastOffsetInContentFrame);
+
+        requiredIntCapacity = entryCapacity * 2 - (frameCapacity - lastOffsetInCurrentFrame);
+        while (requiredIntCapacity > 0) {
+            currentFrameNumber++;
+            requiredIntCapacity -= frameCapacity;
+            newLastOffsetInContentFrame = requiredIntCapacity < 0 ? requiredIntCapacity + frameCapacity
+                    : frameCapacity - 1;
+            currentOffsetInEachFrameList.set(currentFrameNumber, newLastOffsetInContentFrame);
+        }
+
+        return true;
+    }
+
+    protected boolean insertNonFirstTuple(IntSerDeBuffer header, int offsetInHeaderFrame, int contentFrameIndex,
+            int offsetInContentFrame, TuplePointer pointer) throws HyracksDataException {
+        int frameIndex = contentFrameIndex;
+        IntSerDeBuffer contentFrame = contents.get(frameIndex);
+        int entrySlotCapacity = contentFrame.getInt(offsetInContentFrame);
+        int entryUsedCountInSlot = contentFrame.getInt(offsetInContentFrame + 1);
+        boolean frameIndexChanged = false;
+        if (entryUsedCountInSlot < entrySlotCapacity) {
+            // The slot has at least one space to accommodate this tuple pointer.
+            // Increase the used count by 1.
+            contentFrame.writeInt(offsetInContentFrame + 1, entryUsedCountInSlot + 1);
+            // Calculates the first empty spot in the slot.
+            // +2: (capacity, # of used entry count)
+            // *2: each tuplePointer's occupation (frame index + offset in that frame)
+            int startOffsetInContentFrame = offsetInContentFrame + 2 + entryUsedCountInSlot * 2;
+            while (startOffsetInContentFrame >= frameCapacity) {
+                ++frameIndex;
+                startOffsetInContentFrame -= frameCapacity;
+                frameIndexChanged = true;
+            }
+            // We don't have to read content frame again if the frame index has not been changed.
+            if (frameIndexChanged) {
+                contentFrame = contents.get(frameIndex);
+            }
+            contentFrame.writeInt(startOffsetInContentFrame, pointer.getFrameIndex());
+            contentFrame.writeInt(startOffsetInContentFrame + 1, pointer.getTupleIndex());
+        } else {
+            // There is no enough space in this slot. We need to increase the slot size and
+            // migrate the current entries in it.
+
+            // New capacity: double the original capacity
+            int capacity = (entrySlotCapacity + 1) * 2;
+
+            // Temporarily sets the header (frameIdx, offset) as (-1,-1) for the slot.
+            header.writeInvalidVal(offsetInHeaderFrame, 2);
+            // Marks the old slot as obsolete - set the used count as -1 so that its space can be reclaimed
+            // when a garbage collection is executed.
+            contentFrame.writeInvalidVal(offsetInContentFrame + 1, 1);
+
+            // Gets the location of the initial entry.
+            int fIndex = contentFrame.getInt(offsetInContentFrame + 2);
+            int tIndex = contentFrame.getInt(offsetInContentFrame + 3);
+            tempTuplePointer.reset(fIndex, tIndex);
+            // Creates a new double-sized slot for the current entries and
+            // migrates the initial entry in the slot to the new slot.
+            if (!this.insertNewEntry(header, offsetInHeaderFrame, capacity, tempTuplePointer)) {
+                // Reverses the effect of change.
+                header.writeInt(offsetInHeaderFrame, contentFrameIndex);
+                header.writeInt(offsetInHeaderFrame + 1, offsetInContentFrame);
+                contentFrame.writeInt(offsetInContentFrame + 1, entryUsedCountInSlot);
+                return false;
+            }
+
+            int newFrameIndex = header.getInt(offsetInHeaderFrame);
+            int newTupleIndex = header.getInt(offsetInHeaderFrame + 1);
+
+            // Migrates the existing entries (from 2nd to the last).
+            for (int i = 1; i < entryUsedCountInSlot; i++) {
+                int startOffsetInContentFrame = offsetInContentFrame + 2 + i * 2;
+                int startFrameIndex = frameIndex;
+                while (startOffsetInContentFrame >= frameCapacity) {
+                    ++startFrameIndex;
+                    startOffsetInContentFrame -= frameCapacity;
+                }
+                contentFrame = contents.get(startFrameIndex);
+                fIndex = contentFrame.getInt(startOffsetInContentFrame);
+                tIndex = contentFrame.getInt(startOffsetInContentFrame + 1);
+                tempTuplePointer.reset(fIndex, tIndex);
+                if (!insertNonFirstTuple(header, offsetInHeaderFrame, newFrameIndex, newTupleIndex, tempTuplePointer)) {
+                    return false;
+                }
+            }
+            // Now, inserts the new entry that caused an overflow to the old bucket.
+            if (!insertNonFirstTuple(header, offsetInHeaderFrame, newFrameIndex, newTupleIndex, pointer)) {
+                return false;
+            }
+            increaseWastedSpaceCount(capacity);
+        }
+        return true;
+    }
+
+    protected int getHeaderFrameIndex(int entry) {
+        int frameIndex = entry * 2 / frameCapacity;
+        return frameIndex;
+    }
+
+    protected int getHeaderFrameOffset(int entry) {
+        int offset = entry * 2 % frameCapacity;
+        return offset;
+    }
+
+    public static int getUnitSize() {
+        return INT_SIZE;
+    }
+
+    public static int getNumberOfEntryInSlot() {
+        return INIT_ENTRY_SIZE;
+    }
+
+    public static int getExpectedByteSizePerHashValue() {
+        // first constant 2: capacity, # of used count
+        // second constant 2: tuple pointer (frameIndex, offset)
+        return getUnitSize() * (2 + getNumberOfEntryInSlot() * 2);
+    }
+
+    /**
+     * Calculates the expected hash table size based on a scenario: there are no duplicated entries so that
+     * each entry is assigned to all possible slots.
+     *
+     * @param tableSize
+     *            : the cardinality of the hash table - number of slots
+     * @param frameSize
+     *            : the frame size
+     * @return
+     *         expected the byte size of the hash table
+     */
+    public static long getExpectedTableFrameCount(long tableSize, int frameSize) {
+        long numberOfHeaderFrame = (long) (Math.ceil((double) tableSize * 2 / (double) frameSize));
+        long numberOfContentFrame = (long) (Math
+                .ceil(((double) getNumberOfEntryInSlot() * 2 * getUnitSize() * tableSize) / (double) frameSize));
+        return numberOfHeaderFrame + numberOfContentFrame;
+    }
+
+    public static long getExpectedTableByteSize(long tableSize, int frameSize) {
+        return getExpectedTableFrameCount(tableSize, frameSize) * frameSize;
+    }
+
+    /**
+     * Calculates the frame count increment/decrement for a new table size with the original size.
+     *
+     * @param origTableSize
+     *            : the original table cardinality
+     * @param delta
+     *            : a delta (a positive value means that the cardinality of the table will be increased.)
+     *            a negative value means that the cardinality of the table will be decreased.
+     * @return the frame count increment/decrement: a positive number means that the table size will be increased.
+     *         a negative number means that the table size will be decreased.
+     */
+    public static long calculateFrameCountDeltaForTableSizeChange(long origTableSize, long delta, int frameSize) {
+        long originalFrameCount = getExpectedTableFrameCount(origTableSize, frameSize);
+        long newFrameCount = getExpectedTableFrameCount(origTableSize + delta, frameSize);
+        return newFrameCount - originalFrameCount;
+    }
+
+    public static long calculateByteSizeDeltaForTableSizeChange(long origTableSize, long delta, int frameSize) {
+        return calculateFrameCountDeltaForTableSizeChange(origTableSize, delta, frameSize) * frameSize;
+    }
+
+    @Override
+    public boolean isGarbageCollectionNeeded() {
+        // This class doesn't support the garbage collection.
+        return false;
+    }
+
+    @Override
+    public int collectGarbage(ITuplePointerAccessor bufferAccessor, ITuplePartitionComputer tpc)
+            throws HyracksDataException {
+        // This class doesn't support the garbage collection.
+        return -1;
+    }
+
+    static class IntSerDeBuffer {
+
+        ByteBuffer byteBuffer;
+        byte[] bytes;
+
+        public IntSerDeBuffer(ByteBuffer byteBuffer) {
+            this.byteBuffer = byteBuffer;
+            this.bytes = byteBuffer.array();
+            resetFrame();
+        }
+
+        public int getInt(int pos) {
+            int offset = pos * 4;
+            return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16)
+                    + ((bytes[offset + 2] & 0xff) << 8) + (bytes[offset + 3] & 0xff);
+        }
+
+        public void writeInt(int pos, int value) {
+            int offset = pos * 4;
+            bytes[offset++] = (byte) (value >> 24);
+            bytes[offset++] = (byte) (value >> 16);
+            bytes[offset++] = (byte) (value >> 8);
+            bytes[offset] = (byte) (value);
+        }
+
+        public void writeInvalidVal(int intPos, int intRange) {
+            int offset = intPos * 4;
+            Arrays.fill(bytes, offset, offset + INT_SIZE * intRange, INVALID_BYTE_VALUE);
+        }
+
+        public int capacity() {
+            return bytes.length / 4;
+        }
+
+        public int getByteCapacity() {
+            return bytes.length;
+        }
+
+        public ByteBuffer getByteBuffer() {
+            return byteBuffer;
+        }
+
+        public void resetFrame() {
+            Arrays.fill(bytes, INVALID_BYTE_VALUE);
+        }
+
+    }
+
+    @Override
+    public String printInfo() {
+        return null;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java
index 43a1f6d..eeec223 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTableTest.java
@@ -23,8 +23,13 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.FramePoolBackedFrameBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
+import org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -33,11 +38,17 @@
     SerializableHashTable nsTable;
     final int NUM_PART = 101;
     TuplePointer pointer = new TuplePointer(0, 0);
-    final int num = 1000;
+    final int num = 10000;
+    protected IHyracksFrameMgrContext ctx;
+    private IDeallocatableFramePool framePool;
+    private ISimpleFrameBufferManager bufferManager;
 
     @Before
     public void setup() throws HyracksDataException {
-        nsTable = new SerializableHashTable(NUM_PART, new FrameManager(256));
+        ctx = new FrameManager(256);
+        framePool = new DeallocatableFramePool(ctx, ctx.getInitialFrameSize() * 2048);
+        bufferManager = new FramePoolBackedFrameBufferManager(framePool);
+        nsTable = new SerializableHashTable(NUM_PART, ctx, bufferManager);
     }
 
     @Test
@@ -66,7 +77,7 @@
         assertGetValue();
     }
 
-    private void assertGetValue() {
+    protected void assertGetValue() {
         int loop = 0;
         for (int i = 0; i < num; i++) {
             assertTrue(nsTable.getTuplePointer(i % NUM_PART, loop, pointer));
@@ -75,8 +86,9 @@
                 loop++;
             }
         }
+        int tupleCntPerPart = (int) Math.ceil((double) num / NUM_PART);
         for (int i = 0; i < NUM_PART; i++) {
-            assertTrue(nsTable.getTupleCount(i) == 10 || nsTable.getTupleCount(i) == 9);
+            assertTrue(nsTable.getTupleCount(i) == tupleCntPerPart || nsTable.getTupleCount(i) == tupleCntPerPart - 1);
         }
 
     }
@@ -86,7 +98,7 @@
         assertAllPartitionsCountIsZero();
     }
 
-    private void assertAllPartitionsCountIsZero() {
+    protected void assertAllPartitionsCountIsZero() {
         for (int i = 0; i < NUM_PART; i++) {
             assertEquals(0, nsTable.getTupleCount(i));
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTableTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTableTest.java
new file mode 100644
index 0000000..027ee27
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/structures/SimpleSerializableHashTableTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.std.structures;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SimpleSerializableHashTableTest {
+
+    SimpleSerializableHashTable nsTable;
+    final int NUM_PART = 101;
+    TuplePointer pointer = new TuplePointer(0, 0);
+    final int num = 10000;
+    private IHyracksFrameMgrContext ctx;
+
+    @Before
+    public void setup() throws HyracksDataException {
+        ctx = new FrameManager(256);
+        nsTable = new SimpleSerializableHashTable(NUM_PART, ctx);
+    }
+
+    @Test
+    public void testBatchDeletePartition() throws Exception {
+        testInsert();
+        for (int i = 0; i < NUM_PART; i++) {
+            nsTable.delete(i);
+            assertFalse(nsTable.getTuplePointer(i, 0, pointer));
+            assertEquals(0, nsTable.getTupleCount(i));
+
+            for (int j = i; j < num; j += NUM_PART) {
+                pointer.reset(j, j);
+                nsTable.insert(i, pointer);
+            }
+
+            assertGetValue();
+        }
+    }
+
+    @Test
+    public void testInsert() throws Exception {
+        for (int i = 0; i < num; i++) {
+            pointer.reset(i, i);
+            nsTable.insert(i % NUM_PART, pointer);
+        }
+        assertGetValue();
+    }
+
+    private void assertGetValue() {
+        int loop = 0;
+        for (int i = 0; i < num; i++) {
+            assertTrue(nsTable.getTuplePointer(i % NUM_PART, loop, pointer));
+            assertTrue(pointer.getFrameIndex() == i);
+            if (i % NUM_PART == NUM_PART - 1) {
+                loop++;
+            }
+        }
+        int tupleCntPerPart = (int) Math.ceil((double) num / NUM_PART);
+        for (int i = 0; i < NUM_PART; i++) {
+            assertTrue(nsTable.getTupleCount(i) == tupleCntPerPart || nsTable.getTupleCount(i) == tupleCntPerPart - 1);
+        }
+
+    }
+
+    @Test
+    public void testGetCount() throws Exception {
+        assertAllPartitionsCountIsZero();
+    }
+
+    private void assertAllPartitionsCountIsZero() {
+        for (int i = 0; i < NUM_PART; i++) {
+            assertEquals(0, nsTable.getTupleCount(i));
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index f20461c..f169054 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -160,7 +160,7 @@
                 IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
-        int frameLimits = 4;
+        int frameLimits = 5;
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
@@ -249,7 +249,7 @@
                 IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 0 };
-        int frameLimits = 4;
+        int frameLimits = 5;
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
@@ -338,7 +338,7 @@
                         IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() });
 
         int[] keyFields = new int[] { 0 };
-        int frameLimits = 4;
+        int frameLimits = 5;
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
@@ -429,7 +429,7 @@
                 IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
-        int frameLimits = 4;
+        int frameLimits = 5;
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
@@ -524,7 +524,7 @@
                         IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
 
         int[] keyFields = new int[] { 8, 0 };
-        int frameLimits = 4;
+        int frameLimits = 5;
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
@@ -619,7 +619,7 @@
                 IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() });
 
         int[] keyFields = new int[] { 8, 0 };
-        int frameLimits = 4;
+        int frameLimits = 5;
         int tableSize = 8;
         long fileSize = frameLimits * spec.getFrameSize();
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index f8c7487..7075fe9 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -125,7 +125,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128, null);
+                custOrderJoinDesc, 128, null, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -293,7 +293,7 @@
                 new int[] { 1 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                null, custOrderJoinDesc, true, nonMatchWriterFactories, 128);
+                null, custOrderJoinDesc, true, nonMatchWriterFactories, 128, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -464,7 +464,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128, null);
+                custOrderJoinDesc, 128, null, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -552,7 +552,7 @@
                 custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
-        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
+        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 100, 1.2,
                 new int[] { 1 }, new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
@@ -647,7 +647,7 @@
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                         .of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128, null);
+                custOrderJoinDesc, 128, null, 128);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -743,7 +743,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128, null);
+                custOrderJoinDesc, 128, null, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
index b8ec790..f83ab6a 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractExternalGroupbyTest.java
@@ -28,9 +28,6 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import org.junit.Assert;
-import org.junit.Test;
-
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -56,6 +53,8 @@
 import org.apache.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
 import org.apache.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
 import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
 
 public abstract class AbstractExternalGroupbyTest {
 
@@ -176,8 +175,8 @@
 
     @Test
     public void testBuildAndMergeNormalFrameInMem() throws HyracksDataException {
-        int tableSize = 1001;
-        int numFrames = 3;
+        int tableSize = 101;
+        int numFrames = 23;
         int frameSize = 256;
         int minDataSize = frameSize;
         int minRecordSize = 20;
@@ -187,10 +186,10 @@
 
     @Test
     public void testBuildAndMergeNormalFrameSpill() throws HyracksDataException {
-        int tableSize = 1001;
-        int numFrames = 3;
+        int tableSize = 101;
+        int numFrames = 23;
         int frameSize = 256;
-        int minDataSize = frameSize * 4;
+        int minDataSize = frameSize * 40;
         int minRecordSize = 20;
         int maxRecordSize = 50;
         testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, null);
@@ -198,16 +197,14 @@
 
     @Test
     public void testBuildAndMergeBigObj() throws HyracksDataException {
-        int tableSize = 1001;
-        int numFrames = 4;
+        int tableSize = 101;
+        int numFrames = 23;
         int frameSize = 256;
-        int minDataSize = frameSize * 5;
+        int minDataSize = frameSize * 40;
         int minRecordSize = 20;
         int maxRecordSize = 50;
-        HashMap<Integer, String> bigRecords = AbstractRunGeneratorTest.generateBigObject(frameSize, 2);
-        testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize,
-                bigRecords);
-
+        HashMap<Integer, String> bigRecords = AbstractRunGeneratorTest.generateBigObject(frameSize, 3);
+        testBuildAndMerge(tableSize, numFrames, frameSize, minDataSize, minRecordSize, maxRecordSize, bigRecords);
     }
 
     protected abstract void initial(IHyracksTaskContext ctx, int tableSize, int numFrames) throws HyracksDataException;
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index 65073e0..c3d0df1 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -148,7 +148,7 @@
     private static JobSpecification createJob(FileSplit[] customerSplits, FileSplit[] orderSplits,
             FileSplit[] resultSplits, int numJoinPartitions, String algo, int graceInputSize, int graceRecordsPerFrame,
             double graceFactor, int memSize, int tableSize, boolean hasGroupBy, int frameSize)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         JobSpecification spec = new JobSpecification(frameSize);
 
         IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(customerSplits);
@@ -183,7 +183,7 @@
                     new IBinaryHashFunctionFactory[] {
                             PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    Common.custOrderJoinDesc, tableSize, null);
+                    Common.custOrderJoinDesc, tableSize, null, memSize * frameSize);
 
         } else if ("hybrid".equalsIgnoreCase(algo)) {
             join = new OptimizedHybridHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceFactor,