merged r1422:1429 from hyracks_asterix_stabilization

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1431 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/ITaskState.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/IStateObject.java
similarity index 89%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/ITaskState.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/IStateObject.java
index c24e488..f216773 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/ITaskState.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/IStateObject.java
@@ -18,13 +18,12 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.job.JobId;
 
-public interface ITaskState {
+public interface IStateObject {
     public JobId getJobId();
 
-    public TaskId getTaskId();
+    public Object getId();
 
     public long getMemoryOccupancy();
 
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
index ecc44c1..eaa1ec2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
@@ -14,11 +14,10 @@
  */
 package edu.uci.ics.hyracks.api.job;
 
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
 
 public interface IOperatorEnvironment {
-    public void setTaskState(ITaskState taskState);
+    public void setStateObject(IStateObject taskState);
 
-    public ITaskState getTaskState(TaskId taskId);
+    public IStateObject getStateObject(Object id);
 }
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index f2c6993..a72f64c 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -465,7 +465,8 @@
         final JobId jobId = jobRun.getJobId();
         final JobActivityGraph jag = jobRun.getJobActivityGraph();
         final String appName = jag.getApplicationName();
-        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
+        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
+                jobRun.getConnectorPolicyMap());
         for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
             String nodeId = entry.getKey();
             final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 8f7bc4a..c62b446 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -26,8 +26,7 @@
 import edu.uci.ics.hyracks.api.comm.PartitionChannel;
 import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -65,7 +64,7 @@
 
     private final IOperatorEnvironment env;
 
-    private final Map<TaskId, ITaskState> taskStateMap;
+    private final Map<Object, IStateObject> stateObjectMap;
 
     private final Map<TaskAttemptId, Task> taskMap;
 
@@ -88,7 +87,7 @@
         this.jag = jag;
         partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
         env = new OperatorEnvironmentImpl(nodeController.getId());
-        taskStateMap = new HashMap<TaskId, ITaskState>();
+        stateObjectMap = new HashMap<Object, IStateObject>();
         taskMap = new HashMap<TaskAttemptId, Task>();
         counterMap = new HashMap<String, Counter>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
@@ -136,13 +135,13 @@
         }
 
         @Override
-        public void setTaskState(ITaskState taskState) {
-            taskStateMap.put(taskState.getTaskId(), taskState);
+        public void setStateObject(IStateObject taskState) {
+            stateObjectMap.put(taskState.getId(), taskState);
         }
 
         @Override
-        public ITaskState getTaskState(TaskId taskId) {
-            return taskStateMap.get(taskId);
+        public IStateObject getStateObject(Object id) {
+            return stateObjectMap.get(id);
         }
     }
 
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 1bb0b2f..7addda8 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -33,8 +33,7 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -333,12 +332,12 @@
     }
 
     @Override
