Merge branch 'fullstack_lsm_staging' into madhusudancs/parallelize_result_distribution.
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
index 2f53f9b..2f7c0ed 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
@@ -21,9 +21,11 @@
     // TODO type safe list of expressions
     private List<Mutable<ILogicalExpression>> mergeExpressions;
     private LogicalVariable partitioningVariable;
+    private boolean global;
 
     public AggregateOperator(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) {
         super(variables, expressions);
+        global = false;
     }
 
     @Override
@@ -77,6 +79,14 @@
         return partitioningVariable;
     }
 
+    public void setGlobal() {
+        global = true;
+    }
+
+    public boolean isGlobal() {
+        return global;
+    }
+
     @Override
     public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
         IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 81dc2c2..5d490e9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -70,9 +71,14 @@
         AggregateOperator aggOp = (AggregateOperator) op;
         if (aggOp.getExecutionMode() == ExecutionMode.PARTITIONED && aggOp.getPartitioningVariable() != null) {
             StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
-            Set<LogicalVariable> partitioningVariables = new ListSet<LogicalVariable>();
-            partitioningVariables.add(aggOp.getPartitioningVariable());
-            pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(partitioningVariables, null), null);
+            if (aggOp.isGlobal()) {
+                pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
+            } else {
+                Set<LogicalVariable> partitioningVariables = new ListSet<LogicalVariable>();
+                partitioningVariables.add(aggOp.getPartitioningVariable());
+                pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(partitioningVariables, null),
+                        null);
+            }
             return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
         } else {
             return emptyUnaryRequirements();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 302d4d2..9a96319 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -100,10 +100,9 @@
                 context, columns);
 
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
-                resultOp.getDataSink(), columns, pf, inputDesc, false, spec);
+                resultOp.getDataSink(), columns, pf, inputDesc, true, spec);
 
         builder.contributeHyracksOperator(resultOp, runtimeAndConstraints.first);
-        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
         ILogicalOperator src = resultOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, resultOp, 0);
     }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/ResultSetDomain.java
