Several major changes in hyracks:
-- reduced CC/NC communications for reporting partition request and availability; partition request/availability are only reported for the case of send-side materialized (without pipelining) policies in case of task re-attempt.
-- changed buffer cache to dynamically allocate memory based on needs instead of pre-allocating
-- changed each network channel to lazily allocate memory based on needs, and changed materialized connectors to lazily allocate files based on needs
-- changed several major CCNCCFunctions to use non-java serde
-- added a sort-based group-by operator which pushes group-by aggregations into an external sort
-- make external sort a stable sort

1,3,and 4 is to reduce the job overhead.
2 is to reduce the unecessary NC resource consumptions such as memory and files.
5 and 6 are improvements to runtime operators.

One change in algebricks:
-- implemented a rule to push group-by aggregation into sort, i.e., using the sort-based gby operator

Several important changes in pregelix:
-- remove static states in vertex
-- direct check halt bit without deserialization
-- optimize the sort algorithm by packing yet-another 2-byte normalized key into the tPointers array

Change-Id: Id696f9a9f1647b4a025b8b33d20b3a89127c60d6
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/35
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/hyracks/hyracks-api/pom.xml b/hyracks/hyracks-api/pom.xml
index 734d671..99f728f 100644
--- a/hyracks/hyracks-api/pom.xml
+++ b/hyracks/hyracks-api/pom.xml
@@ -47,7 +47,7 @@
   	<dependency>
   		<groupId>org.apache.httpcomponents</groupId>
   		<artifactId>httpclient</artifactId>
-  		<version>4.1-alpha2</version>
+  		<version>4.3</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 48d7275..e71f3c9 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -74,6 +74,7 @@
         acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
         acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
         acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
+        acg.setReportTaskDetails(spec.isReportTaskDetails());
         final Set<Constraint> constraints = new HashSet<Constraint>();
         final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
             @Override
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
index e93ebeb..3bb7d22 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
@@ -14,15 +14,30 @@
  */
 package edu.uci.ics.hyracks.api.comm;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 
-public final class NetworkAddress implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class NetworkAddress implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final byte[] ipAddress;
+    private byte[] ipAddress;
 
