clean up comments and code

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_job_rewriter@3021 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
index 6f5f74b..b11e350 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
@@ -148,9 +148,9 @@
         }
 
         /**
-         * expend one-to-one connected activity cluster by the BFS order
+         * expend one-to-one connected activity cluster by the BFS order.
          * after the while-loop, the original activities are partitioned
-         * into equivalent classes, one-per-super-activity
+         * into equivalent classes, one-per-super-activity.
          */
         Map<ActivityId, SuperActivity> clonedSuperActivities = new HashMap<ActivityId, SuperActivity>();
         while (toBeExpendedMap.size() > 0) {
@@ -275,6 +275,7 @@
                 SuperActivity consumerSuperActivity = invertedActivitySuperActivityMap.get(consumerActivity);
                 int producerSAPort = superActivityProducerPort.get(producerSuperActivity);
                 int consumerSAPort = superActivityConsumerPort.get(consumerSuperActivity);
+                newActivityCluster.addConnector(conn);
                 newActivityCluster.connect(conn, producerSuperActivity, producerSAPort, consumerSuperActivity,
                         consumerSAPort, recordDescriptor);
 
@@ -320,11 +321,17 @@
      * Create a new super activity
      * 
      * @param acg
+     *            the activity cluster
      * @param superActivities
+     *            the map from activity id to current super activities
      * @param toBeExpendedMap
+     *            the map from an existing super activity to its BFS expansion queue of the original activities
      * @param invertedActivitySuperActivityMap
+     *            the map from the original activities to their hosted super activities
      * @param activityId
+     *            the activity id for the new super activity, which is the first added acitivty's id in the super activity
      * @param activity
+     *            the first activity added to the new super activity
      */
     private void createNewSuperActivity(ActivityCluster acg, Map<ActivityId, SuperActivity> superActivities,
             Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
@@ -339,14 +346,20 @@
     }
 
     /**
-     * One super activity swallows another existing super activity
+     * One super activity swallows another existing super activity.
      * 
      * @param superActivities
+     *            the map from activity id to current super activities
      * @param toBeExpendedMap
+     *            the map from an existing super activity to its BFS expansion queue of the original activities
      * @param invertedActivitySuperActivityMap
+     *            the map from the original activities to their hosted super activities
      * @param superActivity
+     *            the "swallowing" super activity
      * @param superActivityId
+     *            the activity id for the "swallowing" super activity, which is also the first added acitivty's id in the super activity
      * @param existingSuperActivity
+     *            an existing super activity which is to be swallowed by the "swallowing" super activity
      */
     private void swallowExistingSuperActivity(Map<ActivityId, SuperActivity> superActivities,
             Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java
index 2538bb9..07b7ffc 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java
@@ -33,6 +33,7 @@
 public class OneToOneConnectedActivityCluster extends ActivityCluster {
 
     private static final long serialVersionUID = 1L;
+
     protected final Map<Integer, Pair<ActivityId, Integer>> clusterInputIndexMap = new HashMap<Integer, Pair<ActivityId, Integer>>();
     protected final Map<Integer, Pair<ActivityId, Integer>> clusterOutputIndexMap = new HashMap<Integer, Pair<ActivityId, Integer>>();
     protected final Map<Pair<ActivityId, Integer>, Integer> invertedClusterOutputIndexMap = new HashMap<Pair<ActivityId, Integer>, Integer>();
@@ -42,30 +43,80 @@
         super(acg, id);
     }
 
+    /**
+     * Set up the mapping of the cluster's output channel to an internal activity and its output channel
+     * 
+     * @param clusterOutputIndex
+     *            the output channel index for the cluster
+     * @param activityId
+     *            the id of the internal activity which produces the corresponding output
+     * @param activityOutputIndex
+     *            the output channel index of the internal activity which corresponds to the output channel of the cluster of activities
+     */
     public void setClusterOutputIndex(int clusterOutputIndex, ActivityId activityId, int activityOutputIndex) {
         clusterOutputIndexMap.put(clusterOutputIndex, Pair.of(activityId, activityOutputIndex));
         invertedClusterOutputIndexMap.put(Pair.of(activityId, activityOutputIndex), clusterOutputIndex);
     }
 
+    /**
+     * get the an internal activity and its output channel of a cluster output channel
+     * 
+     * @param clusterOutputIndex
+     *            the output channel index for the cluster
+     * @return a pair containing the activity id of the corresponding internal activity and the output channel index
+     */
     public Pair<ActivityId, Integer> getActivityIdOutputIndex(int clusterOutputIndex) {
         return clusterOutputIndexMap.get(clusterOutputIndex);
     }
 
+    /**
+     * Set up the mapping of the cluster's input channel to an internal activity and input output channel
+     * 
+     * @param clusterInputIndex
+     *            the input channel index for the cluster
+     * @param activityId
+     *            the id of the internal activity which consumes the corresponding input
+     * @param activityInputIndex
+     *            the output channel index of the internal activity which corresponds to the input channel of the cluster of activities
+     */
     public void setClusterInputIndex(int clusterInputIndex, ActivityId activityId, int activityInputIndex) {
         clusterInputIndexMap.put(clusterInputIndex, Pair.of(activityId, activityInputIndex));
         invertedClusterInputIndexMap.put(Pair.of(activityId, activityInputIndex), clusterInputIndex);
     }
 
+    /**
+     * get the an internal activity and its input channel of a cluster input channel
+     * 
+     * @param clusterOutputIndex
+     *            the output channel index for the cluster
+     * @return a pair containing the activity id of the corresponding internal activity and the output channel index
+     */
     public Pair<ActivityId, Integer> getActivityIdInputIndex(int clusterInputIndex) {
         return clusterInputIndexMap.get(clusterInputIndex);
     }
 
+    /**
+     * Get the cluster input channel of an input-boundary activity and its input channel
+     * 
+     * @param activityInputChannel
+     *            the input-boundary activity and its input channel
+     * @return the cluster input channel
+     */
     public int getClusterInputIndex(Pair<ActivityId, Integer> activityInputChannel) {
-        return invertedClusterInputIndexMap.get(activityInputChannel);
+        Integer channel = invertedClusterInputIndexMap.get(activityInputChannel);
+        return channel == null ? -1 : channel;
     }
 
+    /**
+     * Get the cluster output channel of an input-boundary activity and its output channel
+     * 
+     * @param activityOutputChannel
+     *            the output-boundary activity and its output channel
+     * @return the cluster output channel
+     */
     public int getClusterOutputIndex(Pair<ActivityId, Integer> activityOutputChannel) {
-        return invertedClusterOutputIndexMap.get(activityOutputChannel);
+        Integer channel = invertedClusterOutputIndexMap.get(activityOutputChannel);
+        return channel == null ? -1 : channel;
     }
 
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
index f78cfda..734ff85 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
@@ -66,48 +66,98 @@
             }
         }
 
