Merge branch 'master' into ceej/tiered-tests
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushUnnestDownThroughProductRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushUnnestDownThroughProductRule.java
new file mode 100644
index 0000000..c70fca9
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushUnnestDownThroughProductRule.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class PushUnnestDownThroughProductRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.UNNEST) {
+            return false;
+        }
+        Mutable<ILogicalOperator> op2Ref = op1.getInputs().get(0);
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op2Ref.getValue();
+        if (op2.getOperatorTag() != LogicalOperatorTag.INNERJOIN) {
+            return false;
+        }
+        AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op2;
+        if (join.getCondition().getValue() != ConstantExpression.TRUE) {
+            return false;
+        }
+
+        List<LogicalVariable> used = new ArrayList<LogicalVariable>();
+        VariableUtilities.getUsedVariables(op1, used);
+
+        Mutable<ILogicalOperator> b0Ref = op2.getInputs().get(0);
+        ILogicalOperator b0 = b0Ref.getValue();
+        List<LogicalVariable> b0Scm = new ArrayList<LogicalVariable>();
+        VariableUtilities.getLiveVariables(b0, b0Scm);
+        if (b0Scm.containsAll(used)) {
+            // push unnest on left branch
+            op2Ref.setValue(b0);
+            b0Ref.setValue(op1);
+            opRef.setValue(op2);
+            return true;
+        } else {
+            Mutable<ILogicalOperator> b1Ref = op2.getInputs().get(1);
+            ILogicalOperator b1 = b1Ref.getValue();
+            List<LogicalVariable> b1Scm = new ArrayList<LogicalVariable>();
+            VariableUtilities.getLiveVariables(b1, b1Scm);
+            if (b1Scm.containsAll(used)) {
+                // push unnest on right branch
+                op2Ref.setValue(b1);
+                b1Ref.setValue(op1);
+                opRef.setValue(op2);
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 501dadf..98b8e79 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -28,10 +28,19 @@
 
     private int tupleDataEndOffset;
 
+    private int numberOfFields = -1;
+    private int currentField = 0;
+    private int lastFieldEndOffset = 0;
+
     public FrameTupleAppender(int frameSize) {
         this.frameSize = frameSize;
     }
 
+    public FrameTupleAppender(int frameSize, int numberOfFields) {
+        this.frameSize = frameSize;
+        this.numberOfFields = numberOfFields;
+    }
+
     public void reset(ByteBuffer buffer, boolean clear) {
         this.buffer = buffer;
         if (clear) {
@@ -91,6 +100,44 @@
         return false;
     }
 
+    public boolean appendField(byte[] bytes, int offset, int length) {
+        if (numberOfFields < 0) {
+            throw new IllegalStateException("unintialized number of fields " + numberOfFields);
+        }
+        int currentTupleDataStart = tupleDataEndOffset + numberOfFields * 4 + lastFieldEndOffset;
+        if (currentTupleDataStart + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+            System.arraycopy(bytes, offset, buffer.array(), currentTupleDataStart, length);
+            lastFieldEndOffset = lastFieldEndOffset + length;
+            buffer.putInt(tupleDataEndOffset + currentField * 4, lastFieldEndOffset);
+            if (++currentField == numberOfFields) {
+                tupleDataEndOffset += numberOfFields * 4 + lastFieldEndOffset;
+                buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+                ++tupleCount;
+                buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+
+                //reset for the next tuple
+                currentField = 0;
+                lastFieldEndOffset = 0;
+            }
+            return true;
+        } else {
+            //reset for the next tuple
+            currentField = 0;
+            lastFieldEndOffset = 0;
+            return false;
+        }
+    }
+
+    public boolean appendField(IFrameTupleAccessor fta, int tIndex, int fIndex) {
+        if (numberOfFields < 0) {
+            throw new IllegalStateException("unintialized number of fields " + numberOfFields);
+        }
+        int startOffset = fta.getTupleStartOffset(tIndex);
+        int fStartOffset = fta.getFieldStartOffset(tIndex, fIndex);
+        int fLen = fta.getFieldEndOffset(tIndex, fIndex) - fStartOffset;
+        return appendField(fta.getBuffer().array(), startOffset + fta.getFieldSlotsLength() + fStartOffset, fLen);
+    }
+
     public boolean append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) {
         int length = tEndOffset - tStartOffset;
         if (tupleDataEndOffset + length + 4 + (tupleCount + 1) * 4 <= frameSize) {