Merge fullstack_hyracks_result_distribution to fullstack_asterix_stabilization.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3189 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 7e27b79..0d20f28 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -54,7 +54,8 @@
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT) {
+        if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT
+                && op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
             return false;
         }
         if (!roots.contains(op))
@@ -66,7 +67,8 @@
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT) {
+        if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT
+                && op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
             return false;
         }
         boolean rewritten = false;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
new file mode 100644
index 0000000..dc99ef3
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import java.util.HashMap;
+
+public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> {
+    public enum Status {
+        RUNNING,
+        SUCCESS,
+        FAILED
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    private Status status;
+
+    public DatasetJobRecord() {
+        this.status = Status.RUNNING;
+    }
+
+    public void start() {
+        status = Status.RUNNING;
+    }
+
+    public void success() {
+        status = Status.SUCCESS;
+    }
+
+    public void fail() {
+        status = Status.FAILED;
+    }
+
+    public Status getStatus() {
+        return status;
+    }
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 5266333..371169f 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -15,7 +15,7 @@
 package edu.uci.ics.hyracks.api.dataset;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 
@@ -27,6 +27,8 @@
 
     public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
 
+    public void reportJobFailure(JobId jobId);
+
     public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
 
     public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/ResultSetMetaData.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/ResultSetMetaData.java
new file mode 100644
index 0000000..e3ad69a3
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/ResultSetMetaData.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+public class ResultSetMetaData {
+    private final boolean ordered;
+
+    private final DatasetDirectoryRecord[] records;
+
+    public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
+        this.ordered = ordered;
+        this.records = records;
+    }
+
+    public boolean getOrderedResult() {
+        return ordered;
+    }
+
+    public DatasetDirectoryRecord[] getRecords() {
+        return records;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index a8055c9..05365d3 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -38,7 +38,7 @@
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 13d0c30..13ae426 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -14,16 +14,17 @@
  */
 package edu.uci.ics.hyracks.control.cc.dataset;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.dataset.ResultSetMetaData;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 
@@ -35,25 +36,21 @@
  * job.
  */
 public class DatasetDirectoryService implements IDatasetDirectoryService {
-    private final Map<JobId, Map<ResultSetId, ResultSetMetaData>> jobResultLocationsMap;
+    private final Map<JobId, DatasetJobRecord> jobResultLocations;
 
     public DatasetDirectoryService() {
-        jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, ResultSetMetaData>>();
+        jobResultLocations = new HashMap<JobId, DatasetJobRecord>();
     }
 
     @Override
     public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
             int partition, int nPartitions, NetworkAddress networkAddress) {
-        Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
-        if (rsMap == null) {
-            rsMap = new HashMap<ResultSetId, ResultSetMetaData>();
-            jobResultLocationsMap.put(jobId, rsMap);
-        }
+        DatasetJobRecord djr = getDatasetJobRecord(jobId);
 
-        ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+        ResultSetMetaData resultSetMetaData = djr.get(rsId);
         if (resultSetMetaData == null) {
             resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]);
-            rsMap.put(rsId, resultSetMetaData);
+            djr.put(rsId, resultSetMetaData);
         }
 
         DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
@@ -67,20 +64,42 @@
 
     @Override
     public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
-        DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
-        ddr.writeEOS();
+        int successCount = 0;
+
+        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        ResultSetMetaData resultSetMetaData = djr.get(rsId);
+        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
+        records[partition].writeEOS();
+
+        for (DatasetDirectoryRecord record : records) {
+            if (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS) {
+                successCount++;
+            }
+        }
+        if (successCount == records.length) {
+            djr.success();
+        }
+        notifyAll();
     }
 
     @Override
     public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
-        DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
-        ddr.fail();
+        DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        djr.fail();
+        notifyAll();
+    }
+
+    @Override
+    public synchronized void reportJobFailure(JobId jobId) {
+        DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        djr.fail();
+        notifyAll();
     }
 
     @Override
     public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
