reduce the memory copies in the index join runtime

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2674 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 6ef7e13..d62c525 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -60,6 +60,8 @@
     public static final String NUM_VERTICE = "pregelix.numVertices";
     /** num of edges */
     public static final String NUM_EDGES = "pregelix.numEdges";
+    /** increase state length */
+    public static final String INCREASE_STATE_LENGTH = "pregelix.incStateLength";
     /** job id */
     public static final String JOB_ID = "pregelix.jobid";
 
@@ -130,8 +132,8 @@
     /**
      * Set the global aggregator class (optional)
      * 
-     * @param vertexCombinerClass
-     *            Determines how vertex messages are combined
+     * @param globalAggregatorClass
+     *            Determines how messages are globally aggregated
      */
     final public void setGlobalAggregatorClass(Class<?> globalAggregatorClass) {
         getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, globalAggregatorClass, GlobalAggregator.class);
@@ -139,11 +141,17 @@
 
     /**
      * Set the job Id
-     * 
-     * @param vertexCombinerClass
-     *            Determines how vertex messages are combined
      */
     final public void setJobId(String jobId) {
         getConfiguration().set(JOB_ID, jobId);
     }
+
+    /**
+     * Set whether the vertex state length can be dynamically increased
+     * 
+     * @param jobId
+     */
+    final public void setIncStateLengthDynamically(boolean incStateLengthDynamically) {
+        getConfiguration().setBoolean(INCREASE_STATE_LENGTH, incStateLengthDynamically);
+    }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 7c4853f..74bccb9 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -410,4 +410,15 @@
             throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
         }
     }
+
+    /**
+     * Get the job configuration parameter whether the vertex states will increase dynamically
+     * 
+     * @param conf
+     *            the job configuration
+     * @return the boolean setting of the parameter, by default it is false
+     */
+    public static boolean getIncStateLengthDynamically(Configuration conf) {
+        return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, false);
+    }
 }
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 75a8087..7237537 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.pregelix.dataflow.std;
 
-import java.io.DataOutput;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -24,7 +23,6 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -51,9 +49,6 @@
 
     private ByteBuffer writeBuffer;
     private FrameTupleAppender appender;
-    private ArrayTupleBuilder tb;
-    private DataOutput dos;
-
     private BTree btree;
     private PermutingFrameTupleReference lowKey;
     private PermutingFrameTupleReference highKey;
@@ -67,8 +62,6 @@
     protected ITreeIndexAccessor indexAccessor;
 
     private RecordDescriptor recDesc;
-    private final RecordDescriptor inputRecDesc;
-
     private final IFrameWriter[] writers;
     private final FunctionProxy functionProxy;
 
@@ -77,7 +70,6 @@
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
             IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
             IRuntimeHookFactory postHookFactory, IRecordDescriptorFactory inputRdFactory, int outputArity) {
-        inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
                 opDesc, ctx, partition);
         this.lowKeyInclusive = lowKeyInclusive;
@@ -144,8 +136,6 @@
             rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
                     highKeySearchCmp);
             writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
-            tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
-            dos = tb.getDataOutput();
             appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
@@ -158,27 +148,13 @@
 
     private void writeSearchResults(IFrameTupleAccessor leftAccessor, int tIndex) throws Exception {
         while (cursor.hasNext()) {
-            tb.reset();
             cursor.next();
-
             ITupleReference tupleRef = cursor.getTuple();
-            for (int i = 0; i < inputRecDesc.getFields().length; i++) {
-                int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
-                int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
-                int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
-                int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
-                dos.write(leftAccessor.getBuffer().array(), offset, len);
-                tb.addFieldEndOffset();
-            }
-            for (int i = 0; i < tupleRef.getFieldCount(); i++) {
-                dos.write(tupleRef.getFieldData(i), tupleRef.getFieldStart(i), tupleRef.getFieldLength(i));
-                tb.addFieldEndOffset();
-            }
 
             /**
              * call the update function
              */
-            functionProxy.functionCall(tb, tupleRef);
+            functionProxy.functionCall(leftAccessor, tIndex, tupleRef);
         }
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index c31ebd4..af53abe 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -53,7 +53,7 @@
 
     private ByteBuffer writeBuffer;
     private FrameTupleAppender appender;
-    private ArrayTupleBuilder tb;
+    private ArrayTupleBuilder nullTupleBuilder;
     private DataOutput dos;
 
     private BTree btree;
@@ -144,8 +144,15 @@
             rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
 
             writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
-            tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
-            dos = tb.getDataOutput();
+            
+            nullTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFields().length);            
+            dos = nullTupleBuilder.getDataOutput();
+            nullTupleBuilder.reset();
+            for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+                nullWriter[i].writeNull(dos);
+                nullTupleBuilder.addFieldEndOffset();
+            }
+            
             appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
