Fix for issue 367. Project ops between the commit and the first insert op will push the frames without waiting until they get full.
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
index 459e5dd..d2593cb 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ProjectOperator.java
@@ -30,14 +30,17 @@
public class ProjectOperator extends AbstractLogicalOperator {
private final List<LogicalVariable> variables;
+ private boolean flushFramesRapidly;
public ProjectOperator(List<LogicalVariable> variables) {
this.variables = variables;
+ this.flushFramesRapidly = false;
}
public ProjectOperator(LogicalVariable v) {
this.variables = new ArrayList<LogicalVariable>(1);
this.getVariables().add(v);
+ this.flushFramesRapidly = false;
}
@Override
@@ -75,6 +78,14 @@
return variables;
}
+ public boolean isRapidFrameFlush() {
+ return flushFramesRapidly;
+ }
+
+ public void setRapidFrameFlush(boolean flushFramesRapidly) {
+ this.flushFramesRapidly = flushFramesRapidly;
+ }
+
@Override
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
return createPropagatingAllInputsTypeEnvironment(ctx);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index 54c0505..90c58a0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -61,8 +61,10 @@
}
projectionList[i++] = pos;
}
- StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList);
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList,
+ project.isRapidFrameFlush());
+ RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+ context);
builder.contributeMicroOperator(project, runtime, recDesc);
ILogicalOperator src = project.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, project, 0);
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index cd9931b..78d2bed 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -64,6 +64,23 @@
}
}
+ protected void appendProjectionToFrame(int tIndex, int[] projectionList, boolean flushFrame)
+ throws HyracksDataException {
+ if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
+ throw new IllegalStateException(
+ "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendProjectionToFrame).");
+ }
+ return;
+ }
+ if (flushFrame) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ }
+ }
+
protected void appendTupleToFrame(int tIndex) throws HyracksDataException {
if (!appender.append(tAccess, tIndex)) {
FrameUtils.flushFrame(frame, writer);
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 455ad8e..da881ec 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -26,9 +26,16 @@
public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
private static final long serialVersionUID = 1L;
+ private final boolean flushFramesRapidly;
+
+ public StreamProjectRuntimeFactory(int[] projectionList, boolean flushFramesRapidly) {
+ super(projectionList);
+ this.flushFramesRapidly = flushFramesRapidly;
+ }
public StreamProjectRuntimeFactory(int[] projectionList) {
super(projectionList);
+ this.flushFramesRapidly = false;
}
@Override
@@ -57,8 +64,18 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- appendProjectionToFrame(t, projectionList);
+ if (flushFramesRapidly) {
+ // Whenever all the tuples in the incoming frame have been consumed, the project operator
+ // will push its frame to the next operator; i.e., it won't wait until the frame gets full.
+ int t = 0;
+ for (; t < nTuple - 1; t++) {
+ appendProjectionToFrame(t, projectionList);
+ }
+ appendProjectionToFrame(t, projectionList, true);
+ } else {
+ for (int t = 0; t < nTuple; t++) {
+ appendProjectionToFrame(t, projectionList);
+ }
}
}