-    public void setTaskState(ITaskState taskState) {
-        opEnv.setTaskState(taskState);
+    public void setStateObject(IStateObject taskState) {
+        opEnv.setStateObject(taskState);
     }
 
     @Override
-    public ITaskState getTaskState(TaskId taskId) {
-        return opEnv.getTaskState(taskId);
+    public IStateObject getStateObject(Object id) {
+        return opEnv.getStateObject(id);
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractStateObject.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractStateObject.java
new file mode 100644
index 0000000..753fb98
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractStateObject.java
@@ -0,0 +1,47 @@
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public abstract class AbstractStateObject implements IStateObject {
+    protected JobId jobId;
+
+    protected Object id;
+
+    protected long memoryOccupancy;
+
+    protected AbstractStateObject() {
+    }
+
+    protected AbstractStateObject(JobId jobId, Object id) {
+        this.jobId = jobId;
+        this.id = id;
+    }
+
+    @Override
+    public final JobId getJobId() {
+        return jobId;
+    }
+
+    public final void setJobId(JobId jobId) {
+        this.jobId = jobId;
+    }
+
+    @Override
+    public final Object getId() {
+        return id;
+    }
+
+    public final void setId(Object id) {
+        this.id = id;
+    }
+
+    @Override
+    public final long getMemoryOccupancy() {
+        return memoryOccupancy;
+    }
+
+    public void setMemoryOccupancy(long memoryOccupancy) {
+        this.memoryOccupancy = memoryOccupancy;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractTaskState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractTaskState.java
deleted file mode 100644
index ba4dfd1..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractTaskState.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package edu.uci.ics.hyracks.dataflow.std.base;
-
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
-import edu.uci.ics.hyracks.api.job.JobId;
-
-public abstract class AbstractTaskState implements ITaskState {
-    protected JobId jobId;
-
-    protected TaskId taId;
-
-    protected long memoryOccupancy;
-
-    protected AbstractTaskState() {
-    }
-
-    protected AbstractTaskState(JobId jobId, TaskId taId) {
-        this.jobId = jobId;
-        this.taId = taId;
-    }
-
-    @Override
-    public final JobId getJobId() {
-        return jobId;
-    }
-
-    public final void setJobId(JobId jobId) {
-        this.jobId = jobId;
-    }
-
-    @Override
-    public final TaskId getTaskId() {
-        return taId;
-    }
-
-    public final void setTaskId(TaskId tId) {
-        this.taId = tId;
-    }
-
-    @Override
-    public final long getMemoryOccupancy() {
-        return memoryOccupancy;
-    }
-
-    public void setMemoryOccupancy(long memoryOccupancy) {
-        this.memoryOccupancy = memoryOccupancy;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupBuildOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupBuildOperatorNodePushable.java
new file mode 100644
index 0000000..f36f681
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupBuildOperatorNodePushable.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+class ExternalGroupBuildOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+    private final IHyracksTaskContext ctx;
+    private final Object stateId;
+    private final int[] keyFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final INormalizedKeyComputerFactory firstNormalizerFactory;
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+    private final int framesLimit;
+    private final ISpillableTableFactory spillableTableFactory;
+    private final RecordDescriptor inRecordDescriptor;
+    private final RecordDescriptor outRecordDescriptor;
+    private final FrameTupleAccessor accessor;
+
+    private ExternalGroupState state;
+
+    ExternalGroupBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int[] keyFields, int framesLimit,
+            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory firstNormalizerFactory,
+            IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor, ISpillableTableFactory spillableTableFactory) {
+        this.ctx = ctx;
+        this.stateId = stateId;
+        this.framesLimit = framesLimit;
+        this.aggregatorFactory = aggregatorFactory;
+        this.keyFields = keyFields;
+        this.comparatorFactories = comparatorFactories;
+        this.firstNormalizerFactory = firstNormalizerFactory;
+        this.spillableTableFactory = spillableTableFactory;
+        this.inRecordDescriptor = inRecordDescriptor;
+        this.outRecordDescriptor = outRecordDescriptor;
+        this.accessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        state = new ExternalGroupState(ctx.getJobletContext().getJobId(), stateId);
+        state.setRuns(new LinkedList<RunFileReader>());
+        ISpillableTable table = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories,
+                firstNormalizerFactory, aggregatorFactory, inRecordDescriptor, outRecordDescriptor, framesLimit);
+        table.reset();
+        state.setSpillableTable(table);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        ISpillableTable gTable = state.getSpillableTable();
+        for (int i = 0; i < tupleCount; i++) {
+            /**
+             * If the group table is too large, flush the table into
+             * a run file.
+             */
+            if (!gTable.insert(accessor, i)) {
+                flushFramesToRun();
+                if (!gTable.insert(accessor, i))
+                    throw new HyracksDataException("Failed to insert a new buffer into the aggregate operator!");
+            }
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        //do nothing for failures
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        ISpillableTable gTable = state.getSpillableTable();
+        if (gTable.getFrameCount() >= 0) {
+            if (state.getRuns().size() > 0) {
+                /**
+                 * flush the memory into the run file.
+                 */
+                flushFramesToRun();
+                gTable.close();
+                gTable = null;
+            }
+        }
+        ctx.setStateObject(state);
+    }
+
+    private void flushFramesToRun() throws HyracksDataException {
+        FileReference runFile;
+        try {
+            runFile = ctx.getJobletContext().createManagedWorkspaceFile(
+                    ExternalGroupOperatorDescriptor.class.getSimpleName());
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
+        writer.open();
+        ISpillableTable gTable = state.getSpillableTable();
+        try {
+            gTable.sortFrames();
+            gTable.flushFrames(writer, true);
+        } catch (Exception ex) {
+            throw new HyracksDataException(ex);
+        } finally {
+            writer.close();
+        }
+        gTable.reset();
+        state.getRuns().add(((RunFileWriter) writer).createReader());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupMergeOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupMergeOperatorNodePushable.java
new file mode 100644
index 0000000..d08ad0e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupMergeOperatorNodePushable.java
@@ -0,0 +1,458 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+class ExternalGroupMergeOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+    private final IHyracksTaskContext ctx;
+    private final Object stateId;
+    private final int[] keyFields;
+    private final IBinaryComparator[] comparators;
+    private final AggregateState aggregateState;
+    private final ArrayTupleBuilder tupleBuilder;
+    private final int[] storedKeys;
+    private final IAggregatorDescriptor aggregator;
+    private final boolean isOutputSorted;
+    private final int framesLimit;
+    private final RecordDescriptor outRecordDescriptor;
+    /**
+     * Input frames, one for each run file.
+     */
+    private List<ByteBuffer> inFrames;
+    /**
+     * Output frame.
+     */
+    private ByteBuffer outFrame, writerFrame;
+    private final FrameTupleAppender outAppender;
+    private FrameTupleAppender writerAppender;
+    private LinkedList<RunFileReader> runs;
+    private ExternalGroupState aggState;
+    private ArrayTupleBuilder finalTupleBuilder;
+    /**
+     * how many frames to be read ahead once
+     */
+    private int runFrameLimit = 1;
+    private int[] currentFrameIndexInRun;
+    private int[] currentRunFrames;
+    private final FrameTupleAccessor outFrameAccessor;
+
+    ExternalGroupMergeOperatorNodePushable(IHyracksTaskContext ctx, Object stateId,
+            IBinaryComparatorFactory[] comparatorFactories, int[] keyFields,
+            IAggregatorDescriptorFactory mergerFactory, boolean isOutputSorted, int framesLimit,
+            RecordDescriptor outRecordDescriptor) throws HyracksDataException {
+        this.stateId = stateId;
+        this.keyFields = keyFields;
+        comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        int[] keyFieldsInPartialResults = new int[keyFields.length];
+        for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
+            keyFieldsInPartialResults[i] = i;
+        }
+
+        aggregator = mergerFactory.createAggregator(ctx, outRecordDescriptor, outRecordDescriptor, keyFields,
+                keyFieldsInPartialResults);
+        aggregateState = aggregator.createAggregateStates();
+
+        storedKeys = new int[keyFields.length];
+        /**
+         * Get the list of the fields in the stored records.
+         */
+        for (int i = 0; i < keyFields.length; ++i) {
+            storedKeys[i] = i;
+        }
+
+        tupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+        this.ctx = ctx;
+        outAppender = new FrameTupleAppender(ctx.getFrameSize());
+        outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
+        this.isOutputSorted = isOutputSorted;
+        this.framesLimit = framesLimit;
+        this.outRecordDescriptor = outRecordDescriptor;
+    }
+
+    public void initialize() throws HyracksDataException {
+        aggState = (ExternalGroupState) ctx.getStateObject(stateId);
+        runs = aggState.getRuns();
+        writer.open();
+        try {
+            if (runs.size() <= 0) {
+                ISpillableTable gTable = aggState.getSpillableTable();
+                if (gTable != null) {
+                    if (isOutputSorted)
+                        gTable.sortFrames();
+                    gTable.flushFrames(writer, false);
+                }
+                gTable = null;
+                aggState = null;
+            } else {
+                aggState = null;
+                runs = new LinkedList<RunFileReader>(runs);
+                inFrames = new ArrayList<ByteBuffer>();
+                outFrame = ctx.allocateFrame();
+                outAppender.reset(outFrame, true);
+                outFrameAccessor.reset(outFrame);
+                while (runs.size() > 0) {
+                    try {
+                        doPass(runs);
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+                inFrames.clear();
+            }
+        } catch (Exception e) {
+            writer.fail();
+            throw new HyracksDataException(e);
+        } finally {
+            aggregateState.close();
+            writer.close();
+        }
+    }
+
+    private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
+        FileReference newRun = null;
+        IFrameWriter writer = this.writer;
+        boolean finalPass = false;
+
+        while (inFrames.size() + 2 < framesLimit) {
+            inFrames.add(ctx.allocateFrame());
+        }
+        int runNumber;
+        if (runs.size() + 2 <= framesLimit) {
+            finalPass = true;
+            runFrameLimit = (framesLimit - 2) / runs.size();
+            runNumber = runs.size();
+        } else {
+            runNumber = framesLimit - 2;
+            newRun = ctx.getJobletContext().createManagedWorkspaceFile(
+                    ExternalGroupOperatorDescriptor.class.getSimpleName());
+            writer = new RunFileWriter(newRun, ctx.getIOManager());
+            writer.open();
+        }
+        try {
+            currentFrameIndexInRun = new int[runNumber];
+            currentRunFrames = new int[runNumber];
+            /**
+             * Create file readers for each input run file, only for
+             * the ones fit into the inFrames
+             */
+            RunFileReader[] runFileReaders = new RunFileReader[runNumber];
+            FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+            Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+            ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(), outRecordDescriptor,
+                    runNumber, comparator);
+            /**
+             * current tuple index in each run
+             */
+            int[] tupleIndices = new int[runNumber];
+
+            for (int i = 0; i < runNumber; i++) {
+                int runIndex = topTuples.peek().getRunid();
+                tupleIndices[runIndex] = 0;
+                // Load the run file
+                runFileReaders[runIndex] = runs.get(runIndex);
+                runFileReaders[runIndex].open();
+
+                currentRunFrames[runIndex] = 0;
+                currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
+                for (int j = 0; j < runFrameLimit; j++) {
+                    int frameIndex = currentFrameIndexInRun[runIndex] + j;
+                    if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
+                        tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
+                        tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+                        currentRunFrames[runIndex]++;
+                        if (j == 0)
+                            setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+                    } else {
+                        break;
+                    }
+                }
+            }
+
+            /**
+             * Start merging
+             */
+            while (!topTuples.areRunsExhausted()) {
+                /**
+                 * Get the top record
+                 */
+                ReferenceEntry top = topTuples.peek();
+                int tupleIndex = top.getTupleIndex();
+                int runIndex = topTuples.peek().getRunid();
+                FrameTupleAccessor fta = top.getAccessor();
+
+                int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
+                if (currentTupleInOutFrame < 0
+                        || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
+                    /**
+                     * Initialize the first output record Reset the
+                     * tuple builder
+                     */
+
+                    tupleBuilder.reset();
+
+                    for (int k = 0; k < storedKeys.length; k++) {
+                        tupleBuilder.addField(fta, tupleIndex, storedKeys[k]);
+                    }
+
+                    aggregator.init(tupleBuilder, fta, tupleIndex, aggregateState);
+
+                    if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+                            tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+                        flushOutFrame(writer, finalPass);
+                        if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
+                                tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+                            throw new HyracksDataException(
+                                    "The partial result is too large to be initialized in a frame.");
+                        }
+                    }
+
+                } else {
+                    /**
+                     * if new tuple is in the same group of the
+                     * current aggregator do merge and output to the
+                     * outFrame
+                     */
+
+                    aggregator.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame, aggregateState);
+
+                }
+                tupleIndices[runIndex]++;
+                setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
+            }
+
+            if (outAppender.getTupleCount() > 0) {
+                flushOutFrame(writer, finalPass);
+                outAppender.reset(outFrame, true);
+            }
+
+            aggregator.close();
+
+            runs.subList(0, runNumber).clear();
+            /**
+             * insert the new run file into the beginning of the run
+             * file list
+             */
+            if (!finalPass) {
+                runs.add(0, ((RunFileWriter) writer).createReader());
+            }
+        } finally {
+            if (!finalPass) {
+                writer.close();
+            }
+        }
+    }
+
+    private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
+
+        if (finalTupleBuilder == null) {
+            finalTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+        }
+
+        if (writerFrame == null) {
+            writerFrame = ctx.allocateFrame();
+        }
+
+        if (writerAppender == null) {
+            writerAppender = new FrameTupleAppender(ctx.getFrameSize());
+            writerAppender.reset(writerFrame, true);
+        }
+
+        outFrameAccessor.reset(outFrame);
+
+        for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
+
+            finalTupleBuilder.reset();
+
+            for (int k = 0; k < storedKeys.length; k++) {
+                finalTupleBuilder.addField(outFrameAccessor, i, storedKeys[k]);
+            }
+
+            if (isFinal) {
+
+                aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
+
+            } else {
+
+                aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
+            }
+
+            if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+                    finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+                FrameUtils.flushFrame(writerFrame, writer);
+                writerAppender.reset(writerFrame, true);
+                if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
+                        finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
+                    throw new HyracksDataException("Aggregation output is too large to be fit into a frame.");
+                }
+            }
+        }
+        if (writerAppender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(writerFrame, writer);
+            writerAppender.reset(writerFrame, true);
+        }
+
+        outAppender.reset(outFrame, true);
+    }
+
+    private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
+            FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws HyracksDataException {
+        int runStart = runIndex * runFrameLimit;
+        boolean existNext = false;
+        if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
+            /**
+             * run already closed
+             */
+            existNext = false;
+        } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
+            /**
+             * not the last frame for this run
+             */
+            existNext = true;
+            if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+                tupleIndices[runIndex] = 0;
+                currentFrameIndexInRun[runIndex]++;
+            }
+        } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
+            /**
+             * the last frame has expired
+             */
+            existNext = true;
+        } else {
+            /**
+             * If all tuples in the targeting frame have been
+             * checked.
+             */
+            tupleIndices[runIndex] = 0;
+            currentFrameIndexInRun[runIndex] = runStart;
+            /**
+             * read in batch
+             */
+            currentRunFrames[runIndex] = 0;
+            for (int j = 0; j < runFrameLimit; j++) {
+                int frameIndex = currentFrameIndexInRun[runIndex] + j;
+                if (runCursors[runIndex].nextFrame(inFrames.get(frameIndex))) {
+                    tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
+                    existNext = true;
+                    currentRunFrames[runIndex]++;
+                } else {
+                    break;
+                }
+            }
+        }
+
+        if (existNext) {
+            topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]], tupleIndices[runIndex]);
+        } else {
+            topTuples.pop();
+            closeRun(runIndex, runCursors, tupleAccessors);
+        }
+    }
+
+    /**
+     * Close the run file, and also the corresponding readers and
+     * input frame.
+     * 
+     * @param index
+     * @param runCursors
+     * @param tupleAccessor
+     * @throws HyracksDataException
+     */
+    private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+            throws HyracksDataException {
+        if (runCursors[index] != null) {
+            runCursors[index].close();
+            runCursors[index] = null;
+            int frameOffset = index * runFrameLimit;
+            for (int j = 0; j < runFrameLimit; j++) {
+                tupleAccessor[frameOffset + j] = null;
+            }
+        }
+    }
+
+    private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
+        byte[] b1 = fta1.getBuffer().array();
+        byte[] b2 = fta2.getBuffer().array();
+        for (int f = 0; f < keyFields.length; ++f) {
+            int fIdx = f;
+            int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength() + fta1.getFieldStartOffset(j1, fIdx);
+            int l1 = fta1.getFieldLength(j1, fIdx);
+            int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength() + fta2.getFieldStartOffset(j2, fIdx);
+            int l2_start = fta2.getFieldStartOffset(j2, fIdx);
+            int l2_end = fta2.getFieldEndOffset(j2, fIdx);
+            int l2 = l2_end - l2_start;
+            int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+    private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+        return new Comparator<ReferenceEntry>() {
+
+            @Override
+            public int compare(ReferenceEntry o1, ReferenceEntry o2) {
+                FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
+                FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
+                int j1 = o1.getTupleIndex();
+                int j2 = o2.getTupleIndex();
+                byte[] b1 = fta1.getBuffer().array();
+                byte[] b2 = fta2.getBuffer().array();
+                for (int f = 0; f < keyFields.length; ++f) {
+                    int fIdx = f;
+                    int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+                            + fta1.getFieldStartOffset(j1, fIdx);
+                    int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+                    int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+                            + fta2.getFieldStartOffset(j2, fIdx);
+                    int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+                    int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                    if (c != 0) {
+                        return c;
+                    }
+                }
+                return 0;
+            }
+
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 6f7b706..1045f14 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -14,44 +14,19 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.group;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
 
 /**
  *
@@ -123,29 +98,6 @@
         builder.addBlockingEdge(aggregateAct, mergeAct);
     }
 
-    public static class AggregateActivityState extends AbstractTaskState {
-        private LinkedList<RunFileReader> runs;
-
-        private ISpillableTable gTable;
-
-        public AggregateActivityState() {
-        }
-
-        private AggregateActivityState(JobId jobId, TaskId tId) {
-            super(jobId, tId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-    }
-
     private class AggregateActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
@@ -157,84 +109,10 @@
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
-            final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
-                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private AggregateActivityState state;
-
-                @Override
-                public void open() throws HyracksDataException {
-                    state = new AggregateActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
-                    state.runs = new LinkedList<RunFileReader>();
-                    state.gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories,
-                            firstNormalizerFactory, aggregatorFactory,
-                            recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
-                            ExternalGroupOperatorDescriptor.this.framesLimit);
-                    state.gTable.reset();
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    accessor.reset(buffer);
-                    int tupleCount = accessor.getTupleCount();
-                    for (int i = 0; i < tupleCount; i++) {
-                        /**
-                         * If the group table is too large, flush the table into
-                         * a run file.
-                         */
-                        if (!state.gTable.insert(accessor, i)) {
-                            flushFramesToRun();
-                            if (!state.gTable.insert(accessor, i))
-                                throw new HyracksDataException(
-                                        "Failed to insert a new buffer into the aggregate operator!");
-                        }
-                    }
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    //do nothing for failures
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    if (state.gTable.getFrameCount() >= 0) {
-                        if (state.runs.size() > 0) {
-                            /**
-                             * flush the memory into the run file.
-                             */
-                            flushFramesToRun();
-                            state.gTable.close();
-                            state.gTable = null;
-                        }
-                    }
-                    ctx.setTaskState(state);
-                }
-
-                private void flushFramesToRun() throws HyracksDataException {
-                    FileReference runFile;
-                    try {
-                        runFile = ctx.getJobletContext().createManagedWorkspaceFile(
-                                ExternalGroupOperatorDescriptor.class.getSimpleName());
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e);
-                    }
-                    RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
-                    writer.open();
-                    try {
-                        state.gTable.sortFrames();
-                        state.gTable.flushFrames(writer, true);
-                    } catch (Exception ex) {
-                        throw new HyracksDataException(ex);
-                    } finally {
-                        writer.close();
-                    }
-                    state.gTable.reset();
-                    state.runs.add(((RunFileWriter) writer).createReader());
-                }
-            };
-            return op;
+            return new ExternalGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), keyFields,
+                    framesLimit, comparatorFactories, firstNormalizerFactory, aggregatorFactory,
+                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+                    spillableTableFactory);
         }
     }
 
