[NO ISSUE][RT] Make StreamSelectRuntimeFactory Extensible
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Make StreamSelectRuntimeFactory and its runtime
extensible by providing accessors and replacing
the runtime anonymous class by a named class.
Change-Id: I9c575e6c037e5c8c1818cfa3c6b0bf65697bfb9e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3308
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 933e640..e84b5b4 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -40,22 +40,13 @@
private static final long serialVersionUID = 1L;
// Final
- private final IScalarEvaluatorFactory cond;
- private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
- private final IMissingWriterFactory missingWriterFactory;
+ protected final IScalarEvaluatorFactory cond;
+ protected final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
+ protected final IMissingWriterFactory missingWriterFactory;
// Mutable
- private boolean retainMissing;
+ protected boolean retainMissing;
private int missingPlaceholderVariableIndex;
- /**
- * @param cond
- * @param projectionList
- * if projectionList is null, then no projection is performed
- * @param retainMissing
- * @param missingPlaceholderVariableIndex
- * @param missingWriterFactory
- * @throws HyracksDataException
- */
public StreamSelectRuntimeFactory(IScalarEvaluatorFactory cond, int[] projectionList,
IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory, boolean retainMissing,
int missingPlaceholderVariableIndex, IMissingWriterFactory missingWriterFactory) {
@@ -67,11 +58,6 @@
this.missingWriterFactory = missingWriterFactory;
}
- public void retainMissing(boolean retainMissing, int index) {
- this.retainMissing = retainMissing;
- this.missingPlaceholderVariableIndex = index;
- }
-
@Override
public String toString() {
return "stream-select " + cond.toString();
@@ -80,61 +66,103 @@
@Override
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
final IBinaryBooleanInspector bbi = binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx);
- return new AbstractOneInputOneOutputOneFieldFramePushRuntime() {
- private IPointable p = VoidPointable.FACTORY.createPointable();
- private IScalarEvaluator eval;
- private IMissingWriter missingWriter = null;
- private ArrayTupleBuilder missingTupleBuilder = null;
+ return new StreamSelectRuntime(ctx, bbi);
+ }
- @Override
- public void open() throws HyracksDataException {
- if (eval == null) {
- initAccessAppendFieldRef(ctx);
- eval = cond.createScalarEvaluator(ctx);
- }
- super.open();
- //prepare nullTupleBuilder
- if (retainMissing && missingWriter == null) {
- missingWriter = missingWriterFactory.createMissingWriter();
- missingTupleBuilder = new ArrayTupleBuilder(1);
- DataOutput out = missingTupleBuilder.getDataOutput();
- missingWriter.writeMissing(out);
- missingTupleBuilder.addFieldEndOffset();
- }
+ public void retainMissing(boolean retainMissing, int index) {
+ this.retainMissing = retainMissing;
+ this.missingPlaceholderVariableIndex = index;
+ }
+
+ public IScalarEvaluatorFactory getCond() {
+ return cond;
+ }
+
+ public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
+ return binaryBooleanInspectorFactory;
+ }
+
+ public IMissingWriterFactory getMissingWriterFactory() {
+ return missingWriterFactory;
+ }
+
+ public boolean isRetainMissing() {
+ return retainMissing;
+ }
+
+ public int getMissingPlaceholderVariableIndex() {
+ return missingPlaceholderVariableIndex;
+ }
+
+ public int[] getProjectionList() {
+ return projectionList;
+ }
+
+ public class StreamSelectRuntime extends AbstractOneInputOneOutputOneFieldFramePushRuntime {
+
+ protected final IPointable p = VoidPointable.FACTORY.createPointable();
+ protected final IHyracksTaskContext ctx;
+ protected final IBinaryBooleanInspector bbi;
+ protected IScalarEvaluator eval;
+ protected IMissingWriter missingWriter;
+ protected ArrayTupleBuilder missingTupleBuilder;
+
+ public StreamSelectRuntime(IHyracksTaskContext ctx, IBinaryBooleanInspector bbi) {
+ this.ctx = ctx;
+ this.bbi = bbi;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (eval == null) {
+ initAccessAppendFieldRef(ctx);
+ eval = cond.createScalarEvaluator(ctx);
}
+ super.open();
+ if (retainMissing && missingWriter == null) {
+ missingWriter = missingWriterFactory.createMissingWriter();
+ missingTupleBuilder = new ArrayTupleBuilder(1);
+ DataOutput out = missingTupleBuilder.getDataOutput();
+ missingWriter.writeMissing(out);
+ missingTupleBuilder.addFieldEndOffset();
+ }
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- tRef.reset(tAccess, t);
- eval.evaluate(tRef, p);
- if (bbi.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength())) {
- if (projectionList != null) {
- appendProjectionToFrame(t, projectionList);
- } else {
- appendTupleToFrame(t);
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ eval.evaluate(tRef, p);
+ if (bbi.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength())) {
+ if (projectionList != null) {
+ appendProjectionToFrame(t, projectionList);
} else {
- if (retainMissing) {
- for (int i = 0; i < tRef.getFieldCount(); i++) {
- if (i == missingPlaceholderVariableIndex) {
- appendField(missingTupleBuilder.getByteArray(), 0, missingTupleBuilder.getSize());
- } else {
- appendField(tAccess, t, i);
- }
- }
- }
+ appendTupleToFrame(t);
+ }
+ } else {
+ if (retainMissing) {
+ retainMissingTuple(t);
}
}
}
+ }
- @Override
- public void flush() throws HyracksDataException {
- appender.flush(writer);
+ @Override
+ public void flush() throws HyracksDataException {
+ appender.flush(writer);
+ }
+
+ protected void retainMissingTuple(int t) throws HyracksDataException {
+ for (int i = 0; i < tRef.getFieldCount(); i++) {
+ if (i == missingPlaceholderVariableIndex) {
+ appendField(missingTupleBuilder.getByteArray(), 0, missingTupleBuilder.getSize());
+ } else {
+ appendField(tAccess, t, i);
+ }
}
- };
+ }
}
}