make all fullstack tests pass

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_job_rewriter@2968 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
index cb2acbc..4d6abcf 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
@@ -39,16 +39,16 @@
 
 public class ActivityClusterGraphRewriter implements Serializable {
     private static final long serialVersionUID = 1L;
-    private Map<IActivity, SuperActivity> invertedActivitySuperActivityMap = new HashMap<IActivity, SuperActivity>();
 
     public void rewrite(ActivityClusterGraph acg) {
         acg.getActivityMap().clear();
         acg.getConnectorMap().clear();
+        Map<IActivity, SuperActivity> invertedActivitySuperActivityMap = new HashMap<IActivity, SuperActivity>();
         for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
-            rewriteIntraActivityCluster(entry.getValue());
+            rewriteIntraActivityCluster(entry.getValue(), invertedActivitySuperActivityMap);
         }
         for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
-            rewriteInterActivityCluster(entry.getValue());
+            rewriteInterActivityCluster(entry.getValue(), invertedActivitySuperActivityMap);
         }
         invertedActivitySuperActivityMap.clear();
     }
@@ -59,7 +59,8 @@
      * @param ac
      *            the activity cluster to be rewritten
      */
-    private void rewriteInterActivityCluster(ActivityCluster ac) {
+    private void rewriteInterActivityCluster(ActivityCluster ac,
+            Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
         Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = ac.getBlocked2BlockerMap();
         Map<ActivityId, ActivityId> invertedAid2SuperAidMap = new HashMap<ActivityId, ActivityId>();
         for (Entry<IActivity, SuperActivity> entry : invertedActivitySuperActivityMap.entrySet()) {
@@ -75,11 +76,22 @@
                 replacedBlockers = new HashSet<ActivityId>();
                 for (ActivityId blocker : blockers) {
                     replacedBlockers.add(invertedAid2SuperAidMap.get(blocker));
-                    ac.getDependencies().add(
-                            ac.getActivityClusterGraph().getActivityMap().get(invertedAid2SuperAidMap.get(blocker)));
+                    ActivityCluster dependingAc = ac.getActivityClusterGraph().getActivityMap()
+                            .get(invertedAid2SuperAidMap.get(blocker));
+                    if (!ac.getDependencies().contains(dependingAc)) {
+                        ac.getDependencies().add(dependingAc);
+                    }
                 }
             }
-            replacedBlocked2BlockerMap.put(replacedBlocked, replacedBlockers);
+            if (replacedBlockers != null) {
+                Set<ActivityId> existingBlockers = replacedBlocked2BlockerMap.get(replacedBlocked);
+                if (existingBlockers == null) {
+                    replacedBlocked2BlockerMap.put(replacedBlocked, replacedBlockers);
+                } else {
+                    existingBlockers.addAll(replacedBlockers);
+                    replacedBlocked2BlockerMap.put(replacedBlocked, existingBlockers);
+                }
+            }
         }
         blocked2BlockerMap.clear();
         blocked2BlockerMap.putAll(replacedBlocked2BlockerMap);
@@ -92,7 +104,8 @@
      *            the activity cluster to be rewritten
      *            the activity cluster to be rewritten
      */
-    private void rewriteIntraActivityCluster(ActivityCluster ac) {
+    private void rewriteIntraActivityCluster(ActivityCluster ac,
+            Map<IActivity, SuperActivity> invertedActivitySuperActivityMap) {
         Map<ActivityId, IActivity> activities = ac.getActivityMap();
         Map<ActivityId, List<IConnectorDescriptor>> activityInputMap = ac.getActivityInputMap();
         Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap = ac.getActivityOutputMap();
@@ -164,6 +177,9 @@
                             if (existingSuperActivity == null) {
                                 superActivity.addActivity(newActivity);
                                 toBeExpended.add(newActivity);
+                                if (newActivity.getClass().getName().contains("SortActivity")) {
+                                    System.out.println(newActivity);
+                                }
                                 invertedActivitySuperActivityMap.put(newActivity, superActivity);
                             } else {
                                 /**
@@ -258,6 +274,12 @@
                 consumerSuperActivity.setClusterInputIndex(consumerSAPort, consumerActivity.getActivityId(),
                         consumerPort);
                 acg.getConnectorMap().put(connectorId, newActivityCluster);
+
+                /**
+                 * increasing the port number for the producer and consumer
+                 */
+                superActivityProducerPort.put(producerSuperActivity, ++producerSAPort);
+                superActivityConsumerPort.put(consumerSuperActivity, ++consumerSAPort);
             }
         }
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
index 3fe7b61..ce5214d 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
@@ -119,4 +119,9 @@
     public ActivityId getActivityId() {
         return activityId;
     }
+    
+    @Override
+    public String toString(){
+        return getActivityMap().toString();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index d9a1551..10842bd 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -106,7 +106,9 @@
              */
             if (outputConnectors != null && outputConnectors.size() > 0) {
                 for (IConnectorDescriptor conn : outputConnectors) {
-                    childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+                    if (conn != null) {
+                        childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+                    }
                 }
             }