@@ -249,423 +127,9 @@
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
-            }
-
-            int[] keyFieldsInPartialResults = new int[keyFields.length];
-            for (int i = 0; i < keyFieldsInPartialResults.length; i++) {
-                keyFieldsInPartialResults[i] = i;
-            }
-
-            final IAggregatorDescriptor aggregator = mergerFactory.createAggregator(ctx, recordDescriptors[0],
-                    recordDescriptors[0], keyFields, keyFieldsInPartialResults);
-            final AggregateState aggregateState = aggregator.createAggregateStates();
-
-            final int[] storedKeys = new int[keyFields.length];
-            /**
-             * Get the list of the fields in the stored records.
-             */
-            for (int i = 0; i < keyFields.length; ++i) {
-                storedKeys[i] = i;
-            }
-
-            final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
-
-            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
-                /**
-                 * Input frames, one for each run file.
-                 */
-                private List<ByteBuffer> inFrames;
-
-                /**
-                 * Output frame.
-                 */
-                private ByteBuffer outFrame, writerFrame;
-                private final FrameTupleAppender outAppender = new FrameTupleAppender(ctx.getFrameSize());
-                private FrameTupleAppender writerAppender;
-
-                private LinkedList<RunFileReader> runs;
-
-                private AggregateActivityState aggState;
-
-                private ArrayTupleBuilder finalTupleBuilder;
-
-                /**
-                 * how many frames to be read ahead once
-                 */
-                private int runFrameLimit = 1;
-
-                private int[] currentFrameIndexInRun;
-                private int[] currentRunFrames;
-
-                private final FrameTupleAccessor outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
-                        recordDescriptors[0]);
-
-                public void initialize() throws HyracksDataException {
-                    aggState = (AggregateActivityState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
-                            AGGREGATE_ACTIVITY_ID), partition));
-                    runs = aggState.runs;
-                    writer.open();
-                    try {
-                        if (runs.size() <= 0) {
-                            ISpillableTable gTable = aggState.gTable;
-                            if (gTable != null) {
-                                if (isOutputSorted)
-                                    gTable.sortFrames();
-                                gTable.flushFrames(writer, false);
-                            }
-                            gTable = null;
-                            aggState = null;
-                        } else {
-                            aggState = null;
-                            runs = new LinkedList<RunFileReader>(runs);
-                            inFrames = new ArrayList<ByteBuffer>();
-                            outFrame = ctx.allocateFrame();
-                            outAppender.reset(outFrame, true);
-                            outFrameAccessor.reset(outFrame);
-                            while (runs.size() > 0) {
-                                try {
-                                    doPass(runs);
-                                } catch (Exception e) {
-                                    throw new HyracksDataException(e);
-                                }
-                            }
-                            inFrames.clear();
-                        }
-                    } catch (Exception e) {
-                        writer.fail();
-                        throw new HyracksDataException(e);
-                    } finally {
-                        aggregateState.close();
-                        writer.close();
-                    }
-                }
-
-                private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
-                    FileReference newRun = null;
-                    IFrameWriter writer = this.writer;
-                    boolean finalPass = false;
-
-                    while (inFrames.size() + 2 < framesLimit) {
-                        inFrames.add(ctx.allocateFrame());
-                    }
-                    int runNumber;
-                    if (runs.size() + 2 <= framesLimit) {
-                        finalPass = true;
-                        runFrameLimit = (framesLimit - 2) / runs.size();
-                        runNumber = runs.size();
-                    } else {
-                        runNumber = framesLimit - 2;
-                        newRun = ctx.getJobletContext().createManagedWorkspaceFile(
-                                ExternalGroupOperatorDescriptor.class.getSimpleName());
-                        writer = new RunFileWriter(newRun, ctx.getIOManager());
-                        writer.open();
-                    }
-                    try {
-                        currentFrameIndexInRun = new int[runNumber];
-                        currentRunFrames = new int[runNumber];
-                        /**
-                         * Create file readers for each input run file, only for
-                         * the ones fit into the inFrames
-                         */
-                        RunFileReader[] runFileReaders = new RunFileReader[runNumber];
-                        FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
-                        Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-                        ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx.getFrameSize(),
-                                recordDescriptors[0], runNumber, comparator);
-                        /**
-                         * current tuple index in each run
-                         */
-                        int[] tupleIndices = new int[runNumber];
-
-                        for (int i = 0; i < runNumber; i++) {
-                            int runIndex = topTuples.peek().getRunid();
-                            tupleIndices[runIndex] = 0;
-                            // Load the run file
-                            runFileReaders[runIndex] = runs.get(runIndex);
-                            runFileReaders[runIndex].open();
-
-                            currentRunFrames[runIndex] = 0;
-                            currentFrameIndexInRun[runIndex] = runIndex * runFrameLimit;
-                            for (int j = 0; j < runFrameLimit; j++) {
-                                int frameIndex = currentFrameIndexInRun[runIndex] + j;
-                                if (runFileReaders[runIndex].nextFrame(inFrames.get(frameIndex))) {
-                                    tupleAccessors[frameIndex] = new FrameTupleAccessor(ctx.getFrameSize(),
-                                            recordDescriptors[0]);
-                                    tupleAccessors[frameIndex].reset(inFrames.get(frameIndex));
-                                    currentRunFrames[runIndex]++;
-                                    if (j == 0)
-                                        setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors,
-                                                topTuples);
-                                } else {
-                                    break;
-                                }
-                            }
-                        }
-
-                        /**
-                         * Start merging
-                         */
-                        while (!topTuples.areRunsExhausted()) {
-                            /**
-                             * Get the top record
-                             */
-                            ReferenceEntry top = topTuples.peek();
-                            int tupleIndex = top.getTupleIndex();
-                            int runIndex = topTuples.peek().getRunid();
-                            FrameTupleAccessor fta = top.getAccessor();
-
-                            int currentTupleInOutFrame = outFrameAccessor.getTupleCount() - 1;
-                            if (currentTupleInOutFrame < 0
-                                    || compareFrameTuples(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame) != 0) {
-                                /**
-                                 * Initialize the first output record Reset the
-                                 * tuple builder
-                                 */
-
-                                tupleBuilder.reset();
-                                
-                                for(int k = 0; k < storedKeys.length; k++){
-                                	tupleBuilder.addField(fta, tupleIndex, storedKeys[k]);
-                                }
-
-                                aggregator.init(tupleBuilder, fta, tupleIndex, aggregateState);
-
-                                if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
-                                        tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
-                                    flushOutFrame(writer, finalPass);
-                                    if (!outAppender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(),
-                                            tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
-                                        throw new HyracksDataException(
-                                                "The partial result is too large to be initialized in a frame.");
-                                    }
-                                }
-
-                            } else {
-                                /**
-                                 * if new tuple is in the same group of the
-                                 * current aggregator do merge and output to the
-                                 * outFrame
-                                 */
-
-                                aggregator.aggregate(fta, tupleIndex, outFrameAccessor, currentTupleInOutFrame,
-                                        aggregateState);
-
-                            }
-                            tupleIndices[runIndex]++;
-                            setNextTopTuple(runIndex, tupleIndices, runFileReaders, tupleAccessors, topTuples);
-                        }
-
-                        if (outAppender.getTupleCount() > 0) {
-                            flushOutFrame(writer, finalPass);
-                            outAppender.reset(outFrame, true);
-                        }
-
-                        aggregator.close();
-
-                        runs.subList(0, runNumber).clear();
-                        /**
-                         * insert the new run file into the beginning of the run
-                         * file list
-                         */
-                        if (!finalPass) {
-                            runs.add(0, ((RunFileWriter) writer).createReader());
-                        }
-                    } finally {
-                        if (!finalPass) {
-                            writer.close();
-                        }
-                    }
-                }
-
-                private void flushOutFrame(IFrameWriter writer, boolean isFinal) throws HyracksDataException {
-
-                    if (finalTupleBuilder == null) {
-                        finalTupleBuilder = new ArrayTupleBuilder(recordDescriptors[0].getFields().length);
-                    }
-
-                    if (writerFrame == null) {
-                        writerFrame = ctx.allocateFrame();
-                    }
-
-                    if (writerAppender == null) {
-                        writerAppender = new FrameTupleAppender(ctx.getFrameSize());
-                        writerAppender.reset(writerFrame, true);
-                    }
-
-                    outFrameAccessor.reset(outFrame);
-
-                    for (int i = 0; i < outFrameAccessor.getTupleCount(); i++) {
-
-                        finalTupleBuilder.reset();
-
-                        for (int k = 0; k < storedKeys.length; k++) {
-                            finalTupleBuilder.addField(outFrameAccessor, i, storedKeys[k]);
-                        }
-
-                        if (isFinal) {
-
-                            aggregator.outputFinalResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
-
-                        } else {
-
-                            aggregator.outputPartialResult(finalTupleBuilder, outFrameAccessor, i, aggregateState);
-                        }
-
-                        if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
-                                finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
-                            FrameUtils.flushFrame(writerFrame, writer);
-                            writerAppender.reset(writerFrame, true);
-                            if (!writerAppender.appendSkipEmptyField(finalTupleBuilder.getFieldEndOffsets(),
-                                    finalTupleBuilder.getByteArray(), 0, finalTupleBuilder.getSize())) {
-                                throw new HyracksDataException(
-                                        "Aggregation output is too large to be fit into a frame.");
-                            }
-                        }
-                    }
-                    if (writerAppender.getTupleCount() > 0) {
-                        FrameUtils.flushFrame(writerFrame, writer);
-                        writerAppender.reset(writerFrame, true);
-                    }
-
-                    outAppender.reset(outFrame, true);
-                }
-
-                private void setNextTopTuple(int runIndex, int[] tupleIndices, RunFileReader[] runCursors,
-                        FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples)
-                        throws HyracksDataException {
-                    int runStart = runIndex * runFrameLimit;
-                    boolean existNext = false;
-                    if (tupleAccessors[currentFrameIndexInRun[runIndex]] == null || runCursors[runIndex] == null) {
-                        /**
-                         * run already closed
-                         */
-                        existNext = false;
-                    } else if (currentFrameIndexInRun[runIndex] - runStart < currentRunFrames[runIndex] - 1) {
-                        /**
-                         * not the last frame for this run
-                         */
-                        existNext = true;
-                        if (tupleIndices[runIndex] >= tupleAccessors[currentFrameIndexInRun[runIndex]].getTupleCount()) {
-                            tupleIndices[runIndex] = 0;
-                            currentFrameIndexInRun[runIndex]++;
-                        }
-                    } else if (tupleIndices[runIndex] < tupleAccessors[currentFrameIndexInRun[runIndex]]
-                            .getTupleCount()) {
-                        /**
-                         * the last frame has expired
-                         */
-                        existNext = true;
-                    } else {
-                        /**
-                         * If all tuples in the targeting frame have been
-                         * checked.
-                         */
-                        tupleIndices[runIndex] = 0;
-                        currentFrameIndexInRun[runIndex] = runStart;
-                        /**
-                         * read in batch
-                         */
-                        currentRunFrames[runIndex] = 0;
-                        for (int j = 0; j < runFrameLimit; j++) {
-                            int frameIndex = currentFrameIndexInRun[runIndex]
-                                    + j;
-                            if (runCursors[runIndex].nextFrame(inFrames
-                                    .get(frameIndex))) {
-                                tupleAccessors[frameIndex].reset(inFrames
-                                        .get(frameIndex));
-                                existNext = true;
-                                currentRunFrames[runIndex]++;
-                            } else {
-                                break;
-                            }
-                        }
-                    }
-
-                    if (existNext) {
-                        topTuples.popAndReplace(tupleAccessors[currentFrameIndexInRun[runIndex]],
-                                tupleIndices[runIndex]);
-                    } else {
-                        topTuples.pop();
-                        closeRun(runIndex, runCursors, tupleAccessors);
-                    }
-                }
-
-                /**
-                 * Close the run file, and also the corresponding readers and
-                 * input frame.
-                 * 
-                 * @param index
-                 * @param runCursors
-                 * @param tupleAccessor
-                 * @throws HyracksDataException
-                 */
-                private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
-                        throws HyracksDataException {
-                    if (runCursors[index] != null) {
-                        runCursors[index].close();
-                        runCursors[index] = null;
-                        int frameOffset = index * runFrameLimit;
-                        for (int j = 0; j < runFrameLimit; j++) {
-                            tupleAccessor[frameOffset + j] = null;
-                        }
-                    }
-                }
-
-                private int compareFrameTuples(IFrameTupleAccessor fta1, int j1, IFrameTupleAccessor fta2, int j2) {
-                    byte[] b1 = fta1.getBuffer().array();
-                    byte[] b2 = fta2.getBuffer().array();
-                    for (int f = 0; f < keyFields.length; ++f) {
-                        int fIdx = f;
-                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
-                                + fta1.getFieldStartOffset(j1, fIdx);
-                        int l1 = fta1.getFieldLength(j1, fIdx);
-                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
-                                + fta2.getFieldStartOffset(j2, fIdx);
-                        int l2_start = fta2.getFieldStartOffset(j2, fIdx);
-                        int l2_end = fta2.getFieldEndOffset(j2, fIdx);
-                        int l2 = l2_end - l2_start;
-                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-                        if (c != 0) {
-                            return c;
-                        }
-                    }
-                    return 0;
-                }
-            };
-            return op;
-        }
-
-        private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
-            return new Comparator<ReferenceEntry>() {
-
-                @Override
-                public int compare(ReferenceEntry o1, ReferenceEntry o2) {
-                    FrameTupleAccessor fta1 = (FrameTupleAccessor) o1.getAccessor();
-                    FrameTupleAccessor fta2 = (FrameTupleAccessor) o2.getAccessor();
-                    int j1 = o1.getTupleIndex();
-                    int j2 = o2.getTupleIndex();
-                    byte[] b1 = fta1.getBuffer().array();
-                    byte[] b2 = fta2.getBuffer().array();
-                    for (int f = 0; f < keyFields.length; ++f) {
-                        int fIdx = f;
-                        int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
-                                + fta1.getFieldStartOffset(j1, fIdx);
-                        int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
-                        int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
-                                + fta2.getFieldStartOffset(j2, fIdx);
-                        int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
-                        int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-                        if (c != 0) {
-                            return c;
-                        }
-                    }
-                    return 0;
-                }
-
-            };
+            return new ExternalGroupMergeOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
+                    AGGREGATE_ACTIVITY_ID), partition), comparatorFactories, keyFields, mergerFactory, isOutputSorted,
+                    framesLimit, recordDescriptors[0]);
         }
 
     }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupState.java
