Added ability to register TaskState with the infrastructure

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@565 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/ITaskState.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/ITaskState.java
new file mode 100644
index 0000000..c24e488
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/ITaskState.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.state;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public interface ITaskState {
+    public JobId getJobId();
+
+    public TaskId getTaskId();
+
+    public long getMemoryOccupancy();
+
+    public void toBytes(DataOutput out) throws IOException;
+
+    public void fromBytes(DataInput in) throws IOException;
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
index b41cb35..ecc44c1 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
@@ -14,8 +14,11 @@
  */
 package edu.uci.ics.hyracks.api.job;
 
-public interface IOperatorEnvironment {
-    public void set(String name, Object value);
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
 
-    public Object get(String name);
+public interface IOperatorEnvironment {
+    public void setTaskState(ITaskState taskState);
+
+    public ITaskState getTaskState(TaskId taskId);
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index a3ca82d..e5d9f64 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -34,7 +34,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -275,7 +275,7 @@
         if (cpap != null) {
             return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
         }
-        return new SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy();
+        return new PipeliningConnectorPolicy();
     }
 
     private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 3026359..2176dda 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -27,6 +27,8 @@
 import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -61,6 +63,8 @@
 
     private final Map<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>> envMap;
 
+    private final Map<TaskId, ITaskState> taskStateMap;
+
     private final Map<TaskAttemptId, Task> taskMap;
 
     private final Map<String, Counter> counterMap;
@@ -77,6 +81,7 @@
         this.jobId = jobId;
         partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
         envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
+        taskStateMap = new HashMap<TaskId, ITaskState>();
         taskMap = new HashMap<TaskAttemptId, Task>();
         counterMap = new HashMap<String, Counter>();
         localVariableMap = new HashMap<MultipartName, Object>();
@@ -119,28 +124,26 @@
         localVariableMap.put(name, value);
     }
 
-    private static final class OperatorEnvironmentImpl implements IOperatorEnvironment {
+    private final class OperatorEnvironmentImpl implements IOperatorEnvironment {
         private final String nodeId;
-        private final Map<String, Object> map;
 
         public OperatorEnvironmentImpl(String nodeId) {
             this.nodeId = nodeId;
-            map = new HashMap<String, Object>();
-        }
-
-        @Override
-        public Object get(String name) {
-            return map.get(name);
-        }
-
-        @Override
-        public void set(String name, Object value) {
-            map.put(name, value);
         }
 
         public String toString() {
             return super.toString() + "@" + nodeId;
         }
+
+        @Override
+        public void setTaskState(ITaskState taskState) {
+            taskStateMap.put(taskState.getTaskId(), taskState);
+        }
+
+        @Override
+        public ITaskState getTaskState(TaskId taskId) {
+            return taskStateMap.get(taskId);
+        }
     }
 
     public Executor getExecutor() {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractTaskState.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractTaskState.java
new file mode 100644
index 0000000..ba4dfd1
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractTaskState.java
@@ -0,0 +1,48 @@
+package edu.uci.ics.hyracks.dataflow.std.base;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public abstract class AbstractTaskState implements ITaskState {
+    protected JobId jobId;
+
+    protected TaskId taId;
+
+    protected long memoryOccupancy;
+
+    protected AbstractTaskState() {
+    }
+
+    protected AbstractTaskState(JobId jobId, TaskId taId) {
+        this.jobId = jobId;
+        this.taId = taId;
+    }
+
+    @Override
+    public final JobId getJobId() {
+        return jobId;
+    }
+
+    public final void setJobId(JobId jobId) {
+        this.jobId = jobId;
+    }
+
+    @Override
+    public final TaskId getTaskId() {
+        return taId;
+    }
+
+    public final void setTaskId(TaskId tId) {
+        this.taId = tId;
+    }
+
+    @Override
+    public final long getMemoryOccupancy() {
+        return memoryOccupancy;
+    }
+
+    public void setMemoryOccupancy(long memoryOccupancy) {
+        this.memoryOccupancy = memoryOccupancy;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 692a55a..5f78421 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.group;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -27,6 +29,7 @@
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
@@ -35,6 +38,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -46,21 +50,18 @@
 import edu.uci.ics.hyracks.dataflow.std.aggregators.IAggregatorDescriptorFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
 import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
 
 public class ExternalGroupOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final int AGGREGATE_ACTIVITY_ID = 0;
+
+    private static final int MERGE_ACTIVITY_ID = 1;
+
     private static final long serialVersionUID = 1L;
-    /**
-     * The input frame identifier (in the job environment)
-     */
-    private static final String GROUPTABLES = "gtables";
-    /**
-     * The runs files identifier (in the job environment)
-     */
-    private static final String RUNS = "runs";
     private final int[] keyFields;
     private final IBinaryComparatorFactory[] comparatorFactories;
     private final INormalizedKeyComputerFactory firstNormalizerFactory;
@@ -102,8 +103,8 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), 0));
-        MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, 1));
+        AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
+        MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
 
         builder.addActivity(aggregateAct);
         builder.addSourceEdge(0, aggregateAct, 0);
@@ -114,6 +115,29 @@
         builder.addBlockingEdge(aggregateAct, mergeAct);
     }
 
+    public static class AggregateActivityState extends AbstractTaskState {
+        private LinkedList<RunFileReader> runs;
+
+        private ISpillableTable gTable;
+
+        public AggregateActivityState() {
+        }
+
+        private AggregateActivityState(JobId jobId, TaskId tId) {
+            super(jobId, tId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+    }
+
     private class AggregateActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
@@ -123,25 +147,23 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
-            final ISpillableTable gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields,
-                    comparatorFactories, firstNormalizerFactory, aggregatorFactory,
-                    recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
-                    ExternalGroupOperatorDescriptor.this.framesLimit);
             final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
                     recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-
-                /**
-                 * Run files
-                 */
-                private LinkedList<RunFileReader> runs;
+                private AggregateActivityState state;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    runs = new LinkedList<RunFileReader>();
-                    gTable.reset();
+                    state = new AggregateActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
+                    state.runs = new LinkedList<RunFileReader>();
+                    state.gTable = spillableTableFactory.buildSpillableTable(ctx, keyFields, comparatorFactories,
+                            firstNormalizerFactory, aggregatorFactory,
+                            recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+                            ExternalGroupOperatorDescriptor.this.framesLimit);
+                    state.gTable.reset();
                 }
 
                 @Override
