Fix merge and build issue.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@3172 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
index ce8cc77..282bfb8 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
@@ -102,13 +102,6 @@
}
@Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
- JobSpecification spec) throws AlgebricksException {
- return null;
- }
-
- @Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> arg0,
IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, JobGenContext arg4,
JobSpecification arg5) throws AlgebricksException {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
new file mode 100644
index 0000000..dc99ef3
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/DatasetJobRecord.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import java.util.HashMap;
+
+public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> {
+ public enum Status {
+ RUNNING,
+ SUCCESS,
+ FAILED
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private Status status;
+
+ public DatasetJobRecord() {
+ this.status = Status.RUNNING;
+ }
+
+ public void start() {
+ status = Status.RUNNING;
+ }
+
+ public void success() {
+ status = Status.SUCCESS;
+ }
+
+ public void fail() {
+ status = Status.FAILED;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 5266333..371169f 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -15,7 +15,7 @@
package edu.uci.ics.hyracks.api.dataset;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -27,6 +27,8 @@
public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition);
+ public void reportJobFailure(JobId jobId);
+
public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/ResultSetMetaData.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/ResultSetMetaData.java
new file mode 100644
index 0000000..e3ad69a3
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/ResultSetMetaData.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+public class ResultSetMetaData {
+ private final boolean ordered;
+
+ private final DatasetDirectoryRecord[] records;
+
+ public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
+ this.ordered = ordered;
+ this.records = records;
+ }
+
+ public boolean getOrderedResult() {
+ return ordered;
+ }
+
+ public DatasetDirectoryRecord[] getRecords() {
+ return records;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 13d0c30..13ae426 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -14,16 +14,17 @@
*/
package edu.uci.ics.hyracks.control.cc.dataset;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.dataset.ResultSetMetaData;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -35,25 +36,21 @@
* job.
*/
public class DatasetDirectoryService implements IDatasetDirectoryService {
- private final Map<JobId, Map<ResultSetId, ResultSetMetaData>> jobResultLocationsMap;
+ private final Map<JobId, DatasetJobRecord> jobResultLocations;
public DatasetDirectoryService() {
- jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, ResultSetMetaData>>();
+ jobResultLocations = new HashMap<JobId, DatasetJobRecord>();
}
@Override
public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
int partition, int nPartitions, NetworkAddress networkAddress) {
- Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
- if (rsMap == null) {
- rsMap = new HashMap<ResultSetId, ResultSetMetaData>();
- jobResultLocationsMap.put(jobId, rsMap);
- }
+ DatasetJobRecord djr = getDatasetJobRecord(jobId);
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ ResultSetMetaData resultSetMetaData = djr.get(rsId);
if (resultSetMetaData == null) {
resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]);
- rsMap.put(rsId, resultSetMetaData);
+ djr.put(rsId, resultSetMetaData);
}
DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
@@ -67,20 +64,42 @@
@Override
public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
- DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
- ddr.writeEOS();
+ int successCount = 0;
+
+ DatasetJobRecord djr = jobResultLocations.get(jobId);
+ ResultSetMetaData resultSetMetaData = djr.get(rsId);
+ DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
+ records[partition].writeEOS();
+
+ for (DatasetDirectoryRecord record : records) {
+ if (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS) {
+ successCount++;
+ }
+ }
+ if (successCount == records.length) {
+ djr.success();
+ }
+ notifyAll();
}
@Override
public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
- DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
- ddr.fail();
+ DatasetJobRecord djr = getDatasetJobRecord(jobId);
+ djr.fail();
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void reportJobFailure(JobId jobId) {
+ DatasetJobRecord djr = getDatasetJobRecord(jobId);
+ djr.fail();
+ notifyAll();
}
@Override
public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
- Map<ResultSetId, ResultSetMetaData> rsMap;
- while ((rsMap = jobResultLocationsMap.get(jobId)) == null) {
+ DatasetJobRecord djr;
+ while ((djr = jobResultLocations.get(jobId)) == null) {
try {
wait();
} catch (InterruptedException e) {
@@ -88,38 +107,7 @@
}
}
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
- if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
- throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
- }
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-
- ArrayList<Status> statuses = new ArrayList<Status>(records.length);
- for (int i = 0; i < records.length; i++) {
- statuses.add(records[i].getStatus());
- }
-
- // Default status is idle
- Status status = Status.IDLE;
- if (statuses.contains(Status.FAILED)) {
- // Even if there is at least one failed entry we should return failed status.
- return Status.FAILED;
- } else if (statuses.contains(Status.RUNNING)) {
- // If there are not failed entry and if there is at least one running entry we should return running status.
- return Status.RUNNING;
- } else {
- // If each and every partition has reported success do we report success as the status.
- int successCount = 0;
- for (int i = 0; i < statuses.size(); i++) {
- if (statuses.get(i) == Status.SUCCESS) {
- successCount++;
- }
- }
- if (successCount == statuses.size()) {
- return Status.SUCCESS;
- }
- }
- return status;
+ return djr.getStatus();
}
@Override
@@ -136,13 +124,6 @@
return newRecords;
}
- public DatasetDirectoryRecord getDatasetDirectoryRecord(JobId jobId, ResultSetId rsId, int partition) {
- Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
- DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
- return records[partition];
- }
-
/**
* Compares the records already known by the client for the given job's result set id with the records that the
* dataset directory service knows and if there are any newly discovered records returns a whole array with the
@@ -177,14 +158,15 @@
*/
private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
throws HyracksDataException {
- Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
- if (rsMap == null) {
- return null;
+ DatasetJobRecord djr = getDatasetJobRecord(jobId);
+
+ if (djr.getStatus() == Status.FAILED) {
+ throw new HyracksDataException("Job failed.");
}
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
+ ResultSetMetaData resultSetMetaData = djr.get(rsId);
if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
- throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
+ return null;
}
boolean ordered = resultSetMetaData.getOrderedResult();
@@ -220,22 +202,12 @@
return null;
}
- private class ResultSetMetaData {
- private final boolean ordered;
-
- private final DatasetDirectoryRecord[] records;
-
- public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
- this.ordered = ordered;
- this.records = records;
+ private DatasetJobRecord getDatasetJobRecord(JobId jobId) {
+ DatasetJobRecord djr = jobResultLocations.get(jobId);
+ if (djr == null) {
+ djr = new DatasetJobRecord();
+ jobResultLocations.put(jobId, djr);
}
-
- public boolean getOrderedResult() {
- return ordered;
- }
-
- public DatasetDirectoryRecord[] getRecords() {
- return records;
- }
+ return djr;
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 614ce2d..83c0f98 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -61,9 +61,6 @@
@Option(name = "--", handler = StopOptionHandler.class)
public List<String> appArgs;
- @Option(name = "-result-manager-memory", usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
- public int resultManagerMemory = -1;
-
public void toCommandLine(List<String> cList) {
cList.add("-cc-host");
cList.add(ccHost);
@@ -94,7 +91,5 @@
cList.add(appArg);
}
}
- cList.add("-result-manager-memory");
- cList.add(String.valueOf(resultManagerMemory));
- }
+ }
}
diff --git a/hyracks/hyracks-dist/pom.xml b/hyracks/hyracks-dist/pom.xml
index 4f9fc20..c86c4e6 100755
--- a/hyracks/hyracks-dist/pom.xml
+++ b/hyracks/hyracks-dist/pom.xml
@@ -6,7 +6,7 @@
<parent>
<artifactId>hyracks</artifactId>
<groupId>edu.uci.ics.hyracks</groupId>
- <version>0.2.2-SNAPSHOT</version>
+ <version>0.2.3-SNAPSHOT</version>
</parent>
<artifactId>hyracks-dist</artifactId>