fix addVertex in the inner join plan

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_release_cleanup@3090 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
index 1499157..4b153c1 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/VertexDelegate.java
@@ -114,6 +114,19 @@
             vertex.write(outputInsert);
             insertTb.addFieldEndOffset();
             FrameTupleUtils.flushTuple(appenderInsert, insertTb, insertWriter);
+
+            /**
+             * push alive when necessary
+             */
+            if (pushAlive && !vertex.isHalted()) {
+                alive.reset();
+                DataOutput outputAlive = alive.getDataOutput();
+                vertexId.write(outputAlive);
+                alive.addFieldEndOffset();
+                dummyMessageList.write(outputAlive);
+                alive.addFieldEndOffset();
+                FrameTupleUtils.flushTuple(appenderAlive, alive, aliveWriter);
+            }
         } catch (Exception e) {
             throw new IllegalStateException(e);
         }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 727e7fe..b1730d3 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -246,28 +246,28 @@
 
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+        ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
 
         /** connect all operators **/
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
                 terminateWriter, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
                 finalAggregator, 0);
 
         /**
          * connect the insert/delete operator
          */
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
-                0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
-                0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 4,
+                deleteOp, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
 
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 5, btreeBulkLoad, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5,
+                btreeBulkLoad, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
@@ -282,7 +282,7 @@
         spec.addRoot(emptySink3);
         spec.addRoot(emptySink4);
 
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
         spec.setFrameSize(frameSize);
         return spec;
     }
@@ -470,7 +470,7 @@
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+        ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
         /** connect all operators **/
@@ -479,20 +479,22 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, setUnion, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), setUnion, 0, join, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
                 terminateWriter, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
                 finalAggregator, 0);
 
         /**
          * connect the insert/delete operator
          */
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 3,
+                insertOp, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 4,
+                deleteOp, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
 
-        spec.connect(new OneToOneConnectorDescriptor(spec), join, 5, btreeBulkLoad, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 5, btreeBulkLoad, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
         spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
                 localGby, 0, globalGby, 0);
@@ -506,7 +508,7 @@
         spec.addRoot(emptySink3);
         spec.addRoot(emptySink4);
 
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
         spec.setFrameSize(frameSize);
         return spec;
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 9bad169..87f2156 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -189,7 +189,7 @@
         TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
                 configurationFactory, jobId);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+        ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
 
         /**
          * final aggregate write operator
@@ -233,18 +233,18 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
                 terminateWriter, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
                 finalAggregator, 0);
 
         /**
          * connect the insert/delete operator
          */
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 3, insertOp,
                 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 4, deleteOp,
                 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
 
@@ -264,7 +264,7 @@
         spec.addRoot(emptySink3);
         spec.addRoot(emptySink4);
 
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
         spec.setFrameSize(frameSize);
         return spec;
     }
@@ -432,7 +432,7 @@
         EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
         ClusterConfig.setLocationConstraint(spec, emptySink4);
 
-        ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+        ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
         ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
                 rdUnnestedMessage.getFields()[0]);
 
@@ -441,17 +441,17 @@
         spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
                 terminateWriter, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
                 finalAggregator, 0);
 
         /**
          * connect the insert/delete operator
          */
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 3, insertOp, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
-        spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+        spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 4, deleteOp, 0);
         spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
@@ -465,7 +465,7 @@
         spec.addRoot(finalAggregator);
         spec.addRoot(emptySink);
 
-        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
         spec.setFrameSize(frameSize);
         return spec;
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ffdef10..6ea258e 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -50,12 +50,12 @@
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.NonCombinerConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
@@ -135,7 +135,7 @@
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate);
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -252,7 +252,7 @@
         spec.addRoot(emptySink3);
         spec.addRoot(emptySink4);
 
-        spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
         spec.setFrameSize(frameSize);
         return spec;
     }
@@ -329,7 +329,7 @@
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
                 keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate);
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -441,7 +441,7 @@
         spec.addRoot(finalAggregator);
         spec.addRoot(emptySink);
 
-        spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
         spec.setFrameSize(frameSize);
         return spec;
     }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index cc12523..c6dabb0 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -50,6 +50,7 @@
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
 import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
@@ -131,7 +132,7 @@
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate);
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -266,6 +267,7 @@
         spec.addRoot(emptySink4);
 
         spec.setFrameSize(frameSize);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
         return spec;
     }
 
@@ -341,7 +343,7 @@
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
                 keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
-                rdPartialAggregate);
+                rdPartialAggregate, rdInsert, rdDelete);
         ClusterConfig.setLocationConstraint(spec, join);
 
         /**
@@ -474,6 +476,7 @@
         spec.addRoot(emptySink4);
 
         spec.setFrameSize(frameSize);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
         return spec;
     }
 
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
index d29afca..ae47ed8 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -15,25 +15,44 @@
 
 package edu.uci.ics.pregelix.dataflow;
 
+import org.apache.commons.lang3.tuple.Pair;
+
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedBlockingConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 
 public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
     private static final long serialVersionUID = 1L;
-    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+    private IConnectorPolicy senderSideMatPipPolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+    private IConnectorPolicy senderSideMatBlkPolicy = new SendSideMaterializedBlockingConnectorPolicy();
     private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+    private JobSpecification spec;
+
+    public ConnectorPolicyAssignmentPolicy(JobSpecification spec) {
+        this.spec = spec;
+    }
 
     @Override
     public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
             int[] fanouts) {
         if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
-            return senderSideMaterializePolicy;
+            return senderSideMatPipPolicy;
         } else {
-            return pipeliningPolicy;
+            Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> endPoints = spec
+                    .getConnectorOperatorMap().get(c.getConnectorId());
+            IOperatorDescriptor consumer = endPoints.getRight().getLeft();
+            if (consumer instanceof TreeIndexInsertUpdateDeleteOperatorDescriptor) {
+                return senderSideMatBlkPolicy;
+            } else {
+                return pipeliningPolicy;
+            }
         }
     }
 }
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
index 219de10..5a556fa 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
@@ -28,7 +28,10 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
 import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
 import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
 import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
 import edu.uci.ics.pregelix.example.util.TestUtils;
@@ -72,14 +75,14 @@
         job.setJobName(jobName);
         this.resultFileName = resultFile;
         this.expectedFileName = expectedFile;
-        giraphJobGens = new JobGen[1];
+        giraphJobGens = new JobGen[4];
         giraphJobGens[0] = new JobGenOuterJoin(job);
-        //        waitawhile();
-        //        giraphJobGens[1] = new JobGenInnerJoin(job);
-        //        waitawhile();
-        //        giraphJobGens[2] = new JobGenOuterJoinSort(job);
-        //        waitawhile();
-        //        giraphJobGens[3] = new JobGenOuterJoinSingleSort(job);
+        waitawhile();
+        giraphJobGens[1] = new JobGenInnerJoin(job);
+        waitawhile();
+        giraphJobGens[2] = new JobGenOuterJoinSort(job);
+        waitawhile();
+        giraphJobGens[3] = new JobGenOuterJoinSingleSort(job);
     }
 
     private void waitawhile() throws InterruptedException {