new file mode 100644
index 0000000..365c333
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupState.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+
+public class ExternalGroupState extends AbstractStateObject {
+    private LinkedList<RunFileReader> runs;
+
+    private ISpillableTable gTable;
+
+    public ExternalGroupState() {
+    }
+
+    ExternalGroupState(JobId jobId, Object id) {
+        super(jobId, id);
+    }
+
+    public LinkedList<RunFileReader> getRuns() {
+        return runs;
+    }
+
+    public void setRuns(LinkedList<RunFileReader> runs) {
+        this.runs = runs;
+    }
+
+    public ISpillableTable getSpillableTable() {
+        return gTable;
+    }
+
+    public void setSpillableTable(ISpillableTable gTable) {
+        this.gTable = gTable;
+    }
+
+    @Override
+    public void toBytes(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void fromBytes(DataInput in) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupBuildOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupBuildOperatorNodePushable.java
new file mode 100644
index 0000000..98617c0
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupBuildOperatorNodePushable.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+class HashGroupBuildOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+    private final IHyracksTaskContext ctx;
+    private final FrameTupleAccessor accessor;
+    private final Object stateId;
+    private final int[] keys;
+    private final ITuplePartitionComputerFactory tpcf;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+    private final int tableSize;
+    private final RecordDescriptor inRecordDescriptor;
+    private final RecordDescriptor outRecordDescriptor;
+
+    private HashGroupState state;
+
+    HashGroupBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int[] keys,
+            ITuplePartitionComputerFactory tpcf, IBinaryComparatorFactory[] comparatorFactories,
+            IAggregatorDescriptorFactory aggregatorFactory, int tableSize, RecordDescriptor inRecordDescriptor,
+            RecordDescriptor outRecordDescriptor) {
+        this.ctx = ctx;
+        this.accessor = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+        this.stateId = stateId;
+        this.keys = keys;
+        this.tpcf = tpcf;
+        this.comparatorFactories = comparatorFactories;
+        this.aggregatorFactory = aggregatorFactory;
+        this.tableSize = tableSize;
+        this.inRecordDescriptor = inRecordDescriptor;
+        this.outRecordDescriptor = outRecordDescriptor;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        state = new HashGroupState(ctx.getJobletContext().getJobId(), stateId);
+        state.setHashTable(new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
+                inRecordDescriptor, outRecordDescriptor, tableSize));
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        GroupingHashTable table = state.getHashTable();
+        accessor.reset(buffer);
+        int tupleCount = accessor.getTupleCount();
+        for (int i = 0; i < tupleCount; ++i) {
+            try {
+                table.insert(accessor, i);
+            } catch (Exception e) {
+                System.out.println(e.toString());
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        ctx.setStateObject(state);
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        throw new HyracksDataException("HashGroupOperator is failed.");
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index 49443d1..d9052b1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -14,11 +14,6 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.group;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -28,15 +23,9 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 /**
  *
@@ -89,27 +78,6 @@
         builder.addBlockingEdge(ha, oa);
     }
 
-    public static class HashBuildActivityState extends AbstractTaskState {
-        private GroupingHashTable table;
-
-        public HashBuildActivityState() {
-        }
-
-        private HashBuildActivityState(JobId jobId, TaskId tId) {
-            super(jobId, tId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
-    }
-
     private class HashBuildActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
@@ -120,44 +88,9 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
-                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
-            return new AbstractUnaryInputSinkOperatorNodePushable() {
-                private HashBuildActivityState state;
-
-                @Override
-                public void open() throws HyracksDataException {
-                    state = new HashBuildActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
-                    state.table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
-                            recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
-                            tableSize);
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    accessor.reset(buffer);
-                    int tupleCount = accessor.getTupleCount();
-                    for (int i = 0; i < tupleCount; ++i) {
-                        try {
-                            state.table.insert(accessor, i);
-                        } catch (Exception e) {
-                            System.out.println(e.toString());
-                            throw new HyracksDataException(e);
-                        }
-                    }
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    ctx.setTaskState(state);
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    throw new HyracksDataException("HashGroupOperator is failed.");
-                }
-            };
+            return new HashGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), keys, tpcf,
+                    comparatorFactories, aggregatorFactory, tableSize, recordDescProvider.getInputRecordDescriptor(
+                            getOperatorId(), 0), recordDescriptors[0]);
         }
     }
 
@@ -171,23 +104,8 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            return new AbstractUnaryOutputSourceOperatorNodePushable() {
-                @Override
-                public void initialize() throws HyracksDataException {
-                    HashBuildActivityState buildState = (HashBuildActivityState) ctx.getTaskState(new TaskId(
-                            new ActivityId(getOperatorId(), HASH_BUILD_ACTIVITY_ID), partition));
-                    GroupingHashTable table = buildState.table;
-                    writer.open();
-                    try {
-                        table.write(writer);
-                    } catch (Exception e) {
-                        writer.fail();
-                        throw new HyracksDataException(e);
-                    } finally {
-                        writer.close();
-                    }
-                }
-            };
+            return new HashGroupOutputOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
+                    HASH_BUILD_ACTIVITY_ID), partition));
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOutputOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOutputOperatorNodePushable.java
new file mode 100644
index 0000000..b120297
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOutputOperatorNodePushable.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+class HashGroupOutputOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+    private final IHyracksTaskContext ctx;
+    private final Object stateId;
+
+    HashGroupOutputOperatorNodePushable(IHyracksTaskContext ctx, Object stateId) {
+        this.ctx = ctx;
+        this.stateId = stateId;
+    }
+
+    @Override
+    public void initialize() throws HyracksDataException {
+        HashGroupState buildState = (HashGroupState) ctx.getStateObject(stateId);
+        GroupingHashTable table = buildState.getHashTable();
+        writer.open();
+        try {
+            table.write(writer);
+        } catch (Exception e) {
+            writer.fail();
+            throw new HyracksDataException(e);
+        } finally {
+            writer.close();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupState.java
new file mode 100644
index 0000000..b0eb5b8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupState.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+
+public class HashGroupState extends AbstractStateObject {
+    private GroupingHashTable table;
+
+    public HashGroupState() {
+    }
+
+    HashGroupState(JobId jobId, Object id) {
+        super(jobId, id);
+    }
+
+    public GroupingHashTable getHashTable() {
+        return table;
+    }
+
+    public void setHashTable(GroupingHashTable table) {
+        this.table = table;
+    }
+
+    @Override
+    public void toBytes(DataOutput out) throws IOException {
+
+    }
+
+    @Override
+    public void fromBytes(DataInput in) throws IOException {
+
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 773e1e5..6a0535b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -14,43 +14,19 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.join;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
-import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
 
 public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final int RPARTITION_ACTIVITY_ID = 0;
@@ -129,24 +105,6 @@
         return memsize;
     }
 
-    public static class HashPartitionTaskState extends AbstractTaskState {
-        private RunFileWriter[] fWriters;
-
-        public HashPartitionTaskState(JobId jobId, TaskId taskId) {
-            super(jobId, taskId);
-        }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
-    }
-
     private class HashPartitionActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
         private int operatorInputIndex;
@@ -159,97 +117,12 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
-            }
-            final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(),
-                        recordDescProvider.getInputRecordDescriptor(getOperatorId(), operatorInputIndex));
-
-                private final ITuplePartitionComputer hpc = new FieldHashPartitionComputerFactory(keys,
-                        hashFunctionFactories).createPartitioner();
-
-                private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-
-                private ByteBuffer[] outbufs;
-
-                private HashPartitionTaskState state;
-
-                @Override
-                public void close() throws HyracksDataException {
-                    for (int i = 0; i < numPartitions; i++) {
-                        ByteBuffer head = outbufs[i];
-                        accessor0.reset(head);
-                        if (accessor0.getTupleCount() > 0) {
-                            write(i, head);
-                        }
-                        closeWriter(i);
-                    }
-
-                    ctx.setTaskState(state);
-                }
-
-                private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = state.fWriters[i];
-                    if (writer != null) {
-                        writer.close();
-                    }
-                }
-
-                private void write(int i, ByteBuffer head) throws HyracksDataException {
-                    RunFileWriter writer = state.fWriters[i];
-                    if (writer == null) {
-                        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                                GraceHashJoinOperatorDescriptor.class.getSimpleName());
-                        writer = new RunFileWriter(file, ctx.getIOManager());
-                        writer.open();
-                        state.fWriters[i] = writer;
-                    }
-                    writer.nextFrame(head);
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    accessor0.reset(buffer);
-                    int tCount = accessor0.getTupleCount();
-                    for (int i = 0; i < tCount; ++i) {
-
-                        int entry = hpc.partition(accessor0, i, numPartitions);
-                        ByteBuffer outbuf = outbufs[entry];
-                        appender.reset(outbuf, false);
-                        if (!appender.append(accessor0, i)) {
-                            // buffer is full, ie. we cannot fit the tuple
-                            // into the buffer -- write it to disk
-                            write(entry, outbuf);
-                            outbuf.clear();
-                            appender.reset(outbuf, true);
-                            if (!appender.append(accessor0, i)) {
-                                throw new HyracksDataException("Item too big to fit in frame");
-                            }
-                        }
-                    }
-                }
-
-                @Override
-                public void open() throws HyracksDataException {
-                    state = new HashPartitionTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
-                            partition));
-                    outbufs = new ByteBuffer[numPartitions];
-                    state.fWriters = new RunFileWriter[numPartitions];
-                    for (int i = 0; i < numPartitions; i++) {
-                        outbufs[i] = ctx.allocateFrame();
-                    }
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                }
-            };
-            return op;
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+            return new GraceHashJoinPartitionBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition),
+                    keys, hashFunctionFactories, comparatorFactories, (int) Math.ceil(Math.sqrt(inputsize0 * factor
+                            / nPartitions)), recordDescProvider.getInputRecordDescriptor(getOperatorId(),
+                            operatorInputIndex));
         }
     }
 
@@ -263,92 +136,15 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
-            }
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
-            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
-            if (isLeftOuter) {
-                for (int i = 0; i < nullWriterFactories1.length; i++) {
-                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
-                }
-            }
+            int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
 
-            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
-                private InMemoryHashJoin joiner;
-
-                private RunFileWriter[] buildWriters;
-                private RunFileWriter[] probeWriters;
-                private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
-
-                @Override
-                public void initialize() throws HyracksDataException {
-                    HashPartitionTaskState rState = (HashPartitionTaskState) ctx.getTaskState(new TaskId(
-                            new ActivityId(getOperatorId(), RPARTITION_ACTIVITY_ID), partition));
-                    HashPartitionTaskState sState = (HashPartitionTaskState) ctx.getTaskState(new TaskId(
-                            new ActivityId(getOperatorId(), SPARTITION_ACTIVITY_ID), partition));
-                    buildWriters = sState.fWriters;
-                    probeWriters = rState.fWriters;
-
-                    ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
-                            new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
-                    ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(numPartitions,
-                            new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)).createPartitioner();
-
-                    writer.open();// open for probe
-
-                    try {
-
-                        ByteBuffer buffer = ctx.allocateFrame();// input
-                        // buffer
-                        int tableSize = (int) (numPartitions * recordsPerFrame * factor);
-                        ISerializableTable table = new SerializableHashTable(tableSize, ctx);
-                        
-                        for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
-                            RunFileWriter buildWriter = buildWriters[partitionid];
-                            RunFileWriter probeWriter = probeWriters[partitionid];
-                            if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
-                                continue;
-                            }
-                            table.reset();
-                            joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(),
-                                    rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
-                                    new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table);
-
-                            // build
-                            if (buildWriter != null) {
-                                RunFileReader buildReader = buildWriter.createReader();
-                                buildReader.open();
-                                while (buildReader.nextFrame(buffer)) {
-                                    ByteBuffer copyBuffer = ctx.allocateFrame();
-                                    FrameUtils.copy(buffer, copyBuffer);
-                                    joiner.build(copyBuffer);
-                                    buffer.clear();
-                                }
-                                buildReader.close();
-                            }
-
-                            // probe
-                            RunFileReader probeReader = probeWriter.createReader();
-                            probeReader.open();
-                            while (probeReader.nextFrame(buffer)) {
-                                joiner.join(buffer, writer);
-                                buffer.clear();
-                            }
-                            probeReader.close();
-                            joiner.closeJoin(writer);
-                        }
-                    } catch (Exception e) {
-                        writer.fail();
-                        throw new HyracksDataException(e);
-                    } finally {
-                        writer.close();
-                    }
-                }
-            };
-            return op;
+            return new GraceHashJoinOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
+                    RPARTITION_ACTIVITY_ID), partition), new TaskId(new ActivityId(getOperatorId(),
+                    SPARTITION_ACTIVITY_ID), partition), recordsPerFrame, factor, keys0, keys1, hashFunctionFactories,
+                    comparatorFactories, nullWriterFactories1, rd1, rd0, recordDescriptors[0], numPartitions,
+                    isLeftOuter);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
new file mode 100644
index 0000000..91f509d
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
+
+class GraceHashJoinOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable {
+    private final IHyracksTaskContext ctx;
+    private final Object state0Id;
+    private final Object state1Id;
+    private final int[] keys0;
+    private final int[] keys1;
+    private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final INullWriterFactory[] nullWriterFactories;
+    private final RecordDescriptor rd0;
+    private final RecordDescriptor rd1;
+    private final int recordsPerFrame;
+    private final double factor;
+    private final int numPartitions;
+    private final boolean isLeftOuter;
+
+    GraceHashJoinOperatorNodePushable(IHyracksTaskContext ctx, Object state0Id, Object state1Id, int recordsPerFrame,
+            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+            IBinaryComparatorFactory[] comparatorFactories, INullWriterFactory[] nullWriterFactories,
+            RecordDescriptor rd1, RecordDescriptor rd0, RecordDescriptor outRecordDescriptor, int numPartitions,
+            boolean isLeftOuter) {
+        this.ctx = ctx;
+        this.state0Id = state0Id;
+        this.state1Id = state1Id;
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        this.nullWriterFactories = nullWriterFactories;
+        this.rd0 = rd0;
+        this.rd1 = rd1;
+        this.numPartitions = numPartitions;
+        this.recordsPerFrame = recordsPerFrame;
+        this.factor = factor;
+        this.isLeftOuter = isLeftOuter;
+    }
+
+    @Override
+    public void initialize() throws HyracksDataException {
+        GraceHashJoinPartitionState rState = (GraceHashJoinPartitionState) ctx.getStateObject(state0Id);
+        GraceHashJoinPartitionState sState = (GraceHashJoinPartitionState) ctx.getStateObject(state1Id);
+        RunFileWriter[] buildWriters = sState.getRunWriters();
+        RunFileWriter[] probeWriters = rState.getRunWriters();
+
+        IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
+                new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
+        ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(numPartitions,
+                new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)).createPartitioner();
+
+        final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories.length] : null;
+        if (isLeftOuter) {
+            for (int i = 0; i < nullWriterFactories.length; i++) {
+                nullWriters1[i] = nullWriterFactories[i].createNullWriter();
+            }
+        }
+
+        writer.open();// open for probe
+
+        try {
+
+            ByteBuffer buffer = ctx.allocateFrame();// input
+            // buffer
+            int tableSize = (int) (numPartitions * recordsPerFrame * factor);
+            ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+
+            for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
+                RunFileWriter buildWriter = buildWriters[partitionid];
+                RunFileWriter probeWriter = probeWriters[partitionid];
+                if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
+                    continue;
+                }
+                table.reset();
+                InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
+                        ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
+                        new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table);
+
+                // build
+                if (buildWriter != null) {
+                    RunFileReader buildReader = buildWriter.createReader();
+                    buildReader.open();
+                    while (buildReader.nextFrame(buffer)) {
+                        ByteBuffer copyBuffer = ctx.allocateFrame();
+                        FrameUtils.copy(buffer, copyBuffer);
+                        joiner.build(copyBuffer);
+                        buffer.clear();
+                    }
+                    buildReader.close();
+                }
+
+                // probe
+                RunFileReader probeReader = probeWriter.createReader();
+                probeReader.open();
+                while (probeReader.nextFrame(buffer)) {
+                    joiner.join(buffer, writer);
+                    buffer.clear();
+                }
+                probeReader.close();
+                joiner.closeJoin(writer);
+            }
+        } catch (Exception e) {
+            writer.fail();
+            throw new HyracksDataException(e);
+        } finally {
+            writer.close();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
new file mode 100644
index 0000000..719b736
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionBuildOperatorNodePushable.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+class GraceHashJoinPartitionBuildOperatorNodePushable extends
+        AbstractUnaryInputSinkOperatorNodePushable {
+    private final IHyracksTaskContext ctx;
+    private final Object stateId;
+    private final int numPartitions;
+    private final IBinaryComparator[] comparators;
+    private final FrameTupleAccessor accessor0;
+    private final ITuplePartitionComputer hpc;
+    private final FrameTupleAppender appender;
+    private ByteBuffer[] outbufs;
+    private GraceHashJoinPartitionState state;
+
+    GraceHashJoinPartitionBuildOperatorNodePushable(IHyracksTaskContext ctx, Object stateId, int[] keys,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            int numPartitions, RecordDescriptor inRecordDescriptor) {
+        this.ctx = ctx;
+        this.stateId = stateId;
+        this.numPartitions = numPartitions;
+        accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), inRecordDescriptor);
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        hpc = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories).createPartitioner();
+        comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        for (int i = 0; i < numPartitions; i++) {
+            ByteBuffer head = outbufs[i];
+            accessor0.reset(head);
+            if (accessor0.getTupleCount() > 0) {
+                write(i, head);
+            }
+            closeWriter(i);
+        }
+
+        ctx.setStateObject(state);
+    }
+
+    private void closeWriter(int i) throws HyracksDataException {
+        RunFileWriter writer = state.getRunWriters()[i];
+        if (writer != null) {
+            writer.close();
+        }
+    }
+
+    private void write(int i, ByteBuffer head) throws HyracksDataException {
+        RunFileWriter writer = state.getRunWriters()[i];
+        if (writer == null) {
+            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                    GraceHashJoinOperatorDescriptor.class.getSimpleName());
+            writer = new RunFileWriter(file, ctx.getIOManager());
+            writer.open();
+            state.getRunWriters()[i] = writer;
+        }
+        writer.nextFrame(head);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        accessor0.reset(buffer);
+        int tCount = accessor0.getTupleCount();
+        for (int i = 0; i < tCount; ++i) {
+
+            int entry = hpc.partition(accessor0, i, numPartitions);
+            ByteBuffer outbuf = outbufs[entry];
+            appender.reset(outbuf, false);
+            if (!appender.append(accessor0, i)) {
+                // buffer is full, ie. we cannot fit the tuple
+                // into the buffer -- write it to disk
+                write(entry, outbuf);
+                outbuf.clear();
+                appender.reset(outbuf, true);
+                if (!appender.append(accessor0, i)) {
+                    throw new HyracksDataException("Item too big to fit in frame");
+                }
+            }
+        }
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        state = new GraceHashJoinPartitionState(ctx.getJobletContext().getJobId(), stateId);
+        outbufs = new ByteBuffer[numPartitions];
+        state.setRunWriters(new RunFileWriter[numPartitions]);
+        for (int i = 0; i < numPartitions; i++) {
+            outbufs[i] = ctx.allocateFrame();
+        }
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java
new file mode 100644
index 0000000..906042e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinPartitionState.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+
+public class GraceHashJoinPartitionState extends AbstractStateObject {
+    private RunFileWriter[] fWriters;
+
+    public GraceHashJoinPartitionState(JobId jobId, Object id) {
+        super(jobId, id);
+    }
+
+    public RunFileWriter[] getRunWriters() {
+        return fWriters;
+    }
+
+    public void setRunWriters(RunFileWriter[] fWriters) {
+        this.fWriters = fWriters;
+    }
+
+    @Override
+    public void toBytes(DataOutput out) throws IOException {
+
+    }
+
+    @Override
+    public void fromBytes(DataInput in) throws IOException {
+
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 99f0020..2521830 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -47,7 +47,7 @@
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
@@ -138,7 +138,7 @@
         builder.addTargetEdge(0, phase2, 0);
     }
 
-    public static class BuildAndPartitionTaskState extends AbstractTaskState {
+    public static class BuildAndPartitionTaskState extends AbstractStateObject {
         private RunFileWriter[] fWriters;
         private InMemoryHashJoin joiner;
         private int nPartitions;
@@ -211,7 +211,7 @@
                         closeWriter(i);
                     }
 
-                    ctx.setTaskState(state);
+                    ctx.setStateObject(state);
                 }
 
                 @Override
@@ -390,7 +390,7 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (BuildAndPartitionTaskState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                    state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
                             BUILD_AND_PARTITION_ACTIVITY_ID), partition));
                     writer.open();
                     buildWriters = state.fWriters;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 61fc0b3..89ae09e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -41,7 +41,7 @@
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
@@ -105,7 +105,7 @@
         builder.addBlockingEdge(hba, hpa);
     }
 
-    public static class HashBuildTaskState extends AbstractTaskState {
+    public static class HashBuildTaskState extends AbstractStateObject {
         private InMemoryHashJoin joiner;
 
         public HashBuildTaskState() {
@@ -178,7 +178,7 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-                    ctx.setTaskState(state);
+                    ctx.setStateObject(state);
                 }
 
                 @Override
@@ -204,7 +204,7 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (HashBuildTaskState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                    state = (HashBuildTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
                             BUILD_ACTIVITY_ID), partition));
                     writer.open();
                 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 21a828c..7082948 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -36,7 +36,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
@@ -72,7 +72,7 @@
         builder.addBlockingEdge(jc, nlj);
     }
 
-    public static class JoinCacheTaskState extends AbstractTaskState {
+    public static class JoinCacheTaskState extends AbstractStateObject {
         private NestedLoopJoin joiner;
 
         public JoinCacheTaskState() {
@@ -129,7 +129,7 @@
                 @Override
                 public void close() throws HyracksDataException {
                     state.joiner.closeCache();
-                    ctx.setTaskState(state);
+                    ctx.setStateObject(state);
                 }
 
                 @Override
@@ -156,7 +156,7 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (JoinCacheTaskState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                    state = (JoinCacheTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
                             JOIN_CACHE_ACTIVITY_ID), partition));
                     writer.open();
                 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 87e2b2c..fd91b66 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -47,7 +47,7 @@
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
@@ -201,7 +201,7 @@
         return numberOfPartitions;
     }
 
-    public static class BuildAndPartitionTaskState extends AbstractTaskState {
+    public static class BuildAndPartitionTaskState extends AbstractStateObject {
 
         private int memForJoin;
         private int numOfPartitions;
@@ -290,7 +290,7 @@
                 @Override
                 public void close() throws HyracksDataException {
                     state.hybridHJ.closeBuild();
-                    ctx.setTaskState(state);
+                    ctx.setStateObject(state);
                 }
 
                 @Override
@@ -353,7 +353,7 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (BuildAndPartitionTaskState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                    state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
                             BUILD_AND_PARTITION_ACTIVITY_ID), partition));
 
                     writer.open();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 83160e5..993ff27 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -34,7 +34,7 @@
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
@@ -63,7 +63,7 @@
         builder.addBlockingEdge(ma, ra);
     }
 
-    public static class MaterializerTaskState extends AbstractTaskState {
+    public static class MaterializerTaskState extends AbstractStateObject {
         private RunFileWriter out;
 
         public MaterializerTaskState() {
@@ -115,7 +115,7 @@
                 @Override
                 public void close() throws HyracksDataException {
                     state.out.close();
-                    ctx.setTaskState(state);
+                    ctx.setStateObject(state);
                 }
 
                 @Override
@@ -139,7 +139,7 @@
                 @Override
                 public void initialize() throws HyracksDataException {
                     ByteBuffer frame = ctx.allocateFrame();
-                    MaterializerTaskState state = (MaterializerTaskState) ctx.getTaskState(new TaskId(new ActivityId(
+                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
                             getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
                     RunFileReader in = state.out.createReader();
                     writer.open();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index 3e99000..b2f516b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -32,7 +32,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
 
@@ -40,7 +40,7 @@
     private static final int COLLECT_ACTIVITY_ID = 0;
     private static final int SPLIT_ACTIVITY_ID = 1;
 
-    public static class CollectTaskState extends AbstractTaskState {
+    public static class CollectTaskState extends AbstractStateObject {
         private ArrayList<Object[]> buffer;
 
         public CollectTaskState() {
@@ -91,7 +91,7 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-                    ctx.setTaskState(state);
+                    ctx.setStateObject(state);
                 }
 
                 @Override
@@ -134,7 +134,7 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = (CollectTaskState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                    state = (CollectTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
                             COLLECT_ACTIVITY_ID), partition));
                 }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 0effefb..db3edaa 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -36,7 +36,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
@@ -84,7 +84,7 @@
         builder.addBlockingEdge(sa, ma);
     }
 
-    public static class SortTaskState extends AbstractTaskState {
+    public static class SortTaskState extends AbstractStateObject {
         private List<IFrameReader> runs;
         private FrameSorter frameSorter;
 
@@ -138,7 +138,7 @@
                     runGen.close();
                     state.runs = runGen.getRuns();
                     state.frameSorter = runGen.getFrameSorter();
-                    ctx.setTaskState(state);
+                    ctx.setStateObject(state);
                 }
 
                 @Override
@@ -163,7 +163,7 @@
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    SortTaskState state = (SortTaskState) ctx.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                    SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
                             SORT_ACTIVITY_ID), partition));
                     List<IFrameReader> runs = state.runs;
                     FrameSorter frameSorter = state.frameSorter;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 30bebe9..2689375 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -33,7 +33,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
@@ -76,7 +76,7 @@
         builder.addBlockingEdge(sa, ma);
     }
 
-    public static class SortTaskState extends AbstractTaskState {
+    public static class SortTaskState extends AbstractStateObject {
         private FrameSorter frameSorter;
 
         public SortTaskState() {
@@ -124,7 +124,7 @@
                 @Override
                 public void close() throws HyracksDataException {
                     state.frameSorter.sortFrames();
-                    ctx.setTaskState(state);
+                    ctx.setStateObject(state);
                 }
 
                 @Override
@@ -150,7 +150,7 @@
                 public void initialize() throws HyracksDataException {
                     writer.open();
                     try {
-                        SortTaskState state = (SortTaskState) ctx.getTaskState(new TaskId(new ActivityId(
+                        SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(
                                 getOperatorId(), SORT_ACTIVITY_ID), partition));
                         state.frameSorter.flushFrames(writer);
                     } catch (Exception e) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index a8653d1..1b9d2fe 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -36,7 +36,7 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
@@ -107,7 +107,7 @@
         builder.addBlockingEdge(osa, oma);
     }
 
-    public static class OptimizedSortTaskState extends AbstractTaskState {
+    public static class OptimizedSortTaskState extends AbstractStateObject {
         private List<IFrameReader> runs;
 
         public OptimizedSortTaskState() {
@@ -165,7 +165,7 @@
                             new TaskId(getActivityId(), partition));
                     runGen.close();
                     state.runs = runGen.getRuns();
-                    ctx.setTaskState(state);
+                    ctx.setStateObject(state);
 
                 }
 
@@ -191,7 +191,7 @@
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    OptimizedSortTaskState state = (OptimizedSortTaskState) ctx.getTaskState(new TaskId(new ActivityId(
+                    OptimizedSortTaskState state = (OptimizedSortTaskState) ctx.getStateObject(new TaskId(new ActivityId(
                             getOperatorId(), SORT_ACTIVITY_ID), partition));
 
                     List<IFrameReader> runs = state.runs;
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index c2d3ebf..1ee4400 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -19,6 +19,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -39,8 +40,8 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.misc.LimitOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.sort.OptimizedExternalSortOperatorDescriptor;
 
 public class OptimizedSortMergeTest extends AbstractIntegrationTest {
@@ -74,7 +75,9 @@
                         PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+                createTempFile().getAbsolutePath()) });
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
@@ -125,7 +128,9 @@
         LimitOperatorDescriptor filter = new LimitOperatorDescriptor(spec, ordersDesc, outputLimit);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, filter, NC1_ID);
 
-        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+                createTempFile().getAbsolutePath()) });
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
index 2f27bf0..26e335e 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/IBufferAcceptor.java
@@ -16,6 +16,16 @@
 
 import java.nio.ByteBuffer;
 
+/**
+ * Accepts buffers.
+ * 
+ * @author vinayakb
+ */
 public interface IBufferAcceptor {
+    /**
+     * Accept a buffer.
+     * 
+     * @param buffer
+     */
     public void accept(ByteBuffer buffer);
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
index c395ac9..5e9bf02 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/buffers/ICloseableBufferAcceptor.java
@@ -14,8 +14,23 @@
  */
 package edu.uci.ics.hyracks.net.buffers;
 
+/**
+ * A buffer acceptor that can be closed to indicate end of transmission or an error code
+ * specified to indicate an error in transmission.
+ * 
+ * @author vinayakb
+ */
 public interface ICloseableBufferAcceptor extends IBufferAcceptor {
+    /**
+     * Close the buffer acceptor.
+     */
     public void close();
 
+    /**
+     * Indicate that an error occurred.
+     * 
+     * @param ecode
+     *            - the error code.
+     */
     public void error(int ecode);
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 4b55d4b..63cb6b5 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -28,6 +28,11 @@
 import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
 import edu.uci.ics.hyracks.net.exceptions.NetException;
 
+/**
+ * Handle to a channel that represents a logical full-duplex communication end-point.
+ * 
+ * @author vinayakb
+ */
 public class ChannelControlBlock {
     private static final Logger LOGGER = Logger.getLogger(ChannelControlBlock.class.getName());
 
@@ -45,6 +50,8 @@
 
     private final AtomicBoolean remoteClose;
 
+    private final AtomicBoolean remoteCloseAck;
+
     ChannelControlBlock(ChannelSet cSet, int channelId) {
         this.cSet = cSet;
         this.channelId = channelId;
@@ -53,16 +60,27 @@
         localClose = new AtomicBoolean();
         localCloseAck = new AtomicBoolean();
         remoteClose = new AtomicBoolean();
+        remoteCloseAck = new AtomicBoolean();
     }
 
     int getChannelId() {
         return channelId;
     }
 
+    /**
+     * Get the read inderface of this channel.
+     * 
+     * @return the read interface.
+     */
     public IChannelReadInterface getReadInterface() {
         return ri;
     }
 
+    /**
+     * Get the write interface of this channel.
+     * 
+     * @return the write interface.
+     */
     public IChannelWriteInterface getWriteInterface() {
         return wi;
     }
@@ -321,6 +339,10 @@
         remoteClose.set(true);
     }
 
+    void reportRemoteEOSAck() {
+        remoteCloseAck.set(true);
+    }
+
     boolean getRemoteEOS() {
         return remoteClose.get();
     }
@@ -336,12 +358,13 @@
     }
 
     boolean completelyClosed() {
-        return localCloseAck.get() && remoteClose.get();
+        return localCloseAck.get() && remoteCloseAck.get();
     }
 
     @Override
     public String toString() {
         return "Channel:" + channelId + "[localClose: " + localClose + " localCloseAck: " + localCloseAck
-                + " remoteClose: " + remoteClose + " readCredits: " + ri.credits + " writeCredits: " + wi.credits + "]";
+                + " remoteClose: " + remoteClose + " remoteCloseAck:" + remoteCloseAck + " readCredits: " + ri.credits
+                + " writeCredits: " + wi.credits + "]";
     }
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
index 0fc9b2a..feb605f 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelOpenListener.java
@@ -14,6 +14,17 @@
  */
 package edu.uci.ics.hyracks.net.protocols.muxdemux;
 
+/**
+ * Callback interface to report opening of channels.
+ * 
+ * @author vinayakb
+ */
 public interface IChannelOpenListener {
+    /**
+     * Indicates that a remote endpoint has opened a channel to this receiver.
+     * 
+     * @param channel
+     *            - The newly opened channel.
+     */
     public void channelOpened(ChannelControlBlock channel);
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
index 468a617..178ad78 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
@@ -17,8 +17,26 @@
 import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
 import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
 
+/**
+ * Represents the read interface of a {@link ChannelControlBlock}.
+ * 
+ * @author vinayakb
+ */
 public interface IChannelReadInterface {
+    /**
+     * Set the callback that will be invoked by the network layer when a buffer has been
+     * filled with data received from the remote side.
+     * 
+     * @param fullBufferAcceptor
+     *            - the full buffer acceptor.
+     */
     public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor);
 
+    /**
+     * Get the acceptor that collects empty buffers when the client has finished consuming
+     * a previously full buffer.
+     * 
+     * @return the empty buffer acceptor.
+     */
     public IBufferAcceptor getEmptyBufferAcceptor();
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
index 1e53d71..8e71246 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
@@ -17,8 +17,26 @@
 import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
 import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
 
+/**
+ * Represents the write interface of a {@link ChannelControlBlock}.
+ * 
+ * @author vinayakb
+ */
 public interface IChannelWriteInterface {
+    /**
+     * Set the callback interface that must be invoked when a full buffer has been emptied by
+     * writing the data to the remote end.
+     * 
+     * @param emptyBufferAcceptor
+     *            - the empty buffer acceptor.
+     */
     public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor);
 
+    /**
+     * Get the full buffer acceptor that accepts buffers filled with data that need to be written
+     * to the remote end.
+     * 
+     * @return the full buffer acceptor.
+     */
     public ICloseableBufferAcceptor getFullBufferAcceptor();
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index e717c45..3772df3 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -26,6 +26,12 @@
 import edu.uci.ics.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
 import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
 
+/**
+ * A {@link MultiplexedConnection} can be used by clients to create multiple "channels"
+ * that can have independent full-duplex conversations.
+ * 
+ * @author vinayakb
+ */
 public class MultiplexedConnection implements ITCPConnectionEventListener {
     private static final Logger LOGGER = Logger.getLogger(MultiplexedConnection.class.getName());
 
@@ -51,7 +57,7 @@
 
     private Exception error;
 
-    public MultiplexedConnection(MuxDemux muxDemux) {
+    MultiplexedConnection(MuxDemux muxDemux) {
         this.muxDemux = muxDemux;
         pendingWriteEventsCounter = new IEventCounter() {
             private int counter;
@@ -128,7 +134,14 @@
         cSet.notifyIOError();
     }
 
-    public ChannelControlBlock openChannel() throws NetException, InterruptedException {
+    /**
+     * Open a channel to the other side.
+     * 
+     * @return
+     * @throws NetException
+     *             - A network failure occurred.
+     */
+    public ChannelControlBlock openChannel() throws NetException {
         synchronized (this) {
             if (connectionFailure) {
                 throw new NetException(error);
@@ -264,6 +277,8 @@
                 BitSet pendingEOSAckBitmap = cSet.getPendingEOSAckBitmap();
                 for (int j = pendingEOSAckBitmap.nextSetBit(0); j >= 0; j = pendingEOSAckBitmap.nextSetBit(j)) {
                     pendingEOSAckBitmap.clear(j);
+                    ChannelControlBlock ccb = cSet.getCCB(j);
+                    ccb.reportRemoteEOSAck();
                     writerState.command.setChannelId(j);
                     writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL_ACK);
                     writerState.command.setData(0);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
index 8548bb8..22a57f8 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -24,6 +24,13 @@
 import edu.uci.ics.hyracks.net.protocols.tcp.TCPConnection;
 import edu.uci.ics.hyracks.net.protocols.tcp.TCPEndpoint;
 
+/**
+ * Multiplexed Connection Manager.
+ * Every participant that wants to use the multiplexed connections must create and instance
+ * of this class.
+ * 
+ * @author vinayakb
+ */
 public class MuxDemux {
     private final InetSocketAddress localAddress;
 
@@ -35,6 +42,16 @@
 
     private final MuxDemuxPerformanceCounters perfCounters;
 
+    /**
+     * Constructor.
+     * 
+     * @param localAddress
+     *            - TCP/IP socket address to listen on
+     * @param listener
+     *            - Callback interface to report channel events
+     * @param nThreads
+     *            - Number of threads to use for data transfer
+     */
     public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener, int nThreads) {
         this.localAddress = localAddress;
         this.channelOpenListener = listener;
@@ -80,10 +97,27 @@
         perfCounters = new MuxDemuxPerformanceCounters();
     }
 
+    /**
+     * Starts listening for remote connections and is capable of initiating connections.
+     * 
+     * @throws IOException
+     */
     public void start() throws IOException {
         tcpEndpoint.start(localAddress);
     }
 
+    /**
+     * Create a {@link MultiplexedConnection} that can create channels to the specified remote address.
+     * The remote address must have a {@link MuxDemux} listening for connections.
+     * 
+     * @param remoteAddress
+     *            - Address of the remote {@link MuxDemux}
+     * @return a {@link MultiplexedConnection} connected to the remote address.
+     * @throws InterruptedException
+     *             - This call was interrupted while waiting for connection setup.
+     *             In such an event, it is safe to retry the {@link #connect(InetSocketAddress)} call.
+     * @throws NetException
+     */
     public MultiplexedConnection connect(InetSocketAddress remoteAddress) throws InterruptedException, NetException {
         MultiplexedConnection mConn = null;
         synchronized (this) {
@@ -102,10 +136,20 @@
         return channelOpenListener;
     }
 
+    /**
+     * Get the local address that this {@link MuxDemux} is listening for connections.
+     * 
+     * @return local TCP/IP socket address.
+     */
     public InetSocketAddress getLocalAddress() {
         return tcpEndpoint.getLocalAddress();
     }
 
+    /**
+     * Gets performance counters useful for collecting efficiency metrics.
+     * 
+     * @return
+     */
     public MuxDemuxPerformanceCounters getPerformanceCounters() {
         return perfCounters;
     }
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index e1743a1..b776f55 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -19,8 +19,7 @@
 import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -92,12 +91,12 @@
     }
 
     @Override
-    public void setTaskState(ITaskState taskState) {
+    public void setStateObject(IStateObject taskState) {
 
     }
 
     @Override
-    public ITaskState getTaskState(TaskId taskId) {
+    public IStateObject getStateObject(Object id) {
         return null;
     }
 }
\ No newline at end of file