Index-only plan step 2: Added SplitOperator

 - Introduced SplitOperator that sends a tuple to only one output frame unlike the ReplicateOperator
   that propagates a tuple into all outputs frames.
 - Removed PartitioningSplitOperator and PartitioningSplitOperatorDescriptor that are not functional
   (lacking physical operator)
 - Added a unit test case of SplitOperatorDescriptor in PushRuntimeTest.

Change-Id: Ice190827513cd8632764b52c9d0338d65c830740
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1196
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl
new file mode 100644
index 0000000..0ea8a88
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl
@@ -0,0 +1,4 @@
+0,first branch1
+0,first branch2
+0,first branch3
+0,first branch4
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl
new file mode 100644
index 0000000..53588ef
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl
@@ -0,0 +1,3 @@
+1,second branch1
+1,second branch2
+1,second branch3
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl
new file mode 100644
index 0000000..ceb859a
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl
@@ -0,0 +1,7 @@
+0|first branch1
+1|second branch1
+0|first branch2
+1|second branch2
+0|first branch3
+1|second branch3
+0|first branch4
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 020cffe..1276518 100644
--- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -50,6 +50,7 @@
 import org.apache.hyracks.algebricks.runtime.operators.std.PrinterRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.std.SplitOperatorDescriptor;
 import org.apache.hyracks.algebricks.runtime.operators.std.StreamLimitRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
@@ -83,7 +84,7 @@
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -570,7 +571,7 @@
     }
 
     @Test
-    public void scanSplitWrite() throws Exception {
+    public void scanReplicateWrite() throws Exception {
         final int outputArity = 2;
 
         JobSpecification spec = new JobSpecification(FRAME_SIZE);
@@ -596,7 +597,69 @@
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
 
-        SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity);
+        ReplicateOperatorDescriptor replicateOp = new ReplicateOperatorDescriptor(spec, stringRec, outputArity);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, replicateOp,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
+        for (int i = 0; i < outputArity; i++) {
+            outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] {
+                    new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(outputFile[i])) });
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i],
+                    new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+        }
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, replicateOp, 0);
+        for (int i = 0; i < outputArity; i++) {
+            spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, i, outputOp[i], 0);
+        }
+
+        for (int i = 0; i < outputArity; i++) {
+            spec.addRoot(outputOp[i]);
+        }
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        for (int i = 0; i < outputArity; i++) {
+            compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+        }
+    }
+
+    @Test
+    public void scanSplitWrite() throws Exception {
+        final int outputArity = 2;
+
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
+
+        String inputFileName[] = { "data/simple/int-string-part1.tbl", "data/simple/int-string-part1-split-0.tbl",
+                "data/simple/int-string-part1-split-1.tbl" };
+        File[] inputFiles = new File[inputFileName.length];
+        for (int i=0; i<inputFileName.length; i++) {
+            inputFiles[i] = new File(inputFileName[i]);
+        }
+        File[] outputFile = new File[outputArity];
+        for (int i = 0; i < outputArity; i++) {
+            outputFile[i] = File.createTempFile("splitop", null);
+        }
+
+        FileSplit[] inputSplits = new FileSplit[] {
+                new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(inputFiles[0])) };
+        IFileSplitProvider intSplitProvider = new ConstantFileSplitProvider(inputSplits);
+
+        RecordDescriptor scannerDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+                        new UTF8StringSerializerDeserializer() });
+
+        IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE };
+
+        FileScanOperatorDescriptor intScanner = new FileScanOperatorDescriptor(spec, intSplitProvider,
+                new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, intScanner, DEFAULT_NODES);
+
+        SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, scannerDesc, outputArity,
+                new TupleFieldEvaluatorFactory(0), BinaryIntegerInspectorImpl.FACTORY);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
@@ -609,7 +672,7 @@
                     new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
         }
 
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), intScanner, 0, splitOp, 0);
         for (int i = 0; i < outputArity; i++) {
             spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0);
         }
@@ -620,7 +683,7 @@
         AlgebricksHyracksIntegrationUtil.runJob(spec);
 
         for (int i = 0; i < outputArity; i++) {
-            compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+            compareFiles(inputFileName[i + 1], outputFile[i].getAbsolutePath());
         }
     }