-    private final int port;
+    private int port;
+
+    public static NetworkAddress create(DataInput dis) throws IOException {
+        NetworkAddress networkAddress = new NetworkAddress();
+        networkAddress.readFields(dis);
+        return networkAddress;
+    }
+
+    private NetworkAddress() {
+
+    }
 
     public NetworkAddress(byte[] ipAddress, int port) {
         this.ipAddress = ipAddress;
@@ -55,4 +70,19 @@
         NetworkAddress on = (NetworkAddress) o;
         return on.port == port && Arrays.equals(on.ipAddress, ipAddress);
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeInt(ipAddress.length);
+        output.write(ipAddress);
+        output.writeInt(port);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        int size = input.readInt();
+        ipAddress = new byte[size];
+        input.readFully(ipAddress);
+        port = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index af63632..68560e1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -14,12 +14,27 @@
  */
 package edu.uci.ics.hyracks.api.dataflow;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public final class ActivityId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class ActivityId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
-    private final OperatorDescriptorId odId;
-    private final int id;
+    private OperatorDescriptorId odId;
+    private int id;
+
+    public static ActivityId create(DataInput dis) throws IOException {
+        ActivityId activityId = new ActivityId();
+        activityId.readFields(dis);
+        return activityId;
+    }
+
+    private ActivityId() {
+
+    }
 
     public ActivityId(OperatorDescriptorId odId, int id) {
         this.odId = odId;
@@ -64,4 +79,16 @@
         }
         throw new IllegalArgumentException("Unable to parse: " + str);
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        odId.writeFields(output);
+        output.writeInt(id);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        odId = OperatorDescriptorId.create(input);
+        id = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
index b363556..5190cae 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
@@ -14,13 +14,28 @@
  */
 package edu.uci.ics.hyracks.api.dataflow;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public final class ConnectorDescriptorId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class ConnectorDescriptorId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
     private int id;
 
+    public static ConnectorDescriptorId create(DataInput dis) throws IOException {
+        ConnectorDescriptorId connectorDescriptorId = new ConnectorDescriptorId();
+        connectorDescriptorId.readFields(dis);
+        return connectorDescriptorId;
+    }
+
+    private ConnectorDescriptorId() {
+
+    }
+
     public ConnectorDescriptorId(int id) {
         this.id = id;
     }
@@ -50,4 +65,14 @@
     public String toString() {
         return "CDID:" + id;
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeInt(id);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        id = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
index 0c23465..5351c78 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
@@ -14,12 +14,27 @@
  */
 package edu.uci.ics.hyracks.api.dataflow;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public final class OperatorDescriptorId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class OperatorDescriptorId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final int id;
+    private int id;
+
+    public static OperatorDescriptorId create(DataInput dis) throws IOException {
+        OperatorDescriptorId operatorDescriptorId = new OperatorDescriptorId();
+        operatorDescriptorId.readFields(dis);
+        return operatorDescriptorId;
+    }
+
+    private OperatorDescriptorId() {
+
+    }
 
     public OperatorDescriptorId(int id) {
         this.id = id;
@@ -57,4 +72,14 @@
         }
         throw new IllegalArgumentException("Unable to parse: " + str);
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeInt(id);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        id = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
index 65fa2e5..2355e98 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
@@ -14,14 +14,29 @@
  */
 package edu.uci.ics.hyracks.api.dataflow;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public final class TaskAttemptId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class TaskAttemptId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final TaskId taskId;
+    private TaskId taskId;
 
-    private final int attempt;
+    private int attempt;
+
+    public static TaskAttemptId create(DataInput dis) throws IOException {
+        TaskAttemptId taskAttemptId = new TaskAttemptId();
+        taskAttemptId.readFields(dis);
+        return taskAttemptId;
+    }
+
+    private TaskAttemptId() {
+
+    }
 
     public TaskAttemptId(TaskId taskId, int attempt) {
         this.taskId = taskId;
@@ -63,4 +78,16 @@
         }
         throw new IllegalArgumentException("Unable to parse: " + str);
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        taskId.writeFields(output);
+        output.writeInt(attempt);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        taskId = TaskId.create(input);
+        attempt = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
index 6d58bd9..6b9eecc 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -14,14 +14,29 @@
  */
 package edu.uci.ics.hyracks.api.dataflow;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public final class TaskId implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public final class TaskId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final ActivityId activityId;
+    private ActivityId activityId;
 
-    private final int partition;
+    private int partition;
+
+    public static TaskId create(DataInput dis) throws IOException {
+        TaskId taskId = new TaskId();
+        taskId.readFields(dis);
+        return taskId;
+    }
+
+    private TaskId() {
+
+    }
 
     public TaskId(ActivityId activityId, int partition) {
         this.activityId = activityId;
@@ -63,4 +78,16 @@
         }
         throw new IllegalArgumentException("Unable to parse: " + str);
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        activityId.writeFields(output);
+        output.writeInt(partition);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        activityId = ActivityId.create(input);
+        partition = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/ConnectorPolicyFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/ConnectorPolicyFactory.java
new file mode 100644
index 0000000..8b416da
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/ConnectorPolicyFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @author yingyib
+ */
+public class ConnectorPolicyFactory {
+    public static ConnectorPolicyFactory INSTANCE = new ConnectorPolicyFactory();
+
+    private ConnectorPolicyFactory() {
+
+    }
+
+    public IConnectorPolicy getConnectorPolicy(DataInput input) throws IOException {
+        int kind = input.readInt();
+        switch (kind) {
+            case 0:
+                return new PipeliningConnectorPolicy();
+            case 1:
+                return new SendSideMaterializedBlockingConnectorPolicy();
+            case 2:
+                return new SendSideMaterializedPipeliningConnectorPolicy();
+            case 3:
+                return new SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy();
+            case 4:
+                return new SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy();
+            case 5:
+                return new SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy();
+        }
+        return null;
+    }
+
+    public void writeConnectorPolicy(IConnectorPolicy policy, DataOutput output) throws IOException {
+        if (policy instanceof PipeliningConnectorPolicy) {
+            output.writeInt(0);
+        } else if (policy instanceof SendSideMaterializedBlockingConnectorPolicy) {
+            output.writeInt(1);
+        } else if (policy instanceof SendSideMaterializedPipeliningConnectorPolicy) {
+            output.writeInt(2);
+        } else if (policy instanceof SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy) {
+            output.writeInt(3);
+        } else if (policy instanceof SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy) {
+            output.writeInt(4);
+        } else if (policy instanceof SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy) {
+            output.writeInt(5);
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy.java
new file mode 100644
index 0000000..8beb2f6
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy.java
@@ -0,0 +1,26 @@
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public class SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy implements IConnectorPolicy {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean requiresProducerConsumerCoscheduling() {
+        return true;
+    }
+
+    @Override
+    public boolean consumerWaitsForProducerToFinish() {
+        return false;
+    }
+
+    @Override
+    public boolean materializeOnSendSide() {
+        return false;
+    }
+
+    @Override
+    public boolean materializeOnReceiveSide() {
+        return true;
+    }
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
index 6eab7a7..f461e52 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/deployment/DeploymentId.java
@@ -15,17 +15,32 @@
 
 package edu.uci.ics.hyracks.api.deployment;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.io.IWritable;
+
 /**
  * The representation of a deployment id
  * 
  * @author yingyib
  */
-public class DeploymentId implements Serializable {
+public class DeploymentId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final String deploymentKey;
+    private String deploymentKey;
+
+    public static DeploymentId create(DataInput dis) throws IOException {
+        DeploymentId deploymentId = new DeploymentId();
+        deploymentId.readFields(dis);
+        return deploymentId;
+    }
+
+    private DeploymentId() {
+
+    }
 
     public DeploymentId(String deploymentKey) {
         this.deploymentKey = deploymentKey;
@@ -50,4 +65,14 @@
     public String toString() {
         return deploymentKey;
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeUTF(deploymentKey);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        deploymentKey = input.readUTF();
+    }
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWritable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWritable.java
new file mode 100644
index 0000000..9e7e8c8
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWritable.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @author yingyib
+ */
+public interface IWritable {
+
+    public void writeFields(DataOutput output) throws IOException;
+
+    public void readFields(DataInput input) throws IOException;
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
index e80168b..12c4e6e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -17,6 +17,7 @@
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.json.JSONArray;
@@ -25,6 +26,7 @@
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 
 public class ActivityClusterGraph implements Serializable {
@@ -50,12 +52,15 @@
 
     private boolean useConnectorPolicyForScheduling;
 
+    private boolean reportTaskDetails;
+
     public ActivityClusterGraph() {
         version = 0;
         activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
         activityMap = new HashMap<ActivityId, ActivityCluster>();
         connectorMap = new HashMap<ConnectorDescriptorId, ActivityCluster>();
         frameSize = 32768;
+        reportTaskDetails = true;
     }
 
     public Map<ActivityId, ActivityCluster> getActivityMap() {
@@ -135,6 +140,24 @@
         this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
     }
 
+    public boolean isReportTaskDetails() {
+        return reportTaskDetails;
+    }
+
+    public void setReportTaskDetails(boolean reportTaskDetails) {
+        this.reportTaskDetails = reportTaskDetails;
+    }
+
+    public List<IConnectorDescriptor> getActivityInputs(ActivityId activityId) {
+        ActivityCluster ac = activityMap.get(activityId);
+        return ac.getActivityInputMap().get(activityId);
+    }
+
+    public ActivityId getProducerActivity(ConnectorDescriptorId cid) {
+        ActivityCluster ac = connectorMap.get(cid);
+        return ac.getProducerActivity(cid);
+    }
+
     public JSONObject toJSON() throws JSONException {
         JSONObject acgj = new JSONObject();
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
index b8eb61b..c9027ba 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
@@ -14,12 +14,26 @@
  */
 package edu.uci.ics.hyracks.api.job;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public final class JobId implements Serializable {
-    private static final long serialVersionUID = 1L;
+import edu.uci.ics.hyracks.api.io.IWritable;
 
-    private final long id;
+public final class JobId implements IWritable, Serializable {
+    private static final long serialVersionUID = 1L;
+    private long id;
+
+    public static JobId create(DataInput dis) throws IOException {
+        JobId jobId = new JobId();
+        jobId.readFields(dis);
+        return jobId;
+    }
+
+    private JobId() {
+
+    }
 
     public JobId(long id) {
         this.id = id;
@@ -57,4 +71,14 @@
         }
         throw new IllegalArgumentException();
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeLong(id);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        id = input.readLong();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 128978b..19904dd 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -75,6 +75,8 @@
 
     private boolean useConnectorPolicyForScheduling;
 
+    private boolean reportTaskDetails;
+
     private transient int operatorIdCounter;
 
     private transient int connectorIdCounter;
@@ -98,7 +100,8 @@
         operatorIdCounter = 0;
         connectorIdCounter = 0;
         maxReattempts = 2;
-        useConnectorPolicyForScheduling = true;
+        useConnectorPolicyForScheduling = false;
+        reportTaskDetails = true;
         setFrameSize(frameSize);
     }
 
@@ -288,6 +291,14 @@
         this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
     }
 
+    public boolean isReportTaskDetails() {
+        return reportTaskDetails;
+    }
+
+    public void setReportTaskDetails(boolean reportTaskDetails) {
+        this.reportTaskDetails = reportTaskDetails;
+    }
+
     private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
         List<V> vList = map.get(key);
         if (vList == null) {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
index 2ff71d5..c7e01e6 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
@@ -14,21 +14,35 @@
  */
 package edu.uci.ics.hyracks.api.partitions;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.io.IWritable;
 import edu.uci.ics.hyracks.api.job.JobId;
 
-public final class PartitionId implements Serializable {
+public final class PartitionId implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final JobId jobId;
+    private JobId jobId;
 
-    private final ConnectorDescriptorId cdId;
+    private ConnectorDescriptorId cdId;
 
-    private final int senderIndex;
+    private int senderIndex;
 
-    private final int receiverIndex;
+    private int receiverIndex;
+
+    public static PartitionId create(DataInput dis) throws IOException {
+        PartitionId partitionId = new PartitionId();
+        partitionId.readFields(dis);
+        return partitionId;
+    }
+
+    private PartitionId() {
+
+    }
 
     public PartitionId(JobId jobId, ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
         this.jobId = jobId;
@@ -94,4 +108,20 @@
     public String toString() {
         return jobId.toString() + ":" + cdId + ":" + senderIndex + ":" + receiverIndex;
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        cdId.writeFields(output);
+        jobId.writeFields(output);
+        output.writeInt(receiverIndex);
+        output.writeInt(senderIndex);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        cdId = ConnectorDescriptorId.create(input);
+        jobId = JobId.create(input);
+        receiverIndex = input.readInt();
+        senderIndex = input.readInt();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java
index da30e20..08284cc 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/Counters.java
@@ -22,6 +22,8 @@
 
     public static final String MEMORY_USAGE = "heap-used-sizes";
 
+    public static final String MEMORY_MAX = "heap-max-sizes";
+
     public static final String NETWORK_IO_READ = "net-payload-bytes-read";
 
     public static final String NETWORK_IO_WRITE = "net-payload-bytes-written";
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
index c39cba7..62bb943 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/stats/impl/ClientCounterContext.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.client.stats.impl;
 
 import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
@@ -39,7 +40,7 @@
  */
 public class ClientCounterContext implements IClusterCounterContext {
     private static String[] RESET_COUNTERS = { Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE,
-            Counters.MEMORY_USAGE, Counters.DISK_READ, Counters.DISK_WRITE, Counters.NUM_PROCESSOR };
+            Counters.MEMORY_USAGE, Counters.MEMORY_MAX, Counters.DISK_READ, Counters.DISK_WRITE, Counters.NUM_PROCESSOR };
     private static String[] AGG_COUNTERS = { Counters.SYSTEM_LOAD };
     private static int UPDATE_INTERVAL = 10000;
 
@@ -135,7 +136,7 @@
                 }
             }
         } catch (Exception e) {
-            throw new IllegalStateException(e);
+            //ignore
         }
     }
 
@@ -173,16 +174,24 @@
         } else if (counterObject instanceof JSONArray) {
             JSONArray jArray = (JSONArray) counterObject;
             Object[] values = jArray.toArray();
+            /**
+             * use the last non-zero value as the counter value
+             */
             for (Object value : values) {
                 if (value instanceof Double) {
                     Double dValue = (Double) value;
-                    counterValue += dValue.doubleValue();
+                    double currentVal = dValue.doubleValue();
+                    if (currentVal != 0) {
+                        counterValue = (long) currentVal;
+                    }
                 } else if (value instanceof Long) {
                     Long lValue = (Long) value;
-                    counterValue += lValue.longValue();
+                    long currentVal = lValue.longValue();
+                    if (currentVal != 0) {
+                        counterValue = lValue.longValue();
+                    }
                 }
             }
-            counterValue /= values.length;
         } else {
             Long val = (Long) counterObject;
             counterValue = val.longValue();
@@ -215,7 +224,11 @@
             in.close();
             return response.toString();
         } catch (Exception e) {
-            throw new IllegalStateException(e);
+            if (!(e instanceof java.net.ConnectException || e instanceof IOException)) {
+                throw new IllegalStateException(e);
+            } else {
+                return "";
+            }
         }
     }
 
diff --git a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
index bbf212f..2ba2631 100644
--- a/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
+++ b/hyracks/hyracks-client/src/test/java/edu/uci/ics/hyracks/client/stats/ClientCounterContextTest.java
@@ -33,8 +33,9 @@
         synchronized (this) {
             wait(20000);
         }
-        String[] counters = { Counters.MEMORY_USAGE, Counters.NETWORK_IO_READ, Counters.NETWORK_IO_WRITE,
-                Counters.SYSTEM_LOAD, Counters.NUM_PROCESSOR, Counters.DISK_READ, Counters.DISK_WRITE };
+        String[] counters = { Counters.MEMORY_USAGE, Counters.MEMORY_MAX, Counters.NETWORK_IO_READ,
+                Counters.NETWORK_IO_WRITE, Counters.SYSTEM_LOAD, Counters.NUM_PROCESSOR, Counters.DISK_READ,
+                Counters.DISK_WRITE };
         for (String counterName : counters) {
             ICounter counter = ccContext.getCounter(counterName, false);
             System.out.println(counterName + ": " + counter.get());
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
index ffb1ace..c5cb7d0 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkInputChannel.java
@@ -95,9 +95,7 @@
         }
         ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
         ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
-        for (int i = 0; i < nBuffers; ++i) {
-            ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
-        }
+        ccb.getReadInterface().setBufferFactory(new ReadBufferFactory(nBuffers, ctx), nBuffers, ctx.getFrameSize());
         ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
         writeBuffer.putLong(partitionId.getJobId().getId());
         writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
index af1f8f6..b573b73 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/NetworkOutputChannel.java
@@ -32,6 +32,10 @@
 
     private boolean aborted;
 
+    private int frameSize = 32768;
+
+    private int allocateCounter = 0;
+
     public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
         this.ccb = ccb;
         this.nBuffers = nBuffers;
@@ -40,9 +44,7 @@
     }
 
     public void setFrameSize(int frameSize) {
-        for (int i = 0; i < nBuffers; ++i) {
-            emptyStack.push(ByteBuffer.allocateDirect(frameSize));
-        }
+        this.frameSize = frameSize;
     }
 
     @Override
@@ -58,6 +60,10 @@
                     throw new HyracksDataException("Connection has been aborted");
                 }
                 destBuffer = emptyStack.poll();
+                if (destBuffer == null && allocateCounter < nBuffers) {
+                    destBuffer = ByteBuffer.allocateDirect(frameSize);
+                    allocateCounter++;
+                }
                 if (destBuffer != null) {
                     break;
                 }
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/ReadBufferFactory.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/ReadBufferFactory.java
new file mode 100644
index 0000000..c59398c
--- /dev/null
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/ReadBufferFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm.channels;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IBufferFactory;
+
+/**
+ * @author yingyib
+ */
+public class ReadBufferFactory implements IBufferFactory {
+
+    private final int limit;
+    private final int frameSize;
+    private int counter = 0;
+
+    public ReadBufferFactory(int limit, IHyracksCommonContext ctx) {
+        this.limit = limit;
+        this.frameSize = ctx.getFrameSize();
+    }
+
+    @Override
+    public ByteBuffer createBuffer() {
+        try {
+            if (counter >= limit) {
+                return null;
+            } else {
+                ByteBuffer frame = ByteBuffer.allocate(frameSize);
+                counter++;
+                return frame;
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index c994dfb..e0bc9e2 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -84,8 +84,8 @@
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
 import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
-import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
 import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
 import edu.uci.ics.hyracks.control.common.logs.LogFile;
 import edu.uci.ics.hyracks.control.common.work.IPCResponder;
 import edu.uci.ics.hyracks.control.common.work.IResultCallback;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index ad4744b..dbd64a7 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -30,11 +30,13 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.constraints.Constraint;
 import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
@@ -338,29 +340,45 @@
         tcAttempt.initializePendingTaskCounter();
         tcAttempts.add(tcAttempt);
 
-        /* TODO - Further improvement for reducing messages -- not yet complete.
+        /**
+         * Improvement for reducing master/slave message communications, for each TaskAttemptDescriptor,
+         * we set the NetworkAddress[][] partitionLocations, in which each row is for an incoming connector descriptor
+         * and each column is for an input channel of the connector.
+         */
         for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
             List<TaskAttemptDescriptor> tads = e.getValue();
             for (TaskAttemptDescriptor tad : tads) {
-                TaskId tid = tad.getTaskAttemptId().getTaskId();
+                TaskAttemptId taid = tad.getTaskAttemptId();
+                int attempt = taid.getAttempt();
+                TaskId tid = taid.getTaskId();
                 ActivityId aid = tid.getActivityId();
-                List<IConnectorDescriptor> inConnectors = jag.getActivityInputConnectorDescriptors(aid);
+                List<IConnectorDescriptor> inConnectors = acg.getActivityInputs(aid);
                 int[] inPartitionCounts = tad.getInputPartitionCounts();
-                NetworkAddress[][] partitionLocations = new NetworkAddress[inPartitionCounts.length][];
-                for (int i = 0; i < inPartitionCounts.length; ++i) {
-                    ConnectorDescriptorId cdId = inConnectors.get(i).getConnectorId();
-                    ActivityId producerAid = jag.getProducerActivity(cdId);
-                    partitionLocations[i] = new NetworkAddress[inPartitionCounts[i]];
-                    for (int j = 0; j < inPartitionCounts[i]; ++j) {
-                        TaskId producerTaskId = new TaskId(producerAid, j);
-                        String nodeId = findTaskLocation(producerTaskId);
-                        partitionLocations[i][j] = ccs.getNodeMap().get(nodeId).getDataPort();
+                if (inPartitionCounts != null) {
+                    NetworkAddress[][] partitionLocations = new NetworkAddress[inPartitionCounts.length][];
+                    for (int i = 0; i < inPartitionCounts.length; ++i) {
+                        ConnectorDescriptorId cdId = inConnectors.get(i).getConnectorId();
+                        IConnectorPolicy policy = jobRun.getConnectorPolicyMap().get(cdId);
+                        /**
+                         * carry sender location information into a task
+                         * when it is not the case that it is an re-attempt and the send-side
+                         * is materialized blocking.
+                         */
+                        if (!(attempt > 0 && policy.materializeOnSendSide() && policy
+                                .consumerWaitsForProducerToFinish())) {
+                            ActivityId producerAid = acg.getProducerActivity(cdId);
+                            partitionLocations[i] = new NetworkAddress[inPartitionCounts[i]];
+                            for (int j = 0; j < inPartitionCounts[i]; ++j) {
+                                TaskId producerTaskId = new TaskId(producerAid, j);
+                                String nodeId = findTaskLocation(producerTaskId);
+                                partitionLocations[i][j] = ccs.getNodeMap().get(nodeId).getDataPort();
+                            }
+                        }
                     }
+                    tad.setInputPartitionLocations(partitionLocations);
                 }
-                tad.setInputPartitionLocations(partitionLocations);
             }
         }
-        */
 
         tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
         tcAttempt.setStartTime(System.currentTimeMillis());
@@ -442,24 +460,25 @@
         final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
         final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
                 jobRun.getConnectorPolicyMap());
-        for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
-            String nodeId = entry.getKey();
-            final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
-            final NodeControllerState node = ccs.getNodeMap().get(nodeId);
-            if (node != null) {
-                node.getActiveJobIds().add(jobRun.getJobId());
-                boolean changed = jobRun.getParticipatingNodeIds().add(nodeId);
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey());
-                }
-                try {
-                    byte[] jagBytes = changed ? JavaSerializationUtils.serialize(acg) : null;
+        try {
+            byte[] acgBytes = JavaSerializationUtils.serialize(acg);
+            for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
+                String nodeId = entry.getKey();
+                final List<TaskAttemptDescriptor> taskDescriptors = entry.getValue();
+                final NodeControllerState node = ccs.getNodeMap().get(nodeId);
+                if (node != null) {
+                    node.getActiveJobIds().add(jobRun.getJobId());
+                    boolean changed = jobRun.getParticipatingNodeIds().add(nodeId);
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey());
+                    }
+                    byte[] jagBytes = changed ? acgBytes : null;
                     node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors,
                             connectorPolicies, jobRun.getFlags());
-                } catch (Exception e) {
-                    e.printStackTrace();
                 }
             }
+        } catch (Exception e) {
+            throw new HyracksException(e);
         }
     }
 
@@ -702,10 +721,16 @@
         ccs.getActiveRunMap().remove(jobId);
         ccs.getRunMapArchive().put(jobId, run);
         ccs.getRunHistory().put(jobId, run.getExceptions());
-        try {
-            ccs.getJobLogFile().log(createJobLogObject(run));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+
+        if (run.getActivityClusterGraph().isReportTaskDetails()) {
+            /**
+             * log job details when task-profiling is enabled
+             */
+            try {
+                ccs.getJobLogFile().log(createJobLogObject(run));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 340134e..5c9aad5 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -105,10 +105,16 @@
         ccs.getActiveRunMap().remove(jobId);
         ccs.getRunMapArchive().put(jobId, run);
         ccs.getRunHistory().put(jobId, run.getExceptions());
-        try {
-            ccs.getJobLogFile().log(createJobLogObject(run));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
+
+        if (run.getActivityClusterGraph().isReportTaskDetails()) {
+            /**
+             * log job details when profiling is enabled
+             */
+            try {
+                ccs.getJobLogFile().log(createJobLogObject(run));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 2d6bdea..9f1c4b2 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -18,11 +18,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -71,24 +67,6 @@
             ccs.getActiveRunMap().remove(jobId);
             ccs.getRunMapArchive().put(jobId, run);
             ccs.getRunHistory().put(jobId, run.getExceptions());
-            try {
-                ccs.getJobLogFile().log(createJobLogObject(run));
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-
         }
     }
-
-    private JSONObject createJobLogObject(final JobRun run) {
-        JSONObject jobLogObject = new JSONObject();
-        try {
-            ActivityClusterGraph acg = run.getActivityClusterGraph();
-            jobLogObject.put("activity-cluster-graph", acg.toJSON());
-            jobLogObject.put("job-run", run.toJSON());
-        } catch (JSONException e) {
-            throw new RuntimeException(e);
-        }
-        return jobLogObject;
-    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 01525e4..74e9710 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -49,6 +49,9 @@
     @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
     public int nNetThreads = 1;
 
+    @Option(name = "-net-buffer-count", usage = "Number of network buffers per input/output channel (default:1)", required = false)
+    public int nNetBuffers = 1;
+
     @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
     public int maxMemory = -1;
 
@@ -84,6 +87,8 @@
         cList.add(ioDevices);
         cList.add("-net-thread-count");
         cList.add(String.valueOf(nNetThreads));
+        cList.add("-net-buffer-count");
+        cList.add(String.valueOf(nNetBuffers));
         cList.add("-max-memory");
         cList.add(String.valueOf(maxMemory));
         cList.add("-result-time-to-live");
@@ -113,6 +118,7 @@
         configuration.put("data-ip-address", dataIPAddress);
         configuration.put("iodevices", ioDevices);
         configuration.put("net-thread-count", String.valueOf(nNetThreads));
+        configuration.put("net-buffer-count", String.valueOf(nNetBuffers));
         configuration.put("max-memory", String.valueOf(maxMemory));
         configuration.put("result-time-to-live", String.valueOf(resultTTL));
         configuration.put("result-sweep-threshold", String.valueOf(resultSweepThreshold));
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
index 2407c10..a39a159 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/deployment/DeploymentUtils.java
@@ -28,7 +28,7 @@
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
 
 import edu.uci.ics.hyracks.api.application.IApplicationContext;
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -200,7 +200,7 @@
                     String filePath = deploymentDir + File.separator + fileName;
                     File targetFile = new File(filePath);
                     if (isNC) {
-                        HttpClient hc = new DefaultHttpClient();
+                        HttpClient hc = HttpClientBuilder.create().build();
                         HttpGet get = new HttpGet(url.toString());
                         HttpResponse response = hc.execute(get);
                         InputStream is = response.getEntity().getContent();
@@ -216,6 +216,7 @@
                 }
                 return downloadedFileURLs;
             } catch (Exception e) {
+                e.printStackTrace();
                 trace = e;
             }
         }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index e999913..d574435 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -14,10 +14,11 @@
  */
 package edu.uci.ics.hyracks.control.common.heartbeat;
 
-import java.io.Serializable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
-public class HeartbeatData implements Serializable {
-    private static final long serialVersionUID = 1L;
+public class HeartbeatData {
 
     public long heapInitSize;
     public long heapUsedSize;
@@ -47,4 +48,83 @@
     public long ipcMessageBytesReceived;
     public long diskReads;
     public long diskWrites;
+
+    public void readFields(DataInput dis) throws IOException {
+        heapInitSize = dis.readLong();
+        heapUsedSize = dis.readLong();
+        heapCommittedSize = dis.readLong();
+        heapMaxSize = dis.readLong();
+        nonheapInitSize = dis.readLong();
+        nonheapUsedSize = dis.readLong();
+        nonheapCommittedSize = dis.readLong();
+        nonheapMaxSize = dis.readLong();
+        threadCount = dis.readInt();
+        peakThreadCount = dis.readInt();
+        totalStartedThreadCount = dis.readLong();
+        systemLoadAverage = dis.readDouble();
+        netPayloadBytesRead = dis.readLong();
+        netPayloadBytesWritten = dis.readLong();
+        netSignalingBytesRead = dis.readLong();
+        netSignalingBytesWritten = dis.readLong();
+        netSignalingBytesWritten = dis.readLong();
+        datasetNetPayloadBytesWritten = dis.readLong();
+        datasetNetSignalingBytesRead = dis.readLong();
+        datasetNetSignalingBytesWritten = dis.readLong();
+        ipcMessagesSent = dis.readLong();
+        ipcMessageBytesSent = dis.readLong();
+        ipcMessagesReceived = dis.readLong();
+        ipcMessageBytesReceived = dis.readLong();
+        diskReads = dis.readLong();
+        diskWrites = dis.readLong();
+
+        int gcCounts = dis.readInt();
+        gcCollectionCounts = new long[gcCounts];
+        for (int i = 0; i < gcCollectionCounts.length; i++) {
+            gcCollectionCounts[i] = dis.readLong();
+        }
+        int gcTimeCounts = dis.readInt();
+        gcCollectionTimes = new long[gcTimeCounts];
+        for (int i = 0; i < gcCollectionTimes.length; i++) {
+            gcCollectionTimes[i] = dis.readLong();
+        }
+    }
+
+    public void write(DataOutput dos) throws IOException {
+        dos.writeLong(heapInitSize);
+        dos.writeLong(heapUsedSize);
+        dos.writeLong(heapCommittedSize);
+        dos.writeLong(heapMaxSize);
+        dos.writeLong(nonheapInitSize);
+        dos.writeLong(nonheapUsedSize);
+        dos.writeLong(nonheapCommittedSize);
+        dos.writeLong(nonheapMaxSize);
+        dos.writeInt(threadCount);
+        dos.writeInt(peakThreadCount);
+        dos.writeLong(totalStartedThreadCount);
+        dos.writeDouble(systemLoadAverage);
+        dos.writeLong(netPayloadBytesRead);
+        dos.writeLong(netPayloadBytesWritten);
+        dos.writeLong(netSignalingBytesRead);
+        dos.writeLong(netSignalingBytesWritten);
+        dos.writeLong(datasetNetPayloadBytesRead);
+        dos.writeLong(datasetNetPayloadBytesWritten);
+        dos.writeLong(datasetNetSignalingBytesRead);
+        dos.writeLong(datasetNetSignalingBytesWritten);
+        dos.writeLong(ipcMessagesSent);
+        dos.writeLong(ipcMessageBytesSent);
+        dos.writeLong(ipcMessagesReceived);
+        dos.writeLong(ipcMessageBytesReceived);
+        dos.writeLong(diskReads);
+        dos.writeLong(diskWrites);
+
+        dos.writeInt(gcCollectionCounts.length);
+        for (int i = 0; i < gcCollectionCounts.length; i++) {
+            dos.writeLong(gcCollectionCounts[i]);
+        }
+        dos.writeInt(gcCollectionTimes.length);
+        for (int i = 0; i < gcCollectionTimes.length; i++) {
+            dos.writeLong(gcCollectionTimes[i]);
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 6be2294..1417df9 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -23,9 +23,12 @@
 import java.io.Serializable;
 import java.net.URL;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -36,6 +39,7 @@
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.ConnectorPolicyFactory;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -176,10 +180,10 @@
     public static class NotifyTaskCompleteFunction extends Function {
         private static final long serialVersionUID = 1L;
 
-        private final JobId jobId;
-        private final TaskAttemptId taskId;
-        private final String nodeId;
-        private final TaskProfile statistics;
+        private JobId jobId;
+        private TaskAttemptId taskId;
+        private String nodeId;
+        private TaskProfile statistics;
 
         public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
             this.jobId = jobId;
@@ -208,6 +212,26 @@
         public TaskProfile getStatistics() {
             return statistics;
         }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            JobId jobId = JobId.create(dis);
+            String nodeId = dis.readUTF();
+            TaskAttemptId taskId = TaskAttemptId.create(dis);
+            TaskProfile statistics = TaskProfile.create(dis);
+            return new NotifyTaskCompleteFunction(jobId, taskId, nodeId, statistics);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            NotifyTaskCompleteFunction fn = (NotifyTaskCompleteFunction) object;
+            DataOutputStream dos = new DataOutputStream(out);
+            fn.jobId.writeFields(dos);
+            dos.writeUTF(fn.nodeId);
+            fn.taskId.writeFields(dos);
+            fn.statistics.writeFields(dos);
+        }
     }
 
     public static class NotifyTaskFailureFunction extends Function {
@@ -270,6 +294,23 @@
         public String getNodeId() {
             return nodeId;
         }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            JobId jobId = JobId.create(dis);
+            String nodeId = dis.readUTF();
+
+            return new NotifyJobletCleanupFunction(jobId, nodeId);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            NotifyJobletCleanupFunction fn = (NotifyJobletCleanupFunction) object;
+            DataOutputStream dos = new DataOutputStream(out);
+            fn.jobId.writeFields(dos);
+            dos.writeUTF(fn.nodeId);
+        }
     }
 
     public static class NodeHeartbeatFunction extends Function {
@@ -295,6 +336,23 @@
         public HeartbeatData getHeartbeatData() {
             return hbData;
         }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            String nodeId = dis.readUTF();
+            HeartbeatData hbData = new HeartbeatData();
+            hbData.readFields(dis);
+            return new NodeHeartbeatFunction(nodeId, hbData);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            NodeHeartbeatFunction fn = (NodeHeartbeatFunction) object;
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeUTF(fn.nodeId);
+            fn.hbData.write(dos);
+        }
     }
 
     public static class ReportProfileFunction extends Function {
@@ -650,6 +708,90 @@
         public EnumSet<JobFlag> getFlags() {
             return flags;
         }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            //read jobId and taskId
+            JobId jobId = JobId.create(dis);
+            DeploymentId deploymentId = null;
+            boolean hasDeployed = dis.readBoolean();
+            if (hasDeployed) {
+                deploymentId = DeploymentId.create(dis);
+            }
+
+            // read plan bytes
+            int planBytesSize = dis.readInt();
+            byte[] planBytes = null;
+            if (planBytesSize >= 0) {
+                planBytes = new byte[planBytesSize];
+                dis.read(planBytes, 0, planBytesSize);
+            }
+
+            // read task attempt descriptors
+            int tadSize = dis.readInt();
+            List<TaskAttemptDescriptor> taskDescriptors = new ArrayList<TaskAttemptDescriptor>();
+            for (int i = 0; i < tadSize; i++) {
+                TaskAttemptDescriptor tad = TaskAttemptDescriptor.create(dis);
+                taskDescriptors.add(tad);
+            }
+
+            //read connector policies
+            int cpSize = dis.readInt();
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+            for (int i = 0; i < cpSize; i++) {
+                ConnectorDescriptorId cid = ConnectorDescriptorId.create(dis);
+                IConnectorPolicy policy = ConnectorPolicyFactory.INSTANCE.getConnectorPolicy(dis);
+                connectorPolicies.put(cid, policy);
+            }
+
+            // read flags
+            int flagSize = dis.readInt();
+            EnumSet<JobFlag> flags = EnumSet.noneOf(JobFlag.class);
+            for (int i = 0; i < flagSize; i++) {
+                flags.add(JobFlag.values()[(dis.readInt())]);
+            }
+
+            return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            StartTasksFunction fn = (StartTasksFunction) object;
+            DataOutputStream dos = new DataOutputStream(out);
+
+            //write jobId and deploymentId
+            fn.jobId.writeFields(dos);
+            dos.writeBoolean(fn.deploymentId == null ? false : true);
+            if (fn.deploymentId != null) {
+                fn.deploymentId.writeFields(dos);
+            }
+
+            //write plan bytes
+            dos.writeInt(fn.planBytes == null ? -1 : fn.planBytes.length);
+            if (fn.planBytes != null) {
+                dos.write(fn.planBytes, 0, fn.planBytes.length);
+            }
+
+            //write task descriptors
+            dos.writeInt(fn.taskDescriptors.size());
+            for (int i = 0; i < fn.taskDescriptors.size(); i++) {
+                fn.taskDescriptors.get(i).writeFields(dos);
+            }
+
+            //write connector policies
+            dos.writeInt(fn.connectorPolicies.size());
+            for (Entry<ConnectorDescriptorId, IConnectorPolicy> entry : fn.connectorPolicies.entrySet()) {
+                entry.getKey().writeFields(dos);
+                ConnectorPolicyFactory.INSTANCE.writeConnectorPolicy(entry.getValue(), dos);
+            }
+
+            //write flags
+            dos.writeInt(fn.flags.size());
+            for (JobFlag flag : fn.flags) {
+                dos.writeInt(flag.ordinal());
+            }
+        }
     }
 
     public static class AbortTasksFunction extends Function {
@@ -700,6 +842,23 @@
         public JobStatus getStatus() {
             return status;
         }
+
+        public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+            DataInputStream dis = new DataInputStream(bais);
+
+            JobId jobId = JobId.create(dis);
+            JobStatus status = JobStatus.values()[dis.readInt()];
+
+            return new CleanupJobletFunction(jobId, status);
+        }
+
+        public static void serialize(OutputStream out, Object object) throws Exception {
+            CleanupJobletFunction fn = (CleanupJobletFunction) object;
+            DataOutputStream dos = new DataOutputStream(out);
+            fn.jobId.writeFields(dos);
+            dos.writeInt(fn.status.ordinal());
+        }
     }
 
     public static class GetNodeControllersInfoFunction extends Function {
@@ -978,6 +1137,25 @@
                 case REPORT_PARTITION_AVAILABILITY:
                     ReportPartitionAvailabilityFunction.serialize(out, object);
                     return;
+
+                case NODE_HEARTBEAT:
+                    NodeHeartbeatFunction.serialize(out, object);
+                    return;
+
+                case START_TASKS:
+                    StartTasksFunction.serialize(out, object);
+                    return;
+
+                case NOTIFY_TASK_COMPLETE:
+                    NotifyTaskCompleteFunction.serialize(out, object);
+                    return;
+
+                case NOTIFY_JOBLET_CLEANUP:
+                    NotifyJobletCleanupFunction.serialize(out, object);
+                    return;
+                case CLEANUP_JOBLET:
+                    CleanupJobletFunction.serialize(out, object);
+                    return;
             }
             JavaSerializationBasedPayloadSerializerDeserializer.serialize(out, object);
         }
@@ -992,6 +1170,21 @@
 
                 case REPORT_PARTITION_AVAILABILITY:
                     return ReportPartitionAvailabilityFunction.deserialize(buffer, length);
+
+                case NODE_HEARTBEAT:
+                    return NodeHeartbeatFunction.deserialize(buffer, length);
+
+                case START_TASKS:
+                    return StartTasksFunction.deserialize(buffer, length);
+
+                case NOTIFY_TASK_COMPLETE:
+                    return NotifyTaskCompleteFunction.deserialize(buffer, length);
+
+                case NOTIFY_JOBLET_CLEANUP:
+                    return NotifyJobletCleanupFunction.deserialize(buffer, length);
+
+                case CLEANUP_JOBLET:
+                    return CleanupJobletFunction.deserialize(buffer, length);
             }
 
             return javaSerde.deserializeObject(buffer, length);
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
index 6018132..43b39d3 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
@@ -14,25 +14,39 @@
  */
 package edu.uci.ics.hyracks.control.common.job;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.io.IWritable;
 
-public class TaskAttemptDescriptor implements Serializable {
+public class TaskAttemptDescriptor implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final TaskAttemptId taId;
+    private TaskAttemptId taId;
 
-    private final int nPartitions;
+    private int nPartitions;
 
-    private final int[] nInputPartitions;
+    private int[] nInputPartitions;
 
-    private final int[] nOutputPartitions;
+    private int[] nOutputPartitions;
 
     private NetworkAddress[][] inputPartitionLocations;
 
+    public static TaskAttemptDescriptor create(DataInput dis) throws IOException {
+        TaskAttemptDescriptor taskAttemptDescriptor = new TaskAttemptDescriptor();
+        taskAttemptDescriptor.readFields(dis);
+        return taskAttemptDescriptor;
+    }
+
+    private TaskAttemptDescriptor() {
+
+    }
+
     public TaskAttemptDescriptor(TaskAttemptId taId, int nPartitions, int[] nInputPartitions, int[] nOutputPartitions) {
         this.taId = taId;
         this.nPartitions = nPartitions;
@@ -70,4 +84,74 @@
                 + Arrays.toString(nInputPartitions) + ", nOutputPartitions = " + Arrays.toString(nOutputPartitions)
                 + "]";
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        taId.writeFields(output);
+        output.writeInt(nPartitions);
+
+        output.writeInt(nInputPartitions == null ? -1 : nInputPartitions.length);
+        if (nInputPartitions != null) {
+            for (int i = 0; i < nInputPartitions.length; i++) {
+                output.writeInt(nInputPartitions[i]);
+            }
+        }
+
+        output.writeInt(nOutputPartitions == null ? -1 : nOutputPartitions.length);
+        if (nOutputPartitions != null) {
+            for (int i = 0; i < nOutputPartitions.length; i++) {
+                output.writeInt(nOutputPartitions[i]);
+            }
+        }
+
+        output.writeInt(inputPartitionLocations == null ? -1 : inputPartitionLocations.length);
+        if (inputPartitionLocations != null) {
+            for (int i = 0; i < inputPartitionLocations.length; i++) {
+                if (inputPartitionLocations[i] != null) {
+                    output.writeInt(inputPartitionLocations[i].length);
+                    for (int j = 0; j < inputPartitionLocations[i].length; j++) {
+                        inputPartitionLocations[i][j].writeFields(output);
+                    }
+                } else {
+                    output.writeInt(-1);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        taId = TaskAttemptId.create(input);
+        nPartitions = input.readInt();
+
+        int inputCount = input.readInt();
+        if (inputCount >= 0) {
+            nInputPartitions = new int[inputCount];
+            for (int i = 0; i < nInputPartitions.length; i++) {
+                nInputPartitions[i] = input.readInt();
+            }
+        }
+
+        int outputCount = input.readInt();
+        if (outputCount >= 0) {
+            nOutputPartitions = new int[outputCount];
+            for (int i = 0; i < nOutputPartitions.length; i++) {
+                nOutputPartitions[i] = input.readInt();
+            }
+        }
+
+        int addrCount = input.readInt();
+        if (addrCount >= 0) {
+            inputPartitionLocations = new NetworkAddress[addrCount][];
+            for (int i = 0; i < inputPartitionLocations.length; i++) {
+                int columns = input.readInt();
+                if (columns >= 0) {
+                    inputPartitionLocations[i] = new NetworkAddress[columns];
+                    for (int j = 0; j < columns; j++) {
+                        inputPartitionLocations[i][j] = NetworkAddress.create(input);
+                    }
+                }
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
index 2080718..b8b90cb 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
@@ -14,12 +14,17 @@
  */
 package edu.uci.ics.hyracks.control.common.job.profiling.counters;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
-public class MultiResolutionEventProfiler implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public class MultiResolutionEventProfiler implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final int[] times;
+    private int[] times;
 
     private long offset;
 
@@ -29,6 +34,16 @@
 
     private int eventCounter;
 
+    public static MultiResolutionEventProfiler create(DataInput dis) throws IOException {
+        MultiResolutionEventProfiler multiResolutionEventProfiler = new MultiResolutionEventProfiler();
+        multiResolutionEventProfiler.readFields(dis);
+        return multiResolutionEventProfiler;
+    }
+
+    private MultiResolutionEventProfiler() {
+
+    }
+
     public MultiResolutionEventProfiler(int nSamples) {
         times = new int[nSamples];
         offset = -1;
@@ -78,4 +93,29 @@
     public long getOffset() {
         return offset;
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeInt(eventCounter);
+        output.writeLong(offset);
+        output.writeInt(ptr);
+        output.writeInt(resolution);
+        output.writeInt(times.length);
+        for (int i = 0; i < times.length; i++) {
+            output.writeInt(times[i]);
+        }
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        eventCounter = input.readInt();
+        offset = input.readLong();
+        ptr = input.readInt();
+        resolution = input.readInt();
+        int nSamples = input.readInt();
+        times = new int[nSamples];
+        for (int i = 0; i < times.length; i++) {
+            times[i] = input.readInt();
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java
index 2cb4191..12cd4b1 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java
@@ -14,18 +14,24 @@
  */
 package edu.uci.ics.hyracks.control.common.job.profiling.om;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
-public abstract class AbstractProfile implements Serializable {
+import edu.uci.ics.hyracks.api.io.IWritable;
+
+public abstract class AbstractProfile implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    protected final Map<String, Long> counters;
+    protected Map<String, Long> counters;
 
     public AbstractProfile() {
         counters = new HashMap<String, Long>();
@@ -51,4 +57,24 @@
     protected void merge(AbstractProfile profile) {
         counters.putAll(profile.counters);
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeInt(counters.size());
+        for (Entry<String, Long> entry : counters.entrySet()) {
+            output.writeUTF(entry.getKey());
+            output.writeLong(entry.getValue());
+        }
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        int size = input.readInt();
+        counters = new HashMap<String, Long>();
+        for (int i = 0; i < size; i++) {
+            String key = input.readUTF();
+            long value = input.readLong();
+            counters.put(key, value);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
index a941187..a3f7e41 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -14,8 +14,12 @@
  */
 package edu.uci.ics.hyracks.control.common.job.profiling.om;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.json.JSONArray;
 import org.json.JSONException;
@@ -26,9 +30,19 @@
 public class JobProfile extends AbstractProfile {
     private static final long serialVersionUID = 1L;
 
-    private final JobId jobId;
+    private JobId jobId;
 
-    private final Map<String, JobletProfile> jobletProfiles;
+    private Map<String, JobletProfile> jobletProfiles;
+
+    public static JobProfile create(DataInput dis) throws IOException {
+        JobProfile jobProfile = new JobProfile();
+        jobProfile.readFields(dis);
+        return jobProfile;
+    }
+
+    private JobProfile() {
+
+    }
 
     public JobProfile(JobId jobId) {
         this.jobId = jobId;
@@ -68,4 +82,26 @@
             }
         }
     }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        jobId = JobId.create(input);
+        int size = input.readInt();
+        jobletProfiles = new HashMap<String, JobletProfile>();
+        for (int i = 0; i < size; i++) {
+            String key = input.readUTF();
+            JobletProfile value = JobletProfile.create(input);
+            jobletProfiles.put(key, value);
+        }
+    }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        jobId.writeFields(output);
+        output.writeInt(jobletProfiles.size());
+        for (Entry<String, JobletProfile> entry : jobletProfiles.entrySet()) {
+            output.writeUTF(entry.getKey());
+            entry.getValue().writeFields(output);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java
index 16d08d7..a879873 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java
@@ -14,8 +14,12 @@
  */
 package edu.uci.ics.hyracks.control.common.job.profiling.om;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.json.JSONArray;
 import org.json.JSONException;
@@ -26,9 +30,19 @@
 public class JobletProfile extends AbstractProfile {
     private static final long serialVersionUID = 1L;
 
-    private final String nodeId;
+    private String nodeId;
 
-    private final Map<TaskAttemptId, TaskProfile> taskProfiles;
+    private Map<TaskAttemptId, TaskProfile> taskProfiles;
+
+    public static JobletProfile create(DataInput dis) throws IOException {
+        JobletProfile jobletProfile = new JobletProfile();
+        jobletProfile.readFields(dis);
+        return jobletProfile;
+    }
+
+    private JobletProfile() {
+
+    }
 
     public JobletProfile(String nodeId) {
         this.nodeId = nodeId;
@@ -68,4 +82,28 @@
             }
         }
     }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        super.readFields(input);
+        nodeId = input.readUTF();
+        int size = input.readInt();
+        taskProfiles = new HashMap<TaskAttemptId, TaskProfile>();
+        for (int i = 0; i < size; i++) {
+            TaskAttemptId key = TaskAttemptId.create(input);
+            TaskProfile value = TaskProfile.create(input);
+            taskProfiles.put(key, value);
+        }
+    }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        super.writeFields(output);
+        output.writeUTF(nodeId);
+        output.writeInt(taskProfiles.size());
+        for (Entry<TaskAttemptId, TaskProfile> entry : taskProfiles.entrySet()) {
+            entry.getKey().writeFields(output);
+            entry.getValue().writeFields(output);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
index a9cc979..3fc456d 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
@@ -14,21 +14,35 @@
  */
 package edu.uci.ics.hyracks.control.common.job.profiling.om;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.io.IWritable;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
 
-public class PartitionProfile implements Serializable {
+public class PartitionProfile implements IWritable, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final PartitionId pid;
+    private PartitionId pid;
 
-    private final long openTime;
+    private long openTime;
 
-    private final long closeTime;
+    private long closeTime;
 
-    private final MultiResolutionEventProfiler mrep;
+    private MultiResolutionEventProfiler mrep;
+
+    public static PartitionProfile create(DataInput dis) throws IOException {
+        PartitionProfile partitionProfile = new PartitionProfile();
+        partitionProfile.readFields(dis);
+        return partitionProfile;
+    }
+
+    private PartitionProfile() {
+
+    }
 
     public PartitionProfile(PartitionId pid, long openTime, long closeTime, MultiResolutionEventProfiler mrep) {
         this.pid = pid;
@@ -52,4 +66,20 @@
     public MultiResolutionEventProfiler getSamples() {
         return mrep;
     }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeLong(closeTime);
+        output.writeLong(openTime);
+        mrep.writeFields(output);
+        pid.writeFields(output);
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        closeTime = input.readLong();
+        openTime = input.readLong();
+        mrep = MultiResolutionEventProfiler.create(input);
+        pid = PartitionId.create(input);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
index 6918af4..8774a50 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -14,8 +14,12 @@
  */
 package edu.uci.ics.hyracks.control.common.job.profiling.om;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.json.JSONArray;
 import org.json.JSONException;
@@ -28,9 +32,19 @@
 public class TaskProfile extends AbstractProfile {
     private static final long serialVersionUID = 1L;
 
-    private final TaskAttemptId taskAttemptId;
+    private TaskAttemptId taskAttemptId;
 
-    private final Map<PartitionId, PartitionProfile> partitionSendProfile;
+    private Map<PartitionId, PartitionProfile> partitionSendProfile;
+
+    public static TaskProfile create(DataInput dis) throws IOException {
+        TaskProfile taskProfile = new TaskProfile();
+        taskProfile.readFields(dis);
+        return taskProfile;
+    }
+
+    private TaskProfile() {
+
+    }
 
     public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile) {
         this.taskAttemptId = taskAttemptId;
@@ -84,4 +98,28 @@
 
         return json;
     }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        super.readFields(input);
+        taskAttemptId = TaskAttemptId.create(input);
+        int size = input.readInt();
+        partitionSendProfile = new HashMap<PartitionId, PartitionProfile>();
+        for (int i = 0; i < size; i++) {
+            PartitionId key = PartitionId.create(input);
+            PartitionProfile value = PartitionProfile.create(input);
+            partitionSendProfile.put(key, value);
+        }
+    }
+
+    @Override
+    public void writeFields(DataOutput output) throws IOException {
+        super.writeFields(output);
+        taskAttemptId.writeFields(output);
+        output.writeInt(partitionSendProfile.size());
+        for (Entry<PartitionId, PartitionProfile> entry : partitionSendProfile.entrySet()) {
+            entry.getKey().writeFields(output);
+            entry.getValue().writeFields(output);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 56b6654..89c5b75 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -267,6 +267,10 @@
         return globalJobData;
     }
 
+    public IJobletEventListener getJobletEventListener() {
+        return jobletEventListener;
+    }
+
     public synchronized void advertisePartitionRequest(TaskAttemptId taId, Collection<PartitionId> pids,
             IPartitionCollector collector, PartitionState minState) throws Exception {
         for (PartitionId pid : pids) {
@@ -283,10 +287,6 @@
         }
     }
 
-    public IJobletEventListener getJobletEventListener() {
-        return jobletEventListener;
-    }
-
     public void cleanup(JobStatus status) {
         cleanupStatus = status;
         cleanupPending = true;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index f0b570e..5eec7bb 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -168,7 +168,8 @@
             throw new Exception("id not set");
         }
         partitionManager = new PartitionManager(this);
-        netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
+        netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads,
+                ncConfig.nNetBuffers);
 
         lccm = new LifeCycleComponentManager();
         queue = new WorkQueue();
@@ -243,7 +244,7 @@
         datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
                 ncConfig.resultTTL, ncConfig.resultSweepThreshold);
         datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
-                datasetPartitionManager, ncConfig.nNetThreads);
+                datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers);
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index fa9b6b3..3014024 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -28,6 +28,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
 import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
@@ -89,7 +90,10 @@
 
     private NodeControllerService ncs;
 
-    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor, NodeControllerService ncs) {
+    private List<List<PartitionChannel>> inputChannelsFromConnectors;
+
+    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor, NodeControllerService ncs,
+            List<List<PartitionChannel>> inputChannelsFromConnectors) {
         this.joblet = joblet;
         this.taskAttemptId = taskId;
         this.displayName = displayName;
@@ -102,6 +106,7 @@
         pendingThreads = new LinkedHashSet<Thread>();
         exceptions = new ArrayList<>();
         this.ncs = ncs;
+        this.inputChannelsFromConnectors = inputChannelsFromConnectors;
     }
 
     public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -113,7 +118,7 @@
     public ByteBuffer allocateFrame() throws HyracksDataException {
         return joblet.allocateFrame();
     }
-    
+
     @Override
     public void deallocateFrames(int frameCount) {
         joblet.deallocateFrames(frameCount);
@@ -242,7 +247,7 @@
                         final int cIdx = i;
                         executor.execute(new Runnable() {
                             @Override
-							public void run() {
+                            public void run() {
                                 if (aborted) {
                                     return;
                                 }
@@ -252,7 +257,7 @@
                                 thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
                                 thread.setPriority(Thread.MIN_PRIORITY);
                                 try {
-                                    pushFrames(collector, writer);
+                                    pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
                                 } catch (HyracksDataException e) {
                                     synchronized (Task.this) {
                                         exceptions.add(e);
@@ -266,7 +271,7 @@
                         });
                     }
                     try {
-                        pushFrames(collectors[0], operator.getInputFrameWriter(0));
+                        pushFrames(collectors[0], inputChannelsFromConnectors.get(0), operator.getInputFrameWriter(0));
                     } finally {
                         sem.acquire(collectors.length - 1);
                     }
@@ -293,15 +298,20 @@
         }
     }
 
-    private void pushFrames(IPartitionCollector collector, IFrameWriter writer) throws HyracksDataException {
+    private void pushFrames(IPartitionCollector collector, List<PartitionChannel> inputChannels, IFrameWriter writer)
+            throws HyracksDataException {
         if (aborted) {
             return;
         }
         try {
             collector.open();
             try {
-                joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector,
-                        PartitionState.STARTED);
+                if (inputChannels.size() <= 0) {
+                    joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector,
+                            PartitionState.STARTED);
+                } else {
+                    collector.addPartitions(inputChannels);
+                }
                 IFrameReader reader = collector.getReader();
                 reader.open();
                 try {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java
index 1e8baa1..804f61d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/profiling/IOCounterLinux.java
@@ -16,17 +16,18 @@
 package edu.uci.ics.hyracks.control.nc.io.profiling;
 
 import java.io.BufferedReader;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.StringTokenizer;
 
 public class IOCounterLinux implements IIOCounter {
     public static final String COMMAND = "iostat";
-    public static final String COMMAND2 = "cat /proc/self/io";
+    public static final String STATFILE = "/proc/self/io";
     public static final int PAGE_SIZE = 4096;
 
-    private final long baseReads;
-    private final long baseWrites;
+    private long baseReads = 0;
+    private long baseWrites = 0;
 
     public IOCounterLinux() {
         baseReads = getReads();
@@ -36,12 +37,12 @@
     @Override
     public long getReads() {
         try {
-            long reads = extractColumn(4);
-            return reads - baseReads;
+            long reads = extractRow(4);
+            return reads;
         } catch (IOException e) {
             try {
-                long reads = extractRow(4);
-                return reads / PAGE_SIZE;
+                long reads = extractColumn(4) * PAGE_SIZE;
+                return reads - baseReads;
             } catch (IOException e2) {
                 return 0;
             }
@@ -51,13 +52,13 @@
     @Override
     public long getWrites() {
         try {
-            long writes = extractColumn(5);
-            return writes - baseWrites;
+            long writes = extractRow(5);
+            long cancelledWrites = extractRow(6);
+            return (writes - cancelledWrites);
         } catch (IOException e) {
             try {
-                long writes = extractRow(5);
-                long cancelledWrites = extractRow(6);
-                return (writes - cancelledWrites) / PAGE_SIZE;
+                long writes = extractColumn(5) * PAGE_SIZE;
+                return writes - baseWrites;
             } catch (IOException e2) {
                 return 0;
             }
@@ -92,7 +93,7 @@
     }
 
     private long extractRow(int rowIndex) throws IOException {
-        BufferedReader reader = exec(COMMAND2);
+        BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(STATFILE)));
         String line = null;
         long ios = 0;
         int i = 0;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
index 130c967..348a37c5 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -48,11 +48,14 @@
 
     private final MuxDemux md;
 
+    private final int nBuffers;
+
     private NetworkAddress networkAddress;
 
-    public DatasetNetworkManager(InetAddress inetAddress, IDatasetPartitionManager partitionManager, int nThreads)
-            throws IOException {
+    public DatasetNetworkManager(InetAddress inetAddress, IDatasetPartitionManager partitionManager, int nThreads,
+            int nBuffers) throws IOException {
         this.partitionManager = partitionManager;
+        this.nBuffers = nBuffers;
         md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
                 MAX_CONNECTION_ATTEMPTS);
     }
@@ -102,7 +105,7 @@
                 LOGGER.fine("Received initial dataset partition read request for JobId: " + jobId + " partition: "
                         + partition + " on channel: " + ccb);
             }
-            noc = new NetworkOutputChannel(ccb, 1);
+            noc = new NetworkOutputChannel(ccb, nBuffers);
             try {
                 partitionManager.initializeDatasetPartitionReader(jobId, rsId, partition, noc);
             } catch (HyracksException e) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index 4d9cd22..8791aa1 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -47,12 +47,16 @@
 
     private final PartitionManager partitionManager;
 
+    private final int nBuffers;
+
     private final MuxDemux md;
 
     private NetworkAddress networkAddress;
 
-    public NetworkManager(InetAddress inetAddress, PartitionManager partitionManager, int nThreads) throws IOException {
+    public NetworkManager(InetAddress inetAddress, PartitionManager partitionManager, int nThreads, int nBuffers)
+            throws IOException {
         this.partitionManager = partitionManager;
+        this.nBuffers = nBuffers;
         md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
                 MAX_CONNECTION_ATTEMPTS);
     }
@@ -99,10 +103,11 @@
             if (LOGGER.isLoggable(Level.FINE)) {
                 LOGGER.fine("Received initial partition request: " + pid + " on channel: " + ccb);
             }
-            noc = new NetworkOutputChannel(ccb, 1);
+            noc = new NetworkOutputChannel(ccb, nBuffers);
             try {
                 partitionManager.registerPartitionRequest(pid, noc);
             } catch (HyracksException e) {
+                e.printStackTrace();
                 noc.abort();
             }
         }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
index 433c45a..6c4570d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -50,7 +50,9 @@
 
     @Override
     public void deallocate() {
-        partitionFile.delete();
+        if (partitionFile != null) {
+            partitionFile.delete();
+        }
     }
 
     @Override
@@ -59,6 +61,11 @@
             @Override
             public void run() {
                 try {
+                    if (partitionFile == null) {
+                        writer.open();
+                        writer.close();
+                        return;
+                    }
                     IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
                             IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
                     try {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index 0e9005a..b94d714 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -65,15 +65,17 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("open(" + pid + " by " + taId);
         }
-        fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
-        handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
-                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-        size = 0;
         failed = false;
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (handle == null) {
+            fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+            handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+            size = 0;
+        }
         size += ctx.getIOManager().syncWrite(handle, size, buffer);
     }
 
@@ -87,11 +89,14 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("close(" + pid + " by " + taId);
         }
-        ctx.getIOManager().close(handle);
+        if (handle != null) {
+            ctx.getIOManager().close(handle);
+        }
         if (!failed) {
             manager.registerPartition(pid, taId,
                     new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()),
-                    PartitionState.COMMITTED);
+                    PartitionState.COMMITTED, taId.getAttempt() == 0 ? false : true);
+
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 0e63485..6393979 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -73,7 +73,9 @@
 
     @Override
     public void deallocate() {
-        fRef.delete();
+        if (fRef != null) {
+            fRef.delete();
+        }
     }
 
     @Override
@@ -82,47 +84,56 @@
             @Override
             public void run() {
                 try {
-                    IFileHandle fh = ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
-                            IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    synchronized (MaterializingPipelinedPartition.this) {
+                        while (fRef == null && eos == false) {
+                            MaterializingPipelinedPartition.this.wait();
+                        }
+                    }
+                    IFileHandle fh = fRef == null ? null : ioManager.open(fRef,
+                            IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
                     try {
                         writer.open();
                         try {
-                            long offset = 0;
-                            ByteBuffer buffer = ctx.allocateFrame();
-                            boolean fail = false;
-                            boolean done = false;
-                            while (!fail && !done) {
-                                synchronized (MaterializingPipelinedPartition.this) {
-                                    while (offset >= size && !eos && !failed) {
-                                        try {
-                                            MaterializingPipelinedPartition.this.wait();
-                                        } catch (InterruptedException e) {
-                                            throw new HyracksDataException(e);
+                            if (fh != null) {
+                                long offset = 0;
+                                ByteBuffer buffer = ctx.allocateFrame();
+                                boolean fail = false;
+                                boolean done = false;
+                                while (!fail && !done) {
+                                    synchronized (MaterializingPipelinedPartition.this) {
+                                        while (offset >= size && !eos && !failed) {
+                                            try {
+                                                MaterializingPipelinedPartition.this.wait();
+                                            } catch (InterruptedException e) {
+                                                throw new HyracksDataException(e);
+                                            }
                                         }
+                                        fail = failed;
+                                        done = eos && offset >= size;
                                     }
-                                    fail = failed;
-                                    done = eos && offset >= size;
-                                }
-                                if (fail) {
-                                    writer.fail();
-                                } else if (!done) {
-                                    buffer.clear();
-                                    long readLen = ioManager.syncRead(fh, offset, buffer);
-                                    if (readLen < buffer.capacity()) {
-                                        throw new HyracksDataException("Premature end of file");
+                                    if (fail) {
+                                        writer.fail();
+                                    } else if (!done) {
+                                        buffer.clear();
+                                        long readLen = ioManager.syncRead(fh, offset, buffer);
+                                        if (readLen < buffer.capacity()) {
+                                            throw new HyracksDataException("Premature end of file");
+                                        }
+                                        offset += readLen;
+                                        buffer.flip();
+                                        writer.nextFrame(buffer);
                                     }
-                                    offset += readLen;
-                                    buffer.flip();
-                                    writer.nextFrame(buffer);
                                 }
                             }
                         } finally {
                             writer.close();
                         }
                     } finally {
-                        ioManager.close(fh);
+                        if (fh != null) {
+                            ioManager.close(fh);
+                        }
                     }
-                } catch (HyracksDataException e) {
+                } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
             }
@@ -139,17 +150,23 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("open(" + pid + " by " + taId);
         }
-        fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString().replace(":", "$"));
-        handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
-                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
         size = 0;
         eos = false;
         failed = false;
-        manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+        manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
+    }
+
+    private void checkOrCreateFile() throws HyracksDataException {
+        if (fRef == null) {
+            fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString().replace(":", "$"));
+            handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        }
     }
 
     @Override
     public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        checkOrCreateFile();
         size += ctx.getIOManager().syncWrite(handle, size, buffer);
         notifyAll();
     }
@@ -165,16 +182,13 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("close(" + pid + " by " + taId);
         }
-        boolean commit = false;
         synchronized (this) {
             eos = true;
-            ctx.getIOManager().close(handle);
+            if (handle != null) {
+                ctx.getIOManager().close(handle);
+            }
             handle = null;
-            commit = !failed;
             notifyAll();
         }
-        if (commit) {
-            manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
-        }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index ea966c7..b209cc1 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -37,51 +37,68 @@
 import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
 public class PartitionManager {
+
     private final NodeControllerService ncs;
 
-    private final Map<PartitionId, List<IPartition>> partitionMap;
+    private final Map<PartitionId, List<IPartition>> availablePartitionMap;
 
     private final DefaultDeallocatableRegistry deallocatableRegistry;
 
     private final IWorkspaceFileFactory fileFactory;
 
+    private final Map<PartitionId, NetworkOutputChannel> partitionRequests = new HashMap<PartitionId, NetworkOutputChannel>();
+
     public PartitionManager(NodeControllerService ncs) {
         this.ncs = ncs;
-        partitionMap = new HashMap<PartitionId, List<IPartition>>();
-        deallocatableRegistry = new DefaultDeallocatableRegistry();
-        fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
+        this.availablePartitionMap = new HashMap<PartitionId, List<IPartition>>();
+        this.deallocatableRegistry = new DefaultDeallocatableRegistry();
+        this.fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext()
+                .getIOManager());
     }
 
-    public void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
-            throws HyracksDataException {
-        synchronized (this) {
-            List<IPartition> pList = partitionMap.get(pid);
+    public synchronized void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition,
+            PartitionState state, boolean updateToCC) throws HyracksDataException {
+        try {
+            /**
+             * process pending requests
+             */
+            NetworkOutputChannel writer = partitionRequests.remove(pid);
+            if (writer != null) {
+                writer.setFrameSize(partition.getTaskContext().getFrameSize());
+                partition.writeTo(writer);
+                if (!partition.isReusable()) {
+                    return;
+                }
+            }
+
+            /**
+             * put a coming available partition into the available partition map
+             */
+            List<IPartition> pList = availablePartitionMap.get(pid);
             if (pList == null) {
                 pList = new ArrayList<IPartition>();
-                partitionMap.put(pid, pList);
+                availablePartitionMap.put(pid, pList);
             }
             pList.add(partition);
-        }
-        updatePartitionState(pid, taId, partition, state);
-    }
 
-    public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
-            throws HyracksDataException {
-        PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
-        desc.setState(state);
-        try {
-            ncs.getClusterController().registerPartitionProvider(desc);
+            /**
+             * update to CC only when necessary
+             */
+            if (updateToCC) {
+                updatePartitionState(pid, taId, partition, state);
+            }
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
     }
 
     public synchronized IPartition getPartition(PartitionId pid) {
-        return partitionMap.get(pid).get(0);
+        return availablePartitionMap.get(pid).get(0);
     }
 
     public synchronized void unregisterPartitions(JobId jobId, Collection<IPartition> unregisteredPartitions) {
-        for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = partitionMap.entrySet().iterator(); i.hasNext();) {
+        for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = availablePartitionMap.entrySet().iterator(); i
+                .hasNext();) {
             Map.Entry<PartitionId, List<IPartition>> e = i.next();
             PartitionId pid = e.getKey();
             if (jobId.equals(pid.getJobId())) {
@@ -95,16 +112,21 @@
 
     public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer)
             throws HyracksException {
-        List<IPartition> pList = partitionMap.get(partitionId);
-        if (pList != null && !pList.isEmpty()) {
-            IPartition partition = pList.get(0);
-            writer.setFrameSize(partition.getTaskContext().getFrameSize());
-            partition.writeTo(writer);
-            if (!partition.isReusable()) {
-                partitionMap.remove(partitionId);
+        try {
+            List<IPartition> pList = availablePartitionMap.get(partitionId);
+            if (pList != null && !pList.isEmpty()) {
+                IPartition partition = pList.get(0);
+                writer.setFrameSize(partition.getTaskContext().getFrameSize());
+                partition.writeTo(writer);
+                if (!partition.isReusable()) {
+                    availablePartitionMap.remove(partitionId);
+                }
+            } else {
+                //throw new HyracksException("Request for unknown partition " + partitionId);
+                partitionRequests.put(partitionId, writer);
             }
-        } else {
-            throw new HyracksException("Request for unknown partition " + partitionId);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
         }
     }
 
@@ -115,4 +137,15 @@
     public void close() {
         deallocatableRegistry.close();
     }
+
+    public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+            throws HyracksDataException {
+        PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
+        desc.setState(state);
+        try {
+            ncs.getClusterController().registerPartitionProvider(desc);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index 76345bc..c1abdc3 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -69,7 +69,7 @@
 
     @Override
     public void open() throws HyracksDataException {
-        manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+        manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
         failed = false;
         pendingConnection = true;
     }
@@ -108,7 +108,6 @@
     public void close() throws HyracksDataException {
         if (!failed) {
             ensureConnected();
-            manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
             delegate.close();
         }
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index b9ee504..1be5fc6 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -14,6 +14,9 @@
  */
 package edu.uci.ics.hyracks.control.nc.work;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -25,6 +28,8 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
 import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -43,6 +48,7 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.comm.channels.NetworkInputChannel;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentUtils;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
@@ -89,8 +95,7 @@
     public void run() {
         try {
             NCApplicationContext appCtx = ncs.getApplicationContext();
-            final Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes == null ? null
-                    : (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx));
+            final Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes);
             final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
 
             IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
@@ -119,12 +124,13 @@
                     LOGGER.info("Initializing " + taId + " -> " + han);
                 }
                 final int partition = tid.getPartition();
-                Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs);
+                List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
+                Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
+                        createInputChannels(td, inputs));
                 IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
 
                 List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
 
-                List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
                 if (inputs != null) {
                     for (int i = 0; i < inputs.size(); ++i) {
                         IConnectorDescriptor conn = inputs.get(i);
@@ -169,13 +175,15 @@
     }
 
     private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId jobId, INCApplicationContext appCtx,
-            ActivityClusterGraph acg) throws Exception {
+            byte[] acgBytes) throws Exception {
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
-            if (acg == null) {
+            if (acgBytes == null) {
                 throw new NullPointerException("JobActivityGraph was null");
             }
+            ActivityClusterGraph acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId,
+                    appCtx);
             ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
             jobletMap.put(jobId, ji);
         }
@@ -231,4 +239,38 @@
         }
         return factory;
     }
+
+    /**
+     * Create a list of known channels for each input connector
+     * 
+     * @param td
+     *            the task attempt id
+     * @param inputs
+     *            the input connector descriptors
+     * @return a list of known channels, one for each connector
+     * @throws UnknownHostException
+     */
+    private List<List<PartitionChannel>> createInputChannels(TaskAttemptDescriptor td, List<IConnectorDescriptor> inputs)
+            throws UnknownHostException {
+        NetworkAddress[][] inputAddresses = td.getInputPartitionLocations();
+        List<List<PartitionChannel>> channelsForInputConnectors = new ArrayList<List<PartitionChannel>>();
+        if (inputAddresses != null) {
+            for (int i = 0; i < inputAddresses.length; i++) {
+                List<PartitionChannel> channels = new ArrayList<PartitionChannel>();
+                if (inputAddresses[i] != null) {
+                    for (int j = 0; j < inputAddresses[i].length; j++) {
+                        NetworkAddress networkAddress = inputAddresses[i][j];
+                        PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, td
+                                .getTaskAttemptId().getTaskId().getPartition());
+                        PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(
+                                ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
+                                        .getIpAddress()), networkAddress.getPort()), pid, 5));
+                        channels.add(channel);
+                    }
+                }
+                channelsForInputConnectors.add(channels);
+            }
+        }
+        return channelsForInputConnectors;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
index 1b26c00..f894e38 100644
--- a/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
+++ b/hyracks/hyracks-data/hyracks-data-std/src/main/java/edu/uci/ics/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
@@ -15,11 +15,9 @@
 package edu.uci.ics.hyracks.data.std.util;
 
 import java.io.ByteArrayOutputStream;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.Arrays;
 
 public class ByteArrayAccessibleOutputStream extends ByteArrayOutputStream {
-    private static final Logger LOGGER = Logger.getLogger(ByteArrayAccessibleOutputStream.class.getName());
 
     public ByteArrayAccessibleOutputStream() {
         super();
@@ -34,17 +32,45 @@
     }
 
     public void write(int b) {
-        if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("write(byte) value: " + b);
-        }
-        super.write(b);
+        ensureCapacity(count + 1);
+        buf[count] = (byte) b;
+        count += 1;
     }
 
     @Override
-    public void write(byte[] bytes, int offset, int length) {
-        if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("write(byte[], int, int) offset: " + offset + " length" + length);
+    public void write(byte[] b, int off, int len) {
+        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) {
+            throw new IndexOutOfBoundsException();
         }
-        super.write(bytes, offset, length);
+        ensureCapacity(count + len);
+        System.arraycopy(b, off, buf, count, len);
+        count += len;
+    }
+
+    private void ensureCapacity(int minCapacity) {
+        // overflow-conscious code
+        if (minCapacity - buf.length > 0)
+            grow(minCapacity);
+    }
+
+    /**
+     * Increases the capacity to ensure that it can hold at least the
+     * number of elements specified by the minimum capacity argument.
+     * 
+     * @param minCapacity
+     *            the desired minimum capacity
+     */
+    private void grow(int minCapacity) {
+        // overflow-conscious code
+        int oldCapacity = buf.length;
+        int newCapacity = oldCapacity << 1;
+        if (newCapacity - minCapacity < 0)
+            newCapacity = minCapacity;
+        if (newCapacity < 0) {
+            if (minCapacity < 0) // overflow
+                throw new OutOfMemoryError();
+            newCapacity = Integer.MAX_VALUE;
+        }
+        buf = Arrays.copyOf(buf, newCapacity);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/ByteBufferInputStream.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/ByteBufferInputStream.java
index 9cdd692..eee4758 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/ByteBufferInputStream.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/ByteBufferInputStream.java
@@ -16,11 +16,8 @@
 
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 public class ByteBufferInputStream extends InputStream {
-    private static final Logger LOGGER = Logger.getLogger(ByteBufferInputStream.class.getName());
 
     private ByteBuffer buffer;
 
@@ -37,20 +34,13 @@
     @Override
     public int read() {
         int remaining = buffer.capacity() - position;
-        int value = remaining > 0 ? (buffer.get(position++) & 0xff) : -1;
-        if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("read(): value: " + value + " remaining: " + remaining + " position: " + position);
-        }
+        int value = remaining > 0 ? (buffer.array()[position++] & 0xff) : -1;
         return value;
     }
 
     @Override
     public int read(byte[] bytes, int offset, int length) {
         int remaining = buffer.capacity() - position;
-        if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("read(bytes[], int, int): remaining: " + remaining + " offset: " + offset + " length: "
-                    + length + " position: " + position);
-        }
         if (remaining == 0) {
             return -1;
         }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index a1d24e7..ea586fc 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -32,6 +32,8 @@
     private final FrameTupleAppender[] appenders;
     private final FrameTupleAccessor tupleAccessor;
     private final ITuplePartitionComputer tpc;
+    private final IHyracksTaskContext ctx;
+    private boolean allocated = false;
 
     public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory pwFactory,
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
@@ -42,20 +44,22 @@
             try {
                 pWriters[i] = pwFactory.createFrameWriter(i);
                 appenders[i] = new FrameTupleAppender(ctx.getFrameSize());
-                appenders[i].reset(ctx.allocateFrame(), true);
             } catch (IOException e) {
                 throw new HyracksDataException(e);
             }
         }
         tupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
         this.tpc = tpc;
+        this.ctx = ctx;
     }
 
     @Override
     public void close() throws HyracksDataException {
         for (int i = 0; i < pWriters.length; ++i) {
-            if (appenders[i].getTupleCount() > 0) {
-                flushFrame(appenders[i].getBuffer(), pWriters[i]);
+            if (allocated) {
+                if (appenders[i].getTupleCount() > 0) {
+                    flushFrame(appenders[i].getBuffer(), pWriters[i]);
+                }
             }
             pWriters[i].close();
         }
@@ -71,12 +75,15 @@
     public void open() throws HyracksDataException {
         for (int i = 0; i < pWriters.length; ++i) {
             pWriters[i].open();
-            appenders[i].reset(appenders[i].getBuffer(), true);
         }
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (!allocated) {
+            allocateFrames();
+            allocated = true;
+        }
         tupleAccessor.reset(buffer);
         int tupleCount = tupleAccessor.getTupleCount();
         for (int i = 0; i < tupleCount; ++i) {
@@ -87,12 +94,23 @@
                 flushFrame(appenderBuffer, pWriters[h]);
                 appender.reset(appenderBuffer, true);
                 if (!appender.append(tupleAccessor, i)) {
-                    throw new HyracksDataException("Record size (" + (tupleAccessor.getTupleEndOffset(i) - tupleAccessor.getTupleStartOffset(i)) + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
+                    throw new HyracksDataException("Record size ("
+                            + (tupleAccessor.getTupleEndOffset(i) - tupleAccessor.getTupleStartOffset(i))
+                            + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
                 }
             }
         }
     }
 
+    /**
+     * @throws HyracksDataException
+     */
+    private void allocateFrames() throws HyracksDataException {
+        for (int i = 0; i < appenders.length; ++i) {
+            appenders[i].reset(ctx.allocateFrame(), true);
+        }
+    }
+
     @Override
     public void fail() throws HyracksDataException {
         for (int i = 0; i < appenders.length; ++i) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index ba9ff49..114463f 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -41,6 +41,7 @@
     private final ByteBuffer outFrame;
     private final FrameTupleAppender appender;
     private final ArrayTupleBuilder tupleBuilder;
+    private boolean outputPartial = false;
 
     private boolean first;
 
@@ -48,6 +49,13 @@
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
             IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
+            IFrameWriter writer, boolean outputPartial) throws HyracksDataException {
+        this(ctx, groupFields, comparators, aggregator, inRecordDesc, outRecordDesc, writer);
+        this.outputPartial = outputPartial;
+    }
+
+    public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
+            IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
             IFrameWriter writer) throws HyracksDataException {
         this.groupFields = groupFields;
         this.comparators = comparators;
@@ -121,10 +129,13 @@
         for (int j = 0; j < groupFields.length; j++) {
             tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
         }
-        boolean hasOutput = aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
+        boolean hasOutput = outputPartial ? aggregator.outputPartialResult(tupleBuilder, lastTupleAccessor,
+                lastTupleIndex, aggregateState) : aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor,
+                lastTupleIndex, aggregateState);
 
-        if (hasOutput && !appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
-                tupleBuilder.getSize())) {
+        if (hasOutput
+                && !appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+                        tupleBuilder.getSize())) {
             FrameUtils.flushFrame(outFrame, writer);
             appender.reset(outFrame, true);
             if (!appender.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
new file mode 100644
index 0000000..2a28dea
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunGenerator.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.sort;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
+import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorterMergeSort;
+import edu.uci.ics.hyracks.dataflow.std.sort.FrameSorterQuickSort;
+import edu.uci.ics.hyracks.dataflow.std.sort.IFrameSorter;
+
+/**
+ * Group-by aggregation is pushed before run file generation.
+ * 
+ * @author yingyib
+ */
+public class ExternalSortGroupByRunGenerator implements IFrameWriter {
+    private final IHyracksTaskContext ctx;
+    private final IFrameSorter frameSorter;
+    private final List<IFrameReader> runs;
+    private final int maxSortFrames;
+
+    private final int[] groupFields;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IAggregatorDescriptorFactory aggregatorFactory;
+    private final RecordDescriptor inRecordDesc;
+    private final RecordDescriptor outRecordDesc;
+
+    public ExternalSortGroupByRunGenerator(IHyracksTaskContext ctx, int[] sortFields, RecordDescriptor recordDesc,
+            int framesLimit, int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor outRecordDesc, Algorithm alg) throws HyracksDataException {
+        this.ctx = ctx;
+        if (alg == Algorithm.MERGE_SORT) {
+            frameSorter = new FrameSorterMergeSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+                    recordDesc);
+        } else {
+            frameSorter = new FrameSorterQuickSort(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories,
+                    recordDesc);
+        }
+        this.runs = new LinkedList<IFrameReader>();
+        this.maxSortFrames = framesLimit - 1;
+        this.groupFields = groupFields;
+        this.comparatorFactories = comparatorFactories;
+        this.aggregatorFactory = aggregatorFactory;
+        this.inRecordDesc = recordDesc;
+        this.outRecordDesc = outRecordDesc;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        runs.clear();
+        frameSorter.reset();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (frameSorter.getFrameCount() >= maxSortFrames) {
+            flushFramesToRun();
+        }
+        frameSorter.insertFrame(buffer);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (frameSorter.getFrameCount() > 0) {
+            if (runs.size() <= 0) {
+                frameSorter.sortFrames();
+            } else {
+                flushFramesToRun();
+            }
+        }
+    }
+
+    private void flushFramesToRun() throws HyracksDataException {
+        frameSorter.sortFrames();
+        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                ExternalSortGroupByRunGenerator.class.getSimpleName());
+        RunFileWriter writer = new RunFileWriter(file, ctx.getIOManager());
+
+        //create group-by comparators
+        IBinaryComparator[] comparators = new IBinaryComparator[Math
+                .min(groupFields.length, comparatorFactories.length)];
+        for (int i = 0; i < comparators.length; i++) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc,
+                groupFields, groupFields, writer);
+        PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregator,
+                this.inRecordDesc, this.outRecordDesc, writer, true);
+        pgw.open();
+
+        try {
+            frameSorter.flushFrames(pgw);
+        } finally {
+            pgw.close();
+        }
+        frameSorter.reset();
+        runs.add(writer.createReader());
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+    }
+
+    public IFrameSorter getFrameSorter() {
+        return frameSorter;
+    }
+
+    public List<IFrameReader> getRuns() {
+        return runs;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
new file mode 100644
index 0000000..1f9b358
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/ExternalSortGroupByRunMerger.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.sort;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+import edu.uci.ics.hyracks.dataflow.std.sort.IFrameSorter;
+import edu.uci.ics.hyracks.dataflow.std.sort.RunMergingFrameReader;
+
+/**
+ * Group-by aggregation is pushed into multi-pass merge of external sort.
+ * 
+ * @author yingyib
+ */
+public class ExternalSortGroupByRunMerger {
+
+    private final IHyracksTaskContext ctx;
+    private final List<IFrameReader> runs;
+    private final RecordDescriptor inputRecordDesc;
+    private final RecordDescriptor partialAggRecordDesc;
+    private final RecordDescriptor outRecordDesc;
+    private final int framesLimit;
+    private final IFrameWriter writer;
+    private List<ByteBuffer> inFrames;
+    private ByteBuffer outFrame;
+    private FrameTupleAppender outFrameAppender;
+
+    private IFrameSorter frameSorter; // Used in External sort, no replacement
+                                      // selection
+
+    private final int[] groupFields;
+    private final INormalizedKeyComputer firstKeyNkc;
+    private final IBinaryComparator[] comparators;
+    private final IAggregatorDescriptorFactory mergeAggregatorFactory;
+    private final IAggregatorDescriptorFactory partialAggregatorFactory;
+    private final boolean localSide;
+
+    private final int[] mergeSortFields;
+    private final int[] mergeGroupFields;
+    private IBinaryComparator[] groupByComparators;
+
+    // Constructor for external sort, no replacement selection
+    public ExternalSortGroupByRunMerger(IHyracksTaskContext ctx, IFrameSorter frameSorter, List<IFrameReader> runs,
+            int[] sortFields, RecordDescriptor inRecordDesc, RecordDescriptor partialAggRecordDesc,
+            RecordDescriptor outRecordDesc, int framesLimit, IFrameWriter writer, int[] groupFields,
+            INormalizedKeyComputer nmk, IBinaryComparator[] comparators,
+            IAggregatorDescriptorFactory partialAggregatorFactory, IAggregatorDescriptorFactory aggregatorFactory,
+            boolean localStage) {
+        this.ctx = ctx;
+        this.frameSorter = frameSorter;
+        this.runs = new LinkedList<IFrameReader>(runs);
+        this.inputRecordDesc = inRecordDesc;
+        this.partialAggRecordDesc = partialAggRecordDesc;
+        this.outRecordDesc = outRecordDesc;
+        this.framesLimit = framesLimit;
+        this.writer = writer;
+
+        this.groupFields = groupFields;
+        this.firstKeyNkc = nmk;
+        this.comparators = comparators;
+        this.mergeAggregatorFactory = aggregatorFactory;
+        this.partialAggregatorFactory = partialAggregatorFactory;
+        this.localSide = localStage;
+
+        //create merge sort fields
+        int numSortFields = sortFields.length;
+        mergeSortFields = new int[numSortFields];
+        for (int i = 0; i < numSortFields; i++) {
+            mergeSortFields[i] = i;
+        }
+
+        //create merge group fields
+        int numGroupFields = groupFields.length;
+        mergeGroupFields = new int[numGroupFields];
+        for (int i = 0; i < numGroupFields; i++) {
+            mergeGroupFields[i] = i;
+        }
+
+        //setup comparators for grouping
+        groupByComparators = new IBinaryComparator[Math.min(mergeGroupFields.length, comparators.length)];
+        for (int i = 0; i < groupByComparators.length; i++) {
+            groupByComparators[i] = comparators[i];
+        }
+    }
+
+    public void process() throws HyracksDataException {
+        IAggregatorDescriptorFactory aggregatorFactory = localSide ? partialAggregatorFactory : mergeAggregatorFactory;
+        IAggregatorDescriptor aggregator = aggregatorFactory.createAggregator(ctx, partialAggRecordDesc, outRecordDesc,
+                groupFields, groupFields, writer);
+        PreclusteredGroupWriter pgw = new PreclusteredGroupWriter(ctx, groupFields, groupByComparators, aggregator,
+                inputRecordDesc, outRecordDesc, writer, false);
+        try {
+            if (runs.size() <= 0) {
+                pgw.open();
+                if (frameSorter != null && frameSorter.getFrameCount() > 0) {
+                    frameSorter.flushFrames(pgw);
+                }
+                /** recycle sort buffer */
+                frameSorter.close();
+            } else {
+                /** recycle sort buffer */
+                frameSorter.close();
+
+                inFrames = new ArrayList<ByteBuffer>();
+                outFrame = ctx.allocateFrame();
+                outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+                outFrameAppender.reset(outFrame, true);
+                for (int i = 0; i < framesLimit - 1; ++i) {
+                    inFrames.add(ctx.allocateFrame());
+                }
+                int maxMergeWidth = framesLimit - 1;
+                while (runs.size() > maxMergeWidth) {
+                    int generationSeparator = 0;
+                    while (generationSeparator < runs.size() && runs.size() > maxMergeWidth) {
+                        int mergeWidth = Math.min(Math.min(runs.size() - generationSeparator, maxMergeWidth),
+                                runs.size() - maxMergeWidth + 1);
+                        FileReference newRun = ctx.createManagedWorkspaceFile(ExternalSortGroupByRunMerger.class
+                                .getSimpleName());
+                        IFrameWriter mergeResultWriter = new RunFileWriter(newRun, ctx.getIOManager());
+
+                        aggregatorFactory = localSide ? mergeAggregatorFactory : partialAggregatorFactory;
+                        aggregator = aggregatorFactory.createAggregator(ctx, partialAggRecordDesc,
+                                partialAggRecordDesc, mergeGroupFields, mergeGroupFields, mergeResultWriter);
+                        pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregator,
+                                partialAggRecordDesc, partialAggRecordDesc, mergeResultWriter, true);
+                        pgw.open();
+
+                        IFrameReader[] runCursors = new RunFileReader[mergeWidth];
+                        for (int i = 0; i < mergeWidth; i++) {
+                            runCursors[i] = runs.get(generationSeparator + i);
+                        }
+                        merge(pgw, runCursors);
+                        pgw.close();
+                        runs.subList(generationSeparator, mergeWidth + generationSeparator).clear();
+                        runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
+                    }
+                }
+                if (!runs.isEmpty()) {
+                    aggregator = mergeAggregatorFactory.createAggregator(ctx, partialAggRecordDesc, outRecordDesc,
+                            mergeGroupFields, mergeGroupFields, writer);
+                    pgw = new PreclusteredGroupWriter(ctx, mergeGroupFields, groupByComparators, aggregator,
+                            partialAggRecordDesc, outRecordDesc, writer, false);
+                    pgw.open();
+                    IFrameReader[] runCursors = new RunFileReader[runs.size()];
+                    for (int i = 0; i < runCursors.length; i++) {
+                        runCursors[i] = runs.get(i);
+                    }
+                    merge(pgw, runCursors);
+                }
+            }
+        } catch (Exception e) {
+            pgw.fail();
+        } finally {
+            pgw.close();
+        }
+    }
+
+    private void merge(IFrameWriter mergeResultWriter, IFrameReader[] runCursors) throws HyracksDataException {
+        RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, mergeSortFields,
+                comparators, firstKeyNkc, partialAggRecordDesc);
+        merger.open();
+        try {
+            while (merger.nextFrame(outFrame)) {
+                FrameUtils.flushFrame(outFrame, mergeResultWriter);
+            }
+        } finally {
+            merger.close();
+        }
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
new file mode 100644
index 0000000..cee105b
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/sort/SortGroupByOperatorDescriptor.java
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.group.sort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
+import edu.uci.ics.hyracks.dataflow.std.sort.IFrameSorter;
+
+/**
+ * This Operator pushes group-by aggregation into the external sort.
+ * After the in-memory sort, it aggregates the sorted data before writing it to a run file.
+ * During the merge phase, it does an aggregation over sorted results.
+ * 
+ * @author yingyib
+ */
+public class SortGroupByOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private static final int SORT_ACTIVITY_ID = 0;
+    private static final int MERGE_ACTIVITY_ID = 1;
+
+    private final int framesLimit;
+    private final int[] sortFields;
+    private final int[] groupFields;
+    private final INormalizedKeyComputerFactory firstKeyNormalizerFactory;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IAggregatorDescriptorFactory mergeAggregatorFactory;
+    private final IAggregatorDescriptorFactory partialAggregatorFactory;
+    private final RecordDescriptor partialAggRecordDesc;
+    private final RecordDescriptor outputRecordDesc;
+    private final boolean finalStage;
+    private Algorithm alg = Algorithm.MERGE_SORT;
+
+    /***
+     * @param spec
+     *            , the Hyracks job specification
+     * @param framesLimit
+     *            , the frame limit for this operator
+     * @param sortFields
+     *            , the fields to sort
+     * @param groupFields
+     *            , the fields to group, which can be a prefix subset of sortFields
+     * @param firstKeyNormalizerFactory
+     *            , the normalized key computer factory of the first key
+     * @param comparatorFactories
+     *            , the comparator factories of sort keys
+     * @param partialAggregatorFactory
+     *            , for aggregating the input of this operator
+     * @param mergeAggregatorFactory
+     *            , for aggregating the intermediate data of this operator
+     * @param partialAggRecordDesc
+     *            , the record descriptor of intermediate data
+     * @param outRecordDesc
+     *            , the record descriptor of output data
+     * @param finalStage
+     *            , whether the operator is used for final stage aggregation
+     */
+    public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+            int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory partialAggregatorFactory,
+            IAggregatorDescriptorFactory mergeAggregatorFactory, RecordDescriptor partialAggRecordDesc,
+            RecordDescriptor outRecordDesc, boolean finalStage) {
+        super(spec, 1, 1);
+        this.framesLimit = framesLimit;
+        this.sortFields = sortFields;
+        if (framesLimit <= 1) {
+            throw new IllegalStateException();// minimum of 2 fames (1 in,1 out)
+        }
+        this.recordDescriptors[0] = outRecordDesc;
+
+        this.groupFields = groupFields;
+        this.firstKeyNormalizerFactory = firstKeyNormalizerFactory;
+        this.comparatorFactories = comparatorFactories;
+        this.mergeAggregatorFactory = mergeAggregatorFactory;
+        this.partialAggregatorFactory = partialAggregatorFactory;
+        this.partialAggRecordDesc = partialAggRecordDesc;
+        this.outputRecordDesc = outRecordDesc;
+        this.finalStage = finalStage;
+    }
+
+    /***
+     * @param spec
+     *            , the Hyracks job specification
+     * @param framesLimit
+     *            , the frame limit for this operator
+     * @param sortFields
+     *            , the fields to sort
+     * @param groupFields
+     *            , the fields to group, which can be a prefix subset of sortFields
+     * @param firstKeyNormalizerFactory
+     *            , the normalized key computer factory of the first key
+     * @param comparatorFactories
+     *            , the comparator factories of sort keys
+     * @param partialAggregatorFactory
+     *            , for aggregating the input of this operator
+     * @param mergeAggregatorFactory
+     *            , for aggregating the intermediate data of this operator
+     * @param partialAggRecordDesc
+     *            , the record descriptor of intermediate data
+     * @param outRecordDesc
+     *            , the record descriptor of output data
+     * @param finalStage
+     *            , whether the operator is used for final stage aggregation
+     * @param alg
+     *            , the in-memory sort algorithm
+     */
+    public SortGroupByOperatorDescriptor(IOperatorDescriptorRegistry spec, int framesLimit, int[] sortFields,
+            int[] groupFields, INormalizedKeyComputerFactory firstKeyNormalizerFactory,
+            IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory partialAggregatorFactory,
+            IAggregatorDescriptorFactory mergeAggregatorFactory, RecordDescriptor partialAggRecordDesc,
+            RecordDescriptor outRecordDesc, boolean finalStage, Algorithm alg) {
+        this(spec, framesLimit, sortFields, groupFields, firstKeyNormalizerFactory, comparatorFactories,
+                partialAggregatorFactory, mergeAggregatorFactory, partialAggRecordDesc, outRecordDesc, finalStage);
+        this.alg = alg;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
+        MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
+
+        builder.addActivity(this, sa);
+        builder.addSourceEdge(0, sa, 0);
+
+        builder.addActivity(this, ma);
+        builder.addTargetEdge(0, ma, 0);
+
+        builder.addBlockingEdge(sa, ma);
+    }
+
+    public static class SortTaskState extends AbstractStateObject {
+        private List<IFrameReader> runs;
+        private IFrameSorter frameSorter;
+
+        public SortTaskState() {
+        }
+
+        private SortTaskState(JobId jobId, TaskId taskId) {
+            super(jobId, taskId);
+        }
+
+        @Override
+        public void toBytes(DataOutput out) throws IOException {
+
+        }
+
+        @Override
+        public void fromBytes(DataInput in) throws IOException {
+
+        }
+    }
+
+    private class SortActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public SortActivity(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+                private ExternalSortGroupByRunGenerator runGen;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    runGen = new ExternalSortGroupByRunGenerator(ctx, sortFields,
+                            recordDescProvider.getInputRecordDescriptor(SortActivity.this.getActivityId(), 0),
+                            framesLimit, groupFields, firstKeyNormalizerFactory, comparatorFactories,
+                            partialAggregatorFactory, partialAggRecordDesc, alg);
+                    runGen.open();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    runGen.nextFrame(buffer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    SortTaskState state = new SortTaskState(ctx.getJobletContext().getJobId(), new TaskId(
+                            getActivityId(), partition));
+                    runGen.close();
+                    state.runs = runGen.getRuns();
+                    state.frameSorter = runGen.getFrameSorter();
+                    ctx.setStateObject(state);
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    runGen.fail();
+                }
+            };
+            return op;
+        }
+    }
+
+    private class MergeActivity extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public MergeActivity(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
+                @Override
+                public void initialize() throws HyracksDataException {
+                    SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
+                            SORT_ACTIVITY_ID), partition));
+                    List<IFrameReader> runs = state.runs;
+                    IFrameSorter frameSorter = state.frameSorter;
+                    int necessaryFrames = Math.min(runs.size() + 2, framesLimit);
+
+                    IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+                    for (int i = 0; i < comparators.length; i++) {
+                        comparators[i] = comparatorFactories[i].createBinaryComparator();
+                    }
+                    INormalizedKeyComputer nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory
+                            .createNormalizedKeyComputer();
+
+                    ExternalSortGroupByRunMerger merger = new ExternalSortGroupByRunMerger(ctx, frameSorter, runs,
+                            sortFields, recordDescProvider.getInputRecordDescriptor(new ActivityId(odId,
+                                    SORT_ACTIVITY_ID), 0), partialAggRecordDesc, outputRecordDesc, necessaryFrames,
+                            writer, groupFields, nkc, comparators, partialAggregatorFactory, mergeAggregatorFactory,
+                            !finalStage);
+                    merger.process();
+                }
+            };
+            return op;
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index eaf4162..9178094 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -145,6 +145,7 @@
                             runCursors[i] = runs.get(generationSeparator + i);
                         }
                         merge(mergeResultWriter, runCursors);
+                        mergeResultWriter.close();
                         runs.subList(generationSeparator, mergeWidth + generationSeparator).clear();
                         runs.add(generationSeparator++, ((RunFileWriter) mergeResultWriter).createReader());
                     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index 00fbe9b..24c8cb9 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -149,14 +149,10 @@
         return new Comparator<ReferenceEntry>() {
             public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
                 int nmk1 = tp1.getNormalizedKey();
-                int nmk2 = tp1.getNormalizedKey();
-                if (nmk1 > nmk2) {
-                    return 1;
+                int nmk2 = tp2.getNormalizedKey();
+                if (nmk1 != nmk2) {
+                    return ((((long) nmk1) & 0xffffffffL) < (((long) nmk2) & 0xffffffffL)) ? -1 : 1;
                 }
-                if (nmk1 < nmk2) {
-                    return -1;
-                }
-
                 FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
                 FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
                 byte[] b1 = fta1.getBuffer().array();
@@ -171,7 +167,9 @@
                         return c;
                     }
                 }
-                return 0;
+                int runid1 = tp1.getRunid();
+                int runid2 = tp2.getRunid();
+                return runid1 < runid2 ? -1 : (runid1 == runid2 ? 0 : 1);
             }
         };
     }
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
index 9dd303d..e20211f 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
@@ -53,9 +53,9 @@
     public RuntimeContext(INCApplicationContext appCtx) throws HyracksDataException {
         fileMapManager = new TransientFileMapManager();
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
-        IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
-        bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
-                new DelayPageCleanerPolicy(1000), fileMapManager, 32768, 50, 100, threadFactory);
+        IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, 32768, 50);
+        bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), prs, new DelayPageCleanerPolicy(1000),
+                fileMapManager, 100, threadFactory);
         lcManager = new IndexLifecycleManager();
         ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
         localResourceRepository = localResourceRepositoryFactory.createRepository();
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index e32e409..0f76343 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -77,6 +77,7 @@
         Exception exception = null;
         if (message.getFlag() == Message.ERROR) {
             exception = (Exception) message.getPayload();
+            exception.printStackTrace();
         } else {
             payload = message.getPayload();
         }
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 2e95cde..6b9364d 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -108,12 +108,20 @@
 
         private ByteBuffer currentReadBuffer;
 
+        private IBufferFactory bufferFactory;
+
         ReadInterface() {
             riEmptyStack = new ArrayDeque<ByteBuffer>();
             credits = 0;
         }
 
         @Override
+        public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) {
+            this.bufferFactory = bufferFactory;
+            cSet.addPendingCredits(channelId, limit * frameSize);
+        }
+
+        @Override
         public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor) {
             fba = fullBufferAcceptor;
         }
@@ -130,6 +138,11 @@
                 }
                 if (currentReadBuffer == null) {
                     currentReadBuffer = riEmptyStack.poll();
+                    //if current buffer == null and limit not reached
+                    // factory.createBuffer factory
+                    if (currentReadBuffer == null) {
+                        currentReadBuffer = bufferFactory.createBuffer();
+                    }
                     assert currentReadBuffer != null;
                 }
                 int rSize = Math.min(size, currentReadBuffer.remaining());
@@ -171,6 +184,8 @@
 
         private boolean channelWritabilityState;
 
+        private IBufferFactory bufferFactory;
+
         private final ICloseableBufferAcceptor fba = new ICloseableBufferAcceptor() {
             @Override
             public void accept(ByteBuffer buffer) {
@@ -227,6 +242,22 @@
         }
 
         @Override
+        public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize) {
+            this.bufferFactory = bufferFactory;
+            if (!channelWritabilityState) {
+                cSet.markPendingWrite(channelId);
+            }
+            channelWritabilityState = true;
+            if (eos) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Received duplicate close() on channel: " + channelId);
+                }
+                return;
+            }
+            eos = true;
+        }
+
+        @Override
         public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor) {
             eba = emptyBufferAcceptor;
         }
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IBufferFactory.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IBufferFactory.java
new file mode 100644
index 0000000..5abba95
--- /dev/null
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IBufferFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2014 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.net.protocols.muxdemux;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author yingyib
+ */
+public interface IBufferFactory {
+
+    public ByteBuffer createBuffer();
+
+}
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
index 2e961fd..eb683eb 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
@@ -39,4 +39,17 @@
      * @return the empty buffer acceptor.
      */
     public IBufferAcceptor getEmptyBufferAcceptor();
+
+    /**
+     * Set the buffer factory which is in charge of creating buffers if the request does not
+     * make the number of allocated buffers goes beyond limit
+     * 
+     * @param bufferFactory
+     *            - the buffer factory
+     * @param limit
+     *            - the limit of buffers
+     * @param frameSize
+     *            - the size of each buffer
+     */
+    public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
index e2fb764..42516ea 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
@@ -39,4 +39,17 @@
      * @return the full buffer acceptor.
      */
     public ICloseableBufferAcceptor getFullBufferAcceptor();
+
+    /**
+     * Set the buffer factory which is in charge of creating buffers if the request does not
+     * make the number of allocated buffers goes beyond limit
+     * 
+     * @param bufferFactory
+     *            - the buffer factory
+     * @param limit
+     *            - the limit of buffers
+     * @param frameSize
+     *            - the size of each buffer
+     */
+    public void setBufferFactory(IBufferFactory bufferFactory, int limit, int frameSize);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 03d57f5..740c447 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -16,18 +16,15 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -46,11 +43,9 @@
     private static final int MIN_CLEANED_COUNT_DIFF = 3;
     private static final int PIN_MAX_WAIT_TIME = 50;
 
-    private final int maxOpenFiles;
-
-    private final IIOManager ioManager;
     private final int pageSize;
-    private final int numPages;
+    private final int maxOpenFiles;
+    private final IIOManager ioManager;
     private final CacheBucket[] pageMap;
     private final IPageReplacementStrategy pageReplacementStrategy;
     private final IPageCleanerPolicy pageCleanerPolicy;
@@ -58,27 +53,21 @@
     private final CleanerThread cleanerThread;
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
 
-    private CachedPage[] cachedPages;
+    private List<ICachedPageInternal> cachedPages = new ArrayList<ICachedPageInternal>();
 
     private boolean closed;
 
-    public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
-            IPageReplacementStrategy pageReplacementStrategy, IPageCleanerPolicy pageCleanerPolicy,
-            IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles, ThreadFactory threadFactory) {
+    public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy,
+            IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles,
+            ThreadFactory threadFactory) {
         this.ioManager = ioManager;
-        this.pageSize = pageSize;
-        this.numPages = numPages;
+        this.pageSize = pageReplacementStrategy.getPageSize();
         this.maxOpenFiles = maxOpenFiles;
         pageReplacementStrategy.setBufferCache(this);
-        pageMap = new CacheBucket[numPages * MAP_FACTOR];
+        pageMap = new CacheBucket[pageReplacementStrategy.getMaxAllowedNumPages() * MAP_FACTOR];
         for (int i = 0; i < pageMap.length; ++i) {
             pageMap[i] = new CacheBucket();
         }
-        ByteBuffer[] buffers = allocator.allocate(pageSize, numPages);
-        cachedPages = new CachedPage[buffers.length];
-        for (int i = 0; i < buffers.length; ++i) {
-            cachedPages[i] = new CachedPage(i, buffers[i], pageReplacementStrategy);
-        }
         this.pageReplacementStrategy = pageReplacementStrategy;
         this.pageCleanerPolicy = pageCleanerPolicy;
         this.fileMapManager = fileMapManager;
@@ -96,7 +85,7 @@
 
     @Override
     public int getNumPages() {
-        return numPages;
+        return pageReplacementStrategy.getMaxAllowedNumPages();
     }
 
     private void pinSanityCheck(long dpid) throws HyracksDataException {
@@ -338,7 +327,8 @@
         StringBuilder buffer = new StringBuilder();
         buffer.append("Buffer cache state\n");
         buffer.append("Page Size: ").append(pageSize).append('\n');
-        buffer.append("Number of physical pages: ").append(numPages).append('\n');
+        buffer.append("Number of physical pages: ").append(pageReplacementStrategy.getMaxAllowedNumPages())
+                .append('\n');
         buffer.append("Hash table size: ").append(pageMap.length).append('\n');
         buffer.append("Page Map:\n");
         int nCachedPages = 0;
@@ -416,88 +406,9 @@
         }
     }
 
-    private class CachedPage implements ICachedPageInternal {
-        private final int cpid;
-        private final ByteBuffer buffer;
-        private final AtomicInteger pinCount;
-        private final AtomicBoolean dirty;
-        private final ReadWriteLock latch;
-        private final Object replacementStrategyObject;
-        volatile long dpid;
-        CachedPage next;
-        volatile boolean valid;
-
-        public CachedPage(int cpid, ByteBuffer buffer, IPageReplacementStrategy pageReplacementStrategy) {
-            this.cpid = cpid;
-            this.buffer = buffer;
-            pinCount = new AtomicInteger();
-            dirty = new AtomicBoolean();
-            latch = new ReentrantReadWriteLock(true);
-            replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid);
-            dpid = -1;
-            valid = false;
-        }
-
-        public void reset(long dpid) {
-            this.dpid = dpid;
-            dirty.set(false);
-            valid = false;
-            pageReplacementStrategy.notifyCachePageReset(this);
-        }
-
-        public void invalidate() {
-            reset(-1);
-        }
-
-        @Override
-        public ByteBuffer getBuffer() {
-            return buffer;
-        }
-
-        @Override
-        public Object getReplacementStrategyObject() {
-            return replacementStrategyObject;
-        }
-
-        @Override
-        public boolean pinIfGoodVictim() {
-            return pinCount.compareAndSet(0, 1);
-        }
-
-        @Override
-        public int getCachedPageId() {
-            return cpid;
-        }
-
-        @Override
-        public void acquireReadLatch() {
-            latch.readLock().lock();
-        }
-
-        @Override
-        public void acquireWriteLatch() {
-            latch.writeLock().lock();
-        }
-
-        @Override
-        public void releaseReadLatch() {
-            latch.readLock().unlock();
-        }
-
-        @Override
-        public void releaseWriteLatch(boolean markDirty) {
-            if (markDirty) {
-                if (dirty.compareAndSet(false, true)) {
-                    pinCount.incrementAndGet();
-                }
-            }
-            latch.writeLock().unlock();
-        }
-    }
-
     @Override
     public ICachedPageInternal getPage(int cpid) {
-        return cachedPages[cpid];
+        return cachedPages.get(cpid);
     }
 
     private class CleanerThread extends Thread {
@@ -564,8 +475,9 @@
             try {
                 while (true) {
                     pageCleanerPolicy.notifyCleanCycleStart(this);
+                    int numPages = pageReplacementStrategy.getNumPages();
                     for (int i = 0; i < numPages; ++i) {
-                        CachedPage cPage = cachedPages[i];
+                        CachedPage cPage = (CachedPage) cachedPages.get(i);
                         cleanPage(cPage, false);
                     }
                     if (shutdownStart) {
@@ -715,7 +627,7 @@
             } else {
                 pinCount = cPage.pinCount.get();
             }
-            if (pinCount != 0) {
+            if (pinCount > 0) {
                 throw new IllegalStateException("Page is pinned and file is being closed. Pincount is: " + pinCount);
             }
             cPage.invalidate();
@@ -808,7 +720,11 @@
     }
 
     @Override
+    public void addPage(ICachedPageInternal page) {
+        cachedPages.add(page);
+    }
+
     public void dumpState(OutputStream os) throws IOException {
         os.write(dumpState().getBytes());
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java
new file mode 100644
index 0000000..d57a356
--- /dev/null
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.common.buffercache;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * @author yingyib
+ */
+class CachedPage implements ICachedPageInternal {
+    final int cpid;
+    final ByteBuffer buffer;
+    final AtomicInteger pinCount;
+    final AtomicBoolean dirty;
+    final ReadWriteLock latch;
+    private final Object replacementStrategyObject;
+    private final IPageReplacementStrategy pageReplacementStrategy;
+    volatile long dpid;
+    CachedPage next;
+    volatile boolean valid;
+
+    public CachedPage(int cpid, ByteBuffer buffer, IPageReplacementStrategy pageReplacementStrategy) {
+        this.cpid = cpid;
+        this.buffer = buffer;
+        this.pageReplacementStrategy = pageReplacementStrategy;
+        pinCount = new AtomicInteger();
+        dirty = new AtomicBoolean();
+        latch = new ReentrantReadWriteLock(true);
+        replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid);
+        dpid = -1;
+        valid = false;
+    }
+
+    public void reset(long dpid) {
+        this.dpid = dpid;
+        dirty.set(false);
+        valid = false;
+        pageReplacementStrategy.notifyCachePageReset(this);
+    }
+
+    public void invalidate() {
+        reset(-1);
+    }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
+
+    @Override
+    public Object getReplacementStrategyObject() {
+        return replacementStrategyObject;
+    }
+
+    @Override
+    public boolean pinIfGoodVictim() {
+        return pinCount.compareAndSet(0, 1);
+    }
+
+    @Override
+    public int getCachedPageId() {
+        return cpid;
+    }
+
+    @Override
+    public void acquireReadLatch() {
+        latch.readLock().lock();
+    }
+
+    @Override
+    public void acquireWriteLatch() {
+        latch.writeLock().lock();
+    }
+
+    @Override
+    public void releaseReadLatch() {
+        latch.readLock().unlock();
+    }
+
+    @Override
+    public void releaseWriteLatch(boolean markDirty) {
+        if (markDirty) {
+            if (dirty.compareAndSet(false, true)) {
+                pinCount.incrementAndGet();
+            }
+        }
+        latch.writeLock().unlock();
+    }
+}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
index ec97344..611bf48 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/ClockPageReplacementStrategy.java
@@ -24,9 +24,16 @@
     private final Lock lock;
     private IBufferCacheInternal bufferCache;
     private int clockPtr;
+    private ICacheMemoryAllocator allocator;
+    private int numPages = 0;
+    private final int pageSize;
+    private final int maxAllowedNumPages;
 
-    public ClockPageReplacementStrategy() {
+    public ClockPageReplacementStrategy(ICacheMemoryAllocator allocator, int pageSize, int maxAllowedNumPages) {
         this.lock = new ReentrantLock();
+        this.allocator = allocator;
+        this.pageSize = pageSize;
+        this.maxAllowedNumPages = maxAllowedNumPages;
         clockPtr = 0;
     }
 
@@ -53,38 +60,83 @@
     @Override
     public ICachedPageInternal findVictim() {
         lock.lock();
+        ICachedPageInternal cachedPage = null;
         try {
-            int startClockPtr = clockPtr;
-            int cycleCount = 0;
-            do {
-                ICachedPageInternal cPage = bufferCache.getPage(clockPtr);
-
-                /*
-                 * We do two things here:
-                 * 1. If the page has been accessed, then we skip it -- The CAS would return
-                 * false if the current value is false which makes the page a possible candidate
-                 * for replacement.
-                 * 2. We check with the buffer manager if it feels its a good idea to use this
-                 * page as a victim.
-                 */
-                AtomicBoolean accessedFlag = getPerPageObject(cPage);
-                if (!accessedFlag.compareAndSet(true, false)) {
-                    if (cPage.pinIfGoodVictim()) {
-                        return cPage;
-                    }
-                }
-                clockPtr = (clockPtr + 1) % bufferCache.getNumPages();
-                if (clockPtr == startClockPtr) {
-                    ++cycleCount;
-                }
-            } while (cycleCount < MAX_UNSUCCESSFUL_CYCLE_COUNT);
+            if (numPages >= maxAllowedNumPages) {
+                cachedPage = findVictimByEviction();
+            } else {
+                cachedPage = allocatePage();
+            }
         } finally {
             lock.unlock();
         }
+        return cachedPage;
+    }
+
+    private ICachedPageInternal findVictimByEviction() {
+        int startClockPtr = clockPtr;
+        int cycleCount = 0;
+        do {
+            ICachedPageInternal cPage = bufferCache.getPage(clockPtr);
+
+            /*
+             * We do two things here:
+             * 1. If the page has been accessed, then we skip it -- The CAS would return
+             * false if the current value is false which makes the page a possible candidate
+             * for replacement.
+             * 2. We check with the buffer manager if it feels its a good idea to use this
+             * page as a victim.
+             */
+            AtomicBoolean accessedFlag = getPerPageObject(cPage);
+            if (!accessedFlag.compareAndSet(true, false)) {
+                if (cPage.pinIfGoodVictim()) {
+                    return cPage;
+                }
+            }
+            clockPtr = (clockPtr + 1) % numPages;
+            if (clockPtr == startClockPtr) {
+                ++cycleCount;
+            }
+        } while (cycleCount < MAX_UNSUCCESSFUL_CYCLE_COUNT);
+        return null;
+    }
+
+    @Override
+    public int getNumPages() {
+        int retNumPages = 0;
+        lock.lock();
+        try {
+            retNumPages = numPages;
+        } finally {
+            lock.unlock();
+        }
+        return retNumPages;
+    }
+
+    private ICachedPageInternal allocatePage() {
+        CachedPage cPage = new CachedPage(numPages, allocator.allocate(pageSize, 1)[0], this);
+        bufferCache.addPage(cPage);
+        numPages++;
+        AtomicBoolean accessedFlag = getPerPageObject(cPage);
+        if (!accessedFlag.compareAndSet(true, false)) {
+            if (cPage.pinIfGoodVictim()) {
+                return cPage;
+            }
+        }
         return null;
     }
 
     private AtomicBoolean getPerPageObject(ICachedPageInternal cPage) {
         return (AtomicBoolean) cPage.getReplacementStrategyObject();
     }
+
+    @Override
+    public int getPageSize() {
+        return pageSize;
+    }
+
+    @Override
+    public int getMaxAllowedNumPages() {
+        return maxAllowedNumPages;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCacheInternal.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCacheInternal.java
index 4c9e949..cd2e853 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCacheInternal.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCacheInternal.java
@@ -16,4 +16,6 @@
 
 public interface IBufferCacheInternal extends IBufferCache {
     public ICachedPageInternal getPage(int cpid);
+
+    public void addPage(ICachedPageInternal page);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageReplacementStrategy.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageReplacementStrategy.java
index b6bfdc3..0adcf68 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageReplacementStrategy.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageReplacementStrategy.java
@@ -24,4 +24,10 @@
     public void notifyCachePageAccess(ICachedPageInternal cPage);
 
     public ICachedPageInternal findVictim();
+
+    public int getNumPages();
+
+    public int getPageSize();
+
+    public int getMaxAllowedNumPages();
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
index dd85945..2370b84 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -79,10 +79,10 @@
     public synchronized static IBufferCache getBufferCache(IHyracksTaskContext ctx) {
         if (bufferCache == null) {
             ICacheMemoryAllocator allocator = new HeapBufferAllocator();
-            IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
+            IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, pageSize, numPages);
             IFileMapProvider fileMapProvider = getFileMapProvider(ctx);
-            bufferCache = new BufferCache(ctx.getIOManager(), allocator, prs, new DelayPageCleanerPolicy(1000),
-                    (IFileMapManager) fileMapProvider, pageSize, numPages, maxOpenFiles, threadFactory);
+            bufferCache = new BufferCache(ctx.getIOManager(), prs, new DelayPageCleanerPolicy(1000),
+                    (IFileMapManager) fileMapProvider, maxOpenFiles, threadFactory);
         }
         return bufferCache;
     }