The Hyracks issue #137 shows a hard coded value. The solution was to create a
IUnnestPositionWriter to write the position variable in the form the type defined
outside Hyracks.
The update attempts to make the fewest number of changes to add the writer.
Change-Id: I6ebab5b3dfabcd36c732067acefe33da22307fc7
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/67
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
Reviewed-by: Preston Carman <ecarm002@ucr.edu>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
index b9ac62f..dba9e19 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
@@ -26,6 +26,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
public class UnnestOperator extends AbstractUnnestOperator {
@@ -37,6 +38,11 @@
private ILogicalExpression positionOffsetExpr;
/**
+ * Specify the writer of the positional variable
+ */
+ private IUnnestingPositionWriter positionWriter;
+
+ /**
* Specify the type of the positional variable
*/
private Object positionalVariableType;
@@ -46,10 +52,11 @@
}
public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression,
- LogicalVariable positionalVariable, Object positionalVariableType) {
+ LogicalVariable positionalVariable, Object positionalVariableType, IUnnestingPositionWriter positionWriter) {
this(variable, expression);
this.setPositionalVariable(positionalVariable);
this.setPositionalVariableType(positionalVariableType);
+ this.setPositionWriter(positionWriter);
}
@Override
@@ -69,6 +76,14 @@
return positionalVariable;
}
+ public void setPositionWriter(IUnnestingPositionWriter positionWriter) {
+ this.positionWriter = positionWriter;
+ }
+
+ public IUnnestingPositionWriter getPositionWriter() {
+ return positionalVariable != null ? positionWriter : null;
+ }
+
public void setPositionalVariableType(Object positionalVariableType) {
this.positionalVariableType = positionalVariableType;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 3b761da..9157878 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -720,7 +720,7 @@
@Override
public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
return new UnnestOperator(op.getVariable(), deepCopyExpressionRef(op.getExpressionRef()),
- op.getPositionalVariable(), op.getPositionalVariableType());
+ op.getPositionalVariable(), op.getPositionalVariableType(), op.getPositionWriter());
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
index bf2f68a..d60d4f7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
@@ -88,7 +88,7 @@
int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList,
- unnest.getPositionalVariable() != null, posOffsetExprEvalFactory);
+ unnest.getPositionWriter(), posOffsetExprEvalFactory);
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
ILogicalOperator src = unnest.getInputs().get(0).getValue();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
new file mode 100644
index 0000000..b8d549a
--- /dev/null
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/base/IUnnestingPositionWriter.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.base;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface IUnnestingPositionWriter {
+ public void write(DataOutput dataOutput, int position) throws IOException;
+}
\ No newline at end of file
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 415e718..bdd6d36 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingPositionWriter;
import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
@@ -41,7 +42,7 @@
private int outColPos;
private final boolean outColIsProjected;
- private final boolean hasPositionalVariable;
+ private final IUnnestingPositionWriter positionWriter;
private IScalarEvaluatorFactory posOffsetEvalFactory;
// Each time step() is called on the aggregate, a new value is written in
@@ -50,11 +51,11 @@
// produced the last value.
public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList) {
- this(outCol, unnestingFactory, projectionList, false, null);
+ this(outCol, unnestingFactory, projectionList, null, null);
}
public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList,
- boolean hashPositionalVariable, IScalarEvaluatorFactory posOffsetEvalFactory) {
+ IUnnestingPositionWriter positionWriter, IScalarEvaluatorFactory posOffsetEvalFactory) {
super(projectionList);
this.outCol = outCol;
this.unnestingFactory = unnestingFactory;
@@ -65,7 +66,7 @@
}
}
outColIsProjected = outColPos >= 0;
- this.hasPositionalVariable = hashPositionalVariable;
+ this.positionWriter = positionWriter;
this.posOffsetEvalFactory = posOffsetEvalFactory;
if (this.posOffsetEvalFactory == null) {
this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]);
@@ -129,7 +130,7 @@
goon = false;
} else {
- if (!outColIsProjected && !hasPositionalVariable) {
+ if (!outColIsProjected && positionWriter == null) {
appendProjectionToFrame(t, projectionList);
} else {
for (int f = 0; f < outColPos; f++) {
@@ -140,15 +141,14 @@
} else {
tupleBuilder.addField(tAccess, t, outColPos);
}
- for (int f = outColPos + 1; f < (hasPositionalVariable ? projectionList.length - 1
+ for (int f = outColPos + 1; f < (positionWriter != null ? projectionList.length - 1
: projectionList.length); f++) {
tupleBuilder.addField(tAccess, t, f);
}
}
- if (hasPositionalVariable) {
- // Write the positional variable as an INT32
- tupleBuilder.getDataOutput().writeByte(3);
- tupleBuilder.getDataOutput().writeInt(offset + positionIndex++);
+ if (positionWriter != null) {
+ // Write the positional variable
+ positionWriter.write(tupleBuilder.getDataOutput(), offset + positionIndex++);
tupleBuilder.addFieldEndOffset();
}
appendToFrameFromTupleBuilder(tupleBuilder);