@@ -153,9 +175,9 @@
                          * If the group table is too large, flush the table into
                          * a run file.
                          */
-                        if (!gTable.insert(accessor, i)) {
+                        if (!state.gTable.insert(accessor, i)) {
                             flushFramesToRun();
-                            if (!gTable.insert(accessor, i))
+                            if (!state.gTable.insert(accessor, i))
                                 throw new HyracksDataException(
                                         "Failed to insert a new buffer into the aggregate operator!");
                         }
@@ -169,21 +191,17 @@
 
                 @Override
                 public void close() throws HyracksDataException {
-                    if (gTable.getFrameCount() >= 0) {
-                        if (runs.size() <= 0) {
-                            /**
-                             * All in memory
-                             */
-                            env.set(GROUPTABLES, gTable);
-                        } else {
+                    if (state.gTable.getFrameCount() >= 0) {
+                        if (state.runs.size() > 0) {
                             /**
                              * flush the memory into the run file.
                              */
                             flushFramesToRun();
-                            gTable.close();
+                            state.gTable.close();
+                            state.gTable = null;
                         }
                     }
-                    env.set(RUNS, runs);
+                    env.setTaskState(state);
                 }
 
                 private void flushFramesToRun() throws HyracksDataException {
@@ -197,15 +215,15 @@
                     RunFileWriter writer = new RunFileWriter(runFile, ctx.getIOManager());
                     writer.open();
                     try {
-                        gTable.sortFrames();
-                        gTable.flushFrames(writer, true);
+                        state.gTable.sortFrames();
+                        state.gTable.flushFrames(writer, true);
                     } catch (Exception ex) {
                         throw new HyracksDataException(ex);
                     } finally {
                         writer.close();
                     }
-                    gTable.reset();
-                    runs.add(((RunFileWriter) writer).createReader());
+                    state.gTable.reset();
+                    state.runs.add(((RunFileWriter) writer).createReader());
                 }
             };
             return op;
@@ -221,7 +239,7 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
                 throws HyracksDataException {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
@@ -252,11 +270,10 @@
                  */
                 private ByteBuffer outFrame, writerFrame;
 
-                /**
-                 * List of the run files to be merged
-                 */
                 private LinkedList<RunFileReader> runs;
 
+                private AggregateActivityState aggState;
+
                 /**
                  * how many frames to be read ahead once
                  */
@@ -270,21 +287,21 @@
                 private ArrayTupleBuilder finalTupleBuilder;
                 private FrameTupleAppender writerFrameAppender;
 
-                @SuppressWarnings("unchecked")
                 public void initialize() throws HyracksDataException {
-                    runs = (LinkedList<RunFileReader>) env.get(RUNS);
+                    aggState = (AggregateActivityState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                            AGGREGATE_ACTIVITY_ID), partition));
+                    runs = aggState.runs;
                     writer.open();
                     try {
                         if (runs.size() <= 0) {
-                            ISpillableTable gTable = (ISpillableTable) env.get(GROUPTABLES);
+                            ISpillableTable gTable = aggState.gTable;
                             if (gTable != null) {
                                 if (isOutputSorted)
                                     gTable.sortFrames();
                                 gTable.flushFrames(writer, false);
                             }
-                            env.set(GROUPTABLES, null);
                         } else {
-                            long start = System.currentTimeMillis();
+                            runs = new LinkedList<RunFileReader>(runs);
                             inFrames = new ArrayList<ByteBuffer>();
                             outFrame = ctx.allocateFrame();
                             outFrameAppender.reset(outFrame, true);
@@ -297,8 +314,6 @@
                                 }
                             }
                             inFrames.clear();
-                            long end = System.currentTimeMillis();
-                            System.out.println("merge time " + (end - start));
                         }
                     } catch (Exception e) {
                         writer.fail();
@@ -306,7 +321,6 @@
                     } finally {
                         writer.close();
                     }
-                    env.set(RUNS, null);
                 }
 
                 private void doPass(LinkedList<RunFileReader> runs) throws HyracksDataException {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index a01c80e..625ee25 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -14,27 +14,35 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.group;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String HASHTABLE = "hashtable";
+    private static final int HASH_BUILD_ACTIVITY_ID = 0;
+
+    private static final int OUTPUT_ACTIVITY_ID = 1;
 
     private static final long serialVersionUID = 1L;
 
@@ -58,10 +66,10 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, 0));
+        HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, HASH_BUILD_ACTIVITY_ID));
         builder.addActivity(ha);
 
-        OutputActivity oa = new OutputActivity(new ActivityId(odId, 1));
+        OutputActivity oa = new OutputActivity(new ActivityId(odId, OUTPUT_ACTIVITY_ID));
         builder.addActivity(oa);
 
         builder.addSourceEdge(0, ha, 0);
@@ -69,6 +77,27 @@
         builder.addBlockingEdge(ha, oa);
     }
 
+    public static class HashBuildActivityState extends AbstractTaskState {
+        private GroupingHashTable table;
+
+        public HashBuildActivityState() {
+        }
+
+        private HashBuildActivityState(JobId jobId, TaskId tId) {
+            super(jobId, tId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+
+        }
+    }
+
     private class HashBuildActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
@@ -78,15 +107,17 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
                     recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
             return new AbstractUnaryInputSinkOperatorNodePushable() {
-                private GroupingHashTable table;
+                private HashBuildActivityState state;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
+                    state = new HashBuildActivityState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
+                    state.table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
                             recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
                             tableSize);
                 }
