Add support for registering the result set id with the ResultWriterOperatorDescriptor.

Along with that pass the result set id information all through the stack to
register the partition address along with this result set id in the dataset
directory service.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2533 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 3ff2c46..62dc6c1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -19,7 +19,7 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IDatasetDirectoryService {
-    public void registerResultPartitionLocation(JobId jobId, int partition, int nPartitions,
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
             NetworkAddress networkAddress);
 
     public NetworkAddress[] getResultPartitionLocations(JobId jobId, NetworkAddress[] knownLocations)
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index 862d9aa..6c1ff10 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -21,8 +21,8 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IDatasetPartitionManager {
-    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, int partition, int nPartitions)
-            throws HyracksException;
+    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, int partition,
+            int nPartitions) throws HyracksException;
 
     public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 94f7015..36ee412 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -451,7 +451,8 @@
                 case REGISTER_RESULT_PARTITION_LOCATION: {
                     CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
                     workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
-                            .getJobId(), rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
+                            .getJobId(), rrplf.getResultSetId(), rrplf.getPartition(), rrplf.getNPartitions(), rrplf
+                            .getNetworkAddress()));
                     return;
                 }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index e42b178..5d49bbf 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -20,6 +20,7 @@
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 
@@ -30,22 +31,31 @@
  * all the results) completely. Then we can just get rid of the location information for that job.
  */
 public class DatasetDirectoryService implements IDatasetDirectoryService {
-    private final Map<JobId, NetworkAddress[]> jobPartitionLocationsMap;
+    private final Map<JobId, Map<ResultSetId, DatasetDirectoryRecord[]>> jobResultLocationsMap;
 
     public DatasetDirectoryService() {
-        jobPartitionLocationsMap = new HashMap<JobId, NetworkAddress[]>();
+        jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, DatasetDirectoryRecord[]>>();
     }
 
     @Override
-    public synchronized void registerResultPartitionLocation(JobId jobId, int partition, int nPartitions,
-            NetworkAddress networkAddress) {
-        NetworkAddress[] partitionLocations = jobPartitionLocationsMap.get(jobId);
-        if (partitionLocations == null) {
-            partitionLocations = new NetworkAddress[nPartitions];
-            jobPartitionLocationsMap.put(jobId, partitionLocations);
+    public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition,
+            int nPartitions, NetworkAddress networkAddress) {
+        Map<ResultSetId, DatasetDirectoryRecord[]> resultSetsMap = jobResultLocationsMap.get(jobId);
+        if (resultSetsMap == null) {
+            resultSetsMap = new HashMap<ResultSetId, DatasetDirectoryRecord[]>();
+            jobResultLocationsMap.put(jobId, resultSetsMap);
         }
 
-        partitionLocations[partition] = networkAddress;
+        DatasetDirectoryRecord[] records = resultSetsMap.get(rsId);
+        if (records == null) {
+            records = new DatasetDirectoryRecord[nPartitions];
+            resultSetsMap.put(rsId, records);
+        }
+
+        if (records[partition] == null) {
+            records[partition] = new DatasetDirectoryRecord();
+        }
+        records[partition].setNetworkAddress(networkAddress);
         notifyAll();
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index e18572c..3c762eb 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -15,21 +15,29 @@
 package edu.uci.ics.hyracks.control.cc.work;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 
 public class RegisterResultPartitionLocationWork extends AbstractWork {
     private final ClusterControllerService ccs;
+
     private final JobId jobId;
+
+    private final ResultSetId rsId;
+
     private final int partition;
+
     private final int nPartitions;
+
     private final NetworkAddress networkAddress;
 
-    public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, int partition,
-            int nPartitions, NetworkAddress networkAddress) {
+    public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+            int partition, int nPartitions, NetworkAddress networkAddress) {
         this.ccs = ccs;
         this.jobId = jobId;
+        this.rsId = rsId;
         this.partition = partition;
         this.nPartitions = nPartitions;
         this.networkAddress = networkAddress;
@@ -37,12 +45,13 @@
 
     @Override
     public void run() {
-        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, partition, nPartitions, networkAddress);
+        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, partition, nPartitions,
+                networkAddress);
     }
 
     @Override
     public String toString() {
-        return "JobId@" + jobId + "Partition@" + partition + "NPartitions@" + nPartitions + "ResultPartitionLocation@"
-                + networkAddress;
+        return "JobId@" + jobId + " Partition@" + partition + " ResultSetId@" + rsId + " NPartitions@" + nPartitions
+                + " ResultPartitionLocation@" + networkAddress;
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index b0b81f4..086fc4d 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
@@ -47,8 +48,8 @@
 
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
 
-    public void registerResultPartitionLocation(JobId jobId, int partition, int nPartitions, NetworkAddress addr)
-            throws Exception;
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
+            NetworkAddress addr) throws Exception;
 
     public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 56e9830..e3591ce 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -36,6 +36,7 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -444,14 +445,18 @@
 
         private final JobId jobId;
 
+        private final ResultSetId rsId;
+
         private final int partition;
 
         private final int nPartitions;
 
         private NetworkAddress networkAddress;
 
-        public RegisterResultPartitionLocationFunction(JobId jobId, int partition, int nPartitions, NetworkAddress networkAddress) {
+        public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
+                NetworkAddress networkAddress) {
             this.jobId = jobId;
+            this.rsId = rsId;
             this.partition = partition;
             this.nPartitions = nPartitions;
             this.networkAddress = networkAddress;
@@ -466,6 +471,10 @@
             return jobId;
         }
 
+        public ResultSetId getResultSetId() {
+            return rsId;
+        }
+
         public int getPartition() {
             return partition;
         }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 76b7043..595efd1 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,6 +18,7 @@
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
@@ -96,10 +97,10 @@
     }
 
     @Override
-    public void registerResultPartitionLocation(JobId jobId, int partition, int nPartitions,
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
             NetworkAddress networkAddress) throws Exception {
         CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
-                jobId, partition, nPartitions, networkAddress);
+                jobId, rsId, partition, nPartitions, networkAddress);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 9d9d5fc..7d9b10e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -49,12 +50,12 @@
     }
 
     @Override
-    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, int partition, int nPartitions)
-            throws HyracksException {
+    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, int partition,
+            int nPartitions) throws HyracksException {
         DatasetPartitionWriter dpw = null;
         JobId jobId = ctx.getJobletContext().getJobId();
         try {
-            ncs.getClusterController().registerResultPartitionLocation(jobId, partition, nPartitions,
+            ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, partition, nPartitions,
                     ncs.getDatasetNetworkManager().getNetworkAddress());
             dpw = new DatasetPartitionWriter(ctx, this, partition, executor);
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 31b9af8..dde8669 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -30,8 +31,11 @@
 public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
     private static final long serialVersionUID = 1L;
 
-    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec) {
+    private final ResultSetId rsId;
+
+    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId) {
         super(spec, 1, 0);
+        this.rsId = rsId;
     }
 
     @Override
@@ -45,7 +49,7 @@
             @Override
             public void open() throws HyracksDataException {
                 try {
-                    datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, partition, nPartitions);
+                    datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, partition, nPartitions);
                     datasetPartitionWriter.open();
                 } catch (HyracksException e) {
                     throw new HyracksDataException(e);