Write debug information for each operation in the Dataset Partition Manager.
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 04dfb33..688a725 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -17,6 +17,7 @@
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -32,6 +33,8 @@
 import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
 public class DatasetPartitionManager implements IDatasetPartitionManager {
+    private static final Logger LOGGER = Logger.getLogger(DatasetPartitionManager.class.getName());
+
     private final NodeControllerService ncs;
 
     private final Executor executor;
@@ -62,6 +65,7 @@
                             ResultState state = resultStates[i];
                             if (state != null) {
                                 state.deinit();
+                                LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
                             }
                         }
                         return true;
@@ -94,12 +98,15 @@
             throw new HyracksException(e);
         }
 
+        LOGGER.fine("Initialized partition writer: JobId: " + jobId + ":partition: " + partition);
         return dpw;
     }
 
     @Override
     public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
         try {
+            LOGGER.fine("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId
+                    + ":partition: " + partition);
             ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
         } catch (Exception e) {
             throw new HyracksException(e);
@@ -109,6 +116,8 @@
     @Override
     public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
         try {
+            LOGGER.info("Reporting partition failure: JobId: " + jobId + ": ResultSetId: " + rsId + ":partition: "
+                    + partition);
             ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
         } catch (Exception e) {
             throw new HyracksException(e);
@@ -134,6 +143,7 @@
 
         IDatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
         dpr.writeTo(writer);
+        LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":partition: " + partition);
     }
 
     @Override