move the super activity DAG initialization into the constructor of SuperActivityNodePushable

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_job_rewriter@2987 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
index b81928b..f1aee37 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
@@ -37,10 +37,27 @@
 import edu.uci.ics.hyracks.api.job.ActivityClusterId;
 import edu.uci.ics.hyracks.api.rewriter.runtime.SuperActivity;
 
+/**
+ * This class rewrite the AcivityClusterGraph to eliminate
+ * all one-to-one connections and merge one-to-one connected
+ * DAGs into super activities.
+ * </p>
+ * Each super activity internally maintains a DAG and execute it at the runtime.
+ * 
+ * @author yingyib
+ */
 public class ActivityClusterGraphRewriter implements Serializable {
     private static final long serialVersionUID = 1L;
     private static String ONE_TO_ONE_CONNECTOR = "OneToOneConnectorDescriptor";
 
+    /**
+     * rewrite an activity cluster graph to eliminate
+     * all one-to-one connections and merge one-to-one connected
+     * DAGs into super activities.
+     * 
+     * @param acg
+     *            the activity cluster graph
+     */
     public void rewrite(ActivityClusterGraph acg) {
         acg.getActivityMap().clear();
         acg.getConnectorMap().clear();
@@ -103,7 +120,6 @@
      * 
      * @param ac
      *            the activity cluster to be rewritten
-     *            the activity cluster to be rewritten
      */
     private void rewriteIntraActivityCluster(ActivityCluster ac,
             Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index a1a0f1a..24efc7a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -46,7 +46,6 @@
     private final int partition;
     private final int nPartitions;
     private int inputArity = 0;
-    private boolean initialized = false;
 
     public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities,
             IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -56,11 +55,19 @@
         this.recordDescProvider = recordDescProvider;
         this.partition = partition;
         this.nPartitions = nPartitions;
+
+        /**
+         * initialize the writer-relationship for the internal DAG of operator node pushables
+         */
+        try {
+            init();
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
     }
 
     @Override
     public synchronized void initialize() throws HyracksDataException {
-        init();
         /**
          * initialize operator node pushables in the BFS order
          */
@@ -71,10 +78,6 @@
     }
 
     public void init() throws HyracksDataException {
-        if (initialized) {
-            return;
-        }
-
         Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
         Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
         List<IConnectorDescriptor> outputConnectors = null;
@@ -140,8 +143,6 @@
              */
             outputConnectors = parent.getActivityOutputMap().get(destId);
         }
-
-        initialized = true;
     }
 
     @Override
@@ -162,11 +163,6 @@
     @Override
     public synchronized void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer,
             RecordDescriptor recordDesc) {
-        try {
-            init();
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
         /**
          * set the right output frame writer
          */
@@ -177,12 +173,6 @@
 
     @Override
     public synchronized IFrameWriter getInputFrameWriter(final int index) {
-        try {
-            init();
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-
         /**
          * get the right IFrameWriter from the cluster input index
          */