1. make limit operator work in nested plan
2. avoid send-side materialize but use receive-side materialize to reduce cc/nc communications.

Change-Id: I1a90c70be0514fcc0b286afa456664618d68910f
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/80
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Pouria Pirzadeh <pouria.pirzadeh@gmail.com>
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index 0b3edc7..7d128aa 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,9 +32,9 @@
 
     private static final long serialVersionUID = 1L;
 
-    private IScalarEvaluatorFactory maxObjectsEvalFactory;
-    private IScalarEvaluatorFactory offsetEvalFactory;
-    private IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
+    private final IScalarEvaluatorFactory maxObjectsEvalFactory;
+    private final IScalarEvaluatorFactory offsetEvalFactory;
+    private final IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
 
     public StreamLimitRuntimeFactory(IScalarEvaluatorFactory maxObjectsEvalFactory,
             IScalarEvaluatorFactory offsetEvalFactory, int[] projectionList,
@@ -59,7 +59,7 @@
     public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
         final IBinaryIntegerInspector bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
         return new AbstractOneInputOneOutputOneFramePushRuntime() {
-            private IPointable p = VoidPointable.FACTORY.createPointable();
+            private final IPointable p = VoidPointable.FACTORY.createPointable();
             private IScalarEvaluator evalMaxObjects;
             private IScalarEvaluator evalOffset = null;
             private int toWrite = 0; // how many tuples still to write
@@ -128,6 +128,10 @@
 
             @Override
             public void close() throws HyracksDataException {
+                toWrite = 0; // how many tuples still to write
+                toSkip = 0; // how many tuples still to skip
+                firstTuple = true;
+                afterLastTuple = false;
                 // if (!afterLastTuple) {
                 super.close();
                 // }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
index ca56bb6..950edce 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -3,9 +3,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,18 +22,17 @@
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedBlockingConnectorPolicy;
 import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 
 public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
     private static final long serialVersionUID = 1L;
-    private IConnectorPolicy senderSideMatPipPolicy = new SendSideMaterializedPipeliningConnectorPolicy();
-    //private IConnectorPolicy senderSidePipeliningReceiverSideMatBlkPolicy = new SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy();
-    private IConnectorPolicy senderSidePipeliningReceiverSideMatBlkPolicy = new SendSideMaterializedBlockingConnectorPolicy();
-    private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
-    private JobSpecification spec;
+    private final IConnectorPolicy senderSideMatPipPolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+    private final IConnectorPolicy senderSidePipeliningReceiverSideMatBlkPolicy = new SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy();
+    private final IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+    private final JobSpecification spec;
 
     public ConnectorPolicyAssignmentPolicy(JobSpecification spec) {
         this.spec = spec;