@@ -96,13 +127,13 @@
                     accessor.reset(buffer);
                     int tupleCount = accessor.getTupleCount();
                     for (int i = 0; i < tupleCount; ++i) {
-                        table.insert(accessor, i);
+                        state.table.insert(accessor, i);
                     }
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    env.set(HASHTABLE, table);
+                    env.setTaskState(state);
                 }
 
                 @Override
@@ -121,11 +152,13 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             return new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    GroupingHashTable table = (GroupingHashTable) env.get(HASHTABLE);
+                    HashBuildActivityState buildState = (HashBuildActivityState) env.getTaskState(new TaskId(
+                            new ActivityId(getOperatorId(), HASH_BUILD_ACTIVITY_ID), partition));
+                    GroupingHashTable table = buildState.table;
                     writer.open();
                     try {
                         table.write(writer);
@@ -135,7 +168,6 @@
                     } finally {
                         writer.close();
                     }
-                    env.set(HASHTABLE, null);
                 }
             };
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index d530560..7f33bc4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -14,12 +14,16 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.join;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -31,6 +35,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -42,12 +47,14 @@
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String RELATION0 = "Rel0";
-    private static final String RELATION1 = "Rel1";
+    private static final int RPARTITION_ACTIVITY_ID = 0;
+    private static final int SPARTITION_ACTIVITY_ID = 1;
+    private static final int JOIN_ACTIVITY_ID = 2;
 
     private static final long serialVersionUID = 1L;
     private final int[] keys0;
@@ -98,9 +105,11 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        HashPartitionActivityNode rpart = new HashPartitionActivityNode(new ActivityId(odId, 0), RELATION0, keys0, 0);
-        HashPartitionActivityNode spart = new HashPartitionActivityNode(new ActivityId(odId, 1), RELATION1, keys1, 1);
-        JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, 2));
+        HashPartitionActivityNode rpart = new HashPartitionActivityNode(new ActivityId(odId, RPARTITION_ACTIVITY_ID),
+                keys0, 0);
+        HashPartitionActivityNode spart = new HashPartitionActivityNode(new ActivityId(odId, SPARTITION_ACTIVITY_ID),
+                keys1, 1);
+        JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, JOIN_ACTIVITY_ID));
 
         builder.addActivity(rpart);
         builder.addSourceEdge(0, rpart, 0);
@@ -119,26 +128,43 @@
         return memsize;
     }
 
+    public static class HashPartitionTaskState extends AbstractTaskState {
+        private RunFileWriter[] fWriters;
+
+        public HashPartitionTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+
+        }
+    }
+
     private class HashPartitionActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
-        private String partitionsKey;
         private int operatorInputIndex;
         private int keys[];
 
-        public HashPartitionActivityNode(ActivityId id, String partitionsKey, int keys[], int operatorInputIndex) {
+        public HashPartitionActivityNode(ActivityId id, int keys[], int operatorInputIndex) {
             super(id);
-            this.partitionsKey = partitionsKey;
             this.keys = keys;
             this.operatorInputIndex = operatorInputIndex;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                final IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
+                final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(),
                         recordDescProvider.getInputRecordDescriptor(getOperatorId(), operatorInputIndex));
@@ -147,9 +173,10 @@
                         hashFunctionFactories).createPartitioner();
 
                 private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+
                 private ByteBuffer[] outbufs;
-                private RunFileWriter[] fWriters;
-                private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
+
+                private HashPartitionTaskState state;
 
                 @Override
                 public void close() throws HyracksDataException {
@@ -162,23 +189,24 @@
                         closeWriter(i);
                     }
 
-                    env.set(partitionsKey, fWriters);
+                    env.setTaskState(state);
                 }
 
                 private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = fWriters[i];
+                    RunFileWriter writer = state.fWriters[i];
                     if (writer != null) {
                         writer.close();
                     }
                 }
 
                 private void write(int i, ByteBuffer head) throws HyracksDataException {
-                    RunFileWriter writer = fWriters[i];
+                    RunFileWriter writer = state.fWriters[i];
                     if (writer == null) {
-                        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(partitionsKey);
+                        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                                GraceHashJoinOperatorDescriptor.class.getSimpleName());
                         writer = new RunFileWriter(file, ctx.getIOManager());
                         writer.open();
-                        fWriters[i] = writer;
+                        state.fWriters[i] = writer;
                     }
                     writer.nextFrame(head);
                 }
@@ -208,8 +236,10 @@
 
                 @Override
                 public void open() throws HyracksDataException {
+                    state = new HashPartitionTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
                     outbufs = new ByteBuffer[numPartitions];
-                    fWriters = new RunFileWriter[numPartitions];
+                    state.fWriters = new RunFileWriter[numPartitions];
                     for (int i = 0; i < numPartitions; i++) {
                         outbufs[i] = ctx.allocateFrame();
                     }
@@ -232,7 +262,7 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                final IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
+                final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -255,8 +285,12 @@
 
                 @Override
                 public void initialize() throws HyracksDataException {
-                    buildWriters = (RunFileWriter[]) env.get(RELATION1);
-                    probeWriters = (RunFileWriter[]) env.get(RELATION0);
+                    HashPartitionTaskState rState = (HashPartitionTaskState) env.getTaskState(new TaskId(
+                            new ActivityId(getOperatorId(), RPARTITION_ACTIVITY_ID), partition));
+                    HashPartitionTaskState sState = (HashPartitionTaskState) env.getTaskState(new TaskId(
+                            new ActivityId(getOperatorId(), SPARTITION_ACTIVITY_ID), partition));
+                    buildWriters = sState.fWriters;
+                    probeWriters = rState.fWriters;
 
                     ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
                             new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
@@ -310,12 +344,6 @@
                         writer.close();
                     }
                 }
