reduced communication during result distribution
- when reporting the location of results, the NCs also report if the
  result partition is empty
- the client does not try to read empty partitions
better toString() for subclasses of AbstractWork

Change-Id: Ia39f657e689ea305d49d55bd27c9a512e1ff970f
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/39
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java
index e141d06..019d018 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetDirectoryRecord.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,6 +34,8 @@
 
     private Status status;
 
+    private boolean empty;
+
     public DatasetDirectoryRecord() {
         this.address = null;
         this.readEOS = false;
@@ -48,6 +50,14 @@
         return address;
     }
 
+    public void setEmpty(boolean empty) {
+        this.empty = empty;
+    }
+
+    public boolean getEmpty() {
+        return empty;
+    }
+
     public void readEOS() {
         this.readEOS = true;
     }
@@ -82,4 +92,9 @@
         }
         return address.equals(((DatasetDirectoryRecord) o).address);
     }
+
+    @Override
+    public String toString() {
+        return address.toString() + " " + status + (empty ? " (empty)" : "") + (readEOS ? " (EOS)" : "");
+    }
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
index b6cd1af..60366d3 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetManager.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -14,12 +14,15 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
-import java.util.Map;
+import java.util.Set;
 
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IDatasetManager {
-    public Map<JobId, IDatasetStateRecord> getStateMap();
+
+    public Set<JobId> getJobIds();
+
+    public IDatasetStateRecord getState(JobId jobId);
 
     public void deinitState(JobId jobId);
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index 32910ac..4b7d27a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -24,6 +24,9 @@
     public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
             boolean asyncMode, int partition, int nPartitions) throws HyracksException;
 
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
+            boolean orderedResult, boolean emptyResult) throws HyracksException;
+
     public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition)
             throws HyracksException;
 
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 51e4950..187358c 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -89,64 +89,54 @@
         return status;
     }
 
