1. make limit operator work in nested plan
2. avoid send-side materialize but use receive-side materialize to reduce cc/nc communications.
Change-Id: I1a90c70be0514fcc0b286afa456664618d68910f
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/80
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Pouria Pirzadeh <pouria.pirzadeh@gmail.com>
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index 0b3edc7..7d128aa 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,9 +32,9 @@
private static final long serialVersionUID = 1L;
- private IScalarEvaluatorFactory maxObjectsEvalFactory;
- private IScalarEvaluatorFactory offsetEvalFactory;
- private IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
+ private final IScalarEvaluatorFactory maxObjectsEvalFactory;
+ private final IScalarEvaluatorFactory offsetEvalFactory;
+ private final IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
public StreamLimitRuntimeFactory(IScalarEvaluatorFactory maxObjectsEvalFactory,
IScalarEvaluatorFactory offsetEvalFactory, int[] projectionList,
@@ -59,7 +59,7 @@
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
final IBinaryIntegerInspector bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private IPointable p = VoidPointable.FACTORY.createPointable();
+ private final IPointable p = VoidPointable.FACTORY.createPointable();
private IScalarEvaluator evalMaxObjects;
private IScalarEvaluator evalOffset = null;
private int toWrite = 0; // how many tuples still to write
@@ -128,6 +128,10 @@
@Override
public void close() throws HyracksDataException {
+ toWrite = 0; // how many tuples still to write
+ toSkip = 0; // how many tuples still to skip
+ firstTuple = true;
+ afterLastTuple = false;
// if (!afterLastTuple) {
super.close();
// }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
index ca56bb6..950edce 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,18 +22,17 @@
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedBlockingConnectorPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
private static final long serialVersionUID = 1L;
- private IConnectorPolicy senderSideMatPipPolicy = new SendSideMaterializedPipeliningConnectorPolicy();
- //private IConnectorPolicy senderSidePipeliningReceiverSideMatBlkPolicy = new SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy();
- private IConnectorPolicy senderSidePipeliningReceiverSideMatBlkPolicy = new SendSideMaterializedBlockingConnectorPolicy();
- private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
- private JobSpecification spec;
+ private final IConnectorPolicy senderSideMatPipPolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private final IConnectorPolicy senderSidePipeliningReceiverSideMatBlkPolicy = new SendSidePipeliningReceiveSideMaterializedBlockingConnectorPolicy();
+ private final IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+ private final JobSpecification spec;
public ConnectorPolicyAssignmentPolicy(JobSpecification spec) {
this.spec = spec;