clean up comments and code
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_job_rewriter@3021 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
index 6f5f74b..b11e350 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
@@ -148,9 +148,9 @@
}
/**
- * expend one-to-one connected activity cluster by the BFS order
+ * expend one-to-one connected activity cluster by the BFS order.
* after the while-loop, the original activities are partitioned
- * into equivalent classes, one-per-super-activity
+ * into equivalent classes, one-per-super-activity.
*/
Map<ActivityId, SuperActivity> clonedSuperActivities = new HashMap<ActivityId, SuperActivity>();
while (toBeExpendedMap.size() > 0) {
@@ -275,6 +275,7 @@
SuperActivity consumerSuperActivity = invertedActivitySuperActivityMap.get(consumerActivity);
int producerSAPort = superActivityProducerPort.get(producerSuperActivity);
int consumerSAPort = superActivityConsumerPort.get(consumerSuperActivity);
+ newActivityCluster.addConnector(conn);
newActivityCluster.connect(conn, producerSuperActivity, producerSAPort, consumerSuperActivity,
consumerSAPort, recordDescriptor);
@@ -320,11 +321,17 @@
* Create a new super activity
*
* @param acg
+ * the activity cluster
* @param superActivities
+ * the map from activity id to current super activities
* @param toBeExpendedMap
+ * the map from an existing super activity to its BFS expansion queue of the original activities
* @param invertedActivitySuperActivityMap
+ * the map from the original activities to their hosted super activities
* @param activityId
+ * the activity id for the new super activity, which is the first added acitivty's id in the super activity
* @param activity
+ * the first activity added to the new super activity
*/
private void createNewSuperActivity(ActivityCluster acg, Map<ActivityId, SuperActivity> superActivities,
Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
@@ -339,14 +346,20 @@
}
/**
- * One super activity swallows another existing super activity
+ * One super activity swallows another existing super activity.
*
* @param superActivities
+ * the map from activity id to current super activities
* @param toBeExpendedMap
+ * the map from an existing super activity to its BFS expansion queue of the original activities
* @param invertedActivitySuperActivityMap
+ * the map from the original activities to their hosted super activities
* @param superActivity
+ * the "swallowing" super activity
* @param superActivityId
+ * the activity id for the "swallowing" super activity, which is also the first added acitivty's id in the super activity
* @param existingSuperActivity
+ * an existing super activity which is to be swallowed by the "swallowing" super activity
*/
private void swallowExistingSuperActivity(Map<ActivityId, SuperActivity> superActivities,
Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java
index 2538bb9..07b7ffc 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java
@@ -33,6 +33,7 @@
public class OneToOneConnectedActivityCluster extends ActivityCluster {
private static final long serialVersionUID = 1L;
+
protected final Map<Integer, Pair<ActivityId, Integer>> clusterInputIndexMap = new HashMap<Integer, Pair<ActivityId, Integer>>();
protected final Map<Integer, Pair<ActivityId, Integer>> clusterOutputIndexMap = new HashMap<Integer, Pair<ActivityId, Integer>>();
protected final Map<Pair<ActivityId, Integer>, Integer> invertedClusterOutputIndexMap = new HashMap<Pair<ActivityId, Integer>, Integer>();
@@ -42,30 +43,80 @@
super(acg, id);
}
+ /**
+ * Set up the mapping of the cluster's output channel to an internal activity and its output channel
+ *
+ * @param clusterOutputIndex
+ * the output channel index for the cluster
+ * @param activityId
+ * the id of the internal activity which produces the corresponding output
+ * @param activityOutputIndex
+ * the output channel index of the internal activity which corresponds to the output channel of the cluster of activities
+ */
public void setClusterOutputIndex(int clusterOutputIndex, ActivityId activityId, int activityOutputIndex) {
clusterOutputIndexMap.put(clusterOutputIndex, Pair.of(activityId, activityOutputIndex));
invertedClusterOutputIndexMap.put(Pair.of(activityId, activityOutputIndex), clusterOutputIndex);
}
+ /**
+ * get the an internal activity and its output channel of a cluster output channel
+ *
+ * @param clusterOutputIndex
+ * the output channel index for the cluster
+ * @return a pair containing the activity id of the corresponding internal activity and the output channel index
+ */
public Pair<ActivityId, Integer> getActivityIdOutputIndex(int clusterOutputIndex) {
return clusterOutputIndexMap.get(clusterOutputIndex);
}
+ /**
+ * Set up the mapping of the cluster's input channel to an internal activity and input output channel
+ *
+ * @param clusterInputIndex
+ * the input channel index for the cluster
+ * @param activityId
+ * the id of the internal activity which consumes the corresponding input
+ * @param activityInputIndex
+ * the output channel index of the internal activity which corresponds to the input channel of the cluster of activities
+ */
public void setClusterInputIndex(int clusterInputIndex, ActivityId activityId, int activityInputIndex) {
clusterInputIndexMap.put(clusterInputIndex, Pair.of(activityId, activityInputIndex));
invertedClusterInputIndexMap.put(Pair.of(activityId, activityInputIndex), clusterInputIndex);
}
+ /**
+ * get the an internal activity and its input channel of a cluster input channel
+ *
+ * @param clusterOutputIndex
+ * the output channel index for the cluster
+ * @return a pair containing the activity id of the corresponding internal activity and the output channel index
+ */
public Pair<ActivityId, Integer> getActivityIdInputIndex(int clusterInputIndex) {
return clusterInputIndexMap.get(clusterInputIndex);
}
+ /**
+ * Get the cluster input channel of an input-boundary activity and its input channel
+ *
+ * @param activityInputChannel
+ * the input-boundary activity and its input channel
+ * @return the cluster input channel
+ */
public int getClusterInputIndex(Pair<ActivityId, Integer> activityInputChannel) {
- return invertedClusterInputIndexMap.get(activityInputChannel);
+ Integer channel = invertedClusterInputIndexMap.get(activityInputChannel);
+ return channel == null ? -1 : channel;
}
+ /**
+ * Get the cluster output channel of an input-boundary activity and its output channel
+ *
+ * @param activityOutputChannel
+ * the output-boundary activity and its output channel
+ * @return the cluster output channel
+ */
public int getClusterOutputIndex(Pair<ActivityId, Integer> activityOutputChannel) {
- return invertedClusterOutputIndexMap.get(activityOutputChannel);
+ Integer channel = invertedClusterOutputIndexMap.get(activityOutputChannel);
+ return channel == null ? -1 : channel;
}
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
index f78cfda..734ff85 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
@@ -66,48 +66,98 @@
}
}
+ /**
+ * wrap a RecordDescriptorProvider for the super activity
+ */
IRecordDescriptorProvider wrappedRecDescProvider = new IRecordDescriptorProvider() {
@Override
public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
if (startActivities.get(aid) != null) {
+ /**
+ * if the activity is a start (input boundary) activity
+ */
int superActivityInputChannel = SuperActivity.this.getClusterInputIndex(Pair.of(aid, inputIndex));
- return recordDescProvider.getInputRecordDescriptor(activityId, superActivityInputChannel);
- } else if (SuperActivity.this.getActivityMap().get(aid) != null) {
+ if (superActivityInputChannel >= 0) {
+ return recordDescProvider.getInputRecordDescriptor(activityId, superActivityInputChannel);
+ }
+ }
+ if (SuperActivity.this.getActivityMap().get(aid) != null) {
+ /**
+ * if the activity is an internal activity of the super activity
+ */
IConnectorDescriptor conn = getActivityInputMap().get(aid).get(inputIndex);
return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
- } else {
- ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
- for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
- ActivityCluster ac = entry.getValue();
- for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
- SuperActivity sa = (SuperActivity) saEntry.getValue();
- if (sa.getActivityMap().get(aid) != null) {
- List<IConnectorDescriptor> conns = sa.getActivityInputMap().get(aid);
- if (conns != null && conns.size() >= inputIndex) {
- IConnectorDescriptor conn = conns.get(inputIndex);
- return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
- } else {
- int superActivityInputChannel = sa.getClusterInputIndex(Pair.of(aid, inputIndex));
+ }
+
+ /**
+ * the following is for the case where the activity is in other SuperActivities
+ */
+ ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
+ for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+ ActivityCluster ac = entry.getValue();
+ for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
+ SuperActivity sa = (SuperActivity) saEntry.getValue();
+ if (sa.getActivityMap().get(aid) != null) {
+ List<IConnectorDescriptor> conns = sa.getActivityInputMap().get(aid);
+ if (conns != null && conns.size() >= inputIndex) {
+ IConnectorDescriptor conn = conns.get(inputIndex);
+ return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ } else {
+ int superActivityInputChannel = sa.getClusterInputIndex(Pair.of(aid, inputIndex));
+ if (superActivityInputChannel >= 0) {
return recordDescProvider.getInputRecordDescriptor(sa.getActivityId(),
superActivityInputChannel);
}
}
}
}
- return null;
}
+ return null;
}
@Override
public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
- RecordDescriptor providedDesc = recordDescProvider.getOutputRecordDescriptor(aid, outputIndex);
- if (providedDesc != null) {
- return providedDesc;
- } else {
+ /**
+ * if the activity is an output-boundary activity
+ */
+ int superActivityOutputChannel = SuperActivity.this.getClusterOutputIndex(Pair.of(aid, outputIndex));
+ if (superActivityOutputChannel >= 0) {
+ return recordDescProvider.getOutputRecordDescriptor(activityId, superActivityOutputChannel);
+ }
+
+ if (SuperActivity.this.getActivityMap().get(aid) != null) {
+ /**
+ * if the activity is an internal activity of the super activity
+ */
IConnectorDescriptor conn = getActivityOutputMap().get(aid).get(outputIndex);
return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
}
+
+ /**
+ * the following is for the case where the activity is in other SuperActivities
+ */
+ ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
+ for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+ ActivityCluster ac = entry.getValue();
+ for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
+ SuperActivity sa = (SuperActivity) saEntry.getValue();
+ if (sa.getActivityMap().get(aid) != null) {
+ List<IConnectorDescriptor> conns = sa.getActivityOutputMap().get(aid);
+ if (conns != null && conns.size() >= outputIndex) {
+ IConnectorDescriptor conn = conns.get(outputIndex);
+ return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ } else {
+ superActivityOutputChannel = sa.getClusterOutputIndex(Pair.of(aid, outputIndex));
+ if (superActivityOutputChannel >= 0) {
+ return recordDescProvider.getOutputRecordDescriptor(sa.getActivityId(),
+ superActivityOutputChannel);
+ }
+ }
+ }
+ }
+ }
+ return null;
}
};
@@ -119,9 +169,9 @@
public ActivityId getActivityId() {
return activityId;
}
-
+
@Override
- public String toString(){
+ public String toString() {
return getActivityMap().values().toString();
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 24efc7a..6971ef9 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -36,6 +36,12 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+/**
+ * The runtime of a SuperActivity, which internally executes a DAG of one-to-one connected
+ * activities in a single thread.
+ *
+ * @author yingyib
+ */
public class SuperActivityOperatorNodePushable implements IOperatorNodePushable {
private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
private final List<IOperatorNodePushable> operatprNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
@@ -74,7 +80,6 @@
for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
op.initialize();
}
-
}
public void init() throws HyracksDataException {