make all fullstack tests pass
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_job_rewriter@2968 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
index cb2acbc..4d6abcf 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
@@ -39,16 +39,16 @@
public class ActivityClusterGraphRewriter implements Serializable {
private static final long serialVersionUID = 1L;
- private Map<IActivity, SuperActivity> invertedActivitySuperActivityMap = new HashMap<IActivity, SuperActivity>();
public void rewrite(ActivityClusterGraph acg) {
acg.getActivityMap().clear();
acg.getConnectorMap().clear();
+ Map<IActivity, SuperActivity> invertedActivitySuperActivityMap = new HashMap<IActivity, SuperActivity>();
for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
- rewriteIntraActivityCluster(entry.getValue());
+ rewriteIntraActivityCluster(entry.getValue(), invertedActivitySuperActivityMap);
}
for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
- rewriteInterActivityCluster(entry.getValue());
+ rewriteInterActivityCluster(entry.getValue(), invertedActivitySuperActivityMap);
}
invertedActivitySuperActivityMap.clear();
}
@@ -59,7 +59,8 @@
* @param ac
* the activity cluster to be rewritten
*/
- private void rewriteInterActivityCluster(ActivityCluster ac) {
+ private void rewriteInterActivityCluster(ActivityCluster ac,
+ Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = ac.getBlocked2BlockerMap();
Map<ActivityId, ActivityId> invertedAid2SuperAidMap = new HashMap<ActivityId, ActivityId>();
for (Entry<IActivity, SuperActivity> entry : invertedActivitySuperActivityMap.entrySet()) {
@@ -75,11 +76,22 @@
replacedBlockers = new HashSet<ActivityId>();
for (ActivityId blocker : blockers) {
replacedBlockers.add(invertedAid2SuperAidMap.get(blocker));
- ac.getDependencies().add(
- ac.getActivityClusterGraph().getActivityMap().get(invertedAid2SuperAidMap.get(blocker)));
+ ActivityCluster dependingAc = ac.getActivityClusterGraph().getActivityMap()
+ .get(invertedAid2SuperAidMap.get(blocker));
+ if (!ac.getDependencies().contains(dependingAc)) {
+ ac.getDependencies().add(dependingAc);
+ }
}
}
- replacedBlocked2BlockerMap.put(replacedBlocked, replacedBlockers);
+ if (replacedBlockers != null) {
+ Set<ActivityId> existingBlockers = replacedBlocked2BlockerMap.get(replacedBlocked);
+ if (existingBlockers == null) {
+ replacedBlocked2BlockerMap.put(replacedBlocked, replacedBlockers);
+ } else {
+ existingBlockers.addAll(replacedBlockers);
+ replacedBlocked2BlockerMap.put(replacedBlocked, existingBlockers);
+ }
+ }
}
blocked2BlockerMap.clear();
blocked2BlockerMap.putAll(replacedBlocked2BlockerMap);
@@ -92,7 +104,8 @@
* the activity cluster to be rewritten
* the activity cluster to be rewritten
*/
- private void rewriteIntraActivityCluster(ActivityCluster ac) {
+ private void rewriteIntraActivityCluster(ActivityCluster ac,
+ Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
Map<ActivityId, IActivity> activities = ac.getActivityMap();
Map<ActivityId, List<IConnectorDescriptor>> activityInputMap = ac.getActivityInputMap();
Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap = ac.getActivityOutputMap();
@@ -164,6 +177,9 @@
if (existingSuperActivity == null) {
superActivity.addActivity(newActivity);
toBeExpended.add(newActivity);
+ if (newActivity.getClass().getName().contains("SortActivity")) {
+ System.out.println(newActivity);
+ }
invertedActivitySuperActivityMap.put(newActivity, superActivity);
} else {
/**
@@ -258,6 +274,12 @@
consumerSuperActivity.setClusterInputIndex(consumerSAPort, consumerActivity.getActivityId(),
consumerPort);
acg.getConnectorMap().put(connectorId, newActivityCluster);
+
+ /**
+ * increasing the port number for the producer and consumer
+ */
+ superActivityProducerPort.put(producerSuperActivity, ++producerSAPort);
+ superActivityConsumerPort.put(consumerSuperActivity, ++consumerSAPort);
}
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
index 3fe7b61..ce5214d 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
@@ -119,4 +119,9 @@
public ActivityId getActivityId() {
return activityId;
}
+
+ @Override
+ public String toString(){
+ return getActivityMap().toString();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index d9a1551..10842bd 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -106,7 +106,9 @@
*/
if (outputConnectors != null && outputConnectors.size() > 0) {
for (IConnectorDescriptor conn : outputConnectors) {
- childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+ if (conn != null) {
+ childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+ }
}
}