Merge fullstack_asterix_stabilization into fullstack_hyracks_result_distribution.

git-svn-id: 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/pom.xml b/hyracks/hyracks-api/pom.xml
index 09307e4..6807f76 100644
--- a/hyracks/hyracks-api/pom.xml
+++ b/hyracks/hyracks-api/pom.xml
@@ -17,6 +17,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/
index f36b7b3..0eac9a2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/
@@ -17,6 +17,7 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.rewriter.ActivityClusterGraphRewriter;
 public class JobSpecificationActivityClusterGraphGeneratorFactory implements IActivityClusterGraphGeneratorFactory {
     private static final long serialVersionUID = 1L;
@@ -78,6 +79,8 @@
         return new IActivityClusterGraphGenerator() {
             public ActivityClusterGraph initialize() {
+                ActivityClusterGraphRewriter rewriter = new ActivityClusterGraphRewriter();
+                rewriter.rewrite(acg);
                 return acg;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/
index 6698ff7..9fb2b08 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/
@@ -33,7 +33,7 @@
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-public final class ActivityCluster implements Serializable {
+public class ActivityCluster implements Serializable {
     private static final long serialVersionUID = 1L;
     private final ActivityClusterGraph acg;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/
new file mode 100644
index 0000000..c6761e9
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/
@@ -0,0 +1,381 @@
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.rewriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.rewriter.runtime.SuperActivity;
+ * This class rewrite the AcivityClusterGraph to eliminate
+ * all one-to-one connections and merge one-to-one connected
+ * DAGs into super activities.
+ * </p>
+ * Each super activity internally maintains a DAG and execute it at the runtime.
+ * 
+ * @author yingyib
+ */
+public class ActivityClusterGraphRewriter {
+    private static String ONE_TO_ONE_CONNECTOR = "edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor";
+    /**
+     * rewrite an activity cluster graph to eliminate
+     * all one-to-one connections and merge one-to-one connected
+     * DAGs into super activities.
+     * 
+     * @param acg
+     *            the activity cluster graph
+     */
+    public void rewrite(ActivityClusterGraph acg) {
+        acg.getActivityMap().clear();
+        acg.getConnectorMap().clear();
+        Map<IActivity, SuperActivity> invertedActivitySuperActivityMap = new HashMap<IActivity, SuperActivity>();
+        for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+            rewriteIntraActivityCluster(entry.getValue(), invertedActivitySuperActivityMap);
+        }
+        for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+            rewriteInterActivityCluster(entry.getValue(), invertedActivitySuperActivityMap);
+        }
+        invertedActivitySuperActivityMap.clear();
+    }
+    /**
+     * rewrite the blocking relationship among activity cluster
+     * 
+     * @param ac
+     *            the activity cluster to be rewritten
+     */
+    private void rewriteInterActivityCluster(ActivityCluster ac,
+            Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
+        Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = ac.getBlocked2BlockerMap();
+        Map<ActivityId, ActivityId> invertedAid2SuperAidMap = new HashMap<ActivityId, ActivityId>();
+        for (Entry<IActivity, SuperActivity> entry : invertedActivitySuperActivityMap.entrySet()) {
+            invertedAid2SuperAidMap.put(entry.getKey().getActivityId(), entry.getValue().getActivityId());
+        }
+        Map<ActivityId, Set<ActivityId>> replacedBlocked2BlockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+        for (Entry<ActivityId, Set<ActivityId>> entry : blocked2BlockerMap.entrySet()) {
+            ActivityId blocked = entry.getKey();
+            ActivityId replacedBlocked = invertedAid2SuperAidMap.get(blocked);
+            Set<ActivityId> blockers = entry.getValue();
+            Set<ActivityId> replacedBlockers = null;
+            if (blockers != null) {
+                replacedBlockers = new HashSet<ActivityId>();
+                for (ActivityId blocker : blockers) {
+                    replacedBlockers.add(invertedAid2SuperAidMap.get(blocker));
+                    ActivityCluster dependingAc = ac.getActivityClusterGraph().getActivityMap()
+                            .get(invertedAid2SuperAidMap.get(blocker));
+                    if (!ac.getDependencies().contains(dependingAc)) {
+                        ac.getDependencies().add(dependingAc);
+                    }
+                }
+            }
+            if (replacedBlockers != null) {
+                Set<ActivityId> existingBlockers = replacedBlocked2BlockerMap.get(replacedBlocked);
+                if (existingBlockers == null) {
+                    replacedBlocked2BlockerMap.put(replacedBlocked, replacedBlockers);
+                } else {
+                    existingBlockers.addAll(replacedBlockers);
+                    replacedBlocked2BlockerMap.put(replacedBlocked, existingBlockers);
+                }
+            }
+        }
+        blocked2BlockerMap.clear();
+        blocked2BlockerMap.putAll(replacedBlocked2BlockerMap);
+    }
+    /**
+     * rewrite an activity cluster internally
+     * 
+     * @param ac
+     *            the activity cluster to be rewritten
+     */
+    private void rewriteIntraActivityCluster(ActivityCluster ac,
+            Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
+        Map<ActivityId, IActivity> activities = ac.getActivityMap();
+        Map<ActivityId, List<IConnectorDescriptor>> activityInputMap = ac.getActivityInputMap();
+        Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap = ac.getActivityOutputMap();
+        Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap = ac
+                .getConnectorActivityMap();
+        ActivityClusterGraph acg = ac.getActivityClusterGraph();
+        Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
+        Map<ActivityId, SuperActivity> superActivities = new HashMap<ActivityId, SuperActivity>();
+        Map<ActivityId, Queue<IActivity>> toBeExpendedMap = new HashMap<ActivityId, Queue<IActivity>>();
+        /**
+         * Build the initial super activities
+         */
+        for (Entry<ActivityId, IActivity> entry : activities.entrySet()) {
+            ActivityId activityId = entry.getKey();
+            IActivity activity = entry.getValue();
+            if (activityInputMap.get(activityId) == null) {
+                startActivities.put(activityId, activity);
+                /**
+                 * use the start activity's id as the id of the super activity
+                 */
+                createNewSuperActivity(ac, superActivities, toBeExpendedMap, invertedActivitySuperActivityMap,
+                        activityId, activity);
+            }
+        }
+        /**
+         * expand one-to-one connected activity cluster by the BFS order.
+         * after the while-loop, the original activities are partitioned
+         * into equivalent classes, one-per-super-activity.
+         */
+        Map<ActivityId, SuperActivity> clonedSuperActivities = new HashMap<ActivityId, SuperActivity>();
+        while (toBeExpendedMap.size() > 0) {
+            clonedSuperActivities.clear();
+            clonedSuperActivities.putAll(superActivities);
+            for (Entry<ActivityId, SuperActivity> entry : clonedSuperActivities.entrySet()) {
+                ActivityId superActivityId = entry.getKey();
+                SuperActivity superActivity = entry.getValue();
+                /**
+                 * for the case where the super activity has already been swallowed
+                 */
+                if (superActivities.get(superActivityId) == null) {
+                    continue;
+                }
+                /**
+                 * expend the super activity
+                 */
+                Queue<IActivity> toBeExpended = toBeExpendedMap.get(superActivityId);
+                if (toBeExpended == null) {
+                    /**
+                     * Nothing to expand
+                     */
+                    continue;
+                }
+                IActivity expendingActivity = toBeExpended.poll();
+                List<IConnectorDescriptor> outputConnectors = activityOutputMap.get(expendingActivity.getActivityId());
+                if (outputConnectors != null) {
+                    for (IConnectorDescriptor outputConn : outputConnectors) {
+                        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints = connectorActivityMap
+                                .get(outputConn.getConnectorId());
+                        IActivity newActivity = endPoints.getRight().getLeft();
+                        SuperActivity existingSuperActivity = invertedActivitySuperActivityMap.get(newActivity);
+                        if (outputConn.getClass().getName().contains(ONE_TO_ONE_CONNECTOR)) {
+                            /**
+                             * expend the super activity cluster on an one-to-one out-bound connection
+                             */
+                            if (existingSuperActivity == null) {
+                                superActivity.addActivity(newActivity);
+                                toBeExpended.add(newActivity);
+                                invertedActivitySuperActivityMap.put(newActivity, superActivity);
+                            } else {
+                                /**
+                                 * the two activities already in the same super activity
+                                 */
+                                if (existingSuperActivity == superActivity) {
+                                    continue;
+                                }
+                                /**
+                                 * swallow an existing super activity
+                                 */
+                                swallowExistingSuperActivity(superActivities, toBeExpendedMap,
+                                        invertedActivitySuperActivityMap, superActivity, superActivityId,
+                                        existingSuperActivity);
+                            }
+                        } else {
+                            if (existingSuperActivity == null) {
+                                /**
+                                 * create new activity
+                                 */
+                                createNewSuperActivity(ac, superActivities, toBeExpendedMap,
+                                        invertedActivitySuperActivityMap, newActivity.getActivityId(), newActivity);
+                            }
+                        }
+                    }
+                }
+                /**
+                 * remove the to-be-expended queue if it is empty
+                 */
+                if (toBeExpended.size() == 0) {
+                    toBeExpendedMap.remove(superActivityId);
+                }
+            }
+        }
+        Map<ConnectorDescriptorId, IConnectorDescriptor> connMap = ac.getConnectorMap();
+        Map<ConnectorDescriptorId, RecordDescriptor> connRecordDesc = ac.getConnectorRecordDescriptorMap();
+        Map<SuperActivity, Integer> superActivityProducerPort = new HashMap<SuperActivity, Integer>();
+        Map<SuperActivity, Integer> superActivityConsumerPort = new HashMap<SuperActivity, Integer>();
+        for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
+            superActivityProducerPort.put(entry.getValue(), 0);
+            superActivityConsumerPort.put(entry.getValue(), 0);
+        }
+        /**
+         * create a new activity cluster to replace the old activity cluster
+         */
+        ActivityCluster newActivityCluster = new ActivityCluster(acg, ac.getId());
+        newActivityCluster.setConnectorPolicyAssignmentPolicy(ac.getConnectorPolicyAssignmentPolicy());
+        for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
+            newActivityCluster.addActivity(entry.getValue());
+            acg.getActivityMap().put(entry.getKey(), newActivityCluster);
+        }
+        /**
+         * Setup connectors: either inside a super activity or among super activities
+         */
+        for (Entry<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> entry : connectorActivityMap
+                .entrySet()) {
+            ConnectorDescriptorId connectorId = entry.getKey();
+            Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints = entry.getValue();
+            IActivity producerActivity = endPoints.getLeft().getLeft();
+            IActivity consumerActivity = endPoints.getRight().getLeft();
+            int producerPort = endPoints.getLeft().getRight();
+            int consumerPort = endPoints.getRight().getRight();
+            RecordDescriptor recordDescriptor = connRecordDesc.get(connectorId);
+            IConnectorDescriptor conn = connMap.get(connectorId);
+            if (conn.getClass().getName().contains(ONE_TO_ONE_CONNECTOR)) {
+                /**
+                 * connection edge between inner activities
+                 */
+                SuperActivity residingSuperActivity = invertedActivitySuperActivityMap.get(producerActivity);
+                residingSuperActivity.connect(conn, producerActivity, producerPort, consumerActivity, consumerPort,
+                        recordDescriptor);
+            } else {
+                /**
+                 * connection edge between super activities
+                 */
+                SuperActivity producerSuperActivity = invertedActivitySuperActivityMap.get(producerActivity);
+                SuperActivity consumerSuperActivity = invertedActivitySuperActivityMap.get(consumerActivity);
+                int producerSAPort = superActivityProducerPort.get(producerSuperActivity);
+                int consumerSAPort = superActivityConsumerPort.get(consumerSuperActivity);
+                newActivityCluster.addConnector(conn);
+                newActivityCluster.connect(conn, producerSuperActivity, producerSAPort, consumerSuperActivity,
+                        consumerSAPort, recordDescriptor);
+                /**
+                 * bridge the port
+                 */
+                producerSuperActivity.setClusterOutputIndex(producerSAPort, producerActivity.getActivityId(),
+                        producerPort);
+                consumerSuperActivity.setClusterInputIndex(consumerSAPort, consumerActivity.getActivityId(),
+                        consumerPort);
+                acg.getConnectorMap().put(connectorId, newActivityCluster);
+                /**
+                 * increasing the port number for the producer and consumer
+                 */
+                superActivityProducerPort.put(producerSuperActivity, ++producerSAPort);
+                superActivityConsumerPort.put(consumerSuperActivity, ++consumerSAPort);
+            }
+        }
+        /**
+         * Set up the roots of the new activity cluster
+         */
+        for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
+            List<IConnectorDescriptor> connIds = newActivityCluster.getActivityOutputMap().get(entry.getKey());
+            if (connIds == null || connIds.size() == 0) {
+                newActivityCluster.addRoot(entry.getValue());
+            }
+        }
+        /**
+         * set up the blocked2Blocker mapping, which will be updated in the rewriteInterActivityCluster call
+         */
+        newActivityCluster.getBlocked2BlockerMap().putAll(ac.getBlocked2BlockerMap());
+        /**
+         * replace the old activity cluster with the new activity cluster
+         */
+        acg.getActivityClusterMap().put(ac.getId(), newActivityCluster);
+    }
+    /**
+     * Create a new super activity
+     * 
+     * @param acg
+     *            the activity cluster
+     * @param superActivities
+     *            the map from activity id to current super activities
+     * @param toBeExpendedMap
+     *            the map from an existing super activity to its BFS expansion queue of the original activities
+     * @param invertedActivitySuperActivityMap
+     *            the map from the original activities to their hosted super activities
+     * @param activityId
+     *            the activity id for the new super activity, which is the first added acitivty's id in the super activity
+     * @param activity
+     *            the first activity added to the new super activity
+     */
+    private void createNewSuperActivity(ActivityCluster acg, Map<ActivityId, SuperActivity> superActivities,
+            Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
+            Map<IActivity, SuperActivity> invertedActivitySuperActivityMap, ActivityId activityId, IActivity activity) {
+        SuperActivity superActivity = new SuperActivity(acg.getActivityClusterGraph(), acg.getId(), activityId);
+        superActivities.put(activityId, superActivity);
+        superActivity.addActivity(activity);
+        Queue<IActivity> toBeExpended = new LinkedList<IActivity>();
+        toBeExpended.add(activity);
+        toBeExpendedMap.put(activityId, toBeExpended);
+        invertedActivitySuperActivityMap.put(activity, superActivity);
+    }
+    /**
+     * One super activity swallows another existing super activity.
+     * 
+     * @param superActivities
+     *            the map from activity id to current super activities
+     * @param toBeExpendedMap
+     *            the map from an existing super activity to its BFS expansion queue of the original activities
+     * @param invertedActivitySuperActivityMap
+     *            the map from the original activities to their hosted super activities
+     * @param superActivity
+     *            the "swallowing" super activity
+     * @param superActivityId
+     *            the activity id for the "swallowing" super activity, which is also the first added acitivty's id in the super activity
+     * @param existingSuperActivity
+     *            an existing super activity which is to be swallowed by the "swallowing" super activity
+     */
+    private void swallowExistingSuperActivity(Map<ActivityId, SuperActivity> superActivities,
+            Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
+            Map<IActivity, SuperActivity> invertedActivitySuperActivityMap, SuperActivity superActivity,
+            ActivityId superActivityId, SuperActivity existingSuperActivity) {
+        ActivityId existingSuperActivityId = existingSuperActivity.getActivityId();
+        superActivities.remove(existingSuperActivityId);
+        for (Entry<ActivityId, IActivity> existingEntry : existingSuperActivity.getActivityMap().entrySet()) {
+            IActivity existingActivity = existingEntry.getValue();
+            superActivity.addActivity(existingActivity);
+            invertedActivitySuperActivityMap.put(existingActivity, superActivity);
+        }
+        Queue<IActivity> tbeQueue = toBeExpendedMap.get(superActivityId);
+        Queue<IActivity> existingTbeQueque = toBeExpendedMap.remove(existingSuperActivityId);
+        if (existingTbeQueque != null) {
+            tbeQueue.addAll(existingTbeQueque);
+        }
+    }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/
new file mode 100644
index 0000000..07b7ffc
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/
@@ -0,0 +1,122 @@
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.rewriter;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+ * All the connectors in an OneToOneConnectedCluster are OneToOneConnectorDescriptors.
+ * 
+ * @author yingyib
+ */
+public class OneToOneConnectedActivityCluster extends ActivityCluster {
+    private static final long serialVersionUID = 1L;
+    protected final Map<Integer, Pair<ActivityId, Integer>> clusterInputIndexMap = new HashMap<Integer, Pair<ActivityId, Integer>>();
+    protected final Map<Integer, Pair<ActivityId, Integer>> clusterOutputIndexMap = new HashMap<Integer, Pair<ActivityId, Integer>>();
+    protected final Map<Pair<ActivityId, Integer>, Integer> invertedClusterOutputIndexMap = new HashMap<Pair<ActivityId, Integer>, Integer>();
+    protected final Map<Pair<ActivityId, Integer>, Integer> invertedClusterInputIndexMap = new HashMap<Pair<ActivityId, Integer>, Integer>();
+    public OneToOneConnectedActivityCluster(ActivityClusterGraph acg, ActivityClusterId id) {
+        super(acg, id);
+    }
+    /**
+     * Set up the mapping of the cluster's output channel to an internal activity and its output channel
+     * 
+     * @param clusterOutputIndex
+     *            the output channel index for the cluster
+     * @param activityId
+     *            the id of the internal activity which produces the corresponding output
+     * @param activityOutputIndex
+     *            the output channel index of the internal activity which corresponds to the output channel of the cluster of activities
+     */
+    public void setClusterOutputIndex(int clusterOutputIndex, ActivityId activityId, int activityOutputIndex) {
+        clusterOutputIndexMap.put(clusterOutputIndex, Pair.of(activityId, activityOutputIndex));
+        invertedClusterOutputIndexMap.put(Pair.of(activityId, activityOutputIndex), clusterOutputIndex);
+    }
+    /**
+     * get the an internal activity and its output channel of a cluster output channel
+     * 
+     * @param clusterOutputIndex
+     *            the output channel index for the cluster
+     * @return a pair containing the activity id of the corresponding internal activity and the output channel index
+     */
+    public Pair<ActivityId, Integer> getActivityIdOutputIndex(int clusterOutputIndex) {
+        return clusterOutputIndexMap.get(clusterOutputIndex);
+    }
+    /**
+     * Set up the mapping of the cluster's input channel to an internal activity and input output channel
+     * 
+     * @param clusterInputIndex
+     *            the input channel index for the cluster
+     * @param activityId
+     *            the id of the internal activity which consumes the corresponding input
+     * @param activityInputIndex
+     *            the output channel index of the internal activity which corresponds to the input channel of the cluster of activities
+     */
+    public void setClusterInputIndex(int clusterInputIndex, ActivityId activityId, int activityInputIndex) {
+        clusterInputIndexMap.put(clusterInputIndex, Pair.of(activityId, activityInputIndex));
+        invertedClusterInputIndexMap.put(Pair.of(activityId, activityInputIndex), clusterInputIndex);
+    }
+    /**
+     * get the an internal activity and its input channel of a cluster input channel
+     * 
+     * @param clusterOutputIndex
+     *            the output channel index for the cluster
+     * @return a pair containing the activity id of the corresponding internal activity and the output channel index
+     */
+    public Pair<ActivityId, Integer> getActivityIdInputIndex(int clusterInputIndex) {
+        return clusterInputIndexMap.get(clusterInputIndex);
+    }
+    /**
+     * Get the cluster input channel of an input-boundary activity and its input channel
+     * 
+     * @param activityInputChannel
+     *            the input-boundary activity and its input channel
+     * @return the cluster input channel
+     */
+    public int getClusterInputIndex(Pair<ActivityId, Integer> activityInputChannel) {
+        Integer channel = invertedClusterInputIndexMap.get(activityInputChannel);
+        return channel == null ? -1 : channel;
+    }
+    /**
+     * Get the cluster output channel of an input-boundary activity and its output channel
+     * 
+     * @param activityOutputChannel
+     *            the output-boundary activity and its output channel
+     * @return the cluster output channel
+     */
+    public int getClusterOutputIndex(Pair<ActivityId, Integer> activityOutputChannel) {
+        Integer channel = invertedClusterOutputIndexMap.get(activityOutputChannel);
+        return channel == null ? -1 : channel;
+    }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/
new file mode 100644
index 0000000..734ff85
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/
@@ -0,0 +1,177 @@
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.rewriter.runtime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.lang3.tuple.Pair;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.rewriter.OneToOneConnectedActivityCluster;
+ * This class can be used to execute a DAG of activities inside which
+ * there are only one-to-one connectors.
+ * 
+ * @author yingyib
+ */
+public class SuperActivity extends OneToOneConnectedActivityCluster implements IActivity {
+    private static final long serialVersionUID = 1L;
+    private final ActivityId activityId;
+    public SuperActivity(ActivityClusterGraph acg, ActivityClusterId id, ActivityId activityId) {
+        super(acg, id);
+        this.activityId = activityId;
+    }
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+            throws HyracksDataException {
+        final Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
+        Map<ActivityId, IActivity> activities = getActivityMap();
+        for (Entry<ActivityId, IActivity> entry : activities.entrySet()) {
+            /**
+             * extract start activities
+             */
+            List<IConnectorDescriptor> conns = getActivityInputMap().get(entry.getKey());
+            if (conns == null || conns.size() == 0) {
+                startActivities.put(entry.getKey(), entry.getValue());
+            }
+        }
+        /**
+         * wrap a RecordDescriptorProvider for the super activity
+         */
+        IRecordDescriptorProvider wrappedRecDescProvider = new IRecordDescriptorProvider() {
+            @Override
+            public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
+                if (startActivities.get(aid) != null) {
+                    /**
+                     * if the activity is a start (input boundary) activity
+                     */
+                    int superActivityInputChannel = SuperActivity.this.getClusterInputIndex(Pair.of(aid, inputIndex));
+                    if (superActivityInputChannel >= 0) {
+                        return recordDescProvider.getInputRecordDescriptor(activityId, superActivityInputChannel);
+                    }
+                }
+                if (SuperActivity.this.getActivityMap().get(aid) != null) {
+                    /**
+                     * if the activity is an internal activity of the super activity
+                     */
+                    IConnectorDescriptor conn = getActivityInputMap().get(aid).get(inputIndex);
+                    return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                }
+                /**
+                 * the following is for the case where the activity is in other SuperActivities
+                 */
+                ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
+                for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+                    ActivityCluster ac = entry.getValue();
+                    for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
+                        SuperActivity sa = (SuperActivity) saEntry.getValue();
+                        if (sa.getActivityMap().get(aid) != null) {
+                            List<IConnectorDescriptor> conns = sa.getActivityInputMap().get(aid);
+                            if (conns != null && conns.size() >= inputIndex) {
+                                IConnectorDescriptor conn = conns.get(inputIndex);
+                                return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                            } else {
+                                int superActivityInputChannel = sa.getClusterInputIndex(Pair.of(aid, inputIndex));
+                                if (superActivityInputChannel >= 0) {
+                                    return recordDescProvider.getInputRecordDescriptor(sa.getActivityId(),
+                                            superActivityInputChannel);
+                                }
+                            }
+                        }
+                    }
+                }
+                return null;
+            }
+            @Override
+            public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
+                /**
+                 * if the activity is an output-boundary activity
+                 */
+                int superActivityOutputChannel = SuperActivity.this.getClusterOutputIndex(Pair.of(aid, outputIndex));
+                if (superActivityOutputChannel >= 0) {
+                    return recordDescProvider.getOutputRecordDescriptor(activityId, superActivityOutputChannel);
+                }
+                if (SuperActivity.this.getActivityMap().get(aid) != null) {
+                    /**
+                     * if the activity is an internal activity of the super activity
+                     */
+                    IConnectorDescriptor conn = getActivityOutputMap().get(aid).get(outputIndex);
+                    return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                }
+                /**
+                 * the following is for the case where the activity is in other SuperActivities
+                 */
+                ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
+                for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+                    ActivityCluster ac = entry.getValue();
+                    for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
+                        SuperActivity sa = (SuperActivity) saEntry.getValue();
+                        if (sa.getActivityMap().get(aid) != null) {
+                            List<IConnectorDescriptor> conns = sa.getActivityOutputMap().get(aid);
+                            if (conns != null && conns.size() >= outputIndex) {
+                                IConnectorDescriptor conn = conns.get(outputIndex);
+                                return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                            } else {
+                                superActivityOutputChannel = sa.getClusterOutputIndex(Pair.of(aid, outputIndex));
+                                if (superActivityOutputChannel >= 0) {
+                                    return recordDescProvider.getOutputRecordDescriptor(sa.getActivityId(),
+                                            superActivityOutputChannel);
+                                }
+                            }
+                        }
+                    }
+                }
+                return null;
+            }
+        };
+        return new SuperActivityOperatorNodePushable(this, startActivities, ctx, wrappedRecDescProvider, partition,
+                nPartitions);
+    }
+    @Override
+    public ActivityId getActivityId() {
+        return activityId;
+    }
+    @Override
+    public String toString() {
+        return getActivityMap().values().toString();
+    }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/
new file mode 100644
index 0000000..7d50fa0
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/
@@ -0,0 +1,195 @@
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.rewriter.runtime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import org.apache.commons.lang3.tuple.Pair;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+ * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
+ * connected activities in a single thread.
+ * 
+ * @author yingyib
+ */
+public class SuperActivityOperatorNodePushable implements IOperatorNodePushable {
+    private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
+    private final List<IOperatorNodePushable> operatprNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
+    private final Map<ActivityId, IActivity> startActivities;
+    private final SuperActivity parent;
+    private final IHyracksTaskContext ctx;
+    private final IRecordDescriptorProvider recordDescProvider;
+    private final int partition;
+    private final int nPartitions;
+    private int inputArity = 0;
+    public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities,
+            IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        this.parent = parent;
+        this.startActivities = startActivities;
+        this.ctx = ctx;
+        this.recordDescProvider = recordDescProvider;
+        this.partition = partition;
+        this.nPartitions = nPartitions;
+        /**
+         * initialize the writer-relationship for the internal DAG of operator
+         * node pushables
+         */
+        try {
+            init();
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+    @Override
+    public void initialize() throws HyracksDataException {
+        /**
+         * initialize operator node pushables in the BFS order
+         */
+        for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
+            op.initialize();
+        }
+    }
+    public void init() throws HyracksDataException {
+        Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
+        Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
+        List<IConnectorDescriptor> outputConnectors = null;
+        /**
+         * Set up the source operators
+         */
+        for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
+            IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
+                    nPartitions);
+            startOperatorNodePushables.put(entry.getKey(), opPushable);
+            operatprNodePushablesBFSOrder.add(opPushable);
+            operatorNodePushables.put(entry.getKey(), opPushable);
+            inputArity += opPushable.getInputArity();
+            outputConnectors = parent.getActivityOutputMap().get(entry.getKey());
+            if (outputConnectors != null) {
+                for (IConnectorDescriptor conn : outputConnectors) {
+                    childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+                }
+            }
+        }
+        /**
+         * Using BFS (breadth-first search) to construct to runtime execution
+         * DAG;
+         */
+        while (childQueue.size() > 0) {
+            /**
+             * expend the executing activities further to the downstream
+             */
+            if (outputConnectors != null && outputConnectors.size() > 0) {
+                for (IConnectorDescriptor conn : outputConnectors) {
+                    if (conn != null) {
+                        childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+                    }
+                }
+            }
+            /**
+             * construct the source to destination information
+             */
+            Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> channel = childQueue.poll();
+            ActivityId sourceId = channel.getLeft().getLeft().getActivityId();
+            int outputChannel = channel.getLeft().getRight();
+            ActivityId destId = channel.getRight().getLeft().getActivityId();
+            int inputChannel = channel.getRight().getRight();
+            IOperatorNodePushable sourceOp = operatorNodePushables.get(sourceId);
+            IOperatorNodePushable destOp = operatorNodePushables.get(destId);
+            if (destOp == null) {
+                destOp = channel.getRight().getLeft()
+                        .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+                operatprNodePushablesBFSOrder.add(destOp);
+                operatorNodePushables.put(destId, destOp);
+            }
+            /**
+             * construct the dataflow connection from a producer to a consumer
+             */
+            sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel),
+                    recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
+            /**
+             * traverse to the child of the current activity
+             */
+            outputConnectors = parent.getActivityOutputMap().get(destId);
+        }
+    }
+    @Override
+    public void deinitialize() throws HyracksDataException {
+        /**
+         * de-initialize operator node pushables
+         */
+        for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
+            op.deinitialize();
+        }
+    }
+    @Override
+    public int getInputArity() {
+        return inputArity;
+    }
+    @Override
+    public void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer, RecordDescriptor recordDesc) {
+        /**
+         * set the right output frame writer
+         */
+        Pair<ActivityId, Integer> activityIdOutputIndex = parent.getActivityIdOutputIndex(clusterOutputIndex);
+        IOperatorNodePushable opPushable = operatorNodePushables.get(activityIdOutputIndex.getLeft());
+        opPushable.setOutputFrameWriter(activityIdOutputIndex.getRight(), writer, recordDesc);
+    }
+    @Override
+    public IFrameWriter getInputFrameWriter(final int index) {
+        /**
+         * get the right IFrameWriter from the cluster input index
+         */
+        Pair<ActivityId, Integer> activityIdInputIndex = parent.getActivityIdInputIndex(index);
+        IOperatorNodePushable operatorNodePushable = operatorNodePushables.get(activityIdInputIndex.getLeft());
+        IFrameWriter writer = operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
+        return writer;
+    }
+    @Override
+    public String getDisplayName() {
+        return "Super Activity " + parent.getActivityMap().values().toString();
+    }
diff --git a/hyracks/hyracks-cli/pom.xml b/hyracks/hyracks-cli/pom.xml
index 0456625..991ce2a 100644
--- a/hyracks/hyracks-cli/pom.xml
+++ b/hyracks/hyracks-cli/pom.xml
@@ -20,6 +20,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-control/hyracks-control-cc/pom.xml b/hyracks/hyracks-control/hyracks-control-cc/pom.xml
index 95598da..d644673 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-cc/pom.xml
@@ -17,6 +17,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks/hyracks-control/hyracks-control-common/pom.xml
index 2efdd42..ce1298e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -20,6 +20,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index f103f4a..c44cec9 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -17,6 +17,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-data/hyracks-data-std/pom.xml b/hyracks/hyracks-data/hyracks-data-std/pom.xml
index 85c8fd5..8f5f04e 100644
--- a/hyracks/hyracks-data/hyracks-data-std/pom.xml
+++ b/hyracks/hyracks-data/hyracks-data-std/pom.xml
@@ -18,6 +18,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-dataflow-common/pom.xml b/hyracks/hyracks-dataflow-common/pom.xml
index 393e97f..1a2950b 100644
--- a/hyracks/hyracks-dataflow-common/pom.xml
+++ b/hyracks/hyracks-dataflow-common/pom.xml
@@ -17,6 +17,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/
index 51645c4..ec3c2be 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/
@@ -52,10 +52,10 @@
                     h += fh;
                 if (h < 0) {
-                    h = -h;
+                    h = -(h+1);
                 return h % nParts;
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-hadoop/pom.xml b/hyracks/hyracks-dataflow-hadoop/pom.xml
index c01a6a7..f5135f8 100644
--- a/hyracks/hyracks-dataflow-hadoop/pom.xml
+++ b/hyracks/hyracks-dataflow-hadoop/pom.xml
@@ -1,8 +1,6 @@
 <project xmlns="" xmlns:xsi="" xsi:schemaLocation="">
-  <groupId>edu.uci.ics.hyracks</groupId>
-  <version>0.2.3-SNAPSHOT</version>
diff --git a/hyracks/hyracks-dataflow-std/pom.xml b/hyracks/hyracks-dataflow-std/pom.xml
index 0cb5516..2cf0fdc 100644
--- a/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks/hyracks-dataflow-std/pom.xml
@@ -1,8 +1,6 @@
 <project xmlns="" xmlns:xsi="" xsi:schemaLocation="">
-  <groupId>edu.uci.ics.hyracks</groupId>
-  <version>0.2.3-SNAPSHOT</version>
@@ -20,6 +18,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/
index f2b56fa..f86d9fb 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/
@@ -256,8 +256,6 @@
                 outputAppender.reset(outputFrame, true);
-      ;
                 if (tPointers == null) {
                     // Not sorted
                     for (int i = 0; i < tableSize; ++i) {
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/ b/hyracks/hyracks-dist/src/main/resources/bin/
index e0cdf73..a691c0f 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/
+++ b/hyracks/hyracks-dist/src/main/resources/bin/
@@ -6,6 +6,10 @@
         #Get IP Address
         IPADDR=`/sbin/ifconfig eth0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+    	if [ "$IPADDR" = "" ]
+        then
+		IPADDR=`/sbin/ifconfig em1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+        fi 
 	if [ "$IPADDR" = "" ]
 		IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/ b/hyracks/hyracks-dist/src/main/resources/bin/
index fe2551d..efb79ce 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/
+++ b/hyracks/hyracks-dist/src/main/resources/bin/
@@ -22,4 +22,4 @@
 #Launch hyracks cc script
 chmod -R 755 $HYRACKS_HOME
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
diff --git a/hyracks/hyracks-documentation/pom.xml b/hyracks/hyracks-documentation/pom.xml
index ed24adb..7aedd57 100644
--- a/hyracks/hyracks-documentation/pom.xml
+++ b/hyracks/hyracks-documentation/pom.xml
@@ -1,8 +1,6 @@
 <project xmlns="" xmlns:xsi="" xsi:schemaLocation="">
-  <groupId>edu.uci.ics.hyracks</groupId>
-  <version>0.2.3-SNAPSHOT</version>
diff --git a/hyracks/hyracks-examples/btree-example/btreeapp/pom.xml b/hyracks/hyracks-examples/btree-example/btreeapp/pom.xml
index 6350054..792cedd 100644
--- a/hyracks/hyracks-examples/btree-example/btreeapp/pom.xml
+++ b/hyracks/hyracks-examples/btree-example/btreeapp/pom.xml
@@ -2,7 +2,6 @@
-  <version>0.2.3-SNAPSHOT</version>
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml b/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
index dce275f..f941a5b 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
@@ -39,6 +39,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/pom.xml b/hyracks/hyracks-examples/btree-example/btreehelper/pom.xml
index d94feb7..eb651ce 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/pom.xml
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/pom.xml
@@ -43,6 +43,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
index 95cfaff..4cf9745 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatapp/pom.xml
@@ -2,7 +2,6 @@
-  <version>0.2.3-SNAPSHOT</version>
@@ -149,6 +148,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
index d260601..f52536c 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompatclient/pom.xml
@@ -2,7 +2,6 @@
-  <version>0.2.3-SNAPSHOT</version>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml
index f61b9e8..c397a72 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/hadoopcompathelper/pom.xml
@@ -2,7 +2,6 @@
-  <version>0.2.3-SNAPSHOT</version>
@@ -34,6 +33,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-examples/hadoop-compat-example/pom.xml b/hyracks/hyracks-examples/hadoop-compat-example/pom.xml
index a2cf5ae..16787ca 100644
--- a/hyracks/hyracks-examples/hadoop-compat-example/pom.xml
+++ b/hyracks/hyracks-examples/hadoop-compat-example/pom.xml
@@ -2,7 +2,6 @@
-  <version>0.2.3-SNAPSHOT</version>
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
index 0df38e7..5e7b5c9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -18,6 +18,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-examples/pom.xml b/hyracks/hyracks-examples/pom.xml
index 8ce8108..551e2be 100644
--- a/hyracks/hyracks-examples/pom.xml
+++ b/hyracks/hyracks-examples/pom.xml
@@ -1,8 +1,6 @@
 <project xmlns="" xmlns:xsi="" xsi:schemaLocation="">
-  <groupId>edu.uci.ics.hyracks</groupId>
-  <version>0.2.3-SNAPSHOT</version>
diff --git a/hyracks/hyracks-examples/text-example/pom.xml b/hyracks/hyracks-examples/text-example/pom.xml
index ba8649e..0476bb3 100644
--- a/hyracks/hyracks-examples/text-example/pom.xml
+++ b/hyracks/hyracks-examples/text-example/pom.xml
@@ -2,7 +2,6 @@
-  <version>0.2.3-SNAPSHOT</version>
diff --git a/hyracks/hyracks-examples/text-example/textapp/pom.xml b/hyracks/hyracks-examples/text-example/textapp/pom.xml
index 3834d08..945df0b 100644
--- a/hyracks/hyracks-examples/text-example/textapp/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textapp/pom.xml
@@ -2,7 +2,6 @@
-  <version>0.2.3-SNAPSHOT</version>
@@ -144,6 +143,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-examples/text-example/textclient/pom.xml b/hyracks/hyracks-examples/text-example/textclient/pom.xml
index af72b71..4aace73 100644
--- a/hyracks/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks/hyracks-examples/text-example/textclient/pom.xml
@@ -2,7 +2,6 @@
-  <version>0.2.3-SNAPSHOT</version>
@@ -35,6 +34,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-examples/text-example/texthelper/pom.xml b/hyracks/hyracks-examples/text-example/texthelper/pom.xml
index 5095db8..bcb280c 100644
--- a/hyracks/hyracks-examples/text-example/texthelper/pom.xml
+++ b/hyracks/hyracks-examples/text-example/texthelper/pom.xml
@@ -38,6 +38,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-examples/tpch-example/pom.xml b/hyracks/hyracks-examples/tpch-example/pom.xml
index b237c9b..9951792 100644
--- a/hyracks/hyracks-examples/tpch-example/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/pom.xml
@@ -2,7 +2,6 @@
-  <version>0.2.3-SNAPSHOT</version>
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index 0f8d8fc..4e0d9f0 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -31,6 +31,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-hadoop-compat/pom.xml b/hyracks/hyracks-hadoop-compat/pom.xml
index 9a907b8..87aaaa7 100644
--- a/hyracks/hyracks-hadoop-compat/pom.xml
+++ b/hyracks/hyracks-hadoop-compat/pom.xml
@@ -20,6 +20,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
index b33e8e2..9092655 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/pom.xml
@@ -20,6 +20,7 @@
+					<fork>true</fork>
@@ -63,6 +64,10 @@
+				<property>
+					<name>hadoop</name>
+					<value>1.0.4</value>
+				</property>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/
index a2b16c6..16ce76b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.20.2/src/main/java/edu/uci/ics/hyracks/hdfs/
@@ -1,7 +1,8 @@
 package edu.uci.ics.hyracks.hdfs;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -14,12 +15,25 @@
 public class ContextFactory {
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+    public TaskAttemptContext createContext(Configuration conf, TaskAttemptID tid) throws HyracksDataException {
         try {
-            return new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null, split);
+            return new Mapper().new Context(conf, tid, null, null, null, null, null);
         } catch (Exception e) {
             throw new HyracksDataException(e);
+    public TaskAttemptContext createContext(Configuration conf, int partition) throws HyracksDataException {
+        try {
+            TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
+            return new TaskAttemptContext(conf, tid);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+    public JobContext createJobContext(Configuration conf) {
+        return new JobContext(conf, new JobID("0", 0));
+    }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
index 07b244f..8b7ecf0 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/pom.xml
@@ -19,6 +19,7 @@
+					<fork>true</fork>
@@ -40,6 +41,10 @@
+				<property>
+					<name>hadoop</name>
+					<value>0.23.1</value>
+				</property>
@@ -77,6 +82,10 @@
+				<property>
+					<name>hadoop</name>
+					<value>0.23.6</value>
+				</property>
@@ -109,6 +118,86 @@
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>cdh-4.2</value>
+				</property>
+			</activation>
+			<id>cdh-4.2</id>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+					<version>2.0.0-cdh4.2.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+					<version>2.0.0-cdh4.2.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-hdfs</artifactId>
+					<version>2.0.0-cdh4.2.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>2.0.0-cdh4.2.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>cdh-4.1</value>
+				</property>
+			</activation>
+			<id>cdh-4.1</id>
+			<dependencies>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-common</artifactId>
+					<version>2.0.0-cdh4.1.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+					<version>2.0.0-cdh4.1.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-hdfs</artifactId>
+					<version>2.0.0-cdh4.1.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-minicluster</artifactId>
+					<version>2.0.0-cdh4.1.0</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
@@ -120,4 +209,11 @@
+	<repositories>
+		<repository>
+			<id>cloudera</id>
+			<url></url>
+		</repository>
+	</repositories>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/
index 60ae5d3..ddcce64 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-0.23.1/src/main/java/edu/uci/ics/hyracks/hdfs/
@@ -1,9 +1,12 @@
 package edu.uci.ics.hyracks.hdfs;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -13,12 +16,25 @@
 public class ContextFactory {
-    public TaskAttemptContext createContext(Configuration conf, InputSplit split) throws HyracksDataException {
+    public TaskAttemptContext createContext(Configuration conf, TaskAttemptID tid) throws HyracksDataException {
         try {
-            return new TaskAttemptContextImpl(conf, new TaskAttemptID());
+            return new TaskAttemptContextImpl(conf, tid);
         } catch (Exception e) {
             throw new HyracksDataException(e);
+    public TaskAttemptContext createContext(Configuration conf, int partition) throws HyracksDataException {
+        try {
+            TaskAttemptID tid = new TaskAttemptID("", 0, TaskType.REDUCE, partition, 0);
+            return new TaskAttemptContextImpl(conf, tid);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+    public JobContext createJobContext(Configuration conf) {
+        return new JobContextImpl(conf, new JobID("0", 0));
+    }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index 6557b08..a28c698a 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -20,6 +20,7 @@
+					<fork>true</fork>
@@ -75,6 +76,10 @@
+				<property>
+					<name>hadoop</name>
+					<value>1.0.4</value>
+				</property>
@@ -90,6 +95,10 @@
+				<property>
+					<name>hadoop</name>
+					<value>0.23.1</value>
+				</property>
@@ -105,6 +114,10 @@
+				<property>
+					<name>hadoop</name>
+					<value>0.23.6</value>
+				</property>
@@ -117,6 +130,44 @@
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>cdh-4.1</value>
+				</property>
+			</activation>
+			<id>cdh-4.1</id>
+			<dependencies>
+				<dependency>
+					<groupId>edu.uci.ics.hyracks</groupId>
+					<artifactId>hyracks-hdfs-0.23.1</artifactId>
+					<version>0.2.3-SNAPSHOT</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
+		<profile>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<property>
+					<name>hadoop</name>
+					<value>cdh-4.2</value>
+				</property>
+			</activation>
+			<id>cdh-4.2</id>
+			<dependencies>
+				<dependency>
+					<groupId>edu.uci.ics.hyracks</groupId>
+					<artifactId>hyracks-hdfs-0.23.1</artifactId>
+					<version>0.2.3-SNAPSHOT</version>
+					<type>jar</type>
+					<scope>compile</scope>
+				</dependency>
+			</dependencies>
+		</profile>
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/
index 5923e1e..5d35ec5 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/
@@ -29,12 +29,24 @@
 public interface IKeyValueParser<K, V> {
+     * Initialize the key value parser.
+     * 
+     * @param writer
+     *            The hyracks writer for outputting data.
+     * @throws HyracksDataException
+     */
+    public void open(IFrameWriter writer) throws HyracksDataException;
+    /**
      * Parse a key-value pair returned by HDFS record reader to a tuple.
      * when the parsers' internal buffer is full, it can flush the buffer to the writer
      * @param key
+     *            The key returned from Hadoop's InputReader.
      * @param value
+     *            The value returned from Hadoop's InputReader.
      * @param writer
+     *            The hyracks writer for outputting data.
      * @throws HyracksDataException
     public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException;
@@ -44,7 +56,8 @@
      * This method is called in the close() of HDFSReadOperatorDescriptor.
      * @param writer
+     *            The hyracks writer for outputting data.
      * @throws HyracksDataException
-    public void flush(IFrameWriter writer) throws HyracksDataException;
+    public void close(IFrameWriter writer) throws HyracksDataException;
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/
index 6e943ad..7d6f868 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/
@@ -18,6 +18,7 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
  * Users need to implement this interface to use the HDFSReadOperatorDescriptor.
@@ -36,6 +37,6 @@
      *            the IHyracksTaskContext
      * @return a key-value parser instance.
-    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx);
+    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException;
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/
index 25b9523..8e85627 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/
@@ -26,6 +26,15 @@
 public interface ITupleWriter {
+     * Initialize the the tuple writer.
+     * 
+     * @param output
+     *            The channel for output data.
+     * @throws HyracksDataException
+     */
+    public void open(DataOutput output) throws HyracksDataException;
+    /**
      * Write the tuple to the DataOutput.
      * @param output
@@ -36,4 +45,13 @@
     public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException;
+    /**
+     * Close the writer.
+     * 
+     * @param output
+     *            The channel for output data.
+     * @throws HyracksDataException
+     */
+    public void close(DataOutput output) throws HyracksDataException;
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/
index 839de8f..9a025c2 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/
@@ -17,14 +17,19 @@
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
  * Users need to implement this interface to use the HDFSWriteOperatorDescriptor.
 public interface ITupleWriterFactory extends Serializable {
+     * @param ctx
+     *            the IHyracksTaskContext
      * @return a tuple writer instance
-    public ITupleWriter getTupleWriter();
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException;
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/
index e924650..f49688b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/
@@ -102,6 +102,7 @@
                     JobConf conf = confFactory.getConf();
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
+          ;
                     InputFormat inputFormat = conf.getInputFormat();
                     for (int i = 0; i < inputSplits.length; i++) {
@@ -131,7 +132,7 @@
-                    parser.flush(writer);
+                    parser.close(writer);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/
index ff97a29..3ce6b2a 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/
@@ -89,10 +89,11 @@
                 String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
                 String fileName = outputDirPath + File.separator + "part-" + partition;
-                tupleWriter = tupleWriterFactory.getTupleWriter();
+                tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
                 try {
                     FileSystem dfs = FileSystem.get(conf);
                     dos = dfs.create(new Path(fileName), true);
+          ;
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
@@ -116,6 +117,7 @@
             public void close() throws HyracksDataException {
                 try {
+                    tupleWriter.close(dos);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/
index c691f5d..9574bb4 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/
@@ -43,6 +43,11 @@
         return new IKeyValueParser<LongWritable, Text>() {
+            public void open(IFrameWriter writer) {
+            }
+            @Override
             public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
                 tb.addField(value.getBytes(), 0, value.getLength());
@@ -56,7 +61,7 @@
-            public void flush(IFrameWriter writer) throws HyracksDataException {
+            public void close(IFrameWriter writer) throws HyracksDataException {
                 FrameUtils.flushFrame(buffer, writer);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/
index d26721d..0da14e5 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/
@@ -17,6 +17,7 @@
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
@@ -26,9 +27,14 @@
     private static final long serialVersionUID = 1L;
-    public ITupleWriter getTupleWriter() {
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) {
         return new ITupleWriter() {
-            byte newLine = "\n".getBytes()[0];
+            private byte newLine = "\n".getBytes()[0];
+            @Override
+            public void open(DataOutput output) {
+            }
             public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
@@ -43,6 +49,11 @@
+            @Override
+            public void close(DataOutput output) {
+            }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/
index e7309d4..3f287cf 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/scheduler/
@@ -17,6 +17,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -49,7 +50,7 @@
     private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
-     * The constructor of the scheduler
+     * The constructor of the scheduler.
      * @param ncNameToNcInfos
      * @throws HyracksException
@@ -64,12 +65,20 @@
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos the mapping from nc names to nc infos
+     * @throws HyracksException
+     */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-     * Set location constraints for a file scan operator with a list of file splits
+     * Set location constraints for a file scan operator with a list of file splits.
+     * It guarantees the maximum slots a machine can is at most one more than the minimum slots a
+     * machine can get.
      * @throws HyracksDataException
@@ -77,93 +86,38 @@
         int[] capacity = new int[NCs.length];
         Arrays.fill(capacity, 0);
         String[] locations = new String[splits.length];
-        int slots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
+        /**
+         * upper bound number of slots that a machine can get
+         */
+        int upperBoundSlots = splits.length % capacity.length == 0 ? (splits.length / capacity.length) : (splits.length
                 / capacity.length + 1);
+        /**
+         * lower bound number of slots that a machine can get
+         */
+        int lowerBoundSlots = splits.length % capacity.length == 0 ? upperBoundSlots : upperBoundSlots - 1;
         try {
             Random random = new Random(System.currentTimeMillis());
             boolean scheduled[] = new boolean[splits.length];
             Arrays.fill(scheduled, false);
-            for (int i = 0; i < splits.length; i++) {
-                /**
-                 * get the location of all the splits
-                 */
-                String[] loc = splits[i].getLocations();
-                if (loc.length > 0) {
-                    for (int j = 0; j < loc.length; j++) {
-                        /**
-                         * get all the IP addresses from the name
-                         */
-                        InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
-                        /**
-                         * iterate overa all ips
-                         */
-                        for (InetAddress ip : allIps) {
-                            /**
-                             * if the node controller exists
-                             */
-                            if (ipToNcMapping.get(ip.getHostAddress()) != null) {
-                                /**
-                                 * set the ncs
-                                 */
-                                List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
-                                int arrayPos = random.nextInt(dataLocations.size());
-                                String nc = dataLocations.get(arrayPos);
-                                int pos = ncNameToIndex.get(nc);
-                                /**
-                                 * check if the node is already full
-                                 */
-                                if (capacity[pos] < slots) {
-                                    locations[i] = nc;
-                                    capacity[pos]++;
-                                    scheduled[i] = true;
-                                }
-                            }
-                        }
-                        /**
-                         * break the loop for data-locations if the schedule has already been found
-                         */
-                        if (scheduled[i] == true) {
-                            break;
-                        }
-                    }
-                }
-            }
+            /**
+             * push data-local lower-bounds slots to each machine
+             */
+            scheduleLocalSlots(splits, capacity, locations, lowerBoundSlots, random, scheduled);
+            /**
+             * push data-local upper-bounds slots to each machine
+             */
+            scheduleLocalSlots(splits, capacity, locations, upperBoundSlots, random, scheduled);
-             * find the lowest index the current available NCs
+             * push non-data-local lower-bounds slots to each machine
-            int currentAvailableNC = 0;
-            for (int i = 0; i < capacity.length; i++) {
-                if (capacity[i] < slots) {
-                    currentAvailableNC = i;
-                    break;
-                }
-            }
+            scheduleNoLocalSlots(splits, capacity, locations, lowerBoundSlots, scheduled);
-             * schedule no-local file reads
+             * push non-data-local upper-bounds slots to each machine
-            for (int i = 0; i < splits.length; i++) {
-                // if there is no data-local NC choice, choose a random one
-                if (!scheduled[i]) {
-                    locations[i] = NCs[currentAvailableNC];
-                    capacity[currentAvailableNC]++;
-                    scheduled[i] = true;
-                    /**
-                     * move the available NC cursor to the next one
-                     */
-                    for (int j = currentAvailableNC; j < capacity.length; j++) {
-                        if (capacity[j] < slots) {
-                            currentAvailableNC = j;
-                            break;
-                        }
-                    }
-                }
-            }
+            scheduleNoLocalSlots(splits, capacity, locations, upperBoundSlots, scheduled);
             return locations;
         } catch (IOException e) {
             throw new HyracksException(e);
@@ -171,6 +125,124 @@
+     * Schedule non-local slots to each machine
+     * 
+     * @param splits
+     *            The HDFS file splits.
+     * @param capacity
+     *            The current capacity of each machine.
+     * @param locations
+     *            The result schedule.
+     * @param slots
+     *            The maximum slots of each machine.
+     * @param scheduled
+     *            Indicate which slot is scheduled.
+     */
+    private void scheduleNoLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots,
+            boolean[] scheduled) {
+        /**
+         * find the lowest index the current available NCs
+         */
+        int currentAvailableNC = 0;
+        for (int i = 0; i < capacity.length; i++) {
+            if (capacity[i] < slots) {
+                currentAvailableNC = i;
+                break;
+            }
+        }
+        /**
+         * schedule no-local file reads
+         */
+        for (int i = 0; i < splits.length; i++) {
+            // if there is no data-local NC choice, choose a random one
+            if (!scheduled[i]) {
+                locations[i] = NCs[currentAvailableNC];
+                capacity[currentAvailableNC]++;
+                scheduled[i] = true;
+                /**
+                 * move the available NC cursor to the next one
+                 */
+                for (int j = currentAvailableNC; j < capacity.length; j++) {
+                    if (capacity[j] < slots) {
+                        currentAvailableNC = j;
+                        break;
+                    }
+                }
+            }
+        }
+    }
+    /**
+     * Schedule data-local slots to each machine.
+     * 
+     * @param splits
+     *            The HDFS file splits.
+     * @param capacity
+     *            The current capacity of each machine.
+     * @param locations
+     *            The result schedule.
+     * @param slots
+     *            The maximum slots of each machine.
+     * @param random
+     *            The random generator.
+     * @param scheduled
+     *            Indicate which slot is scheduled.
+     * @throws IOException
+     * @throws UnknownHostException
+     */
+    private void scheduleLocalSlots(InputSplit[] splits, int[] capacity, String[] locations, int slots, Random random,
+            boolean[] scheduled) throws IOException, UnknownHostException {
+        for (int i = 0; i < splits.length; i++) {
+            /**
+             * get the location of all the splits
+             */
+            String[] loc = splits[i].getLocations();
+            if (loc.length > 0) {
+                for (int j = 0; j < loc.length; j++) {
+                    /**
+                     * get all the IP addresses from the name
+                     */
+                    InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
+                    /**
+                     * iterate overa all ips
+                     */
+                    for (InetAddress ip : allIps) {
+                        /**
+                         * if the node controller exists
+                         */
+                        if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+                            /**
+                             * set the ncs
+                             */
+                            List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
+                            int arrayPos = random.nextInt(dataLocations.size());
+                            String nc = dataLocations.get(arrayPos);
+                            int pos = ncNameToIndex.get(nc);
+                            /**
+                             * check if the node is already full
+                             */
+                            if (capacity[pos] < slots) {
+                                locations[i] = nc;
+                                capacity[pos]++;
+                                scheduled[i] = true;
+                            }
+                        }
+                    }
+                    /**
+                     * break the loop for data-locations if the schedule has already been found
+                     */
+                    if (scheduled[i] == true) {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+    /**
      * Load the IP-address-to-NC map from the NCNameToNCInfoMap
      * @param ncNameToNcInfos
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/
index 90f5603..9e9abdf 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/
@@ -139,8 +139,7 @@
                              * read the split
-                            TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(),
-                                    inputSplits.get(i));
+                            TaskAttemptContext context = ctxFactory.createContext(job.getConfiguration(), i);
                             RecordReader reader = inputFormat.createRecordReader(inputSplits.get(i), context);
                             reader.initialize(inputSplits.get(i), context);
                             while (reader.nextKeyValue() == true) {
@@ -148,7 +147,7 @@
-                    parser.flush(writer);
+                    parser.close(writer);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/
index 390a7b5..c1c227c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/
@@ -39,8 +39,8 @@
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
- * The HDFS file write operator using the Hadoop new API.
- * To use this operator, a user need to provide an ITupleWriterFactory.
+ * The HDFS file write operator using the Hadoop new API. To use this operator,
+ * a user need to provide an ITupleWriterFactory.
 public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
@@ -88,10 +88,11 @@
                 String outputPath = FileOutputFormat.getOutputPath(conf).toString();
                 String fileName = outputPath + File.separator + "part-" + partition;
-                tupleWriter = tupleWriterFactory.getTupleWriter();
+                tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
                 try {
                     FileSystem dfs = FileSystem.get(conf.getConfiguration());
                     dos = dfs.create(new Path(fileName), true);
+          ;
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
@@ -115,6 +116,7 @@
             public void close() throws HyracksDataException {
                 try {
+                    tupleWriter.close(dos);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/
index 3445d68..cb97ca1 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/
@@ -15,18 +15,11 @@
 package edu.uci.ics.hyracks.hdfs2.scheduler;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import org.apache.hadoop.mapreduce.InputSplit;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -35,16 +28,10 @@
  * The scheduler conduct data-local scheduling for data reading on HDFS.
  * This class works for Hadoop new API.
 public class Scheduler {
-    /** a list of NCs */
-    private String[] NCs;
-    /** a map from ip to NCs */
-    private Map<String, List<String>> ipToNcMapping = new HashMap<String, List<String>>();
-    /** a map from the NC name to the index */
-    private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>();
+    private edu.uci.ics.hyracks.hdfs.scheduler.Scheduler scheduler;
      * The constructor of the scheduler
@@ -53,17 +40,18 @@
      * @throws HyracksException
     public Scheduler(String ipAddress, int port) throws HyracksException {
-        try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
-            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
-            loadIPAddressToNCMap(ncNameToNcInfos);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
+        scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ipAddress, port);
+    /**
+     * The constructor of the scheduler.
+     * 
+     * @param ncNameToNcInfos
+     *            the mapping from nc names to nc infos
+     * @throws HyracksException
+     */
     public Scheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-        loadIPAddressToNCMap(ncNameToNcInfos);
+        scheduler = new edu.uci.ics.hyracks.hdfs.scheduler.Scheduler(ncNameToNcInfos);
@@ -72,135 +60,11 @@
      * @throws HyracksDataException
     public String[] getLocationConstraints(List<InputSplit> splits) throws HyracksException {
-        int[] capacity = new int[NCs.length];
-        Arrays.fill(capacity, 0);
-        String[] locations = new String[splits.size()];
-        int slots = splits.size() % capacity.length == 0 ? (splits.size() / capacity.length) : (splits.size()
-                / capacity.length + 1);
         try {
-            Random random = new Random(System.currentTimeMillis());
-            boolean scheduled[] = new boolean[splits.size()];
-            Arrays.fill(scheduled, false);
-            for (int i = 0; i < splits.size(); i++) {
-                /**
-                 * get the location of all the splits
-                 */
-                String[] loc = splits.get(i).getLocations();
-                if (loc.length > 0) {
-                    for (int j = 0; j < loc.length; j++) {
-                        /**
-                         * get all the IP addresses from the name
-                         */
-                        InetAddress[] allIps = InetAddress.getAllByName(loc[j]);
-                        /**
-                         * iterate overa all ips
-                         */
-                        for (InetAddress ip : allIps) {
-                            /**
-                             * if the node controller exists
-                             */
-                            if (ipToNcMapping.get(ip.getHostAddress()) != null) {
-                                /**
-                                 * set the ncs
-                                 */
-                                List<String> dataLocations = ipToNcMapping.get(ip.getHostAddress());
-                                int arrayPos = random.nextInt(dataLocations.size());
-                                String nc = dataLocations.get(arrayPos);
-                                int pos = ncNameToIndex.get(nc);
-                                /**
-                                 * check if the node is already full
-                                 */
-                                if (capacity[pos] < slots) {
-                                    locations[i] = nc;
-                                    capacity[pos]++;
-                                    scheduled[i] = true;
-                                }
-                            }
-                        }
-                        /**
-                         * break the loop for data-locations if the schedule has already been found
-                         */
-                        if (scheduled[i] == true) {
-                            break;
-                        }
-                    }
-                }
-            }
-            /**
-             * find the lowest index the current available NCs
-             */
-            int currentAvailableNC = 0;
-            for (int i = 0; i < capacity.length; i++) {
-                if (capacity[i] < slots) {
-                    currentAvailableNC = i;
-                    break;
-                }
-            }
-            /**
-             * schedule no-local file reads
-             */
-            for (int i = 0; i < splits.size(); i++) {
-                // if there is no data-local NC choice, choose a random one
-                if (!scheduled[i]) {
-                    locations[i] = NCs[currentAvailableNC];
-                    capacity[currentAvailableNC]++;
-                    scheduled[i] = true;
-                    /**
-                     * move the available NC cursor to the next one
-                     */
-                    for (int j = currentAvailableNC; j < capacity.length; j++) {
-                        if (capacity[j] < slots) {
-                            currentAvailableNC = j;
-                            break;
-                        }
-                    }
-                }
-            }
-            return locations;
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-    /**
-     * Load the IP-address-to-NC map from the NCNameToNCInfoMap
-     * 
-     * @param ncNameToNcInfos
-     * @throws HyracksException
-     */
-    private void loadIPAddressToNCMap(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException {
-        try {
-            NCs = new String[ncNameToNcInfos.size()];
-            int i = 0;
-            /**
-             * build the IP address to NC map
-             */
-            for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
-                String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
-                        .getHostAddress();
-                List<String> matchedNCs = ipToNcMapping.get(ipAddr);
-                if (matchedNCs == null) {
-                    matchedNCs = new ArrayList<String>();
-                    ipToNcMapping.put(ipAddr, matchedNCs);
-                }
-                matchedNCs.add(entry.getKey());
-                NCs[i] = entry.getKey();
-                i++;
-            }
-            /**
-             * set up the NC name to index mapping
-             */
-            for (i = 0; i < NCs.length; i++) {
-                ncNameToIndex.put(NCs[i], i);
-            }
+            org.apache.hadoop.mapred.InputSplit[] inputSplits = new org.apache.hadoop.mapred.InputSplit[splits.size()];
+            for (int i = 0; i < inputSplits.length; i++)
+                inputSplits[i] = new WrappedFileSplit(splits.get(i).getLocations(), splits.get(i).getLength());
+            return scheduler.getLocationConstraints(inputSplits);
         } catch (Exception e) {
             throw new HyracksException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/
new file mode 100644
index 0000000..1deb469
--- /dev/null
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/scheduler/
@@ -0,0 +1,51 @@
+package edu.uci.ics.hyracks.hdfs2.scheduler;
+import org.apache.hadoop.mapred.InputSplit;
+ * The wrapped implementation of InputSplit, for the new API scheduler
+ * to reuse the old API scheduler
+ */
+public class WrappedFileSplit implements InputSplit {
+    private String[] locations;
+    private long length;
+    public WrappedFileSplit(String[] locations, long length) {
+        this.locations = locations;
+        this.length = length;
+    }
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        int len = input.readInt();
+        locations = new String[len];
+        for (int i = 0; i < len; i++)
+            locations[i] = input.readUTF();
+        length = input.readLong();
+    }
+    @Override
+    public void write(DataOutput output) throws IOException {
+        output.write(locations.length);
+        for (int i = 0; i < locations.length; i++)
+            output.writeUTF(locations[i]);
+        output.writeLong(length);
+    }
+    @Override
+    public long getLength() throws IOException {
+        return length;
+    }
+    @Override
+    public String[] getLocations() throws IOException {
+        return locations;
+    }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/
index 0087307..eccd5ee 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs/scheduler/
@@ -223,8 +223,8 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc3", "nc4", "nc2",
-                "nc4", "nc5", "nc5" };
+        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
+                "nc5", "nc5", "nc6" };
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/
index 79f0874..df3feb2 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/edu/uci/ics/hyracks/hdfs2/scheduler/
@@ -34,7 +34,6 @@
  * Test case for the new HDFS API scheduler
- * 
 public class SchedulerTest extends TestCase {
@@ -228,8 +227,8 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
-        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc3", "nc4", "nc2",
-                "nc4", "nc5", "nc5" };
+        String[] expectedResults = new String[] { "nc1", "nc3", "nc4", "nc2", "nc3", "nc2", "nc1", "nc4", "nc5", "nc6",
+                "nc5", "nc5", "nc6" };
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
diff --git a/hyracks/hyracks-ipc/pom.xml b/hyracks/hyracks-ipc/pom.xml
index a5e3662..6f5e09f 100644
--- a/hyracks/hyracks-ipc/pom.xml
+++ b/hyracks/hyracks-ipc/pom.xml
@@ -17,6 +17,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml b/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
index e95f7f0..a8fc29e 100644
--- a/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
+++ b/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
@@ -19,6 +19,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-net/pom.xml b/hyracks/hyracks-net/pom.xml
index 5eb88b5..fb486df 100644
--- a/hyracks/hyracks-net/pom.xml
+++ b/hyracks/hyracks-net/pom.xml
@@ -17,6 +17,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-server/pom.xml b/hyracks/hyracks-server/pom.xml
index f514820..e0fc40a 100644
--- a/hyracks/hyracks-server/pom.xml
+++ b/hyracks/hyracks-server/pom.xml
@@ -17,6 +17,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks/hyracks-storage-am-btree/pom.xml
index ccb3b41..f251d51 100644
--- a/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks/hyracks-storage-am-btree/pom.xml
@@ -20,6 +20,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-storage-am-common/pom.xml b/hyracks/hyracks-storage-am-common/pom.xml
index a90e91e..dbc4f41d 100644
--- a/hyracks/hyracks-storage-am-common/pom.xml
+++ b/hyracks/hyracks-storage-am-common/pom.xml
@@ -20,6 +20,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-storage-am-invertedindex/pom.xml b/hyracks/hyracks-storage-am-invertedindex/pom.xml
index 2ba4980..5fe2d96 100644
--- a/hyracks/hyracks-storage-am-invertedindex/pom.xml
+++ b/hyracks/hyracks-storage-am-invertedindex/pom.xml
@@ -20,6 +20,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-storage-am-rtree/pom.xml b/hyracks/hyracks-storage-am-rtree/pom.xml
index ade69ef..6c2d734 100644
--- a/hyracks/hyracks-storage-am-rtree/pom.xml
+++ b/hyracks/hyracks-storage-am-rtree/pom.xml
@@ -20,6 +20,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-storage-common/pom.xml b/hyracks/hyracks-storage-common/pom.xml
index ee507cc..3360097 100644
--- a/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks/hyracks-storage-common/pom.xml
@@ -20,6 +20,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-test-support/pom.xml b/hyracks/hyracks-test-support/pom.xml
index 2f10556..89233c9 100644
--- a/hyracks/hyracks-test-support/pom.xml
+++ b/hyracks/hyracks-test-support/pom.xml
@@ -20,6 +20,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-btree-test/pom.xml b/hyracks/hyracks-tests/hyracks-storage-am-btree-test/pom.xml
index ebd8bcc..d0bb883 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-btree-test/pom.xml
+++ b/hyracks/hyracks-tests/hyracks-storage-am-btree-test/pom.xml
@@ -20,6 +20,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml b/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
index bba6a0e..59c8c46 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
+++ b/hyracks/hyracks-tests/hyracks-storage-am-invertedindex-test/pom.xml
@@ -21,6 +21,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-rtree-test/pom.xml b/hyracks/hyracks-tests/hyracks-storage-am-rtree-test/pom.xml
index 72d9a78..7b1a3f3 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-rtree-test/pom.xml
+++ b/hyracks/hyracks-tests/hyracks-storage-am-rtree-test/pom.xml
@@ -20,6 +20,7 @@
+          <fork>true</fork>
diff --git a/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml b/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
index 6063de5..8e429f9 100644
--- a/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
+++ b/hyracks/hyracks-tests/hyracks-storage-common-test/pom.xml
@@ -20,6 +20,7 @@
+          <fork>true</fork>