Addressed code review comments.
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
index d2593cb..459e5dd 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
@@ -30,17 +30,14 @@
public class ProjectOperator extends AbstractLogicalOperator {
private final List<LogicalVariable> variables;
- private boolean flushFramesRapidly;
public ProjectOperator(List<LogicalVariable> variables) {
this.variables = variables;
- this.flushFramesRapidly = false;
}
public ProjectOperator(LogicalVariable v) {
this.variables = new ArrayList<LogicalVariable>(1);
this.getVariables().add(v);
- this.flushFramesRapidly = false;
}
@Override
@@ -78,14 +75,6 @@
return variables;
}
- public boolean isRapidFrameFlush() {
- return flushFramesRapidly;
- }
-
- public void setRapidFrameFlush(boolean flushFramesRapidly) {
- this.flushFramesRapidly = flushFramesRapidly;
- }
-
@Override
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
return createPropagatingAllInputsTypeEnvironment(ctx);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index 90c58a0..7861dc0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -31,6 +31,8 @@
public class StreamProjectPOperator extends AbstractPropagatePropertiesForUsedVariablesPOperator {
+ private boolean flushFramesRapidly;
+
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.STREAM_PROJECT;
@@ -61,8 +63,7 @@
}
projectionList[i++] = pos;
}
- StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList,
- project.isRapidFrameFlush());
+ StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList, flushFramesRapidly);
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
context);
builder.contributeMicroOperator(project, runtime, recDesc);
@@ -76,4 +77,8 @@
computeDeliveredPropertiesForUsedVariables(p, p.getVariables());
}
+ public void setRapidFrameFlush(boolean flushFramesRapidly) {
+ this.flushFramesRapidly = flushFramesRapidly;
+ }
+
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 78d2bed..79e964b 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -54,14 +54,7 @@
}
protected void appendProjectionToFrame(int tIndex, int[] projectionList) throws HyracksDataException {
- if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
- throw new IllegalStateException(
- "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendProjectionToFrame).");
- }
- }
+ appendProjectionToFrame(tIndex, projectionList, false);
}
protected void appendProjectionToFrame(int tIndex, int[] projectionList, boolean flushFrame)
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index da881ec..0e6fc15 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -64,18 +64,17 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
+
+ int t = 0;
+ for (; t < nTuple - 1; t++) {
+ appendProjectionToFrame(t, projectionList);
+ }
if (flushFramesRapidly) {
// Whenever all the tuples in the incoming frame have been consumed, the project operator
// will push its frame to the next operator; i.e., it won't wait until the frame gets full.
- int t = 0;
- for (; t < nTuple - 1; t++) {
- appendProjectionToFrame(t, projectionList);
- }
appendProjectionToFrame(t, projectionList, true);
} else {
- for (int t = 0; t < nTuple; t++) {
- appendProjectionToFrame(t, projectionList);
- }
+ appendProjectionToFrame(t, projectionList);
}
}