similarity index 68%
rename from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java
rename to algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/ResultSetDomain.java
index 8f5ed64..ca15346 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/ResultSetDomain.java
@@ -3,19 +3,25 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- *
+ * 
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.api.dataset;
+package edu.uci.ics.hyracks.algebricks.core.algebra.properties;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+public class ResultSetDomain implements INodeDomain {
+    @Override
+    public boolean sameAs(INodeDomain domain) {
+        return true;
+    }
 
-public interface IDatasetPartitionReader {
-    public void writeTo(IFrameWriter writer);
+    @Override
+    public Integer cardinality() {
+        return 0;
+    }
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
index 08271c1..30ab542 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
@@ -120,6 +120,7 @@
                 context.computeAndSetTypeEnvironmentForOperator(trueAssignOp);
                 initAgg.setPartitioningVariable(trueVar);
                 initAgg.getInputs().get(0).setValue(trueAssignOp);
+                initAgg.setGlobal();
             }
             return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg));
         } else {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index ae38c7f..0c5a5bb 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -31,6 +31,8 @@
 
     public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
 
+    public void abortReader(JobId jobId);
+
     public IWorkspaceFileFactory getFileFactory();
 
     public void close();
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java
deleted file mode 100644
index 42dc157..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IDatasetPartitionWriter extends IFrameWriter {
-    public Page returnPage() throws HyracksDataException;
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
index d49d5cd..f29356b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
@@ -14,7 +14,7 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IHyracksDatasetDirectoryServiceConnection {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
index ba21a84..3fe4ada 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
@@ -14,7 +14,7 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IHyracksDatasetDirectoryServiceInterface {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
index b928a49..c397a94 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
@@ -16,7 +16,7 @@
 
 import java.nio.ByteBuffer;
 
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public interface IHyracksDatasetReader {
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 095fd7d..c882448 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -17,7 +17,7 @@
 import java.net.InetSocketAddress;
 
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index 47cdf97..9c3b918 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -16,7 +16,7 @@
 
 import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 6419983..08ad0d7 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -26,7 +26,7 @@
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
@@ -118,7 +118,7 @@
 
         while (readSize <= 0 && !(isLastPartitionReadComplete())) {
             synchronized (lastMonitor) {
-                while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached()) {
+                while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached() && !lastMonitor.failed()) {
                     try {
                         lastMonitor.wait();
                     } catch (InterruptedException e) {
@@ -127,6 +127,9 @@
                 }
             }
 
+            if (lastMonitor.failed()) {
+                throw new HyracksDataException("Job Failed.");
+            }
             if (isPartitionReadComplete(lastMonitor)) {
                 knownRecords[lastReadPartition].readEOS();
                 if ((lastReadPartition == knownRecords.length - 1)) {
@@ -135,12 +138,8 @@
                     try {
                         lastReadPartition++;
                         while (knownRecords[lastReadPartition] == null) {
-                            try {
-                                knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId,
-                                        resultSetId, knownRecords);
-                            } catch (Exception e) {
-                                // Do nothing here.
-                            }
+                            knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId,
+                                    resultSetId, knownRecords);
                         }
 
                         resultChannel = new DatasetNetworkInputChannel(netManager,
@@ -149,9 +148,7 @@
                         lastMonitor = getMonitor(lastReadPartition);
                         resultChannel.open(datasetClientCtx);
                         resultChannel.registerMonitor(lastMonitor);
-                    } catch (HyracksException e) {
-                        throw new HyracksDataException(e);
-                    } catch (UnknownHostException e) {
+                    } catch (Exception e) {
                         throw new HyracksDataException(e);
                     }
                 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index e648733..21b05d4 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -99,7 +99,7 @@
         records[partition].writeEOS();
 
         for (DatasetDirectoryRecord record : records) {
-            if (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS) {
+            if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) {
                 successCount++;
             }
         }
@@ -112,14 +112,18 @@
     @Override
     public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
         DatasetJobRecord djr = jobResultLocations.get(jobId);
-        djr.fail();
+        if (djr != null) {
+            djr.fail();
+        }
         notifyAll();
     }
 
     @Override
     public synchronized void reportJobFailure(JobId jobId) {
         DatasetJobRecord djr = jobResultLocations.get(jobId);
-        djr.fail();
+        if (djr != null) {
+            djr.fail();
+        }
         notifyAll();
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
index cecd677..4e27f12 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -21,12 +21,13 @@
 import java.util.Map;
 import java.util.Set;
 
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
 import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
 
 public class DatasetMemoryManager {
+    private int availableMemory;
+
     private final Set<Page> availPages;
 
     private final LeastRecentlyUsedList leastRecentlyUsedList;
@@ -36,29 +37,32 @@
     private final static int FRAME_SIZE = 32768;
 
     public DatasetMemoryManager(int availableMemory) {
+        this.availableMemory = availableMemory;
+
         availPages = new HashSet<Page>();
 
         // Atleast have one page for temporarily storing the results.
-        if (availableMemory <= 0)
-            availableMemory = FRAME_SIZE;
-
-        while (availableMemory >= FRAME_SIZE) {
-            /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame()
-             * instead of direct ByteBuffer.allocate()?
-             */
-            availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
-            availableMemory -= FRAME_SIZE;
-        }
+        if (this.availableMemory <= FRAME_SIZE)
+            this.availableMemory = FRAME_SIZE;
 
         leastRecentlyUsedList = new LeastRecentlyUsedList();
         resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>();
     }
 
-    public Page requestPage(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter dpw)
-            throws OutOfMemoryError, HyracksDataException {
+    public synchronized Page requestPage(ResultSetPartitionId resultSetPartitionId, ResultState resultState)
+            throws HyracksDataException {
         Page page;
         if (availPages.isEmpty()) {
-            page = evictPage();
+            if (availableMemory >= FRAME_SIZE) {
+                /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame()
+                 * instead of direct ByteBuffer.allocate()?
+                 */
+                availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
+                availableMemory -= FRAME_SIZE;
+                page = getAvailablePage();
+            } else {
+                page = evictPage();
+            }
         } else {
             page = getAvailablePage();
         }
@@ -71,7 +75,7 @@
          * update reference call before a page is pushed on to the element of the LRU list. So we first obtain the page,
          * then make a updateReference call which in turn creates a new node in the LRU list and then add the page to it.
          */
-        PartitionNode pn = updateReference(resultSetPartitionId, dpw);
+        PartitionNode pn = updateReference(resultSetPartitionId, resultState);
         pn.add(page);
         return page;
     }
@@ -81,7 +85,7 @@
         updateReference(resultSetPartitionId, null);
     }
 
-    public int getPageSize() {
+    public static int getPageSize() {
         return FRAME_SIZE;
     }
 
@@ -90,28 +94,29 @@
         resultPartitionNodesMap.put(resultSetPartitionId, pn);
     }
 
-    protected synchronized PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId,
-            IDatasetPartitionWriter dpw) {
+    protected PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId, ResultState resultState) {
         PartitionNode pn = null;
 
         if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
-            if (dpw != null) {
-                pn = new PartitionNode(resultSetPartitionId, dpw);
+            if (resultState != null) {
+                pn = new PartitionNode(resultSetPartitionId, resultState);
                 insertPartitionNode(resultSetPartitionId, pn);
             }
             return pn;
         }
-        pn = resultPartitionNodesMap.get(resultSetPartitionId);
-        leastRecentlyUsedList.remove(pn);
-        insertPartitionNode(resultSetPartitionId, pn);
+        synchronized (this) {
+            pn = resultPartitionNodesMap.get(resultSetPartitionId);
+            leastRecentlyUsedList.remove(pn);
+            insertPartitionNode(resultSetPartitionId, pn);
+        }
 
         return pn;
     }
 
-    protected synchronized Page evictPage() throws HyracksDataException {
+    protected Page evictPage() throws HyracksDataException {
         PartitionNode pn = leastRecentlyUsedList.getFirst();
-        IDatasetPartitionWriter dpw = pn.getDatasetPartitionWriter();
-        Page page = dpw.returnPage();
+        ResultState resultState = pn.getResultState();
+        Page page = resultState.returnPage();
 
         /* If the partition holding the pages breaks the contract by not returning the page or it has no page, just take
          * away all the pages allocated to it and add to the available pages set.
@@ -140,7 +145,7 @@
         return page;
     }
 
-    protected synchronized Page getAvailablePage() {
+    protected Page getAvailablePage() {
         Iterator<Page> iter = availPages.iterator();
         Page page = iter.next();
         iter.remove();
@@ -197,15 +202,15 @@
 
         private final ResultSetPartitionId resultSetPartitionId;
 
-        private final IDatasetPartitionWriter datasetPartitionWriter;
+        private final ResultState resultState;
 
         private PartitionNode prev;
 
         private PartitionNode next;
 
-        public PartitionNode(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter datasetPartitionWriter) {
+        public PartitionNode(ResultSetPartitionId resultSetPartitionId, ResultState resultState) {
             this.resultSetPartitionId = resultSetPartitionId;
-            this.datasetPartitionWriter = datasetPartitionWriter;
+            this.resultState = resultState;
             prev = null;
             next = null;
         }
@@ -214,8 +219,8 @@
             return resultSetPartitionId;
         }
 
-        public IDatasetPartitionWriter getDatasetPartitionWriter() {
-            return datasetPartitionWriter;
+        public ResultState getResultState() {
+            return resultState;
         }
 
         public void setPrev(PartitionNode node) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index af9a607..ef2902e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -17,11 +17,11 @@
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
@@ -32,6 +32,8 @@
 import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
 public class DatasetPartitionManager implements IDatasetPartitionManager {
+    private static final Logger LOGGER = Logger.getLogger(DatasetPartitionManager.class.getName());
+
     private final NodeControllerService ncs;
 
     private final Executor executor;
@@ -50,18 +52,29 @@
         this.executor = executor;
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
-        datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+        if (availableMemory >= DatasetMemoryManager.getPageSize()) {
+            datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+        } else {
+            datasetMemoryManager = null;
+        }
         partitionResultStateMap = new LinkedHashMap<JobId, ResultState[]>() {
             private static final long serialVersionUID = 1L;
 
             protected boolean removeEldestEntry(Map.Entry<JobId, ResultState[]> eldest) {
-                if (size() > resultHistorySize) {
-                    for (ResultState state : eldest.getValue()) {
-                        state.deinit();
+                synchronized (DatasetPartitionManager.this) {
+                    if (size() > resultHistorySize) {
+                        ResultState[] resultStates = eldest.getValue();
+                        for (int i = 0; i < resultStates.length; i++) {
+                            ResultState state = resultStates[i];
+                            if (state != null) {
+                                state.closeAndDelete();
+                                LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
+                            }
+                        }
+                        return true;
                     }
-                    return true;
+                    return false;
                 }
-                return false;
             }
         };
     }
@@ -72,10 +85,10 @@
         DatasetPartitionWriter dpw = null;
         JobId jobId = ctx.getJobletContext().getJobId();
         try {
-            synchronized (partitionResultStateMap) {
+            synchronized (this) {
                 ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
                         nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
-                dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
+                dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager, fileFactory);
 
                 ResultState[] resultStates = partitionResultStateMap.get(jobId);
                 if (resultStates == null) {
@@ -88,12 +101,15 @@
             throw new HyracksException(e);
         }
 
+        LOGGER.fine("Initialized partition writer: JobId: " + jobId + ":partition: " + partition);
         return dpw;
     }
 
     @Override
     public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
         try {
+            LOGGER.fine("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId
+                    + ":partition: " + partition);
             ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
         } catch (Exception e) {
             throw new HyracksException(e);
@@ -103,6 +119,8 @@
     @Override
     public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
         try {
+            LOGGER.info("Reporting partition failure: JobId: " + jobId + ": ResultSetId: " + rsId + ":partition: "
+                    + partition);
             ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
         } catch (Exception e) {
             throw new HyracksException(e);
@@ -113,7 +131,7 @@
     public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
             throws HyracksException {
         ResultState resultState;
-        synchronized (partitionResultStateMap) {
+        synchronized (this) {
             ResultState[] resultStates = partitionResultStateMap.get(jobId);
 
             if (resultStates == null) {
@@ -126,8 +144,23 @@
             }
         }
 
-        IDatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
+        DatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
         dpr.writeTo(writer);
+        LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":partition: " + partition);
+    }
+
+    @Override
+    public synchronized void abortReader(JobId jobId) {
+        ResultState[] resultStates = partitionResultStateMap.get(jobId);
+
+        if (resultStates == null) {
+            return;
+        }
+        for (ResultState state : resultStates) {
+            if (state != null) {
+                state.abort();
+            }
+        }
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
index a584b4b..07624de 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -20,14 +20,10 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
-import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
 
-public class DatasetPartitionReader implements IDatasetPartitionReader {
+public class DatasetPartitionReader {
     private static final Logger LOGGER = Logger.getLogger(DatasetPartitionReader.class.getName());
 
     private final DatasetMemoryManager datasetMemoryManager;
@@ -36,51 +32,12 @@
 
     private final ResultState resultState;
 
-    private IFileHandle fileHandle;
-
     public DatasetPartitionReader(DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
         this.datasetMemoryManager = datasetMemoryManager;
         this.executor = executor;
         this.resultState = resultState;
     }
 
-    private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
-        long readSize = 0;
-        synchronized (resultState) {
-            while (offset >= resultState.getSize() && !resultState.getEOS()) {
-                try {
-                    resultState.wait();
-                } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-        }
-
-        if (offset >= resultState.getSize() && resultState.getEOS()) {
-            return readSize;
-        }
-
-        if (offset < resultState.getPersistentSize()) {
-            readSize = resultState.getIOManager().syncRead(fileHandle, offset, buffer);
-        }
-
-        if (readSize < buffer.capacity()) {
-            long localPageOffset = offset - resultState.getPersistentSize();
-            int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
-            int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
-            Page page = resultState.getPage(localPageIndex);
-            if (page == null) {
-            	return readSize;
-            }
-            readSize += buffer.remaining();
-            buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
-        }
-
-        datasetMemoryManager.pageReferenced(resultState.getResultSetPartitionId());
-        return readSize;
-    }
-
-    @Override
     public void writeTo(final IFrameWriter writer) {
         executor.execute(new Runnable() {
             @Override
@@ -88,8 +45,7 @@
                 NetworkOutputChannel channel = (NetworkOutputChannel) writer;
                 channel.setFrameSize(resultState.getFrameSize());
                 try {
-                    fileHandle = resultState.getIOManager().open(resultState.getValidFileReference(),
-                            IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    resultState.readOpen();
                     channel.open();
                     try {
                         long offset = 0;
@@ -109,10 +65,8 @@
                         }
                     } finally {
                         channel.close();
-                        resultState.getIOManager().close(fileHandle);
+                        resultState.readClose();
                     }
-                } catch (InterruptedException e) {
-                    throw new RuntimeException(e);
                 } catch (HyracksDataException e) {
                     throw new RuntimeException(e);
                 }
@@ -120,6 +74,14 @@
                     LOGGER.info("result reading successful(" + resultState.getResultSetPartitionId() + ")");
                 }
             }
+
+            private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+                if (datasetMemoryManager == null) {
+                    return resultState.read(offset, buffer);
+                } else {
+                    return resultState.read(datasetMemoryManager, offset, buffer);
+                }
+            }
         });
     }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 317f553..8f4b639 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -18,24 +18,19 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
-import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
 
-public class DatasetPartitionWriter implements IDatasetPartitionWriter {
+public class DatasetPartitionWriter implements IFrameWriter {
     private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
 
-    private static final String FILE_PREFIX = "result_";
-
     private final IDatasetPartitionManager manager;
 
     private final JobId jobId;
@@ -50,10 +45,9 @@
 
     private final ResultState resultState;
 
-    private IFileHandle fileHandle;
-
     public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
-            ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager) {
+            ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager,
+            IWorkspaceFileFactory fileFactory) {
         this.manager = manager;
         this.jobId = jobId;
         this.resultSetId = rsId;
@@ -61,7 +55,7 @@
         this.datasetMemoryManager = datasetMemoryManager;
 
         resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
-        resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), ctx.getFrameSize());
+        resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), fileFactory, ctx.getFrameSize());
     }
 
     public ResultState getResultState() {
@@ -69,41 +63,27 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    public void open() {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("open(" + partition + ")");
         }
-        String fName = FILE_PREFIX + String.valueOf(partition);
-        FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
-        fileHandle = resultState.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
-                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-        resultState.init(fRef, fileHandle);
+        resultState.open();
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        int srcOffset = 0;
-        Page destPage = resultState.getLastPage();
-
-        while (srcOffset < buffer.limit()) {
-            if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
-                destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
-                resultState.addPage(destPage);
-            }
-            int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
-            destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
-            srcOffset += srcLength;
-            resultState.incrementSize(srcLength);
-        }
-
-        synchronized (resultState) {
-            resultState.notifyAll();
+        if (datasetMemoryManager == null) {
+            resultState.write(buffer);
+        } else {
+            resultState.write(datasetMemoryManager, buffer);
         }
     }
 
     @Override
     public void fail() throws HyracksDataException {
         try {
+            resultState.closeAndDelete();
+            resultState.abort();
             manager.reportPartitionFailure(jobId, resultSetId, partition);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
@@ -117,32 +97,10 @@
         }
 
         try {
-            synchronized (resultState) {
-                resultState.setEOS(true);
-                resultState.notifyAll();
-            }
+            resultState.close();
             manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
         }
     }
-
-    @Override
-    public Page returnPage() throws HyracksDataException {
-        Page page = resultState.removePage(0);
-
-        IIOManager ioManager = resultState.getIOManager();
-
-        // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
-        if (page == null) {
-            ioManager.close(fileHandle);
-            return null;
-        }
-
-        page.getBuffer().flip();
-
-        long delta = ioManager.syncWrite(fileHandle, resultState.getPersistentSize(), page.getBuffer());
-        resultState.incrementPersistentSize(delta);
-        return page;
-    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 661df93..911f372 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -17,28 +17,35 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
 import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
 
 public class ResultState implements IStateObject {
+    private static final String FILE_PREFIX = "result_";
+
     private final ResultSetPartitionId resultSetPartitionId;
 
     private final int frameSize;
 
     private final IIOManager ioManager;
 
+    private final IWorkspaceFileFactory fileFactory;
+
     private final AtomicBoolean eos;
 
-    private final AtomicBoolean readEOS;
+    private final AtomicBoolean failed;
 
     private final List<Page> localPageList;
 
@@ -46,29 +53,40 @@
 
     private IFileHandle writeFileHandle;
 
+    private IFileHandle readFileHandle;
+
     private long size;
 
     private long persistentSize;
 
-    ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, int frameSize) {
+    ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, IWorkspaceFileFactory fileFactory,
+            int frameSize) {
         this.resultSetPartitionId = resultSetPartitionId;
         this.ioManager = ioManager;
+        this.fileFactory = fileFactory;
         this.frameSize = frameSize;
         eos = new AtomicBoolean(false);
-        readEOS = new AtomicBoolean(false);
+        failed = new AtomicBoolean(false);
         localPageList = new ArrayList<Page>();
+
+        fileRef = null;
+        writeFileHandle = null;
     }
 
-    public synchronized void init(FileReference fileRef, IFileHandle writeFileHandle) {
-        this.fileRef = fileRef;
-        this.writeFileHandle = writeFileHandle;
-
+    public synchronized void open() {
         size = 0;
         persistentSize = 0;
+    }
+
+    public synchronized void close() {
+        eos.set(true);
         notifyAll();
     }
 
-    public synchronized void deinit() {
+    public synchronized void closeAndDelete() {
+        // Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs
+        // to be taken when there are more requests to these result states.
+        failed.set(true);
         if (writeFileHandle != null) {
             try {
                 ioManager.close(writeFileHandle);
@@ -76,7 +94,149 @@
                 // Since file handle could not be closed, just ignore.
             }
         }
-        fileRef.delete();
+        if (fileRef != null) {
+            fileRef.delete();
+        }
+    }
+
+    public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
+        if (fileRef == null) {
+            String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+            writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        }
+
+        size += ioManager.syncWrite(writeFileHandle, size, buffer);
+
+        notifyAll();
+    }
+
+    public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer)
+            throws HyracksDataException {
+        int srcOffset = 0;
+        Page destPage = null;
+
+        if (!localPageList.isEmpty()) {
+            destPage = localPageList.get(localPageList.size() - 1);
+        }
+
+        while (srcOffset < buffer.limit()) {
+            if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
+                destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+                localPageList.add(destPage);
+            }
+            int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
+            destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+            srcOffset += srcLength;
+            size += srcLength;
+        }
+
+        notifyAll();
+    }
+
+    public synchronized void readOpen() {
+        // It is a noOp for now, leaving here to keep the API stable for future usage.
+    }
+
+    public synchronized void readClose() throws HyracksDataException {
+        if (readFileHandle != null) {
+            ioManager.close(readFileHandle);
+        }
+    }
+
+    public synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+        long readSize = 0;
+
+        while (offset >= size && !eos.get() && !failed.get()) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        if ((offset >= size && eos.get()) || failed.get()) {
+            return readSize;
+        }
+
+        if (readFileHandle == null) {
+            initReadFileHandle();
+        }
+        readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+
+        return readSize;
+    }
+
+    public long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
+            throws HyracksDataException {
+        long readSize = 0;
+        synchronized (this) {
+            while (offset >= size && !eos.get() && !failed.get()) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            if ((offset >= size && eos.get()) || failed.get()) {
+                return readSize;
+            }
+
+            if (offset < persistentSize) {
+                if (readFileHandle == null) {
+                    initReadFileHandle();
+                }
+                readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+            }
+
+            if (readSize < buffer.capacity()) {
+                long localPageOffset = offset - persistentSize;
+                int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize());
+                int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize());
+                Page page = getPage(localPageIndex);
+                if (page == null) {
+                    return readSize;
+                }
+                readSize += buffer.remaining();
+                buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
+            }
+        }
+        datasetMemoryManager.pageReferenced(resultSetPartitionId);
+        return readSize;
+    }
+
+    public synchronized void abort() {
+        failed.set(true);
+        notifyAll();
+    }
+
+    public synchronized Page returnPage() throws HyracksDataException {
+        Page page = removePage();
+
+        // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
+        if (page == null) {
+            ioManager.close(writeFileHandle);
+            return null;
+        }
+
+        page.getBuffer().flip();
+
+        if (fileRef == null) {
+            String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+            writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+            notifyAll();
+        }
+
+        long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer());
+        persistentSize += delta;
+        return page;
+    }
+
+    public synchronized void setEOS(boolean eos) {
+        this.eos.set(eos);
     }
 
     public ResultSetPartitionId getResultSetPartitionId() {
@@ -91,76 +251,6 @@
         return ioManager;
     }
 
-    public synchronized void incrementSize(long delta) {
-        size += delta;
-    }
-
-    public synchronized long getSize() {
-        return size;
-    }
-
-    public synchronized void incrementPersistentSize(long delta) {
-        persistentSize += delta;
-    }
-
-    public synchronized long getPersistentSize() {
-        return persistentSize;
-    }
-
-    public void setEOS(boolean eos) {
-        this.eos.set(eos);
-    }
-
-    public boolean getEOS() {
-        return eos.get();
-    }
-
-    public boolean getReadEOS() {
-        return readEOS.get();
-    }
-
-    public synchronized void addPage(Page page) {
-        localPageList.add(page);
-    }
-
-    public synchronized Page removePage(int index) {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.remove(index);
-        }
-        return page;
-    }
-
-    public synchronized Page getPage(int index) {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.get(index);
-        }
-        return page;
-    }
-
-    public synchronized Page getLastPage() {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.get(localPageList.size() - 1);
-        }
-        return page;
-    }
-
-    public synchronized Page getFirstPage() {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.get(0);
-        }
-        return page;
-    }
-
-    public synchronized FileReference getValidFileReference() throws InterruptedException {
-        while (fileRef == null)
-            wait();
-        return fileRef;
-    }
-
     @Override
     public JobId getJobId() {
         return resultSetPartitionId.getJobId();
@@ -185,4 +275,36 @@
     public void fromBytes(DataInput in) throws IOException {
         throw new UnsupportedOperationException();
     }
+
+    private Page getPage(int index) {
+        Page page = null;
+        if (!localPageList.isEmpty()) {
+            page = localPageList.get(index);
+        }
+        return page;
+    }
+
+    private Page removePage() {
+        Page page = null;
+        if (!localPageList.isEmpty()) {
+            page = localPageList.remove(localPageList.size() - 1);
+        }
+        return page;
+    }
+
+    private void initReadFileHandle() throws HyracksDataException {
+        while (fileRef == null && !failed.get()) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        if (failed.get()) {
+            return;
+        }
+
+        readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
index 8f8c032..a078f50 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -46,6 +46,7 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
         }
+        ncs.getDatasetPartitionManager().abortReader(jobId);
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji != null) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
index 3957934..8b9d15a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.nc.work;
 
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.Task;
@@ -32,8 +33,9 @@
     @Override
     public void run() {
         try {
-            ncs.getClusterController().notifyTaskFailure(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
-                    ncs.getId(), details);
+            JobId jobId = task.getJobletContext().getJobId();
+            ncs.getDatasetPartitionManager().abortReader(jobId);
+            ncs.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), ncs.getId(), details);
         } catch (Exception e) {
             e.printStackTrace();
         }
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
index 07f6ba2..c403501 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
@@ -32,6 +32,9 @@
     }
 
     public void reset(ByteBuffer buffer, boolean clear) {
+        if (clear) {
+            buffer.clear();
+        }
         frameTupleAppender.reset(buffer, clear);
     }