-        Map<ResultSetId, ResultSetMetaData> rsMap;
-        while ((rsMap = jobResultLocationsMap.get(jobId)) == null) {
+        DatasetJobRecord djr;
+        while ((djr = jobResultLocations.get(jobId)) == null) {
             try {
                 wait();
             } catch (InterruptedException e) {
@@ -88,38 +107,7 @@
             }
         }
 
-        ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
-        if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
-            throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
-        }
-        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-
-        ArrayList<Status> statuses = new ArrayList<Status>(records.length);
-        for (int i = 0; i < records.length; i++) {
-            statuses.add(records[i].getStatus());
-        }
-
-        // Default status is idle
-        Status status = Status.IDLE;
-        if (statuses.contains(Status.FAILED)) {
-            // Even if there is at least one failed entry we should return failed status.
-            return Status.FAILED;
-        } else if (statuses.contains(Status.RUNNING)) {
-            // If there are not failed entry and if there is at least one running entry we should return running status.
-            return Status.RUNNING;
-        } else {
-            // If each and every partition has reported success do we report success as the status.
-            int successCount = 0;
-            for (int i = 0; i < statuses.size(); i++) {
-                if (statuses.get(i) == Status.SUCCESS) {
-                    successCount++;
-                }
-            }
-            if (successCount == statuses.size()) {
-                return Status.SUCCESS;
-            }
-        }
-        return status;
+        return djr.getStatus();
     }
 
     @Override
@@ -136,13 +124,6 @@
         return newRecords;
     }
 
-    public DatasetDirectoryRecord getDatasetDirectoryRecord(JobId jobId, ResultSetId rsId, int partition) {
-        Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
-        ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
-        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-        return records[partition];
-    }
-
     /**
      * Compares the records already known by the client for the given job's result set id with the records that the
      * dataset directory service knows and if there are any newly discovered records returns a whole array with the
@@ -177,14 +158,15 @@
      */
     private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
             throws HyracksDataException {
-        Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
-        if (rsMap == null) {
-            return null;
+        DatasetJobRecord djr = getDatasetJobRecord(jobId);
+
+        if (djr.getStatus() == Status.FAILED) {
+            throw new HyracksDataException("Job failed.");
         }
 
-        ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+        ResultSetMetaData resultSetMetaData = djr.get(rsId);
         if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
-            throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
+            return null;
         }
 
         boolean ordered = resultSetMetaData.getOrderedResult();
@@ -220,22 +202,12 @@
         return null;
     }
 
-    private class ResultSetMetaData {
-        private final boolean ordered;
-
-        private final DatasetDirectoryRecord[] records;
-
-        public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
-            this.ordered = ordered;
-            this.records = records;
+    private DatasetJobRecord getDatasetJobRecord(JobId jobId) {
+        DatasetJobRecord djr = jobResultLocations.get(jobId);
+        if (djr == null) {
+            djr = new DatasetJobRecord();
+            jobResultLocations.put(jobId, djr);
         }
-
-        public boolean getOrderedResult() {
-            return ordered;
-        }
-
-        public DatasetDirectoryRecord[] getRecords() {
-            return records;
-        }
+        return djr;
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
index d2dadf5..1bd0674 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
@@ -14,7 +14,7 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index 0fff257..bc8c314 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -32,6 +32,7 @@
     @Override
     protected void performEvent(TaskAttempt ta) {
         JobRun run = ccs.getActiveRunMap().get(jobId);
+        ccs.getDatasetDirectoryService().reportJobFailure(jobId);
         ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster();
         run.getScheduler().notifyTaskFailure(ta, ac, details);
     }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 6a9747b..d1577bc 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -51,6 +51,9 @@
     @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
     public int maxMemory = -1;
 
+    @Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
+    public int resultManagerMemory = -1;
+
     @Option(name = "-app-nc-main-class", usage = "Application NC Main Class")
     public String appNCMainClass;
 
@@ -58,9 +61,6 @@
     @Option(name = "--", handler = StopOptionHandler.class)
     public List<String> appArgs;
 
-    @Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
-    public int resultManagerMemory = -1;
-
     public void toCommandLine(List<String> cList) {
         cList.add("-cc-host");
         cList.add(ccHost);
@@ -79,7 +79,10 @@
         cList.add(String.valueOf(nNetThreads));
         cList.add("-max-memory");
         cList.add(String.valueOf(maxMemory));
-        if (appNCMainClass != null) {
+        cList.add("-result-manager-memory");
+        cList.add(String.valueOf(resultManagerMemory));
+
+       if (appNCMainClass != null) {
             cList.add("-app-nc-main-class");
             cList.add(appNCMainClass);
         }
@@ -89,7 +92,5 @@
                 cList.add(appArg);
             }
         }
-        cList.add("-result-manager-memory");
-        cList.add(String.valueOf(resultManagerMemory));
-    }
-}
\ No newline at end of file
+   }
+}