merge hyracks_dev_next r847:977
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@978 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/pom.xml b/hyracks-api/pom.xml
index f73c27a..c18995d 100644
--- a/hyracks-api/pom.xml
+++ b/hyracks-api/pom.xml
@@ -1,9 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.2.0-SNAPSHOT</version>
-
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks</artifactId>
@@ -45,5 +42,15 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-ipc</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.1</version>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 6e1eedc..866d307 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.api.client;
-import java.rmi.Remote;
import java.util.EnumSet;
import java.util.Map;
@@ -22,7 +21,7 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
-public interface IHyracksClientInterface extends Remote {
+public interface IHyracksClientInterface {
public ClusterControllerInfo getClusterControllerInfo() throws Exception;
public void createApplication(String appName) throws Exception;
@@ -35,7 +34,7 @@
public JobStatus getJobStatus(JobId jobId) throws Exception;
- public void start(JobId jobId) throws Exception;
+ public void startJob(JobId jobId) throws Exception;
public void waitForCompletion(JobId jobId) throws Exception;
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index c6ca51c..75cc245 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -52,6 +52,16 @@
}
public String toString() {
- return "ANID:[" + odId + "]:" + id;
+ return "ANID:" + odId + ":" + id;
+ }
+
+ public static ActivityId parse(String str) {
+ if (str.startsWith("ANID:")) {
+ str = str.substring(5);
+ int idIdx = str.lastIndexOf(':');
+ return new ActivityId(OperatorDescriptorId.parse(str.substring(0, idIdx)), Integer.parseInt(str
+ .substring(idIdx + 1)));
+ }
+ throw new IllegalArgumentException("Unable to parse: " + str);
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
index b858736..8794e09 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
@@ -49,4 +49,12 @@
public String toString() {
return "ODID:" + id;
}
+
+ public static OperatorDescriptorId parse(String str) {
+ if (str.startsWith("ODID:")) {
+ str = str.substring(5);
+ return new OperatorDescriptorId(Integer.parseInt(str));
+ }
+ throw new IllegalArgumentException("Unable to parse: " + str);
+ }
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
index d4e6972..0fb44c1 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
@@ -52,6 +52,15 @@
@Override
public String toString() {
- return "TAID:[" + taskId + "]:" + attempt;
+ return "TAID:" + taskId + ":" + attempt;
+ }
+
+ public static TaskAttemptId parse(String str) {
+ if (str.startsWith("TAID:")) {
+ str = str.substring(5);
+ int idIdx = str.lastIndexOf(':');
+ return new TaskAttemptId(TaskId.parse(str.substring(0, idIdx)), Integer.parseInt(str.substring(idIdx + 1)));
+ }
+ throw new IllegalArgumentException("Unable to parse: " + str);
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
index ee63355..7e0b22d 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -52,6 +52,15 @@
@Override
public String toString() {
- return "TID:[" + activityId + "]:" + partition;
+ return "TID:" + activityId + ":" + partition;
+ }
+
+ public static TaskId parse(String str) {
+ if (str.startsWith("TID:")) {
+ str = str.substring(4);
+ int idIdx = str.lastIndexOf(':');
+ return new TaskId(ActivityId.parse(str.substring(0, idIdx)), Integer.parseInt(str.substring(idIdx + 1)));
+ }
+ throw new IllegalArgumentException("Unable to parse: " + str);
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java
index 4922723..b04fe86 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java
@@ -21,25 +21,29 @@
private static final long serialVersionUID = 1L;
private final ISerializerDeserializer[] fields;
- private final ITypeTrait[] typeTraits;
-
+ private final ITypeTraits[] typeTraits;
+
// leaving this constructor for backwards-compatibility
public RecordDescriptor(ISerializerDeserializer[] fields) {
this.fields = fields;
this.typeTraits = null;
}
-
+
// temporarily adding constructor to include type traits
- public RecordDescriptor(ISerializerDeserializer[] fields, ITypeTrait[] typeTraits) {
- this.fields = fields;
+ public RecordDescriptor(ISerializerDeserializer[] fields, ITypeTraits[] typeTraits) {
+ this.fields = fields;
this.typeTraits = typeTraits;
}
+ public int getFieldCount() {
+ return fields.length;
+ }
+
public ISerializerDeserializer[] getFields() {
return fields;
}
-
- public ITypeTrait[] getTypeTraits() {
- return typeTraits;
+
+ public ITypeTraits[] getTypeTraits() {
+ return typeTraits;
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
index 930d299..2b6d361 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
@@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -33,7 +34,6 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.util.Pair;
public class JobActivityGraph implements Serializable {
private static final long serialVersionUID = 1L;
@@ -148,8 +148,8 @@
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
.getConnectorOperatorMap().get(cdId);
- OperatorDescriptorId consumerOpId = connEdge.second.first.getOperatorId();
- int consumerInputIdx = connEdge.second.second;
+ OperatorDescriptorId consumerOpId = connEdge.getRight().getLeft().getOperatorId();
+ int consumerInputIdx = connEdge.getRight().getRight();
for (ActivityId anId : operatorActivityMap.get(consumerOpId)) {
List<Integer> anInputs = activityInputMap.get(anId);
@@ -168,8 +168,8 @@
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
.getConnectorOperatorMap().get(cdId);
- OperatorDescriptorId producerOpId = connEdge.first.first.getOperatorId();
- int producerInputIdx = connEdge.first.second;
+ OperatorDescriptorId producerOpId = connEdge.getLeft().getLeft().getOperatorId();
+ int producerInputIdx = connEdge.getLeft().getRight();
for (ActivityId anId : operatorActivityMap.get(producerOpId)) {
List<Integer> anOutputs = activityOutputMap.get(anId);
@@ -209,13 +209,11 @@
public JSONObject toJSON() throws JSONException {
JSONObject jplan = new JSONObject();
- jplan.put("type", "plan");
jplan.put("flags", jobFlags.toString());
JSONArray jans = new JSONArray();
for (IActivity an : activityNodes.values()) {
JSONObject jan = new JSONObject();
- jan.put("type", "activity");
jan.put("id", an.getActivityId().toString());
jan.put("java-class", an.getClass().getName());
jan.put("operator-id", an.getActivityId().getOperatorDescriptorId().toString());
@@ -225,7 +223,6 @@
JSONArray jInputs = new JSONArray();
for (int i = 0; i < inputs.size(); ++i) {
JSONObject jInput = new JSONObject();
- jInput.put("type", "activity-input");
jInput.put("input-port", i);
jInput.put("connector-id", inputs.get(i).getConnectorId().toString());
jInputs.put(jInput);
@@ -238,7 +235,6 @@
JSONArray jOutputs = new JSONArray();
for (int i = 0; i < outputs.size(); ++i) {
JSONObject jOutput = new JSONObject();
- jOutput.put("type", "activity-output");
jOutput.put("output-port", i);
jOutput.put("connector-id", outputs.get(i).getConnectorId().toString());
jOutputs.put(jOutput);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 23b33f0..9c6b88c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -33,7 +34,6 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.util.Pair;
public class JobSpecification implements Serializable {
private static final long serialVersionUID = 1L;
@@ -94,10 +94,11 @@
IOperatorDescriptor consumerOp, int consumerPort) {
insertIntoIndexedMap(opInputMap, consumerOp.getOperatorId(), consumerPort, conn);
insertIntoIndexedMap(opOutputMap, producerOp.getOperatorId(), producerPort, conn);
- connectorOpMap.put(conn.getConnectorId(),
- new Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>(
- new Pair<IOperatorDescriptor, Integer>(producerOp, producerPort),
- new Pair<IOperatorDescriptor, Integer>(consumerOp, consumerPort)));
+ connectorOpMap.put(
+ conn.getConnectorId(),
+ Pair.<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> of(
+ Pair.<IOperatorDescriptor, Integer> of(producerOp, producerPort),
+ Pair.<IOperatorDescriptor, Integer> of(consumerOp, consumerPort)));
}
public void setProperty(String name, Serializable value) {
@@ -126,19 +127,19 @@
public RecordDescriptor getConnectorRecordDescriptor(IConnectorDescriptor conn) {
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
.getConnectorId());
- return connInfo.first.first.getOutputRecordDescriptors()[connInfo.first.second];
+ return connInfo.getLeft().getLeft().getOutputRecordDescriptors()[connInfo.getLeft().getRight()];
}
public IOperatorDescriptor getConsumer(IConnectorDescriptor conn) {
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
.getConnectorId());
- return connInfo.second.first;
+ return connInfo.getRight().getLeft();
}
public int getConsumerInputIndex(IConnectorDescriptor conn) {
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
.getConnectorId());
- return connInfo.second.second;
+ return connInfo.getRight().getRight();
}
public IConnectorDescriptor getInputConnectorDescriptor(IOperatorDescriptor op, int inputIndex) {
@@ -180,13 +181,13 @@
public IOperatorDescriptor getProducer(IConnectorDescriptor conn) {
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
.getConnectorId());
- return connInfo.first.first;
+ return connInfo.getLeft().getLeft();
}
public int getProducerOutputIndex(IConnectorDescriptor conn) {
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
.getConnectorId());
- return connInfo.first.second;
+ return connInfo.getLeft().getRight();
}
public List<OperatorDescriptorId> getRoots() {
@@ -264,8 +265,6 @@
public JSONObject toJSON() throws JSONException {
JSONObject jjob = new JSONObject();
- jjob.put("type", "job");
-
JSONArray jopArray = new JSONArray();
for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : opMap.entrySet()) {
jopArray.put(e.getValue().toJSON());
@@ -277,12 +276,11 @@
JSONObject conn = new JSONObject();
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connection = connectorOpMap
.get(e.getKey());
- conn.put("type", "connector-info");
if (connection != null) {
- conn.put("in-operator-id", connection.first.first.getOperatorId().toString());
- conn.put("in-operator-port", connection.first.second.intValue());
- conn.put("out-operator-id", connection.second.first.getOperatorId().toString());
- conn.put("out-operator-port", connection.second.second.intValue());
+ conn.put("in-operator-id", connection.getLeft().getLeft().getOperatorId().toString());
+ conn.put("in-operator-port", connection.getLeft().getRight().intValue());
+ conn.put("out-operator-id", connection.getRight().getLeft().getOperatorId().toString());
+ conn.put("out-operator-port", connection.getRight().getRight().intValue());
}
conn.put("connector", e.getValue().toJSON());
jcArray.put(conn);