-
-                @Override
-                public void deinitialize() throws HyracksDataException {
-                    env.set(RELATION1, null);
-                    env.set(RELATION0, null);
-                }
             };
             return op;
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 720ea99..23064f0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -14,12 +14,16 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.join;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -32,6 +36,7 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -43,15 +48,14 @@
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String JOINER0 = "joiner0";
-    private static final String BUILDRELATION = "BuildRel";
-    private static final String PROBERELATION = "ProbeRel";
-    private static final String MEM_HASHTABLE = "MEMORY_HASHTABLE";
-    private static final String NUM_PARTITION = "NUMBER_B_PARTITIONS"; // B
+    private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
+    private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
+
     private final int memsize;
     private static final long serialVersionUID = 1L;
     private final int inputsize0;
@@ -117,8 +121,10 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(new ActivityId(odId, 0), BUILDRELATION);
-        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(new ActivityId(odId, 1), PROBERELATION);
+        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(new ActivityId(odId,
+                BUILD_AND_PARTITION_ACTIVITY_ID));
+        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(new ActivityId(odId,
+                PARTITION_AND_JOIN_ACTIVITY_ID));
 
         builder.addActivity(phase1);
         builder.addSourceEdge(1, phase1, 0);
@@ -131,18 +137,41 @@
         builder.addTargetEdge(0, phase2, 0);
     }
 
+    public static class BuildAndPartitionTaskState extends AbstractTaskState {
+        private RunFileWriter[] fWriters;
+        private InMemoryHashJoin joiner;
+        private int nPartitions;
+        private int memoryForHashtable;
+
+        public BuildAndPartitionTaskState() {
+        }
+
+        private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+
+        }
+
+    }
+
     private class BuildAndPartitionActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
-        private String relationName;
 
-        public BuildAndPartitionActivityNode(ActivityId id, String relationName) {
+        public BuildAndPartitionActivityNode(ActivityId id) {
             super(id);
-            this.relationName = relationName;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
+                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
@@ -157,7 +186,8 @@
             }
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private InMemoryHashJoin joiner0;
+                private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
+                        .getJobId(), new TaskId(getActivityId(), partition));
                 private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
                 private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
                         hashFunctionFactories).createPartitioner();
@@ -165,16 +195,13 @@
                 private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
                 private ByteBuffer[] bufferForPartitions;
                 private final ByteBuffer inBuffer = ctx.allocateFrame();
