Update the implementation of DatasetPartitionManger along with its interface to support the network communication completely.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2515 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index adc94b0..9e1067e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -15,10 +15,19 @@
 package edu.uci.ics.hyracks.api.dataset;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IDatasetPartitionManager {
-    public IFrameWriter createDatasetPartitionWriter(JobId jobId, int partition, int nPartitions)
-            throws HyracksDataException;
+    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, int partition, int nPartitions)
+            throws HyracksException;
+
+    public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc)
+            throws HyracksException;
+
+    public IWorkspaceFileFactory getFileFactory();
+
+    public void close();
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 6b04dcd..9d9d5fc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -16,38 +16,84 @@
 
 import java.util.HashMap;
 import java.util.Map;
-
-import org.apache.commons.lang3.tuple.Pair;
+import java.util.concurrent.Executor;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
 public class DatasetPartitionManager implements IDatasetPartitionManager {
     private final NodeControllerService ncs;
 
-    private final Map<Pair<JobId, Integer>, IFrameWriter> partitionDatasetWriterMap;
+    private final Executor executor;
 
-    public DatasetPartitionManager(NodeControllerService ncs) {
+    private final Map<JobId, DatasetPartitionWriter[]> partitionDatasetWriterMap;
+
+    private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+    private final IWorkspaceFileFactory fileFactory;
+
+    public DatasetPartitionManager(NodeControllerService ncs, Executor executor) {
         this.ncs = ncs;
-        partitionDatasetWriterMap = new HashMap<Pair<JobId, Integer>, IFrameWriter>();
+        this.executor = executor;
+        partitionDatasetWriterMap = new HashMap<JobId, DatasetPartitionWriter[]>();
+        deallocatableRegistry = new DefaultDeallocatableRegistry();
+        fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
     }
 
     @Override
-    public IFrameWriter createDatasetPartitionWriter(JobId jobId, int partition, int nPartitions)
-            throws HyracksDataException {
+    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, int partition, int nPartitions)
+            throws HyracksException {
         DatasetPartitionWriter dpw = null;
+        JobId jobId = ctx.getJobletContext().getJobId();
         try {
             ncs.getClusterController().registerResultPartitionLocation(jobId, partition, nPartitions,
-                    ncs.getNetworkManager().getNetworkAddress());
-            dpw = new DatasetPartitionWriter();
-            partitionDatasetWriterMap.put(Pair.<JobId, Integer> of(jobId, partition), dpw);
+                    ncs.getDatasetNetworkManager().getNetworkAddress());
+            dpw = new DatasetPartitionWriter(ctx, this, partition, executor);
+
+            DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
+            if (writers == null) {
+                writers = new DatasetPartitionWriter[nPartitions];
+                partitionDatasetWriterMap.put(jobId, writers);
+            }
+            writers[partition] = dpw;
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw new HyracksException(e);
         }
 
         return dpw;
     }
+
+    @Override
+    public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
+            throws HyracksException {
+        DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
+        if (writers == null) {
+            throw new HyracksException("Unknown JobId " + jobId);
+        }
+
+        DatasetPartitionWriter dpw = writers[partition];
+        if (dpw == null) {
+            throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
+        }
+
+        dpw.writeTo(writer);
+    }
+
+    @Override
+    public IWorkspaceFileFactory getFileFactory() {
+        return fileFactory;
+    }
+
+    @Override
+    public void close() {
+        deallocatableRegistry.close();
+    }
 }