merge hyracks_dev_next r847:977

git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@978 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/pom.xml b/hyracks-api/pom.xml
index f73c27a..c18995d 100644
--- a/hyracks-api/pom.xml
+++ b/hyracks-api/pom.xml
@@ -1,9 +1,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-api</artifactId>
-  <version>0.2.0-SNAPSHOT</version>
-
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
@@ -45,5 +42,15 @@
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-ipc</artifactId>
+  		<version>0.2.0-SNAPSHOT</version>
+  	</dependency>
+  	<dependency>
+  		<groupId>org.apache.commons</groupId>
+  		<artifactId>commons-lang3</artifactId>
+  		<version>3.1</version>
+  	</dependency>
   </dependencies>
 </project>
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 6e1eedc..866d307 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.api.client;
 
-import java.rmi.Remote;
 import java.util.EnumSet;
 import java.util.Map;
 
@@ -22,7 +21,7 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 
-public interface IHyracksClientInterface extends Remote {
+public interface IHyracksClientInterface {
     public ClusterControllerInfo getClusterControllerInfo() throws Exception;
 
     public void createApplication(String appName) throws Exception;
@@ -35,7 +34,7 @@
 
     public JobStatus getJobStatus(JobId jobId) throws Exception;
 
-    public void start(JobId jobId) throws Exception;
+    public void startJob(JobId jobId) throws Exception;
 
     public void waitForCompletion(JobId jobId) throws Exception;
 
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index c6ca51c..75cc245 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -52,6 +52,16 @@
     }
 
     public String toString() {
-        return "ANID:[" + odId + "]:" + id;
+        return "ANID:" + odId + ":" + id;
+    }
+
+    public static ActivityId parse(String str) {
+        if (str.startsWith("ANID:")) {
+            str = str.substring(5);
+            int idIdx = str.lastIndexOf(':');
+            return new ActivityId(OperatorDescriptorId.parse(str.substring(0, idIdx)), Integer.parseInt(str
+                    .substring(idIdx + 1)));
+        }
+        throw new IllegalArgumentException("Unable to parse: " + str);
     }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
index b858736..8794e09 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
@@ -49,4 +49,12 @@
     public String toString() {
         return "ODID:" + id;
     }
+
+    public static OperatorDescriptorId parse(String str) {
+        if (str.startsWith("ODID:")) {
+            str = str.substring(5);
+            return new OperatorDescriptorId(Integer.parseInt(str));
+        }
+        throw new IllegalArgumentException("Unable to parse: " + str);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
index d4e6972..0fb44c1 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
@@ -52,6 +52,15 @@
 
     @Override
     public String toString() {
-        return "TAID:[" + taskId + "]:" + attempt;
+        return "TAID:" + taskId + ":" + attempt;
+    }
+
+    public static TaskAttemptId parse(String str) {
+        if (str.startsWith("TAID:")) {
+            str = str.substring(5);
+            int idIdx = str.lastIndexOf(':');
+            return new TaskAttemptId(TaskId.parse(str.substring(0, idIdx)), Integer.parseInt(str.substring(idIdx + 1)));
+        }
+        throw new IllegalArgumentException("Unable to parse: " + str);
     }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
index ee63355..7e0b22d 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -52,6 +52,15 @@
 
     @Override
     public String toString() {
-        return "TID:[" + activityId + "]:" + partition;
+        return "TID:" + activityId + ":" + partition;
+    }
+
+    public static TaskId parse(String str) {
+        if (str.startsWith("TID:")) {
+            str = str.substring(4);
+            int idIdx = str.lastIndexOf(':');
+            return new TaskId(ActivityId.parse(str.substring(0, idIdx)), Integer.parseInt(str.substring(idIdx + 1)));
+        }
+        throw new IllegalArgumentException("Unable to parse: " + str);
     }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java
index 4922723..b04fe86 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java
@@ -21,25 +21,29 @@
     private static final long serialVersionUID = 1L;
 
     private final ISerializerDeserializer[] fields;
-    private final ITypeTrait[] typeTraits;
-    
+    private final ITypeTraits[] typeTraits;
+
     // leaving this constructor for backwards-compatibility
     public RecordDescriptor(ISerializerDeserializer[] fields) {
         this.fields = fields;
         this.typeTraits = null;
     }
-    
+
     // temporarily adding constructor to include type traits
-    public RecordDescriptor(ISerializerDeserializer[] fields, ITypeTrait[] typeTraits) {
-    	this.fields = fields;
+    public RecordDescriptor(ISerializerDeserializer[] fields, ITypeTraits[] typeTraits) {
+        this.fields = fields;
         this.typeTraits = typeTraits;
     }
 
+    public int getFieldCount() {
+        return fields.length;
+    }
+
     public ISerializerDeserializer[] getFields() {
         return fields;
     }
-    
-    public ITypeTrait[] getTypeTraits() {
-    	return typeTraits;
+
+    public ITypeTraits[] getTypeTraits() {
+        return typeTraits;
     }
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
index 930d299..2b6d361 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
@@ -22,6 +22,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
@@ -33,7 +34,6 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.util.Pair;
 
 public class JobActivityGraph implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -148,8 +148,8 @@
         Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
                 .getConnectorOperatorMap().get(cdId);
 
-        OperatorDescriptorId consumerOpId = connEdge.second.first.getOperatorId();
-        int consumerInputIdx = connEdge.second.second;
+        OperatorDescriptorId consumerOpId = connEdge.getRight().getLeft().getOperatorId();
+        int consumerInputIdx = connEdge.getRight().getRight();
 
         for (ActivityId anId : operatorActivityMap.get(consumerOpId)) {
             List<Integer> anInputs = activityInputMap.get(anId);
@@ -168,8 +168,8 @@
         Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
                 .getConnectorOperatorMap().get(cdId);
 
-        OperatorDescriptorId producerOpId = connEdge.first.first.getOperatorId();
-        int producerInputIdx = connEdge.first.second;
+        OperatorDescriptorId producerOpId = connEdge.getLeft().getLeft().getOperatorId();
+        int producerInputIdx = connEdge.getLeft().getRight();
 
         for (ActivityId anId : operatorActivityMap.get(producerOpId)) {
             List<Integer> anOutputs = activityOutputMap.get(anId);
@@ -209,13 +209,11 @@
     public JSONObject toJSON() throws JSONException {
         JSONObject jplan = new JSONObject();
 
-        jplan.put("type", "plan");
         jplan.put("flags", jobFlags.toString());
 
         JSONArray jans = new JSONArray();
         for (IActivity an : activityNodes.values()) {
             JSONObject jan = new JSONObject();
-            jan.put("type", "activity");
             jan.put("id", an.getActivityId().toString());
             jan.put("java-class", an.getClass().getName());
             jan.put("operator-id", an.getActivityId().getOperatorDescriptorId().toString());
@@ -225,7 +223,6 @@
                 JSONArray jInputs = new JSONArray();
                 for (int i = 0; i < inputs.size(); ++i) {
                     JSONObject jInput = new JSONObject();
-                    jInput.put("type", "activity-input");
                     jInput.put("input-port", i);
                     jInput.put("connector-id", inputs.get(i).getConnectorId().toString());
                     jInputs.put(jInput);
@@ -238,7 +235,6 @@
                 JSONArray jOutputs = new JSONArray();
                 for (int i = 0; i < outputs.size(); ++i) {
                     JSONObject jOutput = new JSONObject();
-                    jOutput.put("type", "activity-output");
                     jOutput.put("output-port", i);
                     jOutput.put("connector-id", outputs.get(i).getConnectorId().toString());
                     jOutputs.put(jOutput);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 23b33f0..9c6b88c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -22,6 +22,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
@@ -33,7 +34,6 @@
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.util.Pair;
 
 public class JobSpecification implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -94,10 +94,11 @@
             IOperatorDescriptor consumerOp, int consumerPort) {
         insertIntoIndexedMap(opInputMap, consumerOp.getOperatorId(), consumerPort, conn);
         insertIntoIndexedMap(opOutputMap, producerOp.getOperatorId(), producerPort, conn);
-        connectorOpMap.put(conn.getConnectorId(),
-                new Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>(
-                        new Pair<IOperatorDescriptor, Integer>(producerOp, producerPort),
-                        new Pair<IOperatorDescriptor, Integer>(consumerOp, consumerPort)));
+        connectorOpMap.put(
+                conn.getConnectorId(),
+                Pair.<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> of(
+                        Pair.<IOperatorDescriptor, Integer> of(producerOp, producerPort),
+                        Pair.<IOperatorDescriptor, Integer> of(consumerOp, consumerPort)));
     }
 
     public void setProperty(String name, Serializable value) {
@@ -126,19 +127,19 @@
     public RecordDescriptor getConnectorRecordDescriptor(IConnectorDescriptor conn) {
         Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
                 .getConnectorId());
-        return connInfo.first.first.getOutputRecordDescriptors()[connInfo.first.second];
+        return connInfo.getLeft().getLeft().getOutputRecordDescriptors()[connInfo.getLeft().getRight()];
     }
 
     public IOperatorDescriptor getConsumer(IConnectorDescriptor conn) {
         Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
                 .getConnectorId());
-        return connInfo.second.first;
+        return connInfo.getRight().getLeft();
     }
 
     public int getConsumerInputIndex(IConnectorDescriptor conn) {
         Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
                 .getConnectorId());
-        return connInfo.second.second;
+        return connInfo.getRight().getRight();
     }
 
     public IConnectorDescriptor getInputConnectorDescriptor(IOperatorDescriptor op, int inputIndex) {
@@ -180,13 +181,13 @@
     public IOperatorDescriptor getProducer(IConnectorDescriptor conn) {
         Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
                 .getConnectorId());
-        return connInfo.first.first;
+        return connInfo.getLeft().getLeft();
     }
 
     public int getProducerOutputIndex(IConnectorDescriptor conn) {
         Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
                 .getConnectorId());
-        return connInfo.first.second;
+        return connInfo.getLeft().getRight();
     }
 
     public List<OperatorDescriptorId> getRoots() {
@@ -264,8 +265,6 @@
     public JSONObject toJSON() throws JSONException {
         JSONObject jjob = new JSONObject();
 
-        jjob.put("type", "job");
-
         JSONArray jopArray = new JSONArray();
         for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : opMap.entrySet()) {
             jopArray.put(e.getValue().toJSON());
@@ -277,12 +276,11 @@
             JSONObject conn = new JSONObject();
             Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connection = connectorOpMap
                     .get(e.getKey());
-            conn.put("type", "connector-info");
             if (connection != null) {
-                conn.put("in-operator-id", connection.first.first.getOperatorId().toString());
-                conn.put("in-operator-port", connection.first.second.intValue());
-                conn.put("out-operator-id", connection.second.first.getOperatorId().toString());
-                conn.put("out-operator-port", connection.second.second.intValue());
+                conn.put("in-operator-id", connection.getLeft().getLeft().getOperatorId().toString());
+                conn.put("in-operator-port", connection.getLeft().getRight().intValue());
+                conn.put("out-operator-id", connection.getRight().getLeft().getOperatorId().toString());
+                conn.put("out-operator-port", connection.getRight().getRight().intValue());
             }
             conn.put("connector", e.getValue().toJSON());
             jcArray.put(conn);