+        /**
+         * wrap a RecordDescriptorProvider for the super activity
+         */
         IRecordDescriptorProvider wrappedRecDescProvider = new IRecordDescriptorProvider() {
 
             @Override
             public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
                 if (startActivities.get(aid) != null) {
+                    /**
+                     * if the activity is a start (input boundary) activity
+                     */
                     int superActivityInputChannel = SuperActivity.this.getClusterInputIndex(Pair.of(aid, inputIndex));
-                    return recordDescProvider.getInputRecordDescriptor(activityId, superActivityInputChannel);
-                } else if (SuperActivity.this.getActivityMap().get(aid) != null) {
+                    if (superActivityInputChannel >= 0) {
+                        return recordDescProvider.getInputRecordDescriptor(activityId, superActivityInputChannel);
+                    }
+                }
+                if (SuperActivity.this.getActivityMap().get(aid) != null) {
+                    /**
+                     * if the activity is an internal activity of the super activity
+                     */
                     IConnectorDescriptor conn = getActivityInputMap().get(aid).get(inputIndex);
                     return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
-                } else {
-                    ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
-                    for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
-                        ActivityCluster ac = entry.getValue();
-                        for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
-                            SuperActivity sa = (SuperActivity) saEntry.getValue();
-                            if (sa.getActivityMap().get(aid) != null) {
-                                List<IConnectorDescriptor> conns = sa.getActivityInputMap().get(aid);
-                                if (conns != null && conns.size() >= inputIndex) {
-                                    IConnectorDescriptor conn = conns.get(inputIndex);
-                                    return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
-                                } else {
-                                    int superActivityInputChannel = sa.getClusterInputIndex(Pair.of(aid, inputIndex));
+                }
+
+                /**
+                 * the following is for the case where the activity is in other SuperActivities
+                 */
+                ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
+                for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+                    ActivityCluster ac = entry.getValue();
+                    for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
+                        SuperActivity sa = (SuperActivity) saEntry.getValue();
+                        if (sa.getActivityMap().get(aid) != null) {
+                            List<IConnectorDescriptor> conns = sa.getActivityInputMap().get(aid);
+                            if (conns != null && conns.size() >= inputIndex) {
+                                IConnectorDescriptor conn = conns.get(inputIndex);
+                                return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                            } else {
+                                int superActivityInputChannel = sa.getClusterInputIndex(Pair.of(aid, inputIndex));
+                                if (superActivityInputChannel >= 0) {
                                     return recordDescProvider.getInputRecordDescriptor(sa.getActivityId(),
                                             superActivityInputChannel);
                                 }
                             }
                         }
                     }
-                    return null;
                 }
+                return null;
             }
 
             @Override
             public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