@@ -172,24 +179,10 @@
 
     private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
             throws Exception {
-        tb.reset();
-        for (int i = 0; i < inputRecDesc.getFields().length; i++) {
-            int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
-            int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
-            int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
-            int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
-            dos.write(leftAccessor.getBuffer().array(), offset, len);
-            tb.addFieldEndOffset();
-        }
-        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
-            tb.addFieldEndOffset();
-        }
-
         /**
          * function call
          */
-        functionProxy.functionCall(tb, frameTuple);
+        functionProxy.functionCall(leftAccessor, tIndex, frameTuple);
     }
 
     @Override
@@ -271,20 +264,10 @@
 
     /** write result for outer case */
     private void writeResults(ITupleReference frameTuple) throws Exception {
-        tb.reset();
-        for (int i = 0; i < inputRecDesc.getFields().length; i++) {
-            nullWriter[i].writeNull(dos);
-            tb.addFieldEndOffset();
-        }
-        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
-            tb.addFieldEndOffset();
-        }
-
         /**
          * function call
          */
-        functionProxy.functionCall(tb, frameTuple);
+        functionProxy.functionCall(nullTupleBuilder, frameTuple);
     }
 
     @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 0a966b5..1aa044d 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.pregelix.dataflow.std;
 
-import java.io.DataOutput;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -24,7 +23,6 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -51,8 +49,6 @@
 
     private ByteBuffer writeBuffer;
     private FrameTupleAppender appender;
-    private ArrayTupleBuilder tb;
-    private DataOutput dos;
 
     private BTree btree;
     private boolean isForward;
@@ -63,8 +59,6 @@
     protected ITreeIndexAccessor indexAccessor;
 
     private RecordDescriptor recDesc;
-    private final RecordDescriptor inputRecDesc;
-
     private PermutingFrameTupleReference lowKey;
     private PermutingFrameTupleReference highKey;
 
@@ -79,7 +73,6 @@
             int[] lowKeyFields, int[] highKeyFields, IUpdateFunctionFactory functionFactory,
             IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
             IRecordDescriptorFactory inputRdFactory, int outputArity) {
-        inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
                 opDesc, ctx, partition);
         this.isForward = isForward;
@@ -123,8 +116,6 @@
             lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
 
             writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
-            tb = new ArrayTupleBuilder(btree.getFieldCount());
-            dos = tb.getDataOutput();
             appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
@@ -231,29 +222,13 @@
 
     /** write the right result */
     private void writeRightResults(ITupleReference frameTuple) throws Exception {
-        tb.reset();
-        for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-            dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
-            tb.addFieldEndOffset();
-        }
-
-        functionProxy.functionCall(tb, frameTuple);
+        functionProxy.functionCall(frameTuple);
     }
 
     /** write the left result */
     private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
             throws Exception {
-        tb.reset();
-        for (int i = 0; i < inputRecDesc.getFields().length; i++) {
-            int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
-            int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
-            int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
-            int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
-            dos.write(leftAccessor.getBuffer().array(), offset, len);
-            tb.addFieldEndOffset();
-        }
-
-        functionProxy.functionCall(tb, frameTuple);
+        functionProxy.functionCall(leftAccessor, tIndex, frameTuple);
     }
 
     @Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 4b0f4a5..bb69ff8 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -15,6 +15,7 @@
 
 package edu.uci.ics.pregelix.dataflow.util;
 
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -68,16 +69,19 @@
     /**
      * Call the function
      * 
-     * @param tb
-     *            input data
+     * @param leftAccessor
+     *            input page accessor
+     * @param leftTupleIndex
+     *            the tuple index in the page
      * @param updateRef
      *            update pointer
      * @throws HyracksDataException
      */