+    private DatasetDirectoryRecord getRecord(int partition) throws Exception {
+        while (knownRecords == null || knownRecords[partition] == null) {
+            knownRecords = datasetDirectoryServiceConnection
+                    .getDatasetResultLocations(jobId, resultSetId, knownRecords);
+        }
+        return knownRecords[partition];
+    }
+
+    private boolean nextPartition() throws HyracksDataException {
+        ++lastReadPartition;
+        try {
+            DatasetDirectoryRecord record = getRecord(lastReadPartition);
+            while (record.getEmpty() && (++lastReadPartition) < knownRecords.length) {
+                record = getRecord(lastReadPartition);
+            }
+            if (lastReadPartition == knownRecords.length) {
+                return false;
+            }
+            resultChannel = new DatasetNetworkInputChannel(netManager, getSocketAddress(record), jobId, resultSetId,
+                    lastReadPartition, NUM_READ_BUFFERS);
+            lastMonitor = getMonitor(lastReadPartition);
+            resultChannel.registerMonitor(lastMonitor);
+            resultChannel.open(datasetClientCtx);
+            return true;
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
     @Override
     public int read(ByteBuffer buffer) throws HyracksDataException {
         ByteBuffer readBuffer;
         int readSize = 0;
 
         if (lastReadPartition == -1) {
-            while (knownRecords == null || knownRecords[0] == null) {
-                try {
-                    knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId, resultSetId,
-                            knownRecords);
-                    lastReadPartition = 0;
-                    resultChannel = new DatasetNetworkInputChannel(netManager,
-                            getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId, lastReadPartition,
-                            NUM_READ_BUFFERS);
-                    lastMonitor = getMonitor(lastReadPartition);
-                    resultChannel.registerMonitor(lastMonitor);
-                    resultChannel.open(datasetClientCtx);
-                } catch (Exception e) {
-                    throw new HyracksDataException(e);
-                }
+            if (!nextPartition()) {
+                return readSize;
             }
         }
 
-        while (readSize <= 0 && !(isLastPartitionReadComplete())) {
-            synchronized (lastMonitor) {
-                while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached() && !lastMonitor.failed()) {
-                    try {
-                        lastMonitor.wait();
-                    } catch (InterruptedException e) {
-                        throw new HyracksDataException(e);
-                    }
-                }
-            }
-
-            if (lastMonitor.failed()) {
-                throw new HyracksDataException("Job Failed.");
-            }
+        while (readSize <= 0
+                && !((lastReadPartition == knownRecords.length - 1) && isPartitionReadComplete(lastMonitor))) {
+            waitForNextFrame(lastMonitor);
             if (isPartitionReadComplete(lastMonitor)) {
                 knownRecords[lastReadPartition].readEOS();
-                if ((lastReadPartition == knownRecords.length - 1)) {
+                resultChannel.close();
+                if ((lastReadPartition == knownRecords.length - 1) || !nextPartition()) {
                     break;
-                } else {
-                    try {
-                        lastReadPartition++;
-                        while (knownRecords[lastReadPartition] == null) {
-                            knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId,
-                                    resultSetId, knownRecords);
-                        }
-
-                        resultChannel = new DatasetNetworkInputChannel(netManager,
-                                getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId,
-                                lastReadPartition, NUM_READ_BUFFERS);
-                        lastMonitor = getMonitor(lastReadPartition);
-                        resultChannel.registerMonitor(lastMonitor);
-                        resultChannel.open(datasetClientCtx);
-                    } catch (Exception e) {
-                        throw new HyracksDataException(e);
-                    }
                 }
             } else {
                 readBuffer = resultChannel.getNextBuffer();
@@ -163,26 +153,25 @@
         return readSize;
     }
 
-    private boolean nullExists(DatasetDirectoryRecord[] locations) {
-        if (locations == null) {
-            return true;
-        }
-        for (int i = 0; i < locations.length; i++) {
-            if (locations[i] == null) {
-                return true;
+    private static void waitForNextFrame(IDatasetInputChannelMonitor monitor) throws HyracksDataException {
+        synchronized (monitor) {
+            while (monitor.getNFramesAvailable() <= 0 && !monitor.eosReached() && !monitor.failed()) {
+                try {
+                    monitor.wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
             }
         }
-        return false;
+        if (monitor.failed()) {
+            throw new HyracksDataException("Job Failed.");
+        }
     }
 
     private boolean isPartitionReadComplete(IDatasetInputChannelMonitor monitor) {
         return (monitor.getNFramesAvailable() <= 0) && (monitor.eosReached());
     }
 
-    private boolean isLastPartitionReadComplete() {
-        return ((lastReadPartition == knownRecords.length - 1) && isPartitionReadComplete(lastMonitor));
-    }
-
     private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
         NetworkAddress netAddr = addr.getNetworkAddress();
         return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index b665a4f..c994dfb 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -513,8 +513,8 @@
                 case REGISTER_RESULT_PARTITION_LOCATION: {
                     CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
                     workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
-                            .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getPartition(), rrplf
-                            .getNPartitions(), rrplf.getNetworkAddress()));
+                            .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
+                            rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
                     return;
                 }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index ee1cc67..66e7e7a 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,9 +15,11 @@
 package edu.uci.ics.hyracks.control.cc.dataset;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
@@ -32,6 +34,7 @@
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.dataset.ResultStateSweeper;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 
 /**
  * TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs
@@ -45,12 +48,12 @@
 
     private final long resultSweepThreshold;
 
-    private final Map<JobId, IDatasetStateRecord> jobResultLocations;
+    private final Map<JobId, JobResultInfo> jobResultLocations;
 
     public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
-        jobResultLocations = new LinkedHashMap<JobId, IDatasetStateRecord>();
+        jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
     }
 
     @Override
@@ -61,10 +64,10 @@
     @Override
     public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
             throws HyracksException {
-        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
+        DatasetJobRecord djr = getDatasetJobRecord(jobId);
         if (djr == null) {
             djr = new DatasetJobRecord();
-            jobResultLocations.put(jobId, djr);
+            jobResultLocations.put(jobId, new JobResultInfo(djr, null));
         }
     }
 
@@ -78,10 +81,15 @@
         // Auto-generated method stub
     }
 
+    private DatasetJobRecord getDatasetJobRecord(JobId jobId) {
+        final JobResultInfo jri = jobResultLocations.get(jobId);
+        return jri == null ? null : jri.record;
+    }
+
     @Override
     public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
-            int partition, int nPartitions, NetworkAddress networkAddress) {
-        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
+            boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
+        DatasetJobRecord djr = getDatasetJobRecord(jobId);
 
         ResultSetMetaData resultSetMetaData = djr.get(rsId);
         if (resultSetMetaData == null) {
@@ -94,7 +102,22 @@
             records[partition] = new DatasetDirectoryRecord();
         }
         records[partition].setNetworkAddress(networkAddress);
+        records[partition].setEmpty(emptyResult);
         records[partition].start();
+
+        Waiters waiters = jobResultLocations.get(jobId).waiters;
+        Waiter waiter = waiters != null ? waiters.get(rsId) : null;
+        if (waiter != null) {
+            try {
+                DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, waiter.knownRecords);
+                if (updatedRecords != null) {
+                    waiters.remove(rsId);
+                    waiter.callback.setValue(updatedRecords);
+                }
+            } catch (Exception e) {
+                waiter.callback.setException(e);
+            }
+        }
         notifyAll();
     }
 
@@ -102,7 +125,7 @@
     public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
         int successCount = 0;
 
-        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
+        DatasetJobRecord djr = getDatasetJobRecord(jobId);
         ResultSetMetaData resultSetMetaData = djr.get(rsId);
         DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
         records[partition].writeEOS();
@@ -120,26 +143,38 @@
 
     @Override
     public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
-        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
+        DatasetJobRecord djr = getDatasetJobRecord(jobId);
         if (djr != null) {
             djr.fail();
         }
+        final Waiters waiters = jobResultLocations.get(jobId).waiters;
+        if (waiters != null) {
+            waiters.get(rsId).callback.setException(new Exception());
+            waiters.remove(rsId);
+        }
         notifyAll();
     }
 
     @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
-        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
+        DatasetJobRecord djr = getDatasetJobRecord(jobId);
         if (djr != null) {
             djr.fail(exceptions);
         }
+        final Waiters waiters = jobResultLocations.get(jobId).waiters;
+        if (waiters != null) {
+            for (ResultSetId rsId : waiters.keySet()) {
+                waiters.get(rsId).callback.setException(exceptions.get(0));
+                waiters.remove(rsId);
+            }
+        }
         notifyAll();
     }
 
     @Override
     public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
         DatasetJobRecord djr;
-        while ((djr = (DatasetJobRecord) jobResultLocations.get(jobId)) == null) {
+        while ((djr = getDatasetJobRecord(jobId)) == null) {
             try {
                 wait();
             } catch (InterruptedException e) {
@@ -151,8 +186,13 @@
     }
 
     @Override
-    public Map<JobId, IDatasetStateRecord> getStateMap() {
-        return jobResultLocations;
+    public Set<JobId> getJobIds() {
+        return jobResultLocations.keySet();
+    }
+
+    @Override
+    public IDatasetStateRecord getState(JobId jobId) {
+        return jobResultLocations.get(jobId).record;
     }
 
     @Override
@@ -161,39 +201,35 @@
     }
 
     @Override
-    public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
-            DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
-        DatasetDirectoryRecord[] newRecords;
-        while ((newRecords = updatedRecords(jobId, rsId, knownRecords)) == null) {
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
+    public synchronized void getResultPartitionLocations(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback)
+            throws HyracksDataException {
+        DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, knownRecords);
+        if (updatedRecords == null) {
+            JobResultInfo jri = jobResultLocations.get(jobId);
+            Waiters waiters;
+            if (jri == null) {
+                waiters = new Waiters();
+                jri = new JobResultInfo(null, waiters);
+                jobResultLocations.put(jobId, jri);
+            } else {
+                waiters = jri.waiters;
+                if (waiters == null) {
+                    waiters = new Waiters();
+                    jri.waiters = waiters;
+                }
             }
+            waiters.put(rsId, new Waiter(knownRecords, callback));
+        } else {
+            callback.setValue(updatedRecords);
         }
-        return newRecords;
     }
 
     /**
      * Compares the records already known by the client for the given job's result set id with the records that the
      * dataset directory service knows and if there are any newly discovered records returns a whole array with the
      * new records filled in.
-     * This method has a very convoluted logic. Here is the explanation of how it works.
-     * If the ordering constraint has to be enforced, the method obtains the first null record in the known records in
-     * the order of the partitions. It always traverses the array in the first to last order!
-     * If known records array or the first element in that array is null in the but the record for that partition now
-     * known to the directory service, the method fills in that record in the array and returns the array back.
-     * However, if the first known null record is not a first element in the array, by induction, all the previous
-     * known records should be known already be known to client and none of the records for the partitions ahead is
-     * known by the client yet. So, we check if the client has reached the end of stream for the partition corresponding
-     * to the record before the first known null record, i.e. the last known non-null record. If not, we just return
-     * null because we cannot expose any new locations until the client reaches end of stream for the last known record.
-     * If the client has reached the end of stream record for the last known non-null record, we check if the next record
-     * is discovered by the dataset directory service and if so, we fill it in the records array and return it back or
-     * send null otherwise.
-     * If the ordering is not required, we are free to return any newly discovered records back, so we just check if
-     * arrays are equal and if they are not we send the entire new updated array.
-     * 
+     *
      * @param jobId
      *            - Id of the job for which the directory records should be retrieved.
      * @param rsId
@@ -201,14 +237,14 @@
      * @param knownRecords
      *            - An array of directory records that the client is already aware of.
      * @return
-     *         - Returns null if there aren't any newly discovered partitions enforcing the ordering constraint
+     *         Returns the updated records if new record were discovered, null otherwise
      * @throws HyracksDataException
      *             TODO(madhusudancs): Think about caching (and still be stateless) instead of this ugly O(n) iterations for
      *             every check. This already looks very expensive.
      */
     private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
             throws HyracksDataException {
-        DatasetJobRecord djr = (DatasetJobRecord) jobResultLocations.get(jobId);
+        DatasetJobRecord djr = getDatasetJobRecord(jobId);
 
         if (djr == null) {
             throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist");
@@ -228,36 +264,32 @@
             return null;
         }
 
-        boolean ordered = resultSetMetaData.getOrderedResult();
         DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-        /* If ordering is required, we should expose the dataset directory records only in the order, otherwise
-         * we can simply check if there are any newly discovered records and send the whole array back if there are.
-         */
-        if (ordered) {
-            // Iterate over the known records and find the last record which is not null.
-            int i = 0;
-            for (i = 0; i < records.length; i++) {
-                if (knownRecords == null) {
-                    if (records[0] != null) {
-                        knownRecords = new DatasetDirectoryRecord[records.length];
-                        knownRecords[0] = records[0];
-                        return knownRecords;
-                    }
-                    return null;
-                }
-                if (knownRecords[i] == null) {
-                    if ((i == 0 || knownRecords[i - 1].hasReachedReadEOS()) && records[i] != null) {
-                        knownRecords[i] = records[i];
-                        return knownRecords;
-                    }
-                    return null;
-                }
-            }
-        } else {
-            if (!Arrays.equals(records, knownRecords)) {
-                return records;
-            }
-        }
-        return null;
+
+        return Arrays.equals(records, knownRecords) ? null : records;
     }
-}
\ No newline at end of file
+}
+
+class JobResultInfo {
+    JobResultInfo(DatasetJobRecord record, Waiters waiters) {
+        this.record = record;
+        this.waiters = waiters;
+    }
+
+    DatasetJobRecord record;
+    Waiters waiters;
+}
+
+class Waiters extends HashMap<ResultSetId, Waiter> {
+    private static final long serialVersionUID = 1L;
+}
+
+class Waiter {
+    Waiter(DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
+        this.knownRecords = knownRecords;
+        this.callback = callback;
+    }
+
+    DatasetDirectoryRecord[] knownRecords;
+    IResultCallback<DatasetDirectoryRecord[]> callback;
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 3de3c50..30d1d15 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -25,12 +25,13 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 
 public interface IDatasetDirectoryService extends IJobLifecycleListener, IDatasetManager {
     public void init(ExecutorService executor);
 
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
-            int nPartitions, NetworkAddress networkAddress);
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+            boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress);
 
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition);
 
@@ -40,6 +41,6 @@
 
     public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
 
-    public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
-            DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
+    public void getResultPartitionLocations(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownLocations,
+            IResultCallback<DatasetDirectoryRecord[]> callback) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
index c4d202f..842c7c8 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -61,6 +61,6 @@
 
     @Override
     public String toString() {
-        return "nodeID: " + nodeId;
+        return getName() + ": nodeID: " + nodeId;
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
index fd6b690..2bed845 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
+import java.util.Arrays;
+
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -50,13 +52,16 @@
             @Override
             public void run() {
                 try {
-                    DatasetDirectoryRecord[] partitionLocations = dds.getResultPartitionLocations(jobId, rsId,
-                            knownRecords);
-                    callback.setValue(partitionLocations);
+                    dds.getResultPartitionLocations(jobId, rsId, knownRecords, callback);
                 } catch (HyracksDataException e) {
                     callback.setException(e);
                 }
             }
         });
     }
-}
\ No newline at end of file
+
+    @Override
+    public String toString() {
+        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Known@" + Arrays.toString(knownRecords);
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
index 7aca3ff..adce1ef 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -51,6 +51,6 @@
 
     @Override
     public String toString() {
-        return "JobId@" + jobId + " ResultSetId@" + rsId;
+        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId;
     }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 46a7c16..340134e 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -123,4 +123,10 @@
         }
         return jobLogObject;
     }