-                private RunFileWriter[] fWriters;
-                private int memoryForHashtable;
-                private int B;
 
                 @Override
                 public void close() throws HyracksDataException {
-                    if (memoryForHashtable != 0)
+                    if (state.memoryForHashtable != 0)
                         build(inBuffer);
 
-                    for (int i = 0; i < B; i++) {
+                    for (int i = 0; i < state.nPartitions; i++) {
                         ByteBuffer buf = bufferForPartitions[i];
                         accessorBuild.reset(buf);
                         if (accessorBuild.getTupleCount() > 0) {
@@ -183,22 +210,19 @@
                         closeWriter(i);
                     }
 
-                    env.set(relationName, fWriters);
-                    env.set(JOINER0, joiner0);
-                    env.set(NUM_PARTITION, B);
-                    env.set(MEM_HASHTABLE, memoryForHashtable);
+                    env.setTaskState(state);
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
 
-                    if (memoryForHashtable != memsize - 2) {
+                    if (state.memoryForHashtable != memsize - 2) {
                         accessorBuild.reset(buffer);
                         int tCount = accessorBuild.getTupleCount();
                         for (int i = 0; i < tCount; ++i) {
                             int entry = -1;
-                            if (memoryForHashtable == 0) {
-                                entry = hpcBuild.partition(accessorBuild, i, B);
+                            if (state.memoryForHashtable == 0) {
+                                entry = hpcBuild.partition(accessorBuild, i, state.nPartitions);
                                 boolean newBuffer = false;
                                 ByteBuffer bufBi = bufferForPartitions[entry];
                                 while (true) {
@@ -213,7 +237,7 @@
                                 }
                             } else {
                                 entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
-                                if (entry < memoryForHashtable) {
+                                if (entry < state.memoryForHashtable) {
                                     while (true) {
                                         if (!ftappender.append(accessorBuild, i)) {
                                             build(inBuffer);
@@ -224,7 +248,7 @@
                                         }
                                     }
                                 } else {
-                                    entry %= B;
+                                    entry %= state.nPartitions;
                                     boolean newBuffer = false;
                                     ByteBuffer bufBi = bufferForPartitions[entry];
                                     while (true) {
@@ -250,27 +274,27 @@
                 private void build(ByteBuffer inBuffer) throws HyracksDataException {
                     ByteBuffer copyBuffer = ctx.allocateFrame();
                     FrameUtils.copy(inBuffer, copyBuffer);
-                    joiner0.build(copyBuffer);
+                    state.joiner.build(copyBuffer);
                 }
 
                 @Override
                 public void open() throws HyracksDataException {
                     if (memsize > 1) {
                         if (memsize > inputsize0) {
-                            B = 0;
+                            state.nPartitions = 0;
                         } else {
-                            B = (int) (Math.ceil((double) (inputsize0 * factor / nPartitions - memsize)
+                            state.nPartitions = (int) (Math.ceil((double) (inputsize0 * factor / nPartitions - memsize)
                                     / (double) (memsize - 1)));
                         }
-                        if (B <= 0) {
+                        if (state.nPartitions <= 0) {
                             // becomes in-memory HJ
-                            memoryForHashtable = memsize - 2;
-                            B = 0;
+                            state.memoryForHashtable = memsize - 2;
+                            state.nPartitions = 0;
                         } else {
-                            memoryForHashtable = memsize - B - 2;
-                            if (memoryForHashtable < 0) {
-                                memoryForHashtable = 0;
-                                B = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
+                            state.memoryForHashtable = memsize - state.nPartitions - 2;
+                            if (state.memoryForHashtable < 0) {
+                                state.memoryForHashtable = 0;
+                                state.nPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
                             }
                         }
                     } else {
@@ -281,13 +305,14 @@
                             .createPartitioner();
                     ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
                             .createPartitioner();
-                    int tableSize = (int) (memoryForHashtable * recordsPerFrame * factor);
-                    joiner0 = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
-                            hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
-                                    keys0, keys1, comparators), isLeftOuter, nullWriters1);
-                    bufferForPartitions = new ByteBuffer[B];
-                    fWriters = new RunFileWriter[B];
-                    for (int i = 0; i < B; i++) {
+                    int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
+                    state.joiner = new InMemoryHashJoin(ctx, tableSize,
+                            new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
+                                    ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
+                                    comparators), isLeftOuter, nullWriters1);
+                    bufferForPartitions = new ByteBuffer[state.nPartitions];
+                    state.fWriters = new RunFileWriter[state.nPartitions];
+                    for (int i = 0; i < state.nPartitions; i++) {
                         bufferForPartitions[i] = ctx.allocateFrame();
                     }
 
@@ -299,19 +324,20 @@
                 }
 
                 private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = fWriters[i];
+                    RunFileWriter writer = state.fWriters[i];
                     if (writer != null) {
                         writer.close();
                     }
                 }
 
                 private void write(int i, ByteBuffer head) throws HyracksDataException {
-                    RunFileWriter writer = fWriters[i];
+                    RunFileWriter writer = state.fWriters[i];
                     if (writer == null) {
-                        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(relationName);
+                        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                                BuildAndPartitionActivityNode.class.getSimpleName());
                         writer = new RunFileWriter(file, ctx.getIOManager());
                         writer.open();
-                        fWriters[i] = writer;
+                        state.fWriters[i] = writer;
                     }
                     writer.nextFrame(head);
                 }
@@ -322,16 +348,14 @@
 
     private class PartitionAndJoinActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
-        private String relationName;
 
-        public PartitionAndJoinActivityNode(ActivityId id, String relationName) {
+        public PartitionAndJoinActivityNode(ActivityId id) {
             super(id);
-            this.relationName = relationName;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
+                IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
@@ -346,7 +370,7 @@
             }
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-                private InMemoryHashJoin joiner0;
+                private BuildAndPartitionTaskState state;
                 private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
                 private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
                         hashFunctionFactories);
@@ -361,19 +385,16 @@
                 private RunFileWriter[] buildWriters;
                 private RunFileWriter[] probeWriters;
                 private ByteBuffer[] bufferForPartitions;
-                private int B;
-                private int memoryForHashtable;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    joiner0 = (InMemoryHashJoin) env.get(JOINER0);
+                    state = (BuildAndPartitionTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                            BUILD_AND_PARTITION_ACTIVITY_ID), partition));
                     writer.open();
-                    buildWriters = (RunFileWriter[]) env.get(BUILDRELATION);
-                    B = (Integer) env.get(NUM_PARTITION);
-                    memoryForHashtable = (Integer) env.get(MEM_HASHTABLE);
-                    probeWriters = new RunFileWriter[B];
-                    bufferForPartitions = new ByteBuffer[B];
-                    for (int i = 0; i < B; i++) {
+                    buildWriters = state.fWriters;
+                    probeWriters = new RunFileWriter[state.nPartitions];
+                    bufferForPartitions = new ByteBuffer[state.nPartitions];
+                    for (int i = 0; i < state.nPartitions; i++) {
                         bufferForPartitions[i] = ctx.allocateFrame();
                     }
                     appender.reset(outBuffer, true);
@@ -382,14 +403,14 @@
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    if (memoryForHashtable != memsize - 2) {
+                    if (state.memoryForHashtable != memsize - 2) {
                         accessorProbe.reset(buffer);
                         int tupleCount0 = accessorProbe.getTupleCount();
                         for (int i = 0; i < tupleCount0; ++i) {
 
                             int entry = -1;
-                            if (memoryForHashtable == 0) {
-                                entry = hpcProbe.partition(accessorProbe, i, B);
+                            if (state.memoryForHashtable == 0) {
+                                entry = hpcProbe.partition(accessorProbe, i, state.nPartitions);
                                 boolean newBuffer = false;
                                 ByteBuffer outbuf = bufferForPartitions[entry];
                                 while (true) {
@@ -404,17 +425,17 @@
                                 }
                             } else {
                                 entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
-                                if (entry < memoryForHashtable) {
+                                if (entry < state.memoryForHashtable) {
                                     while (true) {
                                         if (!ftap.append(accessorProbe, i)) {
-                                            joiner0.join(inBuffer, writer);
+                                            state.joiner.join(inBuffer, writer);
                                             ftap.reset(inBuffer, true);
                                         } else
                                             break;
                                     }
 
                                 } else {
-                                    entry %= B;
+                                    entry %= state.nPartitions;
                                     boolean newBuffer = false;
                                     ByteBuffer outbuf = bufferForPartitions[entry];
                                     while (true) {
@@ -431,18 +452,20 @@
                             }
                         }
                     } else {
-                        joiner0.join(buffer, writer);
+                        state.joiner.join(buffer, writer);
                     }
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    joiner0.join(inBuffer, writer);
-                    joiner0.closeJoin(writer);
-                    ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(B, hpcf0).createPartitioner();
-                    ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(B, hpcf1).createPartitioner();
-                    if (memoryForHashtable != memsize - 2) {
-                        for (int i = 0; i < B; i++) {
+                    state.joiner.join(inBuffer, writer);
+                    state.joiner.closeJoin(writer);
+                    ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
+                            .createPartitioner();
+                    ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
+                            .createPartitioner();
+                    if (state.memoryForHashtable != memsize - 2) {
+                        for (int i = 0; i < state.nPartitions; i++) {
                             ByteBuffer buf = bufferForPartitions[i];
                             accessorProbe.reset(buf);
                             if (accessorProbe.getTupleCount() > 0) {
@@ -453,12 +476,12 @@
 
                         inBuffer.clear();
                         int tableSize = -1;
-                        if (memoryForHashtable == 0) {
-                            tableSize = (int) (B * recordsPerFrame * factor);
+                        if (state.memoryForHashtable == 0) {
+                            tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
                         } else {
                             tableSize = (int) (memsize * recordsPerFrame * factor);
                         }
-                        for (int partitionid = 0; partitionid < B; partitionid++) {
+                        for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
                             RunFileWriter buildWriter = buildWriters[partitionid];
                             RunFileWriter probeWriter = probeWriters[partitionid];
                             if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
@@ -494,11 +517,6 @@
                         }
                     }
                     writer.close();
-                    env.set(PROBERELATION, null);
-                    env.set(BUILDRELATION, null);
-                    env.set(JOINER0, null);
-                    env.set(MEM_HASHTABLE, null);
-                    env.set(NUM_PARTITION, null);
                 }
 
                 private void closeWriter(int i) throws HyracksDataException {
@@ -511,7 +529,8 @@
                 private void write(int i, ByteBuffer head) throws HyracksDataException {
                     RunFileWriter writer = probeWriters[i];
                     if (writer == null) {
-                        FileReference file = ctx.createManagedWorkspaceFile(relationName);
+                        FileReference file = ctx.createManagedWorkspaceFile(PartitionAndJoinActivityNode.class
+                                .getSimpleName());
                         writer = new RunFileWriter(file, ctx.getIOManager());
                         writer.open();
                         probeWriters[i] = writer;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 5a28d1c..4a13e16 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -14,12 +14,16 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.join;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -30,6 +34,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -37,11 +42,13 @@
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String JOINER = "joiner";
+    private static final int BUILD_ACTIVITY_ID = 0;
+    private static final int PROBE_ACTIVITY_ID = 1;
 
     private static final long serialVersionUID = 1L;
     private final int[] keys0;
@@ -97,6 +104,27 @@
         builder.addBlockingEdge(hba, hpa);
     }
 
+    public static class HashBuildTaskState extends AbstractTaskState {
+        private InMemoryHashJoin joiner;
+
+        public HashBuildTaskState() {
+        }
+
+        private HashBuildTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+
+        }
+    }
+
     private class HashBuildActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
@@ -106,9 +134,11 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
-            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            final RecordDescriptor rd0 = recordDescProvider
+                    .getInputRecordDescriptor(getOperatorId(), BUILD_ACTIVITY_ID);
+            final RecordDescriptor rd1 = recordDescProvider
+                    .getInputRecordDescriptor(getOperatorId(), PROBE_ACTIVITY_ID);
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -121,7 +151,7 @@
             }
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private InMemoryHashJoin joiner;
+                private HashBuildTaskState state;
 
                 @Override
                 public void open() throws HyracksDataException {
@@ -129,21 +159,24 @@
                             .createPartitioner();
                     ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
                             .createPartitioner();
-                    joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
-                            hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
-                                    keys0, keys1, comparators), isLeftOuter, nullWriters1);
+                    state = new HashBuildTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
+                    state.joiner = new InMemoryHashJoin(ctx, tableSize,
+                            new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
+                                    ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
+                                    comparators), isLeftOuter, nullWriters1);
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                     ByteBuffer copyBuffer = ctx.allocateFrame();
                     FrameUtils.copy(buffer, copyBuffer);
-                    joiner.build(copyBuffer);
+                    state.joiner.build(copyBuffer);
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    env.set(JOINER, joiner);
+                    env.setTaskState(state);
                 }
 
                 @Override
@@ -163,26 +196,26 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-                private InMemoryHashJoin joiner;
+                private HashBuildTaskState state;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    joiner = (InMemoryHashJoin) env.get(JOINER);
+                    state = (HashBuildTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                            BUILD_ACTIVITY_ID), partition));
                     writer.open();
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    joiner.join(buffer, writer);
+                    state.joiner.join(buffer, writer);
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    joiner.closeJoin(writer);
+                    state.joiner.closeJoin(writer);
                     writer.close();
-                    env.set(JOINER, null);
                 }
 
                 @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 6cabbe6..de1b0ea 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -15,28 +15,35 @@
 
 package edu.uci.ics.hyracks.dataflow.std.join;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String JOINER = "joiner";
+    private static final int JOIN_CACHE_ACTIVITY_ID = 0;
+    private static final int NL_JOIN_ACTIVITY_ID = 1;
 
     private static final long serialVersionUID = 1L;
     private final ITuplePairComparatorFactory comparatorFactory;
@@ -52,8 +59,9 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        JoinCacheActivityNode jc = new JoinCacheActivityNode(new ActivityId(getOperatorId(), 0));
-        NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode(new ActivityId(getOperatorId(), 1));
+        JoinCacheActivityNode jc = new JoinCacheActivityNode(new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID));
+        NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode(new ActivityId(getOperatorId(),
+                NL_JOIN_ACTIVITY_ID));
 
         builder.addActivity(jc);
         builder.addSourceEdge(1, jc, 0);
@@ -65,6 +73,27 @@
         builder.addBlockingEdge(jc, nlj);
     }
 
+    public static class JoinCacheTaskState extends AbstractTaskState {
+        private NestedLoopJoin joiner;
+
+        public JoinCacheTaskState() {
+        }
+
+        private JoinCacheTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+
+        }
+    }
+
     private class JoinCacheActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
@@ -74,17 +103,19 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
             final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator();
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private NestedLoopJoin joiner;
+                private JoinCacheTaskState state;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+                    state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
+                    state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                             new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize);
                 }
 
@@ -93,13 +124,13 @@
                     ByteBuffer copyBuffer = ctx.allocateFrame();
                     FrameUtils.copy(buffer, copyBuffer);
                     FrameUtils.makeReadable(copyBuffer);
-                    joiner.cache(copyBuffer);
+                    state.joiner.cache(copyBuffer);
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    joiner.closeCache();
-                    env.set(JOINER, joiner);
+                    state.joiner.closeCache();
+                    env.setTaskState(state);
                 }
 
                 @Override
@@ -119,27 +150,27 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
-                private NestedLoopJoin joiner;
+                private JoinCacheTaskState state;
 
                 @Override
                 public void open() throws HyracksDataException {
-                    joiner = (NestedLoopJoin) env.get(JOINER);
+                    state = (JoinCacheTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                            JOIN_CACHE_ACTIVITY_ID), partition));
                     writer.open();
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    joiner.join(buffer, writer);
+                    state.joiner.join(buffer, writer);
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    joiner.closeJoin(writer);
+                    state.joiner.closeJoin(writer);
                     writer.close();
-                    env.set(JOINER, null);
                 }
 
                 @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 448cd8a..32bfc09 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -14,28 +14,36 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.misc;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final long serialVersionUID = 1L;
-    protected static final String MATERIALIZED_FILE = "materialized-file";
+
+    private final static int MATERIALIZER_ACTIVITY_ID = 0;
+    private final static int READER_ACTIVITY_ID = 1;
 
     public MaterializingOperatorDescriptor(JobSpecification spec, RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
@@ -44,8 +52,8 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, 0));
-        ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, 1));
+        MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
+        ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, READER_ACTIVITY_ID));
 
         builder.addActivity(ma);
         builder.addSourceEdge(0, ma, 0);
@@ -56,6 +64,27 @@
         builder.addBlockingEdge(ma, ra);
     }
 