-    public void functionCall(ArrayTupleBuilder tb, ITupleReference updateRef) throws HyracksDataException {
-        Object[] tuple = tupleDe.deserializeRecord(tb);
+    public void functionCall(IFrameTupleAccessor leftAccessor, int leftTupleIndex, ITupleReference right)
+            throws HyracksDataException {
+        Object[] tuple = tupleDe.deserializeRecord(leftAccessor, leftTupleIndex, right);
         function.process(tuple);
-        function.update(updateRef);
+        function.update(right);
     }
 
     /**
@@ -93,6 +97,21 @@
     }
 
     /**
+     * Call the function
+     * 
+     * @param tb
+     *            input data
+     * @param updateRef
+     *            update pointer
+     * @throws HyracksDataException
+     */
+    public void functionCall(ArrayTupleBuilder tb, ITupleReference updateRef) throws HyracksDataException {
+        Object[] tuple = tupleDe.deserializeRecord(tb, updateRef);
+        function.process(tuple);
+        function.update(updateRef);
+    }
+
+    /**
      * Close the function
      * 
      * @throws HyracksDataException
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
index 5ae1d81..4fe83db 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -20,6 +20,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -42,7 +43,7 @@
     }
 
     public Object[] deserializeRecord(ITupleReference tupleRef) throws HyracksDataException {
-        for (int i = 0; i < record.length; ++i) {
+        for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
             byte[] data = tupleRef.getFieldData(i);
             int offset = tupleRef.getFieldStart(i);
             bbis.setByteArray(data, offset);
@@ -65,11 +66,65 @@
         return record;
     }
 
-    public Object[] deserializeRecord(ArrayTupleBuilder tb) throws HyracksDataException {
+    public Object[] deserializeRecord(IFrameTupleAccessor left, int tIndex, ITupleReference right)
+            throws HyracksDataException {
+        byte[] data = left.getBuffer().array();
+        int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
+        int leftFieldCount = left.getFieldCount();
+        int fStart = tStart;
+        for (int i = 0; i < leftFieldCount; ++i) {
+            /**
+             * reset the input
+             */
+            fStart = tStart + left.getFieldStartOffset(tIndex, i);
+            bbis.setByteArray(data, fStart);
+
+            /**
+             * do deserialization
+             */
+            Object instance = recordDescriptor.getFields()[i].deserialize(di);
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest(i + " " + instance);
+            }
+            record[i] = instance;
+            if (FrameConstants.DEBUG_FRAME_IO) {
+                try {
+                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+                        throw new HyracksDataException("Field magic mismatch");
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        for (int i = leftFieldCount; i < record.length; ++i) {
+            byte[] rightData = right.getFieldData(i - leftFieldCount);
+            int rightOffset = right.getFieldStart(i - leftFieldCount);
+            bbis.setByteArray(rightData, rightOffset);
+
+            Object instance = recordDescriptor.getFields()[i].deserialize(di);
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest(i + " " + instance);
+            }
+            record[i] = instance;
+            if (FrameConstants.DEBUG_FRAME_IO) {
+                try {
+                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+                        throw new HyracksDataException("Field magic mismatch");
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        return record;
+    }
+
+    public Object[] deserializeRecord(ArrayTupleBuilder tb, ITupleReference right) throws HyracksDataException {
         byte[] data = tb.getByteArray();
         int[] offset = tb.getFieldEndOffsets();
         int start = 0;
-        for (int i = 0; i < record.length; ++i) {
+        for (int i = 0; i < offset.length; ++i) {
             /**
              * reset the input
              */
@@ -94,6 +149,26 @@
                 }
             }
         }
+        for (int i = offset.length; i < record.length; ++i) {
+            byte[] rightData = right.getFieldData(i - offset.length);
+            int rightOffset = right.getFieldStart(i - offset.length);
+            bbis.setByteArray(rightData, rightOffset);
+
+            Object instance = recordDescriptor.getFields()[i].deserialize(di);
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest(i + " " + instance);
+            }
+            record[i] = instance;
+            if (FrameConstants.DEBUG_FRAME_IO) {
+                try {
+                    if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+                        throw new HyracksDataException("Field magic mismatch");
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
         return record;
     }
 }