[NO ISSUE][RT] Make StreamSelectRuntimeFactory Extensible

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Make StreamSelectRuntimeFactory and its runtime
  extensible by providing accessors and replacing
  the runtime anonymous class by a named class.

Change-Id: I9c575e6c037e5c8c1818cfa3c6b0bf65697bfb9e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3308
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 933e640..e84b5b4 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -40,22 +40,13 @@
 
     private static final long serialVersionUID = 1L;
     // Final
-    private final IScalarEvaluatorFactory cond;
-    private final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
-    private final IMissingWriterFactory missingWriterFactory;
+    protected final IScalarEvaluatorFactory cond;
+    protected final IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
+    protected final IMissingWriterFactory missingWriterFactory;
     // Mutable
-    private boolean retainMissing;
+    protected boolean retainMissing;
     private int missingPlaceholderVariableIndex;
 
-    /**
-     * @param cond
-     * @param projectionList
-     *            if projectionList is null, then no projection is performed
-     * @param retainMissing
-     * @param missingPlaceholderVariableIndex
-     * @param missingWriterFactory
-     * @throws HyracksDataException
-     */
     public StreamSelectRuntimeFactory(IScalarEvaluatorFactory cond, int[] projectionList,
             IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory, boolean retainMissing,
             int missingPlaceholderVariableIndex, IMissingWriterFactory missingWriterFactory) {
@@ -67,11 +58,6 @@
         this.missingWriterFactory = missingWriterFactory;
     }
 
-    public void retainMissing(boolean retainMissing, int index) {
-        this.retainMissing = retainMissing;
-        this.missingPlaceholderVariableIndex = index;
-    }
-
     @Override
     public String toString() {
         return "stream-select " + cond.toString();
@@ -80,61 +66,103 @@
     @Override
     public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
         final IBinaryBooleanInspector bbi = binaryBooleanInspectorFactory.createBinaryBooleanInspector(ctx);
-        return new AbstractOneInputOneOutputOneFieldFramePushRuntime() {
-            private IPointable p = VoidPointable.FACTORY.createPointable();
-            private IScalarEvaluator eval;
-            private IMissingWriter missingWriter = null;
-            private ArrayTupleBuilder missingTupleBuilder = null;
+        return new StreamSelectRuntime(ctx, bbi);
+    }
 
-            @Override
-            public void open() throws HyracksDataException {
-                if (eval == null) {
-                    initAccessAppendFieldRef(ctx);
-                    eval = cond.createScalarEvaluator(ctx);
-                }
-                super.open();
-                //prepare nullTupleBuilder
-                if (retainMissing && missingWriter == null) {
-                    missingWriter = missingWriterFactory.createMissingWriter();
-                    missingTupleBuilder = new ArrayTupleBuilder(1);
-                    DataOutput out = missingTupleBuilder.getDataOutput();
-                    missingWriter.writeMissing(out);
-                    missingTupleBuilder.addFieldEndOffset();
-                }
+    public void retainMissing(boolean retainMissing, int index) {
+        this.retainMissing = retainMissing;
+        this.missingPlaceholderVariableIndex = index;
+    }
+
+    public IScalarEvaluatorFactory getCond() {
+        return cond;
+    }
+
+    public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
+        return binaryBooleanInspectorFactory;
+    }
+
+    public IMissingWriterFactory getMissingWriterFactory() {
+        return missingWriterFactory;
+    }
+
+    public boolean isRetainMissing() {
+        return retainMissing;
+    }
+
+    public int getMissingPlaceholderVariableIndex() {
+        return missingPlaceholderVariableIndex;
+    }
+
+    public int[] getProjectionList() {
+        return projectionList;
+    }
+
+    public class StreamSelectRuntime extends AbstractOneInputOneOutputOneFieldFramePushRuntime {
+
+        protected final IPointable p = VoidPointable.FACTORY.createPointable();
+        protected final IHyracksTaskContext ctx;
+        protected final IBinaryBooleanInspector bbi;
+        protected IScalarEvaluator eval;
+        protected IMissingWriter missingWriter;
+        protected ArrayTupleBuilder missingTupleBuilder;
+
+        public StreamSelectRuntime(IHyracksTaskContext ctx, IBinaryBooleanInspector bbi) {
+            this.ctx = ctx;
+            this.bbi = bbi;
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            if (eval == null) {
+                initAccessAppendFieldRef(ctx);
+                eval = cond.createScalarEvaluator(ctx);
             }
+            super.open();
+            if (retainMissing && missingWriter == null) {
+                missingWriter = missingWriterFactory.createMissingWriter();
+                missingTupleBuilder = new ArrayTupleBuilder(1);
+                DataOutput out = missingTupleBuilder.getDataOutput();
+                missingWriter.writeMissing(out);
+                missingTupleBuilder.addFieldEndOffset();
+            }
+        }
 
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                tAccess.reset(buffer);
-                int nTuple = tAccess.getTupleCount();
-                for (int t = 0; t < nTuple; t++) {
-                    tRef.reset(tAccess, t);
-                    eval.evaluate(tRef, p);
-                    if (bbi.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength())) {
-                        if (projectionList != null) {
-                            appendProjectionToFrame(t, projectionList);
-                        } else {
-                            appendTupleToFrame(t);
-                        }
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            tAccess.reset(buffer);
+            int nTuple = tAccess.getTupleCount();
+            for (int t = 0; t < nTuple; t++) {
+                tRef.reset(tAccess, t);
+                eval.evaluate(tRef, p);
+                if (bbi.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength())) {
+                    if (projectionList != null) {
+                        appendProjectionToFrame(t, projectionList);
                     } else {
-                        if (retainMissing) {
-                            for (int i = 0; i < tRef.getFieldCount(); i++) {
-                                if (i == missingPlaceholderVariableIndex) {
-                                    appendField(missingTupleBuilder.getByteArray(), 0, missingTupleBuilder.getSize());
-                                } else {
-                                    appendField(tAccess, t, i);
-                                }
-                            }
-                        }
+                        appendTupleToFrame(t);
+                    }
+                } else {
+                    if (retainMissing) {
+                        retainMissingTuple(t);
                     }
                 }
             }
+        }
 
-            @Override
-            public void flush() throws HyracksDataException {
-                appender.flush(writer);
+        @Override
+        public void flush() throws HyracksDataException {
+            appender.flush(writer);
+        }
+
+        protected void retainMissingTuple(int t) throws HyracksDataException {
+            for (int i = 0; i < tRef.getFieldCount(); i++) {
+                if (i == missingPlaceholderVariableIndex) {
+                    appendField(missingTupleBuilder.getByteArray(), 0, missingTupleBuilder.getSize());
+                } else {
+                    appendField(tAccess, t, i);
+                }
             }
-        };
+        }
     }
 
 }