-                RecordDescriptor providedDesc = recordDescProvider.getOutputRecordDescriptor(aid, outputIndex);
-                if (providedDesc != null) {
-                    return providedDesc;
-                } else {
+                /**
+                 * if the activity is an output-boundary activity
+                 */
+                int superActivityOutputChannel = SuperActivity.this.getClusterOutputIndex(Pair.of(aid, outputIndex));
+                if (superActivityOutputChannel >= 0) {
+                    return recordDescProvider.getOutputRecordDescriptor(activityId, superActivityOutputChannel);
+                }
+
+                if (SuperActivity.this.getActivityMap().get(aid) != null) {
+                    /**
+                     * if the activity is an internal activity of the super activity
+                     */
                     IConnectorDescriptor conn = getActivityOutputMap().get(aid).get(outputIndex);
                     return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
                 }
+
+                /**
+                 * the following is for the case where the activity is in other SuperActivities
+                 */
+                ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
+                for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+                    ActivityCluster ac = entry.getValue();
+                    for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
+                        SuperActivity sa = (SuperActivity) saEntry.getValue();
+                        if (sa.getActivityMap().get(aid) != null) {
+                            List<IConnectorDescriptor> conns = sa.getActivityOutputMap().get(aid);
+                            if (conns != null && conns.size() >= outputIndex) {
+                                IConnectorDescriptor conn = conns.get(outputIndex);
+                                return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+                            } else {
+                                superActivityOutputChannel = sa.getClusterOutputIndex(Pair.of(aid, outputIndex));
+                                if (superActivityOutputChannel >= 0) {
+                                    return recordDescProvider.getOutputRecordDescriptor(sa.getActivityId(),
+                                            superActivityOutputChannel);
+                                }
+                            }
+                        }
+                    }
+                }
+                return null;
             }
 
         };
@@ -119,9 +169,9 @@
     public ActivityId getActivityId() {
         return activityId;
     }
-    
+
     @Override
-    public String toString(){
+    public String toString() {
         return getActivityMap().values().toString();
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 24efc7a..6971ef9 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -36,6 +36,12 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
+/**
+ * The runtime of a SuperActivity, which internally executes a DAG of one-to-one connected
+ * activities in a single thread.
+ * 
+ * @author yingyib
+ */
 public class SuperActivityOperatorNodePushable implements IOperatorNodePushable {
     private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
     private final List<IOperatorNodePushable> operatprNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
@@ -74,7 +80,6 @@
         for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
             op.initialize();
         }
-
     }
 
     public void init() throws HyracksDataException {