-}
\ No newline at end of file
+
+    @Override
+    public String toString() {
+        return getName() + ": JobId@" + jobId + " Status@" + status
+                + (exceptions == null ? "" : " Exceptions@" + exceptions);
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
index 92a0f5a..3e6be30 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -57,6 +57,6 @@
 
     @Override
     public String toString() {
-        return "PartitionAvailable@" + partitionDescriptor;
+        return getName() + ": " + partitionDescriptor;
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionRequestWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionRequestWork.java
index 3aa7e41..bb0d013 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionRequestWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionRequestWork.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -54,6 +54,6 @@
 
     @Override
     public String toString() {
-        return "PartitionRequest@" + partitionRequest;
+        return getName() + ": " + partitionRequest;
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 564d1d3..f87a7fa 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -29,6 +29,8 @@
 
     private final boolean orderedResult;
 
+    private final boolean emptyResult;
+
     private final int partition;
 
     private final int nPartitions;
@@ -36,11 +38,12 @@
     private final NetworkAddress networkAddress;
 
     public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
-            boolean orderedResult, int partition, int nPartitions, NetworkAddress networkAddress) {
+            boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
         this.ccs = ccs;
         this.jobId = jobId;
         this.rsId = rsId;
         this.orderedResult = orderedResult;
+        this.emptyResult = emptyResult;
         this.partition = partition;
         this.nPartitions = nPartitions;
         this.networkAddress = networkAddress;
@@ -48,13 +51,13 @@
 
     @Override
     public void run() {
-        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
-                nPartitions, networkAddress);
+        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
+                partition, nPartitions, networkAddress);
     }
 
     @Override
     public String toString() {
-        return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions
-                + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult;
+        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions
+                + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult + " EmptyResult@" + emptyResult;
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
index 5ea171e..fe2bb3a 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -43,6 +43,6 @@
 
     @Override
     public String toString() {
-        return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
+        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
     }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
index 8e5050c..d8d00f5 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskCompleteWork.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -57,6 +57,6 @@
 
     @Override
     public String toString() {
-        return "TaskCompleteEvent@[" + nodeId + "[" + jobId + ":" + taId + "]";
+        return getName() + ": [" + nodeId + "[" + jobId + ":" + taId + "]";
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index e00025d..d6938d9 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -42,6 +42,6 @@
 
     @Override
     public String toString() {
-        return "TaskFailureEvent[" + jobId + ":" + taId + ":" + nodeId + "]";
+        return getName() + ": [" + jobId + ":" + taId + ":" + nodeId + "]";
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 926c83f..659dc23 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -56,7 +56,7 @@
 
     public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
 
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, boolean emptyResult, int partition,
             int nPartitions, NetworkAddress networkAddress) throws Exception;
 
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
index 5a6d849..a5f0d8f 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,12 +17,10 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.dataset.IDatasetManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetStateRecord;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 /**
@@ -64,9 +62,9 @@
     private void sweep() {
         synchronized (datasetManager) {
             toBeCollected.clear();
-            for (Map.Entry<JobId, IDatasetStateRecord> entry : datasetManager.getStateMap().entrySet()) {
-                if (System.currentTimeMillis() > entry.getValue().getTimestamp() + resultTTL) {
-                    toBeCollected.add(entry.getKey());
+            for (JobId jobId : datasetManager.getJobIds()) {
+                if (System.currentTimeMillis() > datasetManager.getState(jobId).getTimestamp() + resultTTL) {
+                    toBeCollected.add(jobId);
                 }
             }
             for (JobId jobId : toBeCollected) {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 9077cd8..6be2294 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -456,6 +456,8 @@
 
         private final boolean orderedResult;
 
+        private final boolean emptyResult;
+
         private final int partition;
 
         private final int nPartitions;
@@ -463,10 +465,11 @@
         private NetworkAddress networkAddress;
 
         public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, boolean orderedResult,
-                int partition, int nPartitions, NetworkAddress networkAddress) {
+                boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
             this.jobId = jobId;
             this.rsId = rsId;
             this.orderedResult = orderedResult;
+            this.emptyResult = emptyResult;
             this.partition = partition;
             this.nPartitions = nPartitions;
             this.networkAddress = networkAddress;
@@ -489,6 +492,10 @@
             return orderedResult;
         }
 
+        public boolean getEmptyResult() {
+            return emptyResult;
+        }
+
         public int getPartition() {
             return partition;
         }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index b52d0ae..6bb1a93 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -112,10 +112,10 @@
         ipcHandle.send(-1, fn, null);
     }
 
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, boolean emptyResult, int partition,
             int nPartitions, NetworkAddress networkAddress) throws Exception {
         CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
-                jobId, rsId, orderedResult, partition, nPartitions, networkAddress);
+                jobId, rsId, orderedResult, emptyResult, partition, nPartitions, networkAddress);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java
index adffba1..7203a36 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,4 +20,15 @@
     public Level logLevel() {
         return Level.INFO;
     }
-}
\ No newline at end of file
+
+    public String getName() {
+        final String className = getClass().getName();
+        final int endIndex = className.endsWith("Work") ? className.length() - 4 : className.length();
+        return className.substring(className.lastIndexOf('.') + 1, endIndex);
+    }
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 325e2bc..b1f4147 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,6 +18,7 @@
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.logging.Logger;
 
@@ -70,28 +71,22 @@
             boolean asyncMode, int partition, int nPartitions) throws HyracksException {
         DatasetPartitionWriter dpw = null;
         JobId jobId = ctx.getJobletContext().getJobId();
-        try {
-            synchronized (this) {
-                ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
-                        nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
-                dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, partition, datasetMemoryManager,
-                        fileFactory);
+        synchronized (this) {
+            dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions,
+                    datasetMemoryManager, fileFactory);
 
-                ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
-                if (rsIdMap == null) {
-                    rsIdMap = new ResultSetMap();
-                    partitionResultStateMap.put(jobId, rsIdMap);
-                }
-
-                ResultState[] resultStates = rsIdMap.get(rsId);
-                if (resultStates == null) {
-                    resultStates = new ResultState[nPartitions];
-                    rsIdMap.put(rsId, resultStates);
-                }
-                resultStates[partition] = dpw.getResultState();
+            ResultSetMap rsIdMap = (ResultSetMap) partitionResultStateMap.get(jobId);
+            if (rsIdMap == null) {
+                rsIdMap = new ResultSetMap();
+                partitionResultStateMap.put(jobId, rsIdMap);
             }
-        } catch (Exception e) {
-            throw new HyracksException(e);
+
+            ResultState[] resultStates = rsIdMap.get(rsId);
+            if (resultStates == null) {
+                resultStates = new ResultState[nPartitions];
+                rsIdMap.put(rsId, resultStates);
+            }
+            resultStates[partition] = dpw.getResultState();
         }
 
         LOGGER.fine("Initialized partition writer: JobId: " + jobId + ":partition: " + partition);
@@ -99,6 +94,17 @@
     }
 
     @Override
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
+            boolean orderedResult, boolean emptyResult) throws HyracksException {
+        try {
+            ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
+                    partition, nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
+        } catch (Exception e) {
+            throw new HyracksException(e);
+        }
+    }
+
+    @Override
     public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
         try {
             LOGGER.fine("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId
@@ -211,8 +217,13 @@
     }
 
     @Override
-    public Map<JobId, IDatasetStateRecord> getStateMap() {
-        return partitionResultStateMap;
+    public Set<JobId> getJobIds() {
+        return partitionResultStateMap.keySet();
+    }
+
+    @Override
+    public IDatasetStateRecord getState(JobId jobId) {
+        return partitionResultStateMap.get(jobId);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index d61da67..d3a6fb5 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -39,22 +39,30 @@
 
     private final boolean asyncMode;
 
+    private final boolean orderedResult;
+
     private final int partition;
 
+    private final int nPartitions;
+
     private final DatasetMemoryManager datasetMemoryManager;
 
     private final ResultSetPartitionId resultSetPartitionId;
 
     private final ResultState resultState;
 
+    private boolean partitionRegistered;
+
     public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
-            ResultSetId rsId, boolean asyncMode, int partition, DatasetMemoryManager datasetMemoryManager,
-            IWorkspaceFileFactory fileFactory) {
+            ResultSetId rsId, boolean asyncMode, boolean orderedResult, int partition, int nPartitions,
+            DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory fileFactory) {
         this.manager = manager;
         this.jobId = jobId;
         this.resultSetId = rsId;
         this.asyncMode = asyncMode;
+        this.orderedResult = orderedResult;
         this.partition = partition;
+        this.nPartitions = nPartitions;
         this.datasetMemoryManager = datasetMemoryManager;
 
         resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
@@ -71,11 +79,16 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("open(" + partition + ")");
         }
+        partitionRegistered = false;
         resultState.open();
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (!partitionRegistered) {
+            registerResultPartitionLocation(false);
+            partitionRegistered = true;
+        }
         if (datasetMemoryManager == null) {
             resultState.write(buffer);
         } else {
@@ -99,12 +112,28 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("close(" + partition + ")");
         }
-
+        if (!partitionRegistered) {
+            registerResultPartitionLocation(true);
+            partitionRegistered = true;
+        }
+        resultState.close();
         try {
-            resultState.close();
             manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
         }
     }
+
+    void registerResultPartitionLocation(boolean empty) throws HyracksDataException {
+        try {
+            manager.registerResultPartitionLocation(jobId, resultSetId, partition, nPartitions, orderedResult, empty);
+        } catch (HyracksException e) {
+            if (e instanceof HyracksDataException) {
+                throw (HyracksDataException) e;
+            } else {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
index 41069d8..d08cd0a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -59,6 +59,6 @@
 
     @Override
     public String toString() {
-        return "nodeID: " + nodeId;
+        return getName() + ": nodeID: " + nodeId;
     }
 }
\ No newline at end of file