Several major changes in hyracks:
-- reduced CC/NC communications for reporting partition request and availability; partition request/availability are only reported for the case of send-side materialized (without pipelining) policies in case of task re-attempt.
-- changed buffer cache to dynamically allocate memory based on needs instead of pre-allocating
-- changed each network channel to lazily allocate memory based on needs, and changed materialized connectors to lazily allocate files based on needs
-- changed several major CCNCCFunctions to use non-java serde
-- added a sort-based group-by operator which pushes group-by aggregations into an external sort
-- make external sort a stable sort

1,3,and 4 is to reduce the job overhead.
2 is to reduce the unecessary NC resource consumptions such as memory and files.
5 and 6 are improvements to runtime operators.

One change in algebricks:
-- implemented a rule to push group-by aggregation into sort, i.e., using the sort-based gby operator

Several important changes in pregelix:
-- remove static states in vertex
-- direct check halt bit without deserialization
-- optimize the sort algorithm by packing yet-another 2-byte normalized key into the tPointers array

Change-Id: Id696f9a9f1647b4a025b8b33d20b3a89127c60d6
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/35
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/hyracks/hyracks-api/pom.xml b/hyracks/hyracks-api/pom.xml
index 734d671..99f728f 100644
--- a/hyracks/hyracks-api/pom.xml
+++ b/hyracks/hyracks-api/pom.xml
@@ -47,7 +47,7 @@
   	<dependency>
   		<groupId>org.apache.httpcomponents</groupId>
   		<artifactId>httpclient</artifactId>
-  		<version>4.1-alpha2</version>
+  		<version>4.3</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 48d7275..e71f3c9 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -74,6 +74,7 @@
         acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
         acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
         acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
+        acg.setReportTaskDetails(spec.isReportTaskDetails());
         final Set<Constraint> constraints = new HashSet<Constraint>();
         final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
             @Override
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
index e93ebeb..3bb7d22 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
@@ -14,15 +14,30 @@
  */
 package edu.uci.ics.hyracks.api.comm;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 
-public final class NetworkAddress implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class NetworkAddress implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final byte[] ipAddress;
+    private byte[] ipAddress;
 
