Merge branch 'master' into jarodwen/features/positionvar_local
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
index 7efee75..b9ac62f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestOperator.java
@@ -23,12 +23,22 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class UnnestOperator extends AbstractUnnestOperator {
 
     private LogicalVariable positionalVariable;
+
+    /**
+     * Used to set the position offset for positional variable
+     */
+    private ILogicalExpression positionOffsetExpr;
+
+    /**
+     * Specify the type of the positional variable
+     */
     private Object positionalVariableType;
 
     public UnnestOperator(LogicalVariable variable, Mutable<ILogicalExpression> expression) {
@@ -67,6 +77,14 @@
         return positionalVariableType;
     }
 
+    public void setPositionOffsetExpr(ILogicalExpression posOffsetExpr) {
+        this.positionOffsetExpr = posOffsetExpr;
+    }
+
+    public ILogicalExpression getPositionOffsetExpr() {
+        return this.positionOffsetExpr;
+    }
+
     @Override
     public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
         return visitor.visitUnnestOperator(this, arg);
@@ -88,4 +106,24 @@
         }
         return env;
     }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return new VariablePropagationPolicy() {
+
+            @Override
+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+                    throws AlgebricksException {
+                if (sources.length > 0) {
+                    target.addAllVariables(sources[0]);
+                }
+                for (LogicalVariable v : variables) {
+                    target.addVariable(v);
+                }
+                if (positionalVariable != null) {
+                    target.addVariable(positionalVariable);
+                }
+            }
+        };
+    }
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
index bf45e18..bf2f68a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnnestPOperator.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.UnnestRuntimeFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -57,10 +58,7 @@
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         UnnestOperator unnest = (UnnestOperator) op;
-        if (unnest.getPositionalVariable() != null) {
-            throw new AlgebricksException("Cannot generate runtime for unnest with positional variable "
-                    + unnest.getPositionalVariable());
-        }
+
         int outCol = opSchema.findVariable(unnest.getVariable());
         ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
         IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
@@ -79,8 +77,18 @@
         UnnestingFunctionCallExpression agg = (UnnestingFunctionCallExpression) unnestExpr;
         IUnnestingEvaluatorFactory unnestingFactory = expressionRuntimeProvider.createUnnestingFunctionFactory(agg,
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+
+        // for position offset
+        ILogicalExpression posOffsetExpr = unnest.getPositionOffsetExpr();
+        IScalarEvaluatorFactory posOffsetExprEvalFactory = null;
+        if (posOffsetExpr != null) {
+            posOffsetExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(posOffsetExpr,
+                    context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas, context);
+        }
+
         int[] projectionList = JobGenHelper.projectAllVariables(opSchema);
-        UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList);
+        UnnestRuntimeFactory unnestRuntime = new UnnestRuntimeFactory(outCol, unnestingFactory, projectionList,
+                unnest.getPositionalVariable() != null, posOffsetExprEvalFactory);
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
         builder.contributeMicroOperator(unnest, unnestRuntime, recDesc);
         ILogicalOperator src = unnest.getInputs().get(0).getValue();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 6c1dd5e..b079c3e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -51,8 +51,8 @@
             FrameUtils.flushFrame(frame, writer);
             appender.reset(frame, true);
             if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                throw new IllegalStateException(
-                        "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder).");
+                throw new HyracksDataException(
+                        "Could not write frame: the size of the tuple is too long to be fit into a single frame. (AbstractOneInputOneOutputOneFramePushRuntime.appendToFrameFromTupleBuilder)");
             }
         }
         if (flushFrame) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 0499684..e5bede2 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -107,7 +107,8 @@
                             FrameUtils.flushFrame(frame, writer);
                             appender.reset(frame, true);
                             if (!appender.appendConcat(tRef.getFrameTupleAccessor(), tRef.getTupleIndex(), ta, t)) {
-                                throw new IllegalStateException("Could not write frame.");
+                                throw new HyracksDataException(
+                                        "Could not write frame: subplan result is larger than the single-frame limit.");
                             }
                         }
                     }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index df70437..88dc19f 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -14,11 +14,15 @@
  */
 package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluator;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ConstantEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -26,6 +30,7 @@
 import edu.uci.ics.hyracks.data.std.api.IPointable;
 import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
@@ -36,12 +41,20 @@
     private int outColPos;
     private final boolean outColIsProjected;
 
+    private final boolean hasPositionalVariable;
+    private IScalarEvaluatorFactory posOffsetEvalFactory;
+
     // Each time step() is called on the aggregate, a new value is written in
     // its output. One byte is written before that value and is neglected.
     // By convention, if the aggregate function writes nothing, it means it
     // produced the last value.
 
     public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList) {
+        this(outCol, unnestingFactory, projectionList, false, null);
+    }
+
+    public UnnestRuntimeFactory(int outCol, IUnnestingEvaluatorFactory unnestingFactory, int[] projectionList,
+            boolean hashPositionalVariable, IScalarEvaluatorFactory posOffsetEvalFactory) {
         super(projectionList);
         this.outCol = outCol;
         this.unnestingFactory = unnestingFactory;
@@ -52,6 +65,11 @@
             }
         }
         outColIsProjected = outColPos >= 0;
