Merge branch 'master' into yingyi/fullstack_fix
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index aea04b2..55da00e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -38,6 +38,8 @@
public class AssignPOperator extends AbstractPhysicalOperator {
+ private boolean flushFramesRapidly;
+
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.ASSIGN;
@@ -76,7 +78,8 @@
// TODO push projections into the operator
int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
- AssignRuntimeFactory runtime = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ AssignRuntimeFactory runtime = new AssignRuntimeFactory(outColumns, evalFactories, projectionList,
+ flushFramesRapidly);
// contribute one Asterix framewriter
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
@@ -92,4 +95,8 @@
return true;
}
+ public void setRapidFrameFlush(boolean flushFramesRapidly) {
+ this.flushFramesRapidly = flushFramesRapidly;
+ }
+
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 082e98a..6c1dd5e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -43,6 +43,10 @@
}
protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb) throws HyracksDataException {
+ appendToFrameFromTupleBuilder(tb, false);
+ }
+
+ protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb, boolean flushFrame) throws HyracksDataException {
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
@@ -51,6 +55,10 @@
"Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
}
}
+ if (flushFrame) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ }
}
protected void appendProjectionToFrame(int tIndex, int[] projectionList) throws HyracksDataException {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index fa99c15..fb889ea 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -36,6 +36,7 @@
private int[] outColumns;
private IScalarEvaluatorFactory[] evalFactories;
+ private final boolean flushFramesRapidly;
/**
* @param outColumns
@@ -46,9 +47,15 @@
*/
public AssignRuntimeFactory(int[] outColumns, IScalarEvaluatorFactory[] evalFactories, int[] projectionList) {
+ this(outColumns, evalFactories, projectionList, false);
+ }
+
+ public AssignRuntimeFactory(int[] outColumns, IScalarEvaluatorFactory[] evalFactories, int[] projectionList,
+ boolean flushFramesRapidly) {
super(projectionList);
this.outColumns = outColumns;
this.evalFactories = evalFactories;
+ this.flushFramesRapidly = flushFramesRapidly;
}
@Override
@@ -107,9 +114,22 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- tRef.reset(tAccess, t);
- produceTuple(tupleBuilder, tAccess, t, tRef);
+ int t = 0;
+ if (nTuple > 1) {
+ for (; t < nTuple - 1; t++) {
+ tRef.reset(tAccess, t);
+ produceTuple(tupleBuilder, tAccess, t, tRef);
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+ }
+
+ tRef.reset(tAccess, t);
+ produceTuple(tupleBuilder, tAccess, t, tRef);
+ if (flushFramesRapidly) {
+ // Whenever all the tuples in the incoming frame have been consumed, the assign operator
+ // will push its frame to the next operator; i.e., it won't wait until the frame gets full.
+ appendToFrameFromTupleBuilder(tupleBuilder, true);
+ } else {
appendToFrameFromTupleBuilder(tupleBuilder);
}
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 7f10948..3e87f31 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -34,8 +34,7 @@
}
public StreamProjectRuntimeFactory(int[] projectionList) {
- super(projectionList);
- this.flushFramesRapidly = false;
+ this(projectionList, false);
}
@Override
@@ -66,8 +65,10 @@
int nTuple = tAccess.getTupleCount();
int t = 0;
- for (; t < nTuple - 1; t++) {
- appendProjectionToFrame(t, projectionList);
+ if (nTuple > 1) {
+ for (; t < nTuple - 1; t++) {
+ appendProjectionToFrame(t, projectionList);
+ }
}
if (flushFramesRapidly) {
// Whenever all the tuples in the incoming frame have been consumed, the project operator
@@ -76,10 +77,8 @@
} else {
appendProjectionToFrame(t, projectionList);
}
-
}
};
}
-
}