Merge branch 'master' into jarodwen/features/positionvar
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
index 7efee75..b9ac62f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
@@ -23,12 +23,22 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
public class UnnestOperator extends AbstractUnnestOperator {
private LogicalVariable positionalVariable;
+
+ /**
+ * Used to set the position offset for positional variable
+ */
+ private ILogicalExpression positionOffsetExpr;
+
+ /**
+ * Specify the type of the positional variable
+ */
private Object positionalVariableType;
public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression) {
@@ -67,6 +77,14 @@
return positionalVariableType;
}
+ public void setPositionOffsetExpr(ILogicalExpression posOffsetExpr) {
+ this.positionOffsetExpr = posOffsetExpr;
+ }
+
+ public ILogicalExpression getPositionOffsetExpr() {
+ return this.positionOffsetExpr;
+ }
+
@Override
public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
return visitor.visitUnnestOperator(this, arg);
@@ -88,4 +106,24 @@
}
return env;
}
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return new VariablePropagationPolicy() {
+
+ @Override
+ public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+ throws AlgebricksException {
+ if (sources.length > 0) {
+ target.addAllVariables(sources[0]);
+ }
+ for (LogicalVariable v : variables) {
+ target.addVariable(v);
+ }
+ if (positionalVariable != null) {
+ target.addVariable(positionalVariable);
+ }
+ }
+ };
+ }
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
index bf45e18..bf2f68a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.UnnestRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -57,10 +58,7 @@
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
UnnestOperator unnest = (UnnestOperator) op;
- if (unnest.getPositionalVariable() != null) {
- throw new AlgebricksException("Cannot generate runtime for unnest with positional variable "
- + unnest.getPositionalVariable());
- }
+
int outCol = opSchema.findVariable(unnest.getVariable());
ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
@@ -79,8 +77,18 @@
UnnestingFunctionCallExpression agg = (UnnestingFunctionCallExpression) unnestExpr;
IUnnestingEvaluatorFactory unnestingFactory = expressionRuntimeProvider.createUnnestingFunctionFactory(agg,
context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+
+ // for position offset
+ ILogicalExpression posOffsetExpr = unnest.getPositionOffsetExpr();
+ IScalarEvaluatorFactory posOffsetExprEvalFactory = null;
+ if (posOffsetExpr != null) {
+ posOffsetExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(posOffsetExpr,
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+ }
+
int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
- UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList);
+ UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList,
+ unnest.getPositionalVariable() != null, posOffsetExprEvalFactory);
RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
ILogicalOperator src = unnest.getInputs().get(0).getValue();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index df70437..81f0afa 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -14,11 +14,15 @@
*/
package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
+import java.io.IOException;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -26,6 +30,7 @@
import edu.uci.ics.hyracks.data.std.api.IPointable;
import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -36,12 +41,20 @@
private int outColPos;
private final boolean outColIsProjected;
+ private final boolean hasPositionalVariable;
+ private IScalarEvaluatorFactory posOffsetEvalFactory;
+
// Each time step() is called on the aggregate, a new value is written in
// its output. One byte is written before that value and is neglected.
// By convention, if the aggregate function writes nothing, it means it
// produced the last value.
public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList) {
+ this(outCol, unnestingFactory, projectionList, false, null);
+ }
+
+ public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList,
+ boolean hashPositionalVariable, IScalarEvaluatorFactory posOffsetEvalFactory) {
super(projectionList);
this.outCol = outCol;
this.unnestingFactory = unnestingFactory;
@@ -52,6 +65,11 @@
}
}
outColIsProjected = outColPos >= 0;
+ this.hasPositionalVariable = hashPositionalVariable;
+ this.posOffsetEvalFactory = posOffsetEvalFactory;
+ if (this.posOffsetEvalFactory == null) {
+ this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]);
+ }
}
@Override
@@ -68,6 +86,9 @@
private IUnnestingEvaluator agg;
private ArrayTupleBuilder tupleBuilder;
+ private int tupleCount;
+ private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
+
@Override
public void open() throws HyracksDataException {
initAccessAppendRef(ctx);
@@ -77,6 +98,7 @@
throw new HyracksDataException(ae);
}
tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ tupleCount = 1;
writer.open();
}
@@ -86,6 +108,16 @@
int nTuple = tAccess.getTupleCount();
for (int t = 0; t < nTuple; t++) {
tRef.reset(tAccess, t);
+
+ try {
+ offsetEval.evaluate(tRef, p);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+
+ @SuppressWarnings("static-access")
+ int offset = IntegerSerializerDeserializer.INSTANCE.getInt(p.getByteArray(), p.getStartOffset());
+
try {
agg.init(tRef);
boolean goon = true;
@@ -94,21 +126,32 @@
if (!agg.step(p)) {
goon = false;
} else {
- if (!outColIsProjected) {
+
+ if (!outColIsProjected && !hasPositionalVariable) {
appendProjectionToFrame(t, projectionList);
} else {
for (int f = 0; f < outColPos; f++) {
tupleBuilder.addField(tAccess, t, f);
}
- tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
- for (int f = outColPos + 1; f < projectionList.length; f++) {
+ if (outColIsProjected) {
+ tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+ } else {
+ tupleBuilder.addField(tAccess, t, outColPos);
+ }
+ for (int f = outColPos + 1; f < (hasPositionalVariable ? projectionList.length - 1
+ : projectionList.length); f++) {
tupleBuilder.addField(tAccess, t, f);
}
}
+ if (hasPositionalVariable) {
+ tupleBuilder.getDataOutput().writeByte(0);
+ tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
+ tupleBuilder.addFieldEndOffset();
+ }
appendToFrameFromTupleBuilder(tupleBuilder);
}
} while (goon);
- } catch (AlgebricksException ae) {
+ } catch (AlgebricksException | IOException ae) {
throw new HyracksDataException(ae);
}
}