+        this.hasPositionalVariable = hashPositionalVariable;
+        this.posOffsetEvalFactory = posOffsetEvalFactory;
+        if (this.posOffsetEvalFactory == null) {
+            this.posOffsetEvalFactory = new ConstantEvaluatorFactory(new byte[5]);
+        }
     }
 
     @Override
@@ -68,6 +86,9 @@
             private IUnnestingEvaluator agg;
             private ArrayTupleBuilder tupleBuilder;
 
+            private int tupleCount;
+            private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
+
             @Override
             public void open() throws HyracksDataException {
                 initAccessAppendRef(ctx);
@@ -77,6 +98,7 @@
                     throw new HyracksDataException(ae);
                 }
                 tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+                tupleCount = 1;
                 writer.open();
             }
 
@@ -86,6 +108,16 @@
                 int nTuple = tAccess.getTupleCount();
                 for (int t = 0; t < nTuple; t++) {
                     tRef.reset(tAccess, t);
+
+                    try {
+                        offsetEval.evaluate(tRef, p);
+                    } catch (AlgebricksException e) {
+                        throw new HyracksDataException(e);
+                    }
+
+                    @SuppressWarnings("static-access")
+                    int offset = IntegerSerializerDeserializer.INSTANCE.getInt(p.getByteArray(), p.getStartOffset());
+
                     try {
                         agg.init(tRef);
                         boolean goon = true;
@@ -94,21 +126,33 @@
                             if (!agg.step(p)) {
                                 goon = false;
                             } else {
-                                if (!outColIsProjected) {
+
+                                if (!outColIsProjected && !hasPositionalVariable) {
                                     appendProjectionToFrame(t, projectionList);
                                 } else {
                                     for (int f = 0; f < outColPos; f++) {
                                         tupleBuilder.addField(tAccess, t, f);
                                     }
-                                    tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
-                                    for (int f = outColPos + 1; f < projectionList.length; f++) {
+                                    if (outColIsProjected) {
+                                        tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+                                    } else {
+                                        tupleBuilder.addField(tAccess, t, outColPos);
+                                    }
+                                    for (int f = outColPos + 1; f < (hasPositionalVariable ? projectionList.length - 1
+                                            : projectionList.length); f++) {
                                         tupleBuilder.addField(tAccess, t, f);
                                     }
                                 }
+                                if (hasPositionalVariable) {
+                                    // Write the positional variable as an INT32
+                                    tupleBuilder.getDataOutput().writeByte(3);
+                                    tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
+                                    tupleBuilder.addFieldEndOffset();
+                                }
                                 appendToFrameFromTupleBuilder(tupleBuilder);
                             }
                         } while (goon);
-                    } catch (AlgebricksException ae) {
+                    } catch (AlgebricksException | IOException ae) {
                         throw new HyracksDataException(ae);
                     }
                 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index c75f9f9..1af32de 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -25,4 +25,6 @@
     public IIOManager getIOManager();
 
     public ByteBuffer allocateFrame() throws HyracksDataException;
+    
+    public void deallocateFrames(int frameCount);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
index aa7008f..3ea81ef 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/DatasetClientContext.java
@@ -41,4 +41,10 @@
         return ByteBuffer.allocate(frameSize);
     }
 
+    @Override
+    public void deallocateFrames(int frameCount) {
+        // TODO Auto-generated method stub
+        
+    }
+
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 53e5a01..bbbe0f5 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -114,6 +114,11 @@
     }
 
     @Override
+    public void deallocateFrames(int frameCount) {
+        joblet.deallocateFrames(frameCount);
+    }
+
+    @Override
     public int getFrameSize() {
         return joblet.getFrameSize();
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 7773765..66efc03 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -44,6 +44,8 @@
 
     private boolean first;
 
+    private boolean isFailed = false;
+
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators,
             IAggregatorDescriptor aggregator, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc,
             IFrameWriter writer) throws HyracksDataException {
@@ -149,12 +151,13 @@
 
     @Override
     public void fail() throws HyracksDataException {
+        isFailed = true;
         writer.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
-        if (!first) {
+        if (!isFailed && !first) {
             writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
             if (appender.getTupleCount() > 0) {
                 FrameUtils.flushFrame(outFrame, writer);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 0ab2f4d..b03489c 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -561,6 +561,12 @@
         public ByteBuffer allocateFrame() {
             return ByteBuffer.allocate(FRAME_SIZE);
         }
+        
+        @Override
+        public void deallocateFrames(int frameCount) {
+            // TODO Auto-generated method stub
+            
+        }
     }
 
     @Override
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index 10b5582..f18099b 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -46,6 +46,10 @@
     public ByteBuffer allocateFrame() {
         return jobletContext.allocateFrame();
     }
+    
+    @Override
+    public void deallocateFrames(int frameCount) {
+    }
 
     @Override
     public int getFrameSize() {