+    public static class MaterializerTaskState extends AbstractTaskState {
+        private RunFileWriter out;
+
+        public MaterializerTaskState() {
+        }
+
+        private MaterializerTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+
+        }
+    }
+
     private final class MaterializerActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
@@ -65,27 +94,29 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             return new AbstractUnaryInputSinkOperatorNodePushable() {
-                private RunFileWriter out;
+                private MaterializerTaskState state;
 
                 @Override
                 public void open() throws HyracksDataException {
+                    state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
                     FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
                             MaterializingOperatorDescriptor.class.getSimpleName());
-                    out = new RunFileWriter(file, ctx.getIOManager());
-                    out.open();
+                    state.out = new RunFileWriter(file, ctx.getIOManager());
+                    state.out.open();
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    out.nextFrame(buffer);
+                    state.out.nextFrame(buffer);
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    out.close();
-                    env.set(MATERIALIZED_FILE, out);
+                    state.out.close();
+                    env.setTaskState(state);
                 }
 
                 @Override
@@ -104,13 +135,14 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             return new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
                     ByteBuffer frame = ctx.allocateFrame();
-                    RunFileWriter out = (RunFileWriter) env.get(MATERIALIZED_FILE);
-                    RunFileReader in = out.createReader();
+                    MaterializerTaskState state = (MaterializerTaskState) env.getTaskState(new TaskId(new ActivityId(
+                            getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
+                    RunFileReader in = state.out.createReader();
                     writer.open();
                     try {
                         in.open();
@@ -130,7 +162,6 @@
 
                 @Override
                 public void deinitialize() throws HyracksDataException {
-                    env.set(MATERIALIZED_FILE, null);
                 }
             };
         }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index a06bb45..351c8a6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -14,25 +14,52 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.misc;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.ArrayList;
-import java.util.List;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.dataflow.std.util.DeserializedOperatorNodePushable;
 
 public class SplitVectorOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final int COLLECT_ACTIVITY_ID = 0;
+    private static final int SPLIT_ACTIVITY_ID = 1;
+
+    public static class CollectTaskState extends AbstractTaskState {
+        private ArrayList<Object[]> buffer;
+
+        public CollectTaskState() {
+        }
+
+        private CollectTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+        }
+    }
+
     private class CollectActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
@@ -46,10 +73,10 @@
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
-                private ArrayList<Object[]> buffer;
+                private CollectTaskState state;
 
                 @Override
                 public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
@@ -58,18 +85,19 @@
 
                 @Override
                 public void open() throws HyracksDataException {
-                    buffer = new ArrayList<Object[]>();
-                    env.set(BUFFER, buffer);
+                    state = new CollectTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
+                    state.buffer = new ArrayList<Object[]>();
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-
+                    env.setTaskState(state);
                 }
 
                 @Override
                 public void writeData(Object[] data) throws HyracksDataException {
-                    buffer.add(data);
+                    state.buffer.add(data);
                 }
 
                 @Override
@@ -91,10 +119,12 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
                 private IOpenableDataWriter<Object[]> writer;
 
+                private CollectTaskState state;
+
                 @Override
                 public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
                     if (index != 0) {
@@ -105,6 +135,8 @@
 
                 @Override
                 public void open() throws HyracksDataException {
+                    state = (CollectTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                            COLLECT_ACTIVITY_ID), partition));
                 }
 
                 @Override
