Index-only plan step 2: Added SplitOperator
- Introduced SplitOperator that sends a tuple to only one output frame unlike the ReplicateOperator
that propagates a tuple into all outputs frames.
- Removed PartitioningSplitOperator and PartitioningSplitOperatorDescriptor that are not functional
(lacking physical operator)
- Added a unit test case of SplitOperatorDescriptor in PushRuntimeTest.
Change-Id: Ice190827513cd8632764b52c9d0338d65c830740
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1196
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl
new file mode 100644
index 0000000..0ea8a88
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl
@@ -0,0 +1,4 @@
+0,first branch1
+0,first branch2
+0,first branch3
+0,first branch4
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl
new file mode 100644
index 0000000..53588ef
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl
@@ -0,0 +1,3 @@
+1,second branch1
+1,second branch2
+1,second branch3
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl
new file mode 100644
index 0000000..ceb859a
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl
@@ -0,0 +1,7 @@
+0|first branch1
+1|second branch1
+0|first branch2
+1|second branch2
+0|first branch3
+1|second branch3
+0|first branch4
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 020cffe..1276518 100644
--- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -50,6 +50,7 @@
import org.apache.hyracks.algebricks.runtime.operators.std.PrinterRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.std.SplitOperatorDescriptor;
import org.apache.hyracks.algebricks.runtime.operators.std.StreamLimitRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
@@ -83,7 +84,7 @@
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -570,7 +571,7 @@
}
@Test
- public void scanSplitWrite() throws Exception {
+ public void scanReplicateWrite() throws Exception {
final int outputArity = 2;
JobSpecification spec = new JobSpecification(FRAME_SIZE);
@@ -596,7 +597,69 @@
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp,
new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
- SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity);
+ ReplicateOperatorDescriptor replicateOp = new ReplicateOperatorDescriptor(spec, stringRec, outputArity);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, replicateOp,
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+ IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
+ for (int i = 0; i < outputArity; i++) {
+ outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] {
+ new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(outputFile[i])) });
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i],
+ new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+ }
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, replicateOp, 0);
+ for (int i = 0; i < outputArity; i++) {
+ spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, i, outputOp[i], 0);
+ }
+
+ for (int i = 0; i < outputArity; i++) {
+ spec.addRoot(outputOp[i]);
+ }
+ AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+ for (int i = 0; i < outputArity; i++) {
+ compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+ }
+ }
+
+ @Test
+ public void scanSplitWrite() throws Exception {
+ final int outputArity = 2;
+
+ JobSpecification spec = new JobSpecification(FRAME_SIZE);
+
+ String inputFileName[] = { "data/simple/int-string-part1.tbl", "data/simple/int-string-part1-split-0.tbl",
+ "data/simple/int-string-part1-split-1.tbl" };
+ File[] inputFiles = new File[inputFileName.length];
+ for (int i=0; i<inputFileName.length; i++) {
+ inputFiles[i] = new File(inputFileName[i]);
+ }
+ File[] outputFile = new File[outputArity];
+ for (int i = 0; i < outputArity; i++) {
+ outputFile[i] = File.createTempFile("splitop", null);
+ }
+
+ FileSplit[] inputSplits = new FileSplit[] {
+ new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(inputFiles[0])) };
+ IFileSplitProvider intSplitProvider = new ConstantFileSplitProvider(inputSplits);
+
+ RecordDescriptor scannerDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+ new UTF8StringSerializerDeserializer() });
+
+ IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE };
+
+ FileScanOperatorDescriptor intScanner = new FileScanOperatorDescriptor(spec, intSplitProvider,
+ new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, intScanner, DEFAULT_NODES);
+
+ SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, scannerDesc, outputArity,
+ new TupleFieldEvaluatorFactory(0), BinaryIntegerInspectorImpl.FACTORY);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp,
new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
@@ -609,7 +672,7 @@
new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
}
- spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), intScanner, 0, splitOp, 0);
for (int i = 0; i < outputArity; i++) {
spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0);
}
@@ -620,7 +683,7 @@
AlgebricksHyracksIntegrationUtil.runJob(spec);
for (int i = 0; i < outputArity; i++) {
- compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+ compareFiles(inputFileName[i + 1], outputFile[i].getAbsolutePath());
}
}