rewrite hyracks activity cluster graphs to eliminate one-to-one connectors

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_job_rewriter@2964 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index f36b7b3..91a39d7 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -17,11 +17,13 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.rewriter.ActivityClusterGraphRewriter;
 
 public class JobSpecificationActivityClusterGraphGeneratorFactory implements IActivityClusterGraphGeneratorFactory {
     private static final long serialVersionUID = 1L;
 
     private final JobSpecification spec;
+    private final ActivityClusterGraphRewriter rewriter = new ActivityClusterGraphRewriter();
 
     public JobSpecificationActivityClusterGraphGeneratorFactory(JobSpecification jobSpec) {
         this.spec = jobSpec;
@@ -78,6 +80,7 @@
         return new IActivityClusterGraphGenerator() {
             @Override
             public ActivityClusterGraph initialize() {
+                rewriter.rewrite(acg);
                 return acg;
             }
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
index 6698ff7..9fb2b08 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
@@ -33,7 +33,7 @@
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 
-public final class ActivityCluster implements Serializable {
+public class ActivityCluster implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final ActivityClusterGraph acg;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
new file mode 100644
index 0000000..cb2acbc
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.rewriter;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.rewriter.runtime.SuperActivity;
+
+public class ActivityClusterGraphRewriter implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private Map<IActivity, SuperActivity> invertedActivitySuperActivityMap = new HashMap<IActivity, SuperActivity>();
+
+    public void rewrite(ActivityClusterGraph acg) {
+        acg.getActivityMap().clear();
+        acg.getConnectorMap().clear();
+        for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+            rewriteIntraActivityCluster(entry.getValue());
+        }
+        for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+            rewriteInterActivityCluster(entry.getValue());
+        }
+        invertedActivitySuperActivityMap.clear();
+    }
+
+    /**
+     * rewrite the blocking relationship among activity cluster
+     * 
+     * @param ac
+     *            the activity cluster to be rewritten
+     */
+    private void rewriteInterActivityCluster(ActivityCluster ac) {
+        Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = ac.getBlocked2BlockerMap();
+        Map<ActivityId, ActivityId> invertedAid2SuperAidMap = new HashMap<ActivityId, ActivityId>();
+        for (Entry<IActivity, SuperActivity> entry : invertedActivitySuperActivityMap.entrySet()) {
+            invertedAid2SuperAidMap.put(entry.getKey().getActivityId(), entry.getValue().getActivityId());
+        }
+        Map<ActivityId, Set<ActivityId>> replacedBlocked2BlockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+        for (Entry<ActivityId, Set<ActivityId>> entry : blocked2BlockerMap.entrySet()) {
+            ActivityId blocked = entry.getKey();
+            ActivityId replacedBlocked = invertedAid2SuperAidMap.get(blocked);
+            Set<ActivityId> blockers = entry.getValue();
+            Set<ActivityId> replacedBlockers = null;
+            if (blockers != null) {
+                replacedBlockers = new HashSet<ActivityId>();
+                for (ActivityId blocker : blockers) {
+                    replacedBlockers.add(invertedAid2SuperAidMap.get(blocker));
+                    ac.getDependencies().add(
+                            ac.getActivityClusterGraph().getActivityMap().get(invertedAid2SuperAidMap.get(blocker)));
+                }
+            }
+            replacedBlocked2BlockerMap.put(replacedBlocked, replacedBlockers);
+        }
+        blocked2BlockerMap.clear();
+        blocked2BlockerMap.putAll(replacedBlocked2BlockerMap);
+    }
+
+    /**
+     * rewrite an activity cluster internally
+     * 
+     * @param ac
+     *            the activity cluster to be rewritten
+     *            the activity cluster to be rewritten
+     */
+    private void rewriteIntraActivityCluster(ActivityCluster ac) {
+        Map<ActivityId, IActivity> activities = ac.getActivityMap();
+        Map<ActivityId, List<IConnectorDescriptor>> activityInputMap = ac.getActivityInputMap();
+        Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap = ac.getActivityOutputMap();
+        Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap = ac
+                .getConnectorActivityMap();
+        ActivityClusterGraph acg = ac.getActivityClusterGraph();
+        Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
+        Map<ActivityId, SuperActivity> superActivities = new HashMap<ActivityId, SuperActivity>();
+        Map<ActivityId, Queue<IActivity>> toBeExpendedMap = new HashMap<ActivityId, Queue<IActivity>>();
+
+        /**
+         * Build the initial super activities
+         */
+        for (Entry<ActivityId, IActivity> entry : activities.entrySet()) {
+            ActivityId activityId = entry.getKey();
+            IActivity activity = entry.getValue();
+            if (activityInputMap.get(activityId) == null) {
+                startActivities.put(activityId, activity);
+                /**
+                 * use the start activity's id as the id of the super activity
+                 */
+                createNewSuperActivity(ac, superActivities, toBeExpendedMap, invertedActivitySuperActivityMap,
+                        activityId, activity);
+            }
+        }
+
+        /**
+         * expend one-to-one connected activity cluster by the BFS order
+         * after the while-loop, the original activities are partitioned
+         * into equivalent classes, one-per-super-activity
+         */
+        Map<ActivityId, SuperActivity> clonedSuperActivities = new HashMap<ActivityId, SuperActivity>();
+        while (toBeExpendedMap.size() > 0) {
+            clonedSuperActivities.clear();
+            clonedSuperActivities.putAll(superActivities);
+            for (Entry<ActivityId, SuperActivity> entry : clonedSuperActivities.entrySet()) {
+                ActivityId superActivityId = entry.getKey();
+                SuperActivity superActivity = entry.getValue();
+
+                /**
+                 * for the case where the super activity has already been swallowed
+                 */
+                if (superActivities.get(superActivityId) == null) {
+                    continue;
+                }
+
+                /**
+                 * expend the super activity
+                 */
+                Queue<IActivity> toBeExpended = toBeExpendedMap.get(superActivityId);
+                if (toBeExpended == null) {
+                    /**
+                     * Nothing to expand
+                     */
+                    continue;
+                }
+                IActivity expendingActivity = toBeExpended.poll();
+                List<IConnectorDescriptor> outputConnectors = activityOutputMap.get(expendingActivity.getActivityId());
+                if (outputConnectors != null) {
+                    for (IConnectorDescriptor outputConn : outputConnectors) {
+                        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints = connectorActivityMap
+                                .get(outputConn.getConnectorId());
+                        IActivity newActivity = endPoints.getRight().getLeft();
+                        SuperActivity existingSuperActivity = invertedActivitySuperActivityMap.get(newActivity);
+                        if (outputConn.getClass().getName().contains("OneToOneConnectorDescriptor")) {
+                            /**
+                             * expend the super activity cluster on an one-to-one out-bound connection
+                             */
+                            if (existingSuperActivity == null) {
+                                superActivity.addActivity(newActivity);
+                                toBeExpended.add(newActivity);
+                                invertedActivitySuperActivityMap.put(newActivity, superActivity);
+                            } else {
+                                /**
+                                 * the two activities already in the same super activity
+                                 */
+                                if (existingSuperActivity == superActivity) {
+                                    continue;
+                                }
+                                /**
+                                 * swallow an existing super activity
+                                 */
+                                swallowExistingSuperActivity(superActivities, toBeExpendedMap,
+                                        invertedActivitySuperActivityMap, superActivity, superActivityId,
+                                        existingSuperActivity);
+                            }
+                        } else {
+                            if (existingSuperActivity == null) {
+                                /**
+                                 * create new activity
+                                 */
+                                createNewSuperActivity(ac, superActivities, toBeExpendedMap,
+                                        invertedActivitySuperActivityMap, newActivity.getActivityId(), newActivity);
+                            }
+                        }
+                    }
+                }
+
+                /**
+                 * remove the to-be-expended queue if it is empty
+                 */
+                if (toBeExpended.size() == 0) {
+                    toBeExpendedMap.remove(superActivityId);
+                }
+            }
+        }
+
+        Map<ConnectorDescriptorId, IConnectorDescriptor> connMap = ac.getConnectorMap();
+        Map<ConnectorDescriptorId, RecordDescriptor> connRecordDesc = ac.getConnectorRecordDescriptorMap();
+        Map<SuperActivity, Integer> superActivityProducerPort = new HashMap<SuperActivity, Integer>();
+        Map<SuperActivity, Integer> superActivityConsumerPort = new HashMap<SuperActivity, Integer>();
+        for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
+            superActivityProducerPort.put(entry.getValue(), 0);
+            superActivityConsumerPort.put(entry.getValue(), 0);
+        }
+
+        /**
+         * create a new activity cluster to replace the old activity cluster
+         */
+        ActivityCluster newActivityCluster = new ActivityCluster(acg, ac.getId());
+        newActivityCluster.setConnectorPolicyAssignmentPolicy(ac.getConnectorPolicyAssignmentPolicy());
+        for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
+            newActivityCluster.addActivity(entry.getValue());
+            acg.getActivityMap().put(entry.getKey(), newActivityCluster);
+        }
+
+        /**
+         * Setup connectors: either inside a super activity or among super activities
+         */
+        for (Entry<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> entry : connectorActivityMap
+                .entrySet()) {
+            ConnectorDescriptorId connectorId = entry.getKey();
+            Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints = entry.getValue();
+            IActivity producerActivity = endPoints.getLeft().getLeft();
+            IActivity consumerActivity = endPoints.getRight().getLeft();
+            int producerPort = endPoints.getLeft().getRight();
+            int consumerPort = endPoints.getRight().getRight();
+            RecordDescriptor recordDescriptor = connRecordDesc.get(connectorId);
+            IConnectorDescriptor conn = connMap.get(connectorId);
+            if (conn.getClass().getName().contains("OneToOneConnectorDescriptor")) {
+                /**
+                 * connection edge between inner activities
+                 */
+                SuperActivity residingSuperActivity = invertedActivitySuperActivityMap.get(producerActivity);
+                residingSuperActivity.connect(conn, producerActivity, producerPort, consumerActivity, consumerPort,
+                        recordDescriptor);
+            } else {
+                /**
+                 * connection edge between super activities
+                 */
+                SuperActivity producerSuperActivity = invertedActivitySuperActivityMap.get(producerActivity);
+                SuperActivity consumerSuperActivity = invertedActivitySuperActivityMap.get(consumerActivity);
+                int producerSAPort = superActivityProducerPort.get(producerSuperActivity);
+                int consumerSAPort = superActivityConsumerPort.get(consumerSuperActivity);
+                newActivityCluster.connect(conn, producerSuperActivity, producerSAPort, consumerSuperActivity,
+                        consumerSAPort, recordDescriptor);
+
+                /**
+                 * bridge the port
+                 */
+                producerSuperActivity.setClusterOutputIndex(producerSAPort, producerActivity.getActivityId(),
+                        producerPort);
+                consumerSuperActivity.setClusterInputIndex(consumerSAPort, consumerActivity.getActivityId(),
+                        consumerPort);
+                acg.getConnectorMap().put(connectorId, newActivityCluster);
+            }
+        }
+
+        /**
+         * Set up the roots of the new activity cluster
+         */
+        for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
+            List<IConnectorDescriptor> connIds = newActivityCluster.getActivityOutputMap().get(entry.getKey());
+            if (connIds == null || connIds.size() == 0) {
+                newActivityCluster.addRoot(entry.getValue());
+            }
+        }
+
+        /**
+         * set up the blocked2Blocker mapping, which will be updated in the rewriteInterActivityCluster call
+         */
+        newActivityCluster.getBlocked2BlockerMap().putAll(ac.getBlocked2BlockerMap());
+
+        /**
+         * replace the old activity cluster with the new activity cluster
+         */
+        acg.getActivityClusterMap().put(ac.getId(), newActivityCluster);
+    }
+
+    /**
+     * Create a new super activity
+     * 
+     * @param acg
+     * @param superActivities
+     * @param toBeExpendedMap
+     * @param invertedActivitySuperActivityMap
+     * @param activityId
+     * @param activity
+     */
+    private void createNewSuperActivity(ActivityCluster acg, Map<ActivityId, SuperActivity> superActivities,
+            Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
+            Map<IActivity, SuperActivity> invertedActivitySuperActivityMap, ActivityId activityId, IActivity activity) {
+        SuperActivity superActivity = new SuperActivity(acg.getActivityClusterGraph(), acg.getId(), activityId);
+        superActivities.put(activityId, superActivity);
+        superActivity.addActivity(activity);
+        Queue<IActivity> toBeExpended = new LinkedList<IActivity>();
+        toBeExpended.add(activity);
+        toBeExpendedMap.put(activityId, toBeExpended);
+        invertedActivitySuperActivityMap.put(activity, superActivity);
+    }
+
+    /**
+     * One super activity swallows another existing super activity
+     * 
+     * @param superActivities
+     * @param toBeExpendedMap
+     * @param invertedActivitySuperActivityMap
+     * @param superActivity
+     * @param superActivityId
+     * @param existingSuperActivity
+     */
+    private void swallowExistingSuperActivity(Map<ActivityId, SuperActivity> superActivities,
+            Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
+            Map<IActivity, SuperActivity> invertedActivitySuperActivityMap, SuperActivity superActivity,
+            ActivityId superActivityId, SuperActivity existingSuperActivity) {
+        ActivityId existingSuperActivityId = existingSuperActivity.getActivityId();
+        superActivities.remove(existingSuperActivityId);
+        for (Entry<ActivityId, IActivity> existingEntry : existingSuperActivity.getActivityMap().entrySet()) {
+            IActivity existingActivity = existingEntry.getValue();
+            superActivity.addActivity(existingActivity);
+            invertedActivitySuperActivityMap.put(existingActivity, superActivity);
+        }
+        Queue<IActivity> tbeQueue = toBeExpendedMap.get(superActivityId);
+        Queue<IActivity> existingTbeQueque = toBeExpendedMap.remove(existingSuperActivityId);
+        tbeQueue.addAll(existingTbeQueque);
+    }
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java
new file mode 100644
index 0000000..2538bb9
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.rewriter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+
+/**
+ * All the connectors in an OneToOneConnectedCluster are OneToOneConnectorDescriptors.
+ * 
+ * @author yingyib
+ */
+public class OneToOneConnectedActivityCluster extends ActivityCluster {
+
+    private static final long serialVersionUID = 1L;
+    protected final Map<Integer, Pair<ActivityId, Integer>> clusterInputIndexMap = new HashMap<Integer, Pair<ActivityId, Integer>>();
+    protected final Map<Integer, Pair<ActivityId, Integer>> clusterOutputIndexMap = new HashMap<Integer, Pair<ActivityId, Integer>>();
+    protected final Map<Pair<ActivityId, Integer>, Integer> invertedClusterOutputIndexMap = new HashMap<Pair<ActivityId, Integer>, Integer>();
+    protected final Map<Pair<ActivityId, Integer>, Integer> invertedClusterInputIndexMap = new HashMap<Pair<ActivityId, Integer>, Integer>();
+
+    public OneToOneConnectedActivityCluster(ActivityClusterGraph acg, ActivityClusterId id) {
+        super(acg, id);
+    }
+
+    public void setClusterOutputIndex(int clusterOutputIndex, ActivityId activityId, int activityOutputIndex) {
+        clusterOutputIndexMap.put(clusterOutputIndex, Pair.of(activityId, activityOutputIndex));
+        invertedClusterOutputIndexMap.put(Pair.of(activityId, activityOutputIndex), clusterOutputIndex);
+    }
+
+    public Pair<ActivityId, Integer> getActivityIdOutputIndex(int clusterOutputIndex) {
+        return clusterOutputIndexMap.get(clusterOutputIndex);
+    }
+
+    public void setClusterInputIndex(int clusterInputIndex, ActivityId activityId, int activityInputIndex) {
+        clusterInputIndexMap.put(clusterInputIndex, Pair.of(activityId, activityInputIndex));
+        invertedClusterInputIndexMap.put(Pair.of(activityId, activityInputIndex), clusterInputIndex);
+    }
+
+    public Pair<ActivityId, Integer> getActivityIdInputIndex(int clusterInputIndex) {
+        return clusterInputIndexMap.get(clusterInputIndex);
+    }
+
+    public int getClusterInputIndex(Pair<ActivityId, Integer> activityInputChannel) {
+        return invertedClusterInputIndexMap.get(activityInputChannel);
+    }
+
+    public int getClusterOutputIndex(Pair<ActivityId, Integer> activityOutputChannel) {
+        return invertedClusterOutputIndexMap.get(activityOutputChannel);
+    }
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
new file mode 100644
index 0000000..3fe7b61
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.rewriter.runtime;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.rewriter.OneToOneConnectedActivityCluster;
+
+/**
+ * This class can be used to execute a DAG of activities inside which
+ * there are only one-to-one connectors.
+ * 
+ * @author yingyib
+ */
+public class SuperActivity extends OneToOneConnectedActivityCluster implements IActivity {
+    private static final long serialVersionUID = 1L;
+    private final ActivityId activityId;
+
+    public SuperActivity(ActivityClusterGraph acg, ActivityClusterId id, ActivityId activityId) {
+        super(acg, id);
+        this.activityId = activityId;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+            throws HyracksDataException {
+        final Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
+        Map<ActivityId, IActivity> activities = getActivityMap();
+        for (Entry<ActivityId, IActivity> entry : activities.entrySet()) {
+            /**
+             * extract start activities
+             */
+            List<IConnectorDescriptor> conns = getActivityInputMap().get(entry.getKey());
+            if (conns == null || conns.size() == 0) {
+                startActivities.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        IRecordDescriptorProvider wrappedRecDescProvider = new IRecordDescriptorProvider() {
+
+            @Override
+            public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
+                if (startActivities.get(aid) != null) {
+                    int superActivityInputChannel = SuperActivity.this.getClusterInputIndex(Pair.of(aid, inputIndex));
+                    return recordDescProvider.getInputRecordDescriptor(activityId, superActivityInputChannel);
+                } else if (SuperActivity.this.getActivityMap().get(aid) != null) {
+                    IConnectorDescriptor conn = getActivityInputMap().get(aid).get(inputIndex);
+                    return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                } else {
+                    ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
+                    for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+                        ActivityCluster ac = entry.getValue();
+                        for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
+                            SuperActivity sa = (SuperActivity) saEntry.getValue();
+                            if (sa.getActivityMap().get(aid) != null) {
+                                List<IConnectorDescriptor> conns = sa.getActivityInputMap().get(aid);
+                                if (conns != null && conns.size() >= inputIndex) {
+                                    IConnectorDescriptor conn = conns.get(inputIndex);
+                                    return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                                } else {
+                                    int superActivityInputChannel = sa.getClusterInputIndex(Pair.of(aid, inputIndex));
+                                    return recordDescProvider.getInputRecordDescriptor(sa.getActivityId(),
+                                            superActivityInputChannel);
+                                }
+                            }
+                        }
+                    }
+                    return null;
+                }
+            }
+
+            @Override
+            public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
+                RecordDescriptor providedDesc = recordDescProvider.getOutputRecordDescriptor(aid, outputIndex);
+                if (providedDesc != null) {
+                    return providedDesc;
+                } else {
+                    IConnectorDescriptor conn = getActivityOutputMap().get(aid).get(outputIndex);
+                    return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                }
+            }
+
+        };
+        return new SuperActivityOperatorNodePushable(this, startActivities, ctx, wrappedRecDescProvider, partition,
+                nPartitions);
+    }
+
+    @Override
+    public ActivityId getActivityId() {
+        return activityId;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
new file mode 100644
index 0000000..d9a1551
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.rewriter.runtime;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class SuperActivityOperatorNodePushable implements IOperatorNodePushable {
+    private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
+    private final List<IOperatorNodePushable> operatprNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
+    private final Map<ActivityId, IActivity> startActivities;
+    private final SuperActivity parent;
+    private final IHyracksTaskContext ctx;
+    private final IRecordDescriptorProvider recordDescProvider;
+    private final int partition;
+    private final int nPartitions;
+    private int inputArity = 0;
+    private boolean initialized = false;
+
+    public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities,
+            IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+        this.parent = parent;
+        this.startActivities = startActivities;
+        this.ctx = ctx;
+        this.recordDescProvider = recordDescProvider;
+        this.partition = partition;
+        this.nPartitions = nPartitions;
+    }
+
+    @Override
+    public synchronized void initialize() throws HyracksDataException {
+        init();
+        /**
+         * initialize operator node pushables in the BFS order
+         */
+        for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
+            op.initialize();
+        }
+
+    }
+
+    public void init() throws HyracksDataException {
+        if (initialized) {
+            return;
+        }
+
+        Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
+        Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
+        List<IConnectorDescriptor> outputConnectors = null;
+
+        /**
+         * Set up the source operators
+         */
+        for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
+            IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
+                    nPartitions);
+            startOperatorNodePushables.put(entry.getKey(), opPushable);
+            operatprNodePushablesBFSOrder.add(opPushable);
+            operatorNodePushables.put(entry.getKey(), opPushable);
+            inputArity += opPushable.getInputArity();
+            outputConnectors = parent.getActivityOutputMap().get(entry.getKey());
+            if (outputConnectors != null) {
+                for (IConnectorDescriptor conn : outputConnectors) {
+                    childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+                }
+            }
+        }
+
+        /**
+         * Using BFS (breadth-first search) to construct to runtime execution DAG;
+         */
+        while (childQueue.size() > 0) {
+            /**
+             * expend the executing activities further to the downstream
+             */
+            if (outputConnectors != null && outputConnectors.size() > 0) {
+                for (IConnectorDescriptor conn : outputConnectors) {
+                    childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+                }
+            }
+
+            /**
+             * construct the source to destination information
+             */
+            Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> channel = childQueue.poll();
+            ActivityId sourceId = channel.getLeft().getLeft().getActivityId();
+            int outputChannel = channel.getLeft().getRight();
+            ActivityId destId = channel.getRight().getLeft().getActivityId();
+            int inputChannel = channel.getRight().getRight();
+            IOperatorNodePushable sourceOp = operatorNodePushables.get(sourceId);
+            IOperatorNodePushable destOp = operatorNodePushables.get(destId);
+            if (destOp == null) {
+                destOp = channel.getRight().getLeft()
+                        .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+                operatprNodePushablesBFSOrder.add(destOp);
+                operatorNodePushables.put(destId, destOp);
+            }
+
+            /**
+             * construct the dataflow connection from a producer to a consumer
+             */
+            sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel),
+                    recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
+
+            /**
+             * traverse to the child of the current activity
+             */
+            outputConnectors = parent.getActivityOutputMap().get(destId);
+        }
+
+        initialized = true;
+    }
+
+    @Override
+    public void deinitialize() throws HyracksDataException {
+        /**
+         * de-initialize operator node pushables
+         */
+        for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
+            op.deinitialize();
+        }
+    }
+
+    @Override
+    public int getInputArity() {
+        return inputArity;
+    }
+
+    @Override
+    public synchronized void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer,
+            RecordDescriptor recordDesc) {
+        try {
+            init();
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+        /**
+         * set the right output frame writer
+         */
+        Pair<ActivityId, Integer> activityIdOutputIndex = parent.getActivityIdOutputIndex(clusterOutputIndex);
+        IOperatorNodePushable opPushable = operatorNodePushables.get(activityIdOutputIndex.getLeft());
+        opPushable.setOutputFrameWriter(activityIdOutputIndex.getRight(), writer, recordDesc);
+    }
+
+    @Override
+    public synchronized IFrameWriter getInputFrameWriter(final int index) {
+        try {
+            init();
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+
+        /**
+         * get the right IFrameWriter from the cluster input index
+         */
+        Pair<ActivityId, Integer> activityIdInputIndex = parent.getActivityIdInputIndex(index);
+        IOperatorNodePushable operatorNodePushable = operatorNodePushables.get(activityIdInputIndex.getLeft());
+        final IFrameWriter writer = operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
+
+        return new IFrameWriter() {
+
+            @Override
+            public void open() throws HyracksDataException {
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                writer.nextFrame(buffer);
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+                writer.fail();
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                writer.close();
+            }
+
+        };
+    }
+
+    @Override
+    public String getDisplayName() {
+        return "Meta Activity";
+    }
+
+}