-    private final int port;
+    private int port;
+
+    public static NetworkAddress create(DataInput dis) throws IOException {
+        NetworkAddress networkAddress = new NetworkAddress();
+        networkAddress.readFields(dis);
+        return networkAddress;
+    }
+
+    private NetworkAddress() {
+
+    }
 
     public NetworkAddress(byte[] ipAddress, int port) {
         this.ipAddress = ipAddress;
@@ -55,4 +70,19 @@
         NetworkAddress on = (NetworkAddress) o;
         return on.port == port && Arrays.equals(on.ipAddress, ipAddress);
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeInt(ipAddress.length);
+        output.write(ipAddress);
+        output.writeInt(port);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        int size = input.readInt();
+        ipAddress = new byte[size];
+        input.readFully(ipAddress);
+        port = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index af63632..68560e1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -14,12 +14,27 @@
  */
 package edu.uci.ics.hyracks.api.dataflow;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public final class ActivityId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class ActivityId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
-    private final OperatorDescriptorId odId;
-    private final int id;
+    private OperatorDescriptorId odId;
+    private int id;
+
+    public static ActivityId create(DataInput dis) throws IOException {
+        ActivityId activityId = new ActivityId();
+        activityId.readFields(dis);
+        return activityId;
+    }
+
+    private ActivityId() {
+
+    }
 
     public ActivityId(OperatorDescriptorId odId, int id) {
         this.odId = odId;
@@ -64,4 +79,16 @@
         }
         throw new IllegalArgumentException("Unable to parse: " + str);
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        odId.writeFields(output);
+        output.writeInt(id);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        odId = OperatorDescriptorId.create(input);
+        id = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
index b363556..5190cae 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
@@ -14,13 +14,28 @@
  */
 package edu.uci.ics.hyracks.api.dataflow;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public final class ConnectorDescriptorId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class ConnectorDescriptorId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
     private int id;
 
+    public static ConnectorDescriptorId create(DataInput dis) throws IOException {
+        ConnectorDescriptorId connectorDescriptorId = new ConnectorDescriptorId();
+        connectorDescriptorId.readFields(dis);
+        return connectorDescriptorId;
+    }
+
+    private ConnectorDescriptorId() {
+
+    }
+
     public ConnectorDescriptorId(int id) {
         this.id = id;
     }
@@ -50,4 +65,14 @@
     public String toString() {
         return "CDID:" + id;
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeInt(id);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        id = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
index 0c23465..5351c78 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
@@ -14,12 +14,27 @@
  */
 package edu.uci.ics.hyracks.api.dataflow;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public final class OperatorDescriptorId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class OperatorDescriptorId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final int id;
+    private int id;
+
+    public static OperatorDescriptorId create(DataInput dis) throws IOException {
+        OperatorDescriptorId operatorDescriptorId = new OperatorDescriptorId();
+        operatorDescriptorId.readFields(dis);
+        return operatorDescriptorId;
+    }
+
+    private OperatorDescriptorId() {
+
+    }
 
     public OperatorDescriptorId(int id) {
         this.id = id;
@@ -57,4 +72,14 @@
         }
         throw new IllegalArgumentException("Unable to parse: " + str);
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeInt(id);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        id = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
index 65fa2e5..2355e98 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
@@ -14,14 +14,29 @@
  */
 package edu.uci.ics.hyracks.api.dataflow;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public final class TaskAttemptId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class TaskAttemptId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final TaskId taskId;
+    private TaskId taskId;
 
-    private final int attempt;
+    private int attempt;
+
+    public static TaskAttemptId create(DataInput dis) throws IOException {
+        TaskAttemptId taskAttemptId = new TaskAttemptId();
+        taskAttemptId.readFields(dis);
+        return taskAttemptId;
+    }
+
+    private TaskAttemptId() {
+
+    }
 
     public TaskAttemptId(TaskId taskId, int attempt) {
         this.taskId = taskId;
@@ -63,4 +78,16 @@
         }
         throw new IllegalArgumentException("Unable to parse: " + str);
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        taskId.writeFields(output);
+        output.writeInt(attempt);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        taskId = TaskId.create(input);
+        attempt = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
index 6d58bd9..6b9eecc 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -14,14 +14,29 @@
  */
 package edu.uci.ics.hyracks.api.dataflow;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public final class TaskId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class TaskId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final ActivityId activityId;
+    private ActivityId activityId;
 
-    private final int partition;
+    private int partition;
+
+    public static TaskId create(DataInput dis) throws IOException {
+        TaskId taskId = new TaskId();
+        taskId.readFields(dis);
+        return taskId;
+    }
+
+    private TaskId() {
+
+    }
 
     public TaskId(ActivityId activityId, int partition) {
         this.activityId = activityId;
@@ -63,4 +78,16 @@
         }
         throw new IllegalArgumentException("Unable to parse: " + str);
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        activityId.writeFields(output);
+        output.writeInt(partition);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        activityId = ActivityId.create(input);
+        partition = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/ConnectorPolicyFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/ConnectorPolicyFactory.java
new file mode 100644
index 0000000..8b416da
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/ConnectorPolicyFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @author yingyib
+ */
+public class ConnectorPolicyFactory {
+    public static ConnectorPolicyFactory INSTANCE = new ConnectorPolicyFactory();
+
+    private ConnectorPolicyFactory() {
+
+    }
+
+    public IConnectorPolicy getConnectorPolicy(DataInput input) throws IOException {
+        int kind = input.readInt();
+        switch (kind) {
+            case 0:
+                return new PipeliningConnectorPolicy();
+            case 1:
+                return new SendSideMaterializedBlockingConnectorPolicy();
+            case 2:
+                return new SendSideMaterializedPipeliningConnectorPolicy();
+            case 3:
+                return new SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy();
+            case 4:
+                return new SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy();
+            case 5:
+                return new SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy();
+        }
+        return null;
+    }
+
+    public void writeConnectorPolicy(IConnectorPolicy policy, DataOutput output) throws IOException {
+        if (policy instanceof PipeliningConnectorPolicy) {
+            output.writeInt(0);
+        } else if (policy instanceof SendSideMaterializedBlockingConnectorPolicy) {
+            output.writeInt(1);
+        } else if (policy instanceof SendSideMaterializedPipeliningConnectorPolicy) {
+            output.writeInt(2);
+        } else if (policy instanceof SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy) {
+            output.writeInt(3);
+        } else if (policy instanceof SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy) {
+            output.writeInt(4);
+        } else if (policy instanceof SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy) {
+            output.writeInt(5);
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy.java
new file mode 100644
index 0000000..8beb2f6
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public class SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy implements IConnectorPolicy {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean requiresProducerConsumerCoscheduling() {
+        return true;
+    }
+
+    @Override
+    public boolean consumerWaitsForProducerToFinish() {
+        return false;
+    }
+
+    @Override
+    public boolean materializeOnSendSide() {
+        return false;
+    }
+
+    @Override
+    public boolean materializeOnReceiveSide() {
+        return true;
+    }
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
index 6eab7a7..f461e52 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
@@ -15,17 +15,32 @@
 
 package edu.uci.ics.hyracks.api.deployment;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.io.IWritable;
+
 /**
  * The representation of a deployment id
  * 
  * @author yingyib
  */
-public class DeploymentId implements Serializable {
+public class DeploymentId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final String deploymentKey;
+    private String deploymentKey;
+
+    public static DeploymentId create(DataInput dis) throws IOException {
+        DeploymentId deploymentId = new DeploymentId();
+        deploymentId.readFields(dis);
+        return deploymentId;
+    }
+
+    private DeploymentId() {
+
+    }
 
     public DeploymentId(String deploymentKey) {
         this.deploymentKey = deploymentKey;
@@ -50,4 +65,14 @@
     public String toString() {
         return deploymentKey;
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeUTF(deploymentKey);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        deploymentKey = input.readUTF();
+    }
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWritable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWritable.java
new file mode 100644
index 0000000..9e7e8c8
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWritable.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @author yingyib
+ */
+public interface IWritable {
+
+    public void writeFields(DataOutput output) throws IOException;
+
+    public void readFields(DataInput input) throws IOException;
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
index e80168b..12c4e6e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -17,6 +17,7 @@
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.json.JSONArray;
@@ -25,6 +26,7 @@
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 
 public class ActivityClusterGraph implements Serializable {
@@ -50,12 +52,15 @@
 
     private boolean useConnectorPolicyForScheduling;
 
+    private boolean reportTaskDetails;
+
     public ActivityClusterGraph() {
         version = 0;
         activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
         activityMap = new HashMap<ActivityId, ActivityCluster>();
         connectorMap = new HashMap<ConnectorDescriptorId, ActivityCluster>();
         frameSize = 32768;
+        reportTaskDetails = true;
     }
 
     public Map<ActivityId, ActivityCluster> getActivityMap() {
@@ -135,6 +140,24 @@
         this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
     }
 
+    public boolean isReportTaskDetails() {
+        return reportTaskDetails;
+    }
+
+    public void setReportTaskDetails(boolean reportTaskDetails) {
+        this.reportTaskDetails = reportTaskDetails;
+    }
+
+    public List<IConnectorDescriptor> getActivityInputs(ActivityId activityId) {
+        ActivityCluster ac = activityMap.get(activityId);
+        return ac.getActivityInputMap().get(activityId);
+    }
+
+    public ActivityId getProducerActivity(ConnectorDescriptorId cid) {
+        ActivityCluster ac = connectorMap.get(cid);
+        return ac.getProducerActivity(cid);
+    }
+
     public JSONObject toJSON() throws JSONException {
         JSONObject acgj = new JSONObject();
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
index b8eb61b..c9027ba 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
@@ -14,12 +14,26 @@
  */
 package edu.uci.ics.hyracks.api.job;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public final class JobId implements Serializable {
-    private static final long serialVersionUID = 1L;
+import edu.uci.ics.hyracks.api.io.IWritable;
 
-    private final long id;
+public final class JobId implements IWritable, Serializable {
+    private static final long serialVersionUID = 1L;
+    private long id;
+
+    public static JobId create(DataInput dis) throws IOException {
+        JobId jobId = new JobId();
+        jobId.readFields(dis);
+        return jobId;
+    }
+
+    private JobId() {
+
+    }
 
     public JobId(long id) {
         this.id = id;
@@ -57,4 +71,14 @@
         }
         throw new IllegalArgumentException();
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeLong(id);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        id = input.readLong();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 128978b..19904dd 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -75,6 +75,8 @@
 
     private boolean useConnectorPolicyForScheduling;
 
+    private boolean reportTaskDetails;
+
     private transient int operatorIdCounter;
 
     private transient int connectorIdCounter;
@@ -98,7 +100,8 @@
         operatorIdCounter = 0;
         connectorIdCounter = 0;
         maxReattempts = 2;
-        useConnectorPolicyForScheduling = true;
+        useConnectorPolicyForScheduling = false;
+        reportTaskDetails = true;
         setFrameSize(frameSize);
     }
 
@@ -288,6 +291,14 @@
         this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
     }
 
+    public boolean isReportTaskDetails() {
+        return reportTaskDetails;
+    }
+
+    public void setReportTaskDetails(boolean reportTaskDetails) {
+        this.reportTaskDetails = reportTaskDetails;
+    }
+
     private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
         List<V> vList = map.get(key);
         if (vList == null) {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
index 2ff71d5..c7e01e6 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
@@ -14,21 +14,35 @@
  */
 package edu.uci.ics.hyracks.api.partitions;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.io.IWritable;
 import edu.uci.ics.hyracks.api.job.JobId;
 
-public final class PartitionId implements Serializable {
+public final class PartitionId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final JobId jobId;
+    private JobId jobId;
 
-    private final ConnectorDescriptorId cdId;
+    private ConnectorDescriptorId cdId;
 
-    private final int senderIndex;
+    private int senderIndex;
 
-    private final int receiverIndex;
+    private int receiverIndex;
+
+    public static PartitionId create(DataInput dis) throws IOException {
+        PartitionId partitionId = new PartitionId();
+        partitionId.readFields(dis);
+        return partitionId;
+    }
+
+    private PartitionId() {
+
+    }
 
     public PartitionId(JobId jobId, ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
         this.jobId = jobId;
@@ -94,4 +108,20 @@
     public String toString() {
         return jobId.toString() + ":" + cdId + ":" + senderIndex + ":" + receiverIndex;
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        cdId.writeFields(output);
+        jobId.writeFields(output);
+        output.writeInt(receiverIndex);
+        output.writeInt(senderIndex);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        cdId = ConnectorDescriptorId.create(input);
+        jobId = JobId.create(input);
+        receiverIndex = input.readInt();
+        senderIndex = input.readInt();
+    }
 }
\ No newline at end of file