move the super activity DAG initialization into the constructor of SuperActivityNodePushable
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_job_rewriter@2987 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
index b81928b..f1aee37 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
@@ -37,10 +37,27 @@
import edu.uci.ics.hyracks.api.job.ActivityClusterId;
import edu.uci.ics.hyracks.api.rewriter.runtime.SuperActivity;
+/**
+ * This class rewrite the AcivityClusterGraph to eliminate
+ * all one-to-one connections and merge one-to-one connected
+ * DAGs into super activities.
+ * </p>
+ * Each super activity internally maintains a DAG and execute it at the runtime.
+ *
+ * @author yingyib
+ */
public class ActivityClusterGraphRewriter implements Serializable {
private static final long serialVersionUID = 1L;
private static String ONE_TO_ONE_CONNECTOR = "OneToOneConnectorDescriptor";
+ /**
+ * rewrite an activity cluster graph to eliminate
+ * all one-to-one connections and merge one-to-one connected
+ * DAGs into super activities.
+ *
+ * @param acg
+ * the activity cluster graph
+ */
public void rewrite(ActivityClusterGraph acg) {
acg.getActivityMap().clear();
acg.getConnectorMap().clear();
@@ -103,7 +120,6 @@
*
* @param ac
* the activity cluster to be rewritten
- * the activity cluster to be rewritten
*/
private void rewriteIntraActivityCluster(ActivityCluster ac,
Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index a1a0f1a..24efc7a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -46,7 +46,6 @@
private final int partition;
private final int nPartitions;
private int inputArity = 0;
- private boolean initialized = false;
public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities,
IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -56,11 +55,19 @@
this.recordDescProvider = recordDescProvider;
this.partition = partition;
this.nPartitions = nPartitions;
+
+ /**
+ * initialize the writer-relationship for the internal DAG of operator node pushables
+ */
+ try {
+ init();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
}
@Override
public synchronized void initialize() throws HyracksDataException {
- init();
/**
* initialize operator node pushables in the BFS order
*/
@@ -71,10 +78,6 @@
}
public void init() throws HyracksDataException {
- if (initialized) {
- return;
- }
-
Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
List<IConnectorDescriptor> outputConnectors = null;
@@ -140,8 +143,6 @@
*/
outputConnectors = parent.getActivityOutputMap().get(destId);
}
-
- initialized = true;
}
@Override
@@ -162,11 +163,6 @@
@Override
public synchronized void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer,
RecordDescriptor recordDesc) {
- try {
- init();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
/**
* set the right output frame writer
*/
@@ -177,12 +173,6 @@
@Override
public synchronized IFrameWriter getInputFrameWriter(final int index) {
- try {
- init();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
-
/**
* get the right IFrameWriter from the cluster input index
*/