@@ -113,12 +145,11 @@
 
                 @Override
                 public void writeData(Object[] data) throws HyracksDataException {
-                    List<Object[]> buffer = (List<Object[]>) env.get(BUFFER);
-                    int n = buffer.size();
+                    int n = state.buffer.size();
                     int step = (int) Math.floor(n / (float) splits);
                     writer.open();
                     for (int i = 0; i < splits; ++i) {
-                        writer.writeData(buffer.get(step * (i + 1) - 1));
+                        writer.writeData(state.buffer.get(step * (i + 1) - 1));
                     }
                     writer.close();
                 }
@@ -133,8 +164,6 @@
         }
     }
 
-    private static final String BUFFER = "buffer";
-
     private static final long serialVersionUID = 1L;
 
     private final int splits;
@@ -147,8 +176,8 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        CollectActivity ca = new CollectActivity(new ActivityId(odId, 0));
-        SplitActivity sa = new SplitActivity(new ActivityId(odId, 1));
+        CollectActivity ca = new CollectActivity(new ActivityId(odId, COLLECT_ACTIVITY_ID));
+        SplitActivity sa = new SplitActivity(new ActivityId(odId, SPLIT_ACTIVITY_ID));
 
         builder.addActivity(ca);
         builder.addSourceEdge(0, ca, 0);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 55659a4..52b292b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -14,6 +14,9 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
