Merge branch 'master' of https://code.google.com/p/hyracks
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
index 478d64e..bff23bc 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
@@ -17,7 +17,7 @@
 import java.util.HashMap;
 import java.util.List;
 
-public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> {
+public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> implements IDatasetStateRecord {
     public enum Status {
         RUNNING,
         SUCCESS,
@@ -26,11 +26,14 @@
 
     private static final long serialVersionUID = 1L;
 
+    private final long timestamp;
+
     private Status status;
 
     private List<Exception> exceptions;
 
     public DatasetJobRecord() {
+        this.timestamp = System.currentTimeMillis();
         this.status = Status.RUNNING;
     }
 
@@ -51,6 +54,10 @@
         this.exceptions = exceptions;
     }
 
+    public long getTimestamp() {
+        return timestamp;
+    }
+
     public Status getStatus() {
         return status;
     }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
similarity index 68%
copy from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
copy to hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
index aad5b55..b6cd1af 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
@@ -14,20 +14,12 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
-import java.nio.ByteBuffer;
+import java.util.Map;
 
-public class Page {
-    private final ByteBuffer buffer;
+import edu.uci.ics.hyracks.api.job.JobId;
 
-    public Page(ByteBuffer buffer) {
-        this.buffer = buffer;
-    }
+public interface IDatasetManager {
+    public Map<JobId, IDatasetStateRecord> getStateMap();
 
-    public ByteBuffer getBuffer() {
-        return buffer;
-    }
-
-    public ByteBuffer clear() {
-        return (ByteBuffer) buffer.clear();
-    }
+    public void deinitState(JobId jobId);
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index e8cceb9..32910ac 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -20,9 +20,9 @@
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 
-public interface IDatasetPartitionManager {
+public interface IDatasetPartitionManager extends IDatasetManager {
     public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
-            int partition, int nPartitions) throws HyracksException;
+            boolean asyncMode, int partition, int nPartitions) throws HyracksException;
 
     public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition)
             throws HyracksException;
@@ -32,6 +32,8 @@
     public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
             throws HyracksException;
 
+    public void removePartition(JobId jobId, ResultSetId resultSetId, int partition);
+
     public void abortReader(JobId jobId);
 
     public IWorkspaceFileFactory getFileFactory();
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetStateRecord.java
similarity index 68%
copy from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
copy to hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetStateRecord.java
index aad5b55..1a400e3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetStateRecord.java
@@ -14,20 +14,6 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
-import java.nio.ByteBuffer;
-
-public class Page {
-    private final ByteBuffer buffer;
-
-    public Page(ByteBuffer buffer) {
-        this.buffer = buffer;
-    }
-
-    public ByteBuffer getBuffer() {
-        return buffer;
-    }
-
-    public ByteBuffer clear() {
-        return (ByteBuffer) buffer.clear();
-    }
+public interface IDatasetStateRecord {
+    public long getTimestamp();
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index e80c263..0463350 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -40,7 +40,6 @@
 import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -48,6 +47,7 @@
 import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.dataset.DatasetDirectoryService;
+import edu.uci.ics.hyracks.control.cc.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
@@ -188,7 +188,7 @@
             }
         };
         sweeper = new DeadNodeSweeper();
-        datasetDirectoryService = new DatasetDirectoryService(ccConfig.jobHistorySize);
+        datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
         jobCounter = 0;
 
         deploymentRunMap = new HashMap<DeploymentId, DeploymentRun>();
@@ -220,6 +220,8 @@
         timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
         jobLog.open();
         startApplication();
+
+        datasetDirectoryService.init(executor);
         LOGGER.log(Level.INFO, "Started ClusterControllerService");
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 0db1c7a..ee1cc67 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -18,18 +18,20 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.dataset.ResultSetMetaData;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.dataset.ResultStateSweeper;
 
 /**
  * TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs
@@ -39,22 +41,27 @@
  * job.
  */
 public class DatasetDirectoryService implements IDatasetDirectoryService {
-    private final Map<JobId, DatasetJobRecord> jobResultLocations;
+    private final long resultTTL;
 
-    public DatasetDirectoryService(final int jobHistorySize) {
-        jobResultLocations = new LinkedHashMap<JobId, DatasetJobRecord>() {
-            private static final long serialVersionUID = 1L;
+    private final long resultSweepThreshold;
 
-            protected boolean removeEldestEntry(Map.Entry<JobId, DatasetJobRecord> eldest) {
-                return size() > jobHistorySize;
-            }
-        };
+    private final Map<JobId, IDatasetStateRecord> jobResultLocations;
+
+    public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
+        this.resultTTL = resultTTL;
+        this.resultSweepThreshold = resultSweepThreshold;
+        jobResultLocations = new LinkedHashMap<JobId, IDatasetStateRecord>();
+    }
+
+    @Override
+    public void init(ExecutorService executor) {
+        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
     }
 
     @Override
     public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
             throws HyracksException {
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
         if (djr == null) {
             djr = new DatasetJobRecord();
             jobResultLocations.put(jobId, djr);
@@ -74,7 +81,7 @@
     @Override
     public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
             int partition, int nPartitions, NetworkAddress networkAddress) {
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
 
         ResultSetMetaData resultSetMetaData = djr.get(rsId);
         if (resultSetMetaData == null) {
@@ -95,7 +102,7 @@
     public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
         int successCount = 0;
 
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
         ResultSetMetaData resultSetMetaData = djr.get(rsId);
         DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
         records[partition].writeEOS();
@@ -113,7 +120,7 @@
 
     @Override
     public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
         if (djr != null) {
             djr.fail();
         }
@@ -122,7 +129,7 @@
 
     @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
         if (djr != null) {
             djr.fail(exceptions);
         }
@@ -132,7 +139,7 @@
     @Override
     public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
         DatasetJobRecord djr;
-        while ((djr = jobResultLocations.get(jobId)) == null) {
+        while ((djr = (DatasetJobRecord) jobResultLocations.get(jobId)) == null) {
             try {
                 wait();
             } catch (InterruptedException e) {
@@ -144,6 +151,16 @@
     }
 
     @Override
+    public Map<JobId, IDatasetStateRecord> getStateMap() {
+        return jobResultLocations;
+    }
+
+    @Override
+    public void deinitState(JobId jobId) {
+        jobResultLocations.remove(jobResultLocations.get(jobId));
+    }
+
+    @Override
     public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
             DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
         DatasetDirectoryRecord[] newRecords;
@@ -191,7 +208,7 @@
      */
     private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
             throws HyracksDataException {
-        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
 
         if (djr == null) {
             throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist");
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
similarity index 83%
rename from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
rename to hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index ea5da5e..3de3c50 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -12,17 +12,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.api.dataset;
+package edu.uci.ics.hyracks.control.cc.dataset;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.IDatasetManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
 import edu.uci.ics.hyracks.api.job.JobId;
 
-public interface IDatasetDirectoryService extends IJobLifecycleListener {
+public interface IDatasetDirectoryService extends IJobLifecycleListener, IDatasetManager {
+    public void init(ExecutorService executor);
+
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
             int nPartitions, NetworkAddress networkAddress);
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
index f1f1dc4..fd6b690 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
@@ -15,11 +15,11 @@
 package edu.uci.ics.hyracks.control.cc.work;
 
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index c4acd47..c7244fb 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -52,6 +52,12 @@
     @Option(name = "-job-history-size", usage = "Limits the number of historical jobs remembered by the system to the specified value. (default: 10)")
     public int jobHistorySize = 10;
 
+    @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should be retained by the system in milliseconds. (default: 24 hours)")
+    public long resultTTL = 86400000;
+
+    @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup should be invoked in milliseconds. (default: 1 minute)")
+    public long resultSweepThreshold = 60000;
+
     @Option(name = "-cc-root", usage = "Sets the root folder used for file operations. (default: ClusterControllerService)")
     public String ccRoot = "ClusterControllerService";
 
@@ -86,6 +92,10 @@
         cList.add(String.valueOf(defaultMaxJobAttempts));
         cList.add("-job-history-size");
         cList.add(String.valueOf(jobHistorySize));
+        cList.add("-result-time-to-live");
+        cList.add(String.valueOf(resultTTL));
+        cList.add("-result-sweep-threshold");
+        cList.add(String.valueOf(resultSweepThreshold));
         cList.add("-cc-root");
         cList.add(ccRoot);
         if (clusterTopologyDefinition != null) {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 32d8bf0..43129f5 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -23,100 +23,104 @@
 import org.kohsuke.args4j.spi.StopOptionHandler;
 
 public class NCConfig implements Serializable {
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	@Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
-	public String ccHost;
+    @Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
+    public String ccHost;
 
-	@Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
-	public int ccPort = 1099;
+    @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
+    public int ccPort = 1099;
 
-	@Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
-	public String clusterNetIPAddress;
+    @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
+    public String clusterNetIPAddress;
 
-	@Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
-	public String nodeId;
+    @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
+    public String nodeId;
 
-	@Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
-	public String dataIPAddress;
+    @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
+    public String dataIPAddress;
 
-	@Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
-	public String datasetIPAddress;
+    @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
+    public String datasetIPAddress;
 
-	@Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
-	public String ioDevices = System.getProperty("java.io.tmpdir");
+    @Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
+    public String ioDevices = System.getProperty("java.io.tmpdir");
 
-	@Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
-	public int nNetThreads = 1;
+    @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
+    public int nNetThreads = 1;
 
-	@Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
-	public int maxMemory = -1;
+    @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
+    public int maxMemory = -1;
 
-	@Option(name = "-result-history-size", usage = "Limits the number of jobs whose results should be remembered by the system to the specified value. (default: 10)")
-	public int resultHistorySize = 100;
+    @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should be retained by the system in milliseconds. (default: 24 hours)")
+    public long resultTTL = 86400000;
 
-	@Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
-	public int resultManagerMemory = -1;
+    @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup should be invoked in milliseconds. (default: 1 minute)")
+    public long resultSweepThreshold = 60000;
 
-	@Option(name = "-app-nc-main-class", usage = "Application NC Main Class")
-	public String appNCMainClass;
+    @Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
+    public int resultManagerMemory = -1;
 
-	@Argument
-	@Option(name = "--", handler = StopOptionHandler.class)
-	public List<String> appArgs;
+    @Option(name = "-app-nc-main-class", usage = "Application NC Main Class")
+    public String appNCMainClass;
 
-	public void toCommandLine(List<String> cList) {
-		cList.add("-cc-host");
-		cList.add(ccHost);
-		cList.add("-cc-port");
-		cList.add(String.valueOf(ccPort));
-		cList.add("-cluster-net-ip-address");
-		cList.add(clusterNetIPAddress);
-		cList.add("-node-id");
-		cList.add(nodeId);
-		cList.add("-data-ip-address");
-		cList.add(dataIPAddress);
-		cList.add(datasetIPAddress);
-		cList.add("-iodevices");
-		cList.add(ioDevices);
-		cList.add("-net-thread-count");
-		cList.add(String.valueOf(nNetThreads));
-		cList.add("-max-memory");
-		cList.add(String.valueOf(maxMemory));
-		cList.add("-result-history-size");
-		cList.add(String.valueOf(resultHistorySize));
-		cList.add("-result-manager-memory");
-		cList.add(String.valueOf(resultManagerMemory));
+    @Argument
+    @Option(name = "--", handler = StopOptionHandler.class)
+    public List<String> appArgs;
 
-		if (appNCMainClass != null) {
-			cList.add("-app-nc-main-class");
-			cList.add(appNCMainClass);
-		}
-		if (appArgs != null && !appArgs.isEmpty()) {
-			cList.add("--");
-			for (String appArg : appArgs) {
-				cList.add(appArg);
-			}
-		}
-	}
+    public void toCommandLine(List<String> cList) {
+        cList.add("-cc-host");
+        cList.add(ccHost);
+        cList.add("-cc-port");
+        cList.add(String.valueOf(ccPort));
+        cList.add("-cluster-net-ip-address");
+        cList.add(clusterNetIPAddress);
+        cList.add("-node-id");
+        cList.add(nodeId);
+        cList.add("-data-ip-address");
+        cList.add(dataIPAddress);
+        cList.add(datasetIPAddress);
+        cList.add("-iodevices");
+        cList.add(ioDevices);
+        cList.add("-net-thread-count");
+        cList.add(String.valueOf(nNetThreads));
+        cList.add("-max-memory");
+        cList.add(String.valueOf(maxMemory));
+        cList.add("-result-time-to-live");
+        cList.add(String.valueOf(resultTTL));
+        cList.add("-result-sweep-threshold");
+        cList.add(String.valueOf(resultSweepThreshold));
+        cList.add("-result-manager-memory");
+        cList.add(String.valueOf(resultManagerMemory));
 
-	public void toMap(Map<String, String> configuration) {
-		configuration.put("cc-host", ccHost);
-		configuration.put("cc-port", (String.valueOf(ccPort)));
-		configuration.put("cluster-net-ip-address", clusterNetIPAddress);
-		configuration.put("node-id", nodeId);
-		configuration.put("data-ip-address", dataIPAddress);
-		configuration.put("iodevices", ioDevices);
-		configuration.put("net-thread-count", String.valueOf(nNetThreads));
-		configuration.put("max-memory", String.valueOf(maxMemory));
-		configuration.put("result-history-size",
-				String.valueOf(resultHistorySize));
-		configuration.put("result-manager-memory",
-				String.valueOf(resultManagerMemory));
+        if (appNCMainClass != null) {
+            cList.add("-app-nc-main-class");
+            cList.add(appNCMainClass);
+        }
+        if (appArgs != null && !appArgs.isEmpty()) {
+            cList.add("--");
+            for (String appArg : appArgs) {
+                cList.add(appArg);
+            }
+        }
+    }
 
-		if (appNCMainClass != null) {
-			configuration.put("app-nc-main-class", appNCMainClass);
-		}
+    public void toMap(Map<String, String> configuration) {
+        configuration.put("cc-host", ccHost);
+        configuration.put("cc-port", (String.valueOf(ccPort)));
+        configuration.put("cluster-net-ip-address", clusterNetIPAddress);
+        configuration.put("node-id", nodeId);
+        configuration.put("data-ip-address", dataIPAddress);
+        configuration.put("iodevices", ioDevices);
+        configuration.put("net-thread-count", String.valueOf(nNetThreads));
+        configuration.put("max-memory", String.valueOf(maxMemory));
+        configuration.put("result-time-to-live", String.valueOf(resultTTL));
+        configuration.put("result-sweep-threshold", String.valueOf(resultSweepThreshold));
+        configuration.put("result-manager-memory", String.valueOf(resultManagerMemory));
 
-	}
+        if (appNCMainClass != null) {
+            configuration.put("app-nc-main-class", appNCMainClass);
+        }
+
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
new file mode 100644
index 0000000..69b560c
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.control.common.dataset;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataset.IDatasetManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+/**
+ * Sweeper to clean up the stale result distribution files and result states.
+ */
+public class ResultStateSweeper implements Runnable {
+    private static final Logger LOGGER = Logger.getLogger(ResultStateSweeper.class.getName());
+
+    private final IDatasetManager datasetManager;
+
+    private final long resultTTL;
+
+    private final long resultSweepThreshold;
+
+    private final List<JobId> toBeCollected;
+
+    public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) {
+        this.datasetManager = datasetManager;
+        this.resultTTL = resultTTL;
+        this.resultSweepThreshold = resultSweepThreshold;
+        toBeCollected = new ArrayList<JobId>();
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                Thread.sleep(resultSweepThreshold);
+                sweep();
+            } catch (InterruptedException e) {
+                LOGGER.severe("Result cleaner thread interrupted, but we continue running it.");
+                // There isn't much we can do really here
+            }
+        }
+
+    }
+
+    private void sweep() {
+        synchronized (datasetManager) {
+            toBeCollected.clear();
+            for (Map.Entry<JobId, IDatasetStateRecord> entry : datasetManager.getStateMap().entrySet()) {
+                if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
+                    toBeCollected.add(entry.getKey());
+                }
+            }
+            for (JobId jobId : toBeCollected) {
+                datasetManager.deinitState(jobId);
+            }
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Result state cleanup instance successfully completed.");
+        }
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 37b64a7..60859e3 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -217,7 +217,7 @@
     private void init() throws Exception {
         ctx.getIOManager().setExecutor(executor);
         datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
-                ncConfig.resultHistorySize);
+                ncConfig.resultTTL, ncConfig.resultSweepThreshold);
         datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
                 datasetPartitionManager, ncConfig.nNetThreads);
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
index e8ef169..d9f7302 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -21,7 +21,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index fe48c17..325e2bc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -24,10 +24,12 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.dataset.ResultStateSweeper;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
@@ -40,7 +42,7 @@
 
     private final Executor executor;
 
-    private final Map<JobId, Map<ResultSetId, ResultState[]>> partitionResultStateMap;
+    private final Map<JobId, IDatasetStateRecord> partitionResultStateMap;
 
     private final DefaultDeallocatableRegistry deallocatableRegistry;
 
@@ -48,8 +50,8 @@
 
     private final DatasetMemoryManager datasetMemoryManager;
 
-    public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory,
-            final int resultHistorySize) {
+    public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
+            long resultSweepThreshold) {
         this.ncs = ncs;
         this.executor = executor;
         deallocatableRegistry = new DefaultDeallocatableRegistry();
@@ -59,35 +61,25 @@
         } else {
             datasetMemoryManager = null;
         }
-        partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
-            private static final long serialVersionUID = 1L;
-
-            protected boolean removeEldestEntry(Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
-                synchronized (DatasetPartitionManager.this) {
-                    if (size() > resultHistorySize) {
-                        deinitState(eldest);
-                        return true;
-                    }
-                    return false;
-                }
-            }
-        };
+        partitionResultStateMap = new LinkedHashMap<JobId, IDatasetStateRecord>();
+        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
     }
 
     @Override
     public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
-            int partition, int nPartitions) throws HyracksException {
+            boolean asyncMode, int partition, int nPartitions) throws HyracksException {
         DatasetPartitionWriter dpw = null;
         JobId jobId = ctx.getJobletContext().getJobId();
         try {
             synchronized (this) {
                 ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
                         nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
-                dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager, fileFactory);
+                dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, partition, datasetMemoryManager,
+                        fileFactory);
 
-                Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+                ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
                 if (rsIdMap == null) {
-                    rsIdMap = new HashMap<ResultSetId, ResultState[]>();
+                    rsIdMap = new ResultSetMap();
                     partitionResultStateMap.put(jobId, rsIdMap);
                 }
 
@@ -133,7 +125,7 @@
             IFrameWriter writer) throws HyracksException {
         ResultState resultState;
         synchronized (this) {
-            Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+            ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
 
             if (rsIdMap == null) {
                 throw new HyracksException("Unknown JobId " + jobId);
@@ -150,15 +142,44 @@
             }
         }
 
-        DatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
+        DatasetPartitionReader dpr = new DatasetPartitionReader(this, datasetMemoryManager, executor, resultState);
         dpr.writeTo(writer);
         LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":ResultSetId: " + resultSetId + ":partition: "
                 + partition);
     }
 
     @Override
+    public synchronized void removePartition(JobId jobId, ResultSetId resultSetId, int partition) {
+        ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
+        if (rsIdMap != null) {
+            ResultState[] resultStates = rsIdMap.get(resultSetId);
+            if (resultStates != null) {
+                ResultState state = resultStates[partition];
+                if (state != null) {
+                    state.closeAndDelete();
+                    LOGGER.fine("Removing partition: " + partition + " for JobId: " + jobId);
+                }
+                resultStates[partition] = null;
+                boolean stateEmpty = true;
+                for (int i = 0; i < resultStates.length; i++) {
+                    if (resultStates[i] != null) {
+                        stateEmpty = false;
+                        break;
+                    }
+                }
+                if (stateEmpty) {
+                    rsIdMap.remove(resultSetId);
+                }
+            }
+            if (rsIdMap.isEmpty()) {
+                partitionResultStateMap.remove(jobId);
+            }
+        }
+    }
+
+    @Override
     public synchronized void abortReader(JobId jobId) {
-        Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+        ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
 
         if (rsIdMap == null) {
             return;
@@ -183,14 +204,25 @@
 
     @Override
     public synchronized void close() {
-        for (Entry<JobId, Map<ResultSetId, ResultState[]>> entry : partitionResultStateMap.entrySet()) {
-            deinitState(entry);
+        for (Entry<JobId, IDatasetStateRecord> entry : partitionResultStateMap.entrySet()) {
+            deinit(entry.getKey());
         }
         deallocatableRegistry.close();
     }
 
-    public void deinitState(Entry<JobId, Map<ResultSetId, ResultState[]>> entry) {
-        Map<ResultSetId, ResultState[]> rsIdMap = entry.getValue();
+    @Override
+    public Map<JobId, IDatasetStateRecord> getStateMap() {
+        return partitionResultStateMap;
+    }
+
+    @Override
+    public void deinitState(JobId jobId) {
+        deinit(jobId);
+        partitionResultStateMap.remove(jobId);
+    }
+
+    private void deinit(JobId jobId) {
+        ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
         if (rsIdMap != null) {
             for (ResultSetId rsId : rsIdMap.keySet()) {
                 ResultState[] resultStates = rsIdMap.get(rsId);
@@ -199,11 +231,26 @@
                         ResultState state = resultStates[i];
                         if (state != null) {
                             state.closeAndDelete();
-                            LOGGER.fine("Removing partition: " + i + " for JobId: " + entry.getKey());
+                            LOGGER.fine("Removing partition: " + i + " for JobId: " + jobId);
                         }
                     }
                 }
             }
         }
     }
+
+    private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> implements IDatasetStateRecord {
+        private static final long serialVersionUID = 1L;
+
+        long timestamp;
+
+        public ResultSetMap() {
+            super();
+            timestamp = System.currentTimeMillis();
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
index 9cab183..6d6fc83 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -26,13 +26,17 @@
 public class DatasetPartitionReader {
     private static final Logger LOGGER = Logger.getLogger(DatasetPartitionReader.class.getName());
 
+    private final DatasetPartitionManager datasetPartitionManager;
+
     private final DatasetMemoryManager datasetMemoryManager;
 
     private final Executor executor;
 
     private final ResultState resultState;
 
-    public DatasetPartitionReader(DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
+    public DatasetPartitionReader(DatasetPartitionManager datasetPartitionManager,
+            DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
+        this.datasetPartitionManager = datasetPartitionManager;
         this.datasetMemoryManager = datasetMemoryManager;
         this.executor = executor;
         this.resultState = resultState;
@@ -66,6 +70,12 @@
                     } finally {
                         channel.close();
                         resultState.readClose();
+                        // If the query is a synchronous query, remove its partition as soon as it is read.
+                        if (!resultState.getAsyncMode()) {
+                            datasetPartitionManager.removePartition(resultState.getResultSetPartitionId().getJobId(),
+                                    resultState.getResultSetPartitionId().getResultSetId(), resultState
+                                            .getResultSetPartitionId().getPartition());
+                        }
                     }
                 } catch (HyracksDataException e) {
                     throw new RuntimeException(e);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 58f0806..d61da67 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -37,6 +37,8 @@
 
     private final ResultSetId resultSetId;
 
+    private final boolean asyncMode;
+
     private final int partition;
 
     private final DatasetMemoryManager datasetMemoryManager;
@@ -46,16 +48,18 @@
     private final ResultState resultState;
 
     public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
-            ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager,
+            ResultSetId rsId, boolean asyncMode, int partition, DatasetMemoryManager datasetMemoryManager,
             IWorkspaceFileFactory fileFactory) {
         this.manager = manager;
         this.jobId = jobId;
         this.resultSetId = rsId;
+        this.asyncMode = asyncMode;
         this.partition = partition;
         this.datasetMemoryManager = datasetMemoryManager;
 
         resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
-        resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), fileFactory, ctx.getFrameSize());
+        resultState = new ResultState(resultSetPartitionId, asyncMode, ctx.getIOManager(), fileFactory,
+                ctx.getFrameSize());
     }
 
     public ResultState getResultState() {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/Page.java
similarity index 94%
rename from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
rename to hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/Page.java
index aad5b55..4387e2b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/Page.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.api.dataset;
+package edu.uci.ics.hyracks.control.nc.dataset;
 
 import java.nio.ByteBuffer;
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 277e241..c24034d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -23,7 +23,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
-import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IFileHandle;
@@ -37,6 +36,8 @@
 
     private final ResultSetPartitionId resultSetPartitionId;
 
+    private final boolean asyncMode;
+
     private final int frameSize;
 
     private final IIOManager ioManager;
@@ -59,9 +60,10 @@
 
     private long persistentSize;
 
-    ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, IWorkspaceFileFactory fileFactory,
-            int frameSize) {
+    ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode, IIOManager ioManager,
+            IWorkspaceFileFactory fileFactory, int frameSize) {
         this.resultSetPartitionId = resultSetPartitionId;
+        this.asyncMode = asyncMode;
         this.ioManager = ioManager;
         this.fileFactory = fileFactory;
         this.frameSize = frameSize;
@@ -251,6 +253,10 @@
         return ioManager;
     }
 
+    public boolean getAsyncMode() {
+        return asyncMode;
+    }
+
     @Override
     public JobId getJobId() {
         return resultSetPartitionId.getJobId();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index f1bd693..7330590 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -42,13 +42,16 @@
 
     private final boolean ordered;
 
+    private final boolean asyncMode;
+
     private final IResultSerializerFactory resultSerializerFactory;
 
     public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
-            IResultSerializerFactory resultSerializerFactory) throws IOException {
+            boolean asyncMode, IResultSerializerFactory resultSerializerFactory) throws IOException {
         super(spec, 1, 0);
         this.rsId = rsId;
         this.ordered = ordered;
+        this.asyncMode = asyncMode;
         this.resultSerializerFactory = resultSerializerFactory;
     }
 
@@ -75,7 +78,7 @@
             @Override
             public void open() throws HyracksDataException {
                 try {
-                    datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, partition,
+                    datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
                             nPartitions);
                     datasetPartitionWriter.open();
                     resultSerializer.init();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
index 07b57c8..0a5f2c5 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AggregationTest.java
@@ -94,7 +94,7 @@
             throws IOException {
 
         ResultSetId rsId = new ResultSetId(1);
-        AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+        AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         spec.addResultSetId(rsId);
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index cee8a0f..906a750 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -97,7 +97,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         spec.addResultSetId(rsId);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
@@ -169,7 +169,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         spec.addResultSetId(rsId);
 
@@ -242,7 +242,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, group2, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         spec.addResultSetId(rsId);
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
index 7c8048d..42c823a 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -205,7 +205,7 @@
             throws IOException {
 
         ResultSetId rsId = new ResultSetId(1);
-        AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+        AbstractSingleActivityOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         spec.addResultSetId(rsId);
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index 845cd50..380ba29 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -80,7 +80,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -135,7 +135,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
index 9c93874..c42dc75 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
@@ -66,7 +66,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, NC2_ID, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         spec.addResultSetId(rsId);
 
@@ -102,7 +102,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         spec.addResultSetId(rsId);
 
@@ -141,7 +141,7 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         spec.addResultSetId(rsId);
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index 16179da..48db176 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -78,7 +78,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -125,7 +125,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
index 6494b28..8446b83 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
@@ -90,7 +90,7 @@
             ResultSetId rsId = new ResultSetId(i);
             spec.addResultSetId(rsId);
 
-            outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true,
+            outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
                     ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
             PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
         }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 2e8997d..734c3da 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -128,7 +128,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -211,7 +211,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -294,7 +294,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -378,7 +378,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -466,7 +466,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -554,7 +554,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -635,7 +635,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -726,7 +726,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -817,7 +817,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -904,7 +904,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -997,7 +997,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
index ba4404f..daab141 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -176,7 +176,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -253,7 +253,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -330,7 +330,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
@@ -412,7 +412,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
index b338601..ef31508 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
@@ -70,7 +70,7 @@
         ResultSetId rsId = new ResultSetId(1);
         spec.addResultSetId(rsId);
 
-        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false,
+        IOperatorDescriptor printer = new ResultWriterOperatorDescriptor(spec, rsId, false, false,
                 ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);