Merge branch 'master' into jarodwen/features/positionvar
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
index 7efee75..b9ac62f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
@@ -23,12 +23,22 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class UnnestOperator extends AbstractUnnestOperator {
 
     private LogicalVariable positionalVariable;
+
+    /**
+     * Used to set the position offset for positional variable
+     */
+    private ILogicalExpression positionOffsetExpr;
+
+    /**
+     * Specify the type of the positional variable
+     */
     private Object positionalVariableType;
 
     public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression) {
@@ -67,6 +77,14 @@
         return positionalVariableType;
     }
 
+    public void setPositionOffsetExpr(ILogicalExpression posOffsetExpr) {
+        this.positionOffsetExpr = posOffsetExpr;
+    }
+
+    public ILogicalExpression getPositionOffsetExpr() {
+        return this.positionOffsetExpr;
+    }
+
     @Override
     public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
         return visitor.visitUnnestOperator(this, arg);
@@ -88,4 +106,24 @@
         }
         return env;
     }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return new VariablePropagationPolicy() {
+
+            @Override
+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+                    throws AlgebricksException {
+                if (sources.length > 0) {
+                    target.addAllVariables(sources[0]);
+                }
+                for (LogicalVariable v : variables) {
+                    target.addVariable(v);
+                }
+                if (positionalVariable != null) {
+                    target.addVariable(positionalVariable);
+                }
+            }
+        };
+    }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
index bf45e18..bf2f68a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.UnnestRuntimeFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -57,10 +58,7 @@
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         UnnestOperator unnest = (UnnestOperator) op;
-        if (unnest.getPositionalVariable() != null) {
-            throw new AlgebricksException("Cannot generate runtime for unnest with positional variable "
-                    + unnest.getPositionalVariable());
-        }
+
         int outCol = opSchema.findVariable(unnest.getVariable());
         ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
         IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
@@ -79,8 +77,18 @@
         UnnestingFunctionCallExpression agg = (UnnestingFunctionCallExpression) unnestExpr;
         IUnnestingEvaluatorFactory unnestingFactory = expressionRuntimeProvider.createUnnestingFunctionFactory(agg,
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+
+        // for position offset
+        ILogicalExpression posOffsetExpr = unnest.getPositionOffsetExpr();
+        IScalarEvaluatorFactory posOffsetExprEvalFactory = null;
+        if (posOffsetExpr != null) {
+            posOffsetExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(posOffsetExpr,
+                    context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+        }
+
         int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
-        UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList);
+        UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList,
+                unnest.getPositionalVariable() != null, posOffsetExprEvalFactory);
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
         builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
         ILogicalOperator src = unnest.getInputs().get(0).getValue();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index df70437..81f0afa 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -14,11 +14,15 @@
  */
 package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -26,6 +30,7 @@
 import edu.uci.ics.hyracks.data.std.api.IPointable;
 import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
@@ -36,12 +41,20 @@
     private int outColPos;
     private final boolean outColIsProjected;
 
+    private final boolean hasPositionalVariable;
+    private IScalarEvaluatorFactory posOffsetEvalFactory;
+
     // Each time step() is called on the aggregate, a new value is written in
     // its output. One byte is written before that value and is neglected.
     // By convention, if the aggregate function writes nothing, it means it
     // produced the last value.
 
     public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList) {
+        this(outCol, unnestingFactory, projectionList, false, null);
+    }
+
+    public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList,
+            boolean hashPositionalVariable, IScalarEvaluatorFactory posOffsetEvalFactory) {
         super(projectionList);
         this.outCol = outCol;
         this.unnestingFactory = unnestingFactory;
@@ -52,6 +65,11 @@
             }
         }
         outColIsProjected = outColPos >= 0;
+        this.hasPositionalVariable = hashPositionalVariable;
+        this.posOffsetEvalFactory = posOffsetEvalFactory;
+        if (this.posOffsetEvalFactory == null) {
+            this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]);
+        }
     }
 
     @Override
@@ -68,6 +86,9 @@
             private IUnnestingEvaluator agg;
             private ArrayTupleBuilder tupleBuilder;
 
+            private int tupleCount;
+            private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
+
             @Override
             public void open() throws HyracksDataException {
                 initAccessAppendRef(ctx);
@@ -77,6 +98,7 @@
                     throw new HyracksDataException(ae);
                 }
                 tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+                tupleCount = 1;
                 writer.open();
             }
 
@@ -86,6 +108,16 @@
                 int nTuple = tAccess.getTupleCount();
                 for (int t = 0; t < nTuple; t++) {
                     tRef.reset(tAccess, t);
+
+                    try {
+                        offsetEval.evaluate(tRef, p);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+
+                    @SuppressWarnings("static-access")
+                    int offset = IntegerSerializerDeserializer.INSTANCE.getInt(p.getByteArray(), p.getStartOffset());
+
                     try {
                         agg.init(tRef);
                         boolean goon = true;
@@ -94,21 +126,32 @@
                             if (!agg.step(p)) {
                                 goon = false;
                             } else {
-                                if (!outColIsProjected) {
+
+                                if (!outColIsProjected && !hasPositionalVariable) {
                                     appendProjectionToFrame(t, projectionList);
                                 } else {
                                     for (int f = 0; f < outColPos; f++) {
                                         tupleBuilder.addField(tAccess, t, f);
                                     }
-                                    tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
-                                    for (int f = outColPos + 1; f < projectionList.length; f++) {
+                                    if (outColIsProjected) {
+                                        tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+                                    } else {
+                                        tupleBuilder.addField(tAccess, t, outColPos);
+                                    }
+                                    for (int f = outColPos + 1; f < (hasPositionalVariable ? projectionList.length - 1
+                                            : projectionList.length); f++) {
                                         tupleBuilder.addField(tAccess, t, f);
                                     }
                                 }
+                                if (hasPositionalVariable) {
+                                    tupleBuilder.getDataOutput().writeByte(0);
+                                    tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
+                                    tupleBuilder.addFieldEndOffset();
+                                }
                                 appendToFrameFromTupleBuilder(tupleBuilder);
                             }
                         } while (goon);
-                    } catch (AlgebricksException ae) {
+                    } catch (AlgebricksException | IOException ae) {
                         throw new HyracksDataException(ae);
                     }
                 }