[NO ISSUE][*DB] Refactoring AssignRuntime Factory
Change-Id: Ieb583580f7bc8a40a15839ce15c492aa0bfb410c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18249
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 5343069..300fac6 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -39,10 +39,9 @@
public class AssignRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
private static final long serialVersionUID = 1L;
-
- private int[] outColumns;
- private IScalarEvaluatorFactory[] evalFactories;
- private final boolean flushFramesRapidly;
+ protected int[] outColumns;
+ protected IScalarEvaluatorFactory[] evalFactories;
+ protected final boolean flushFramesRapidly;
/**
* @param outColumns
@@ -64,6 +63,14 @@
this.flushFramesRapidly = flushFramesRapidly;
}
+ public int[] getOutColumns() {
+ return outColumns;
+ }
+
+ public IScalarEvaluatorFactory[] getEvalFactories() {
+ return evalFactories;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -88,92 +95,106 @@
@Override
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
- IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
- final int[] projectionToOutColumns = new int[projectionList.length];
- for (int j = 0; j < projectionList.length; j++) {
- projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+ return new AssignRuntime(ctx);
+ }
+
+ public int[] getProjectionList() {
+ return projectionList;
+ }
+
+ public class AssignRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+ private IPointable result;
+ private IScalarEvaluator[] eval;
+ protected ArrayTupleBuilder tupleBuilder;
+ private final int[] projectionToOutColumns;
+ private boolean first = true;
+ protected int tupleIndex = 0;
+ protected final IHyracksTaskContext ctx;
+ protected final IEvaluatorContext evalCtx;
+
+ public AssignRuntime(IHyracksTaskContext ctx) {
+ this.ctx = ctx;
+ this.evalCtx = new EvaluatorContext(ctx);
+ projectionToOutColumns = new int[projectionList.length];
+ for (int j = 0; j < projectionList.length; j++) {
+ projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+ }
+ tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ eval = new IScalarEvaluator[evalFactories.length];
+ result = VoidPointable.FACTORY.createPointable();
}
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private IPointable result = VoidPointable.FACTORY.createPointable();
- private IScalarEvaluator[] eval = new IScalarEvaluator[evalFactories.length];
- private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
- private boolean first = true;
- private int tupleIndex = 0;
-
- @Override
- public void open() throws HyracksDataException {
- if (first) {
- initAccessAppendRef(ctx);
- first = false;
- int n = evalFactories.length;
- for (int i = 0; i < n; i++) {
- eval[i] = evalFactories[i].createScalarEvaluator(evalCtx);
- }
+ @Override
+ public void open() throws HyracksDataException {
+ if (first) {
+ initAccessAppendRef(ctx);
+ first = false;
+ int n = evalFactories.length;
+ for (int i = 0; i < n; i++) {
+ eval[i] = evalFactories[i].createScalarEvaluator(evalCtx);
}
- super.open();
}
+ super.open();
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- // what if nTuple is 0?
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- if (nTuple < 1) {
- if (nTuple < 0) {
- throw new HyracksDataException("Negative number of tuples in the frame: " + nTuple);
- }
- appender.flush(writer);
- } else {
- if (nTuple > 1) {
- for (; tupleIndex < nTuple - 1; tupleIndex++) {
- tRef.reset(tAccess, tupleIndex);
- produceTuple(tupleBuilder, tAccess, tupleIndex, tRef);
- appendToFrameFromTupleBuilder(tupleBuilder);
- }
- }
-
- if (tupleIndex < nTuple) {
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ // what if nTuple is 0?
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ if (nTuple < 1) {
+ if (nTuple < 0) {
+ throw new HyracksDataException("Negative number of tuples in the frame: " + nTuple);
+ }
+ appender.flush(writer);
+ } else {
+ if (nTuple > 1) {
+ for (; tupleIndex < nTuple - 1; tupleIndex++) {
tRef.reset(tAccess, tupleIndex);
produceTuple(tupleBuilder, tAccess, tupleIndex, tRef);
- if (flushFramesRapidly) {
- // Whenever all the tuples in the incoming frame have been consumed, the assign operator
- // will push its frame to the next operator; i.e., it won't wait until the frame gets full.
- appendToFrameFromTupleBuilder(tupleBuilder, true);
- } else {
- appendToFrameFromTupleBuilder(tupleBuilder);
- }
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+ }
+ if (tupleIndex < nTuple) {
+ tRef.reset(tAccess, tupleIndex);
+ produceTuple(tupleBuilder, tAccess, tupleIndex, tRef);
+ if (flushFramesRapidly) {
+ // Whenever all the tuples in the incoming frame have been consumed, the assign operator
+ // will push its frame to the next operator; i.e., it won't wait until the frame gets full.
+ appendToFrameFromTupleBuilder(tupleBuilder, true);
} else {
- if (flushFramesRapidly) {
- flushAndReset();
- }
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+ } else {
+ if (flushFramesRapidly) {
+ flushAndReset();
}
}
- tupleIndex = 0;
}
+ tupleIndex = 0;
+ }
- private void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
- FrameTupleReference tupleRef) throws HyracksDataException {
- try {
- tb.reset();
- for (int f = 0; f < projectionList.length; f++) {
- int k = projectionToOutColumns[f];
- if (k >= 0) {
- eval[k].evaluate(tupleRef, result);
- tb.addField(result.getByteArray(), result.getStartOffset(), result.getLength());
- } else {
- tb.addField(accessor, tIndex, projectionList[f]);
- }
+ protected void produceTuple(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex,
+ FrameTupleReference tupleRef) throws HyracksDataException {
+ try {
+ tb.reset();
+ for (int f = 0; f < projectionList.length; f++) {
+ int k = projectionToOutColumns[f];
+ if (k >= 0) {
+ eval[k].evaluate(tupleRef, result);
+ tb.addField(result.getByteArray(), result.getStartOffset(), result.getLength());
+ } else {
+ tb.addField(accessor, tIndex, projectionList[f]);
}
- } catch (HyracksDataException e) {
- throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE, e, sourceLoc, tupleIndex);
}
+ } catch (HyracksDataException e) {
+ throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE, e, sourceLoc, tupleIndex);
}
+ }
- @Override
- public void flush() throws HyracksDataException {
- appender.flush(writer);
- }
- };
+ @Override
+ public void flush() throws HyracksDataException {
+ appender.flush(writer);
+ }
}
-}
+}
\ No newline at end of file