@@ -22,6 +25,7 @@
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
@@ -29,17 +33,20 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class ExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String FRAMESORTER = "framesorter";
-    private static final String RUNS = "runs";
-
     private static final long serialVersionUID = 1L;
+
+    private static final int SORT_ACTIVITY_ID = 0;
+    private static final int MERGE_ACTIVITY_ID = 1;
+
     private final int[] sortFields;
     private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
     private final IBinaryComparatorFactory[] comparatorFactories;
@@ -66,8 +73,8 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        SortActivity sa = new SortActivity(new ActivityId(odId, 0));
-        MergeActivity ma = new MergeActivity(new ActivityId(odId, 1));
+        SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
+        MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
 
         builder.addActivity(sa);
         builder.addSourceEdge(0, sa, 0);
@@ -78,6 +85,28 @@
         builder.addBlockingEdge(sa, ma);
     }
 
+    public static class SortTaskState extends AbstractTaskState {
+        private List<IFrameReader> runs;
+        private FrameSorter frameSorter;
+
+        public SortTaskState() {
+        }
+
+        private SortTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+
+        }
+    }
+
     private class SortActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
@@ -87,12 +116,14 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            final ExternalSortRunGenerator runGen = new ExternalSortRunGenerator(ctx, sortFields,
-                    firstKeyNormalizerFactory, comparatorFactories, recordDescriptors[0], framesLimit);
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+                private ExternalSortRunGenerator runGen;
+
                 @Override
                 public void open() throws HyracksDataException {
+                    runGen = new ExternalSortRunGenerator(ctx, sortFields, firstKeyNormalizerFactory,
+                            comparatorFactories, recordDescriptors[0], framesLimit);
                     runGen.open();
                 }
 
@@ -103,9 +134,12 @@
 
                 @Override
                 public void close() throws HyracksDataException {
+                    SortTaskState state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(
+                            getActivityId(), partition));
                     runGen.close();
-                    env.set(FRAMESORTER, runGen.getFrameSorter());
-                    env.set(RUNS, runGen.getRuns());
+                    state.runs = runGen.getRuns();
+                    state.frameSorter = runGen.getFrameSorter();
+                    env.setTaskState(state);
                 }
 
                 @Override
@@ -126,12 +160,14 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
-                    List<IFrameReader> runs = (List<IFrameReader>) env.get(RUNS);
-                    FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
+                    SortTaskState state = (SortTaskState) env.getTaskState(new TaskId(new ActivityId(getOperatorId(),
+                            SORT_ACTIVITY_ID), partition));
+                    List<IFrameReader> runs = state.runs;
+                    FrameSorter frameSorter = state.frameSorter;
                     IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
                     for (int i = 0; i < comparatorFactories.length; ++i) {
                         comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -139,8 +175,6 @@
                     ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
                             comparators, recordDescriptors[0], framesLimit, writer);
                     merger.process();
-                    env.set(FRAMESORTER, null);
-                    env.set(RUNS, null);
                 }
             };
             return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 75929ce..eafe738 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -14,28 +14,36 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractTaskState;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String FRAMESORTER = "framesorter";
-
     private static final long serialVersionUID = 1L;
+
+    private static final int SORT_ACTIVITY_ID = 0;
+    private static final int MERGE_ACTIVITY_ID = 1;
+
     private final int[] sortFields;
     private INormalizedKeyComputerFactory firstKeyNormalizerFactory;
     private IBinaryComparatorFactory[] comparatorFactories;
@@ -57,8 +65,8 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        SortActivity sa = new SortActivity(new ActivityId(odId, 0));
-        MergeActivity ma = new MergeActivity(new ActivityId(odId, 1));
+        SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
+        MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
 
         builder.addActivity(sa);
         builder.addSourceEdge(0, sa, 0);
@@ -69,6 +77,25 @@
         builder.addBlockingEdge(sa, ma);
     }
 
+    public static class SortTaskState extends AbstractTaskState {
+        private FrameSorter frameSorter;
+
+        public SortTaskState() {
+        }
+
+        private SortTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+        }
+    }
+
     private class SortActivity extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
 
@@ -78,24 +105,27 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            final FrameSorter frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory,
-                    comparatorFactories, recordDescriptors[0]);
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+                private SortTaskState state;
+
                 @Override
                 public void open() throws HyracksDataException {
-                    frameSorter.reset();
+                    state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
+                    state.frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory,
+                            comparatorFactories, recordDescriptors[0]);
+                    state.frameSorter.reset();
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    frameSorter.insertFrame(buffer);
+                    state.frameSorter.insertFrame(buffer);
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    frameSorter.sortFrames();
-                    env.set(FRAMESORTER, frameSorter);
+                    state.frameSorter.sortFrames();
+                    env.setTaskState(state);
                 }
 
                 @Override
@@ -115,21 +145,21 @@
 
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
-                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 @Override
                 public void initialize() throws HyracksDataException {
                     writer.open();
                     try {
-                        FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
-                        frameSorter.flushFrames(writer);
+                        SortTaskState state = (SortTaskState) env.getTaskState(new TaskId(new ActivityId(
+                                getOperatorId(), SORT_ACTIVITY_ID), partition));
+                        state.frameSorter.flushFrames(writer);
                     } catch (Exception e) {
                         writer.fail();
                         throw new HyracksDataException(e);
                     } finally {
                         writer.close();
                     }
-                    env.set(FRAMESORTER, null);
                 }
             };
             return op;