Add a parameter for the ResultWriterOperatorDescriptor to support ordered result distribution.

Let this parameter percolate throughout the layers of indirection and the
network until it hits the dataset directory service.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2547 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index c7aabb0..f47e41b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -19,8 +19,8 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IDatasetDirectoryService {
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
-            NetworkAddress networkAddress);
+    public void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId, int partition,
+            int nPartitions, NetworkAddress networkAddress);
 
     public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
             DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index 6c1ff10..db952a0 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -21,8 +21,8 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IDatasetPartitionManager {
-    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, int partition,
-            int nPartitions) throws HyracksException;
+    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, boolean orderedResult, ResultSetId rsId,
+            int partition, int nPartitions) throws HyracksException;
 
     public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 95eecab..8097ad4 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -452,8 +452,8 @@
                 case REGISTER_RESULT_PARTITION_LOCATION: {
                     CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
                     workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
-                            .getJobId(), rrplf.getResultSetId(), rrplf.getPartition(), rrplf.getNPartitions(), rrplf
-                            .getNetworkAddress()));
+                            .getJobId(), rrplf.getOrderedResult(), rrplf.getResultSetId(), rrplf.getPartition(), rrplf
+                            .getNPartitions(), rrplf.getNetworkAddress()));
                     return;
                 }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index eaa3bab..fab870e 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -40,12 +40,12 @@
     }
 
     @Override
-    public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition,
-            int nPartitions, NetworkAddress networkAddress) {
-        Map<ResultSetId, DatasetDirectoryRecord[]> resultSetsMap = jobResultLocationsMap.get(jobId);
-        if (resultSetsMap == null) {
-            resultSetsMap = new HashMap<ResultSetId, DatasetDirectoryRecord[]>();
-            jobResultLocationsMap.put(jobId, resultSetsMap);
+    public synchronized void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId,
+            int partition, int nPartitions, NetworkAddress networkAddress) {
+        Map<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>> rsMap = jobResultLocationsMap.get(jobId);
+        if (rsMap == null) {
+            rsMap = new HashMap<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>>();
+            jobResultLocationsMap.put(jobId, rsMap);
         }
 
         DatasetDirectoryRecord[] records = resultSetsMap.get(rsId);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 3c762eb..4c36f27 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -25,6 +25,8 @@
 
     private final JobId jobId;
 
+    private final boolean orderedResult;
+
     private final ResultSetId rsId;
 
     private final int partition;
@@ -33,10 +35,11 @@
 
     private final NetworkAddress networkAddress;
 
-    public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
-            int partition, int nPartitions, NetworkAddress networkAddress) {
+    public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, boolean orderedResult,
+            ResultSetId rsId, int partition, int nPartitions, NetworkAddress networkAddress) {
         this.ccs = ccs;
         this.jobId = jobId;
+        this.orderedResult = orderedResult;
         this.rsId = rsId;
         this.partition = partition;
         this.nPartitions = nPartitions;
@@ -45,13 +48,13 @@
 
     @Override
     public void run() {
-        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, partition, nPartitions,
-                networkAddress);
+        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, orderedResult, rsId, partition,
+                nPartitions, networkAddress);
     }
 
     @Override
     public String toString() {
         return "JobId@" + jobId + " Partition@" + partition + " ResultSetId@" + rsId + " NPartitions@" + nPartitions
-                + " ResultPartitionLocation@" + networkAddress;
+                + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult;
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 086fc4d..a79110d 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -48,8 +48,8 @@
 
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
 
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
-            NetworkAddress addr) throws Exception;
+    public void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId, int partition,
+            int nPartitions, NetworkAddress addr) throws Exception;
 
     public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index e3591ce..2d48155 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -445,6 +445,8 @@
 
         private final JobId jobId;
 
+        private final boolean orderedResult;
+
         private final ResultSetId rsId;
 
         private final int partition;
@@ -453,9 +455,10 @@
 
         private NetworkAddress networkAddress;
 
-        public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
-                NetworkAddress networkAddress) {
+        public RegisterResultPartitionLocationFunction(JobId jobId, boolean orderedResult, ResultSetId rsId,
+                int partition, int nPartitions, NetworkAddress networkAddress) {
             this.jobId = jobId;
+            this.orderedResult = orderedResult;
             this.rsId = rsId;
             this.partition = partition;
             this.nPartitions = nPartitions;
@@ -471,6 +474,10 @@
             return jobId;
         }
 
+        public boolean getOrderedResult() {
+            return orderedResult;
+        }
+
         public ResultSetId getResultSetId() {
             return rsId;
         }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 595efd1..ad3a82e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -97,10 +97,10 @@
     }
 
     @Override
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
-            NetworkAddress networkAddress) throws Exception {
+    public void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId, int partition,
+            int nPartitions, NetworkAddress networkAddress) throws Exception {
         CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
-                jobId, rsId, partition, nPartitions, networkAddress);
+                jobId, orderedResult, rsId, partition, nPartitions, networkAddress);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 7d9b10e..b9a8b69 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -50,13 +50,13 @@
     }
 
     @Override
-    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, int partition,
-            int nPartitions) throws HyracksException {
+    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, boolean orderedResult, ResultSetId rsId,
+            int partition, int nPartitions) throws HyracksException {
         DatasetPartitionWriter dpw = null;
         JobId jobId = ctx.getJobletContext().getJobId();
         try {
-            ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, partition, nPartitions,
-                    ncs.getDatasetNetworkManager().getNetworkAddress());
+            ncs.getClusterController().registerResultPartitionLocation(jobId, orderedResult, rsId, partition,
+                    nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
             dpw = new DatasetPartitionWriter(ctx, this, partition, executor);
 
             DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index dde8669..8c44494 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -33,9 +33,12 @@
 
     private final ResultSetId rsId;
 
-    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId) {
+    private final boolean ordered;
+
+    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered) {
         super(spec, 1, 0);
         this.rsId = rsId;
+        this.ordered = ordered;
     }
 
     @Override
@@ -49,7 +52,8 @@
             @Override
             public void open() throws HyracksDataException {
                 try {
-                    datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, partition, nPartitions);
+                    datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, ordered, rsId, partition,
+                            nPartitions);
                     datasetPartitionWriter.open();
                 } catch (HyracksException e) {
                     throw new HyracksDataException(e);