reduce the memory copies in the index join runtime
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2674 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index 6ef7e13..d62c525 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -60,6 +60,8 @@
public static final String NUM_VERTICE = "pregelix.numVertices";
/** num of edges */
public static final String NUM_EDGES = "pregelix.numEdges";
+ /** increase state length */
+ public static final String INCREASE_STATE_LENGTH = "pregelix.incStateLength";
/** job id */
public static final String JOB_ID = "pregelix.jobid";
@@ -130,8 +132,8 @@
/**
* Set the global aggregator class (optional)
*
- * @param vertexCombinerClass
- * Determines how vertex messages are combined
+ * @param globalAggregatorClass
+ * Determines how messages are globally aggregated
*/
final public void setGlobalAggregatorClass(Class<?> globalAggregatorClass) {
getConfiguration().setClass(GLOBAL_AGGREGATOR_CLASS, globalAggregatorClass, GlobalAggregator.class);
@@ -139,11 +141,17 @@
/**
* Set the job Id
- *
- * @param vertexCombinerClass
- * Determines how vertex messages are combined
*/
final public void setJobId(String jobId) {
getConfiguration().set(JOB_ID, jobId);
}
+
+ /**
+ * Set whether the vertex state length can be dynamically increased
+ *
+ * @param jobId
+ */
+ final public void setIncStateLengthDynamically(boolean incStateLengthDynamically) {
+ getConfiguration().setBoolean(INCREASE_STATE_LENGTH, incStateLengthDynamically);
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 7c4853f..74bccb9 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -410,4 +410,15 @@
throw new IllegalArgumentException("createMessageValue: Illegally accessed", e);
}
}
+
+ /**
+ * Get the job configuration parameter whether the vertex states will increase dynamically
+ *
+ * @param conf
+ * the job configuration
+ * @return the boolean setting of the parameter, by default it is false
+ */
+ public static boolean getIncStateLengthDynamically(Configuration conf) {
+ return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, false);
+ }
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 75a8087..7237537 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.pregelix.dataflow.std;
-import java.io.DataOutput;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -24,7 +23,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -51,9 +49,6 @@
private ByteBuffer writeBuffer;
private FrameTupleAppender appender;
- private ArrayTupleBuilder tb;
- private DataOutput dos;
-
private BTree btree;
private PermutingFrameTupleReference lowKey;
private PermutingFrameTupleReference highKey;
@@ -67,8 +62,6 @@
protected ITreeIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
- private final RecordDescriptor inputRecDesc;
-
private final IFrameWriter[] writers;
private final FunctionProxy functionProxy;
@@ -77,7 +70,6 @@
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
IRuntimeHookFactory postHookFactory, IRecordDescriptorFactory inputRdFactory, int outputArity) {
- inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
this.lowKeyInclusive = lowKeyInclusive;
@@ -144,8 +136,6 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
- dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
@@ -158,27 +148,13 @@
private void writeSearchResults(IFrameTupleAccessor leftAccessor, int tIndex) throws Exception {
while (cursor.hasNext()) {
- tb.reset();
cursor.next();
-
ITupleReference tupleRef = cursor.getTuple();
- for (int i = 0; i < inputRecDesc.getFields().length; i++) {
- int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
- int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
- int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
- int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
- dos.write(leftAccessor.getBuffer().array(), offset, len);
- tb.addFieldEndOffset();
- }
- for (int i = 0; i < tupleRef.getFieldCount(); i++) {
- dos.write(tupleRef.getFieldData(i), tupleRef.getFieldStart(i), tupleRef.getFieldLength(i));
- tb.addFieldEndOffset();
- }
/**
* call the update function
*/
- functionProxy.functionCall(tb, tupleRef);
+ functionProxy.functionCall(leftAccessor, tIndex, tupleRef);
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index c31ebd4..af53abe 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -53,7 +53,7 @@
private ByteBuffer writeBuffer;
private FrameTupleAppender appender;
- private ArrayTupleBuilder tb;
+ private ArrayTupleBuilder nullTupleBuilder;
private DataOutput dos;
private BTree btree;
@@ -144,8 +144,15 @@
rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
- dos = tb.getDataOutput();
+
+ nullTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFields().length);
+ dos = nullTupleBuilder.getDataOutput();
+ nullTupleBuilder.reset();
+ for (int i = 0; i < inputRecDesc.getFields().length; i++) {
+ nullWriter[i].writeNull(dos);
+ nullTupleBuilder.addFieldEndOffset();
+ }
+
appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
@@ -172,24 +179,10 @@
private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
throws Exception {
- tb.reset();
- for (int i = 0; i < inputRecDesc.getFields().length; i++) {
- int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
- int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
- int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
- int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
- dos.write(leftAccessor.getBuffer().array(), offset, len);
- tb.addFieldEndOffset();
- }
- for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
- tb.addFieldEndOffset();
- }
-
/**
* function call
*/
- functionProxy.functionCall(tb, frameTuple);
+ functionProxy.functionCall(leftAccessor, tIndex, frameTuple);
}
@Override
@@ -271,20 +264,10 @@
/** write result for outer case */
private void writeResults(ITupleReference frameTuple) throws Exception {
- tb.reset();
- for (int i = 0; i < inputRecDesc.getFields().length; i++) {
- nullWriter[i].writeNull(dos);
- tb.addFieldEndOffset();
- }
- for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
- tb.addFieldEndOffset();
- }
-
/**
* function call
*/
- functionProxy.functionCall(tb, frameTuple);
+ functionProxy.functionCall(nullTupleBuilder, frameTuple);
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 0a966b5..1aa044d 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.pregelix.dataflow.std;
-import java.io.DataOutput;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -24,7 +23,6 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -51,8 +49,6 @@
private ByteBuffer writeBuffer;
private FrameTupleAppender appender;
- private ArrayTupleBuilder tb;
- private DataOutput dos;
private BTree btree;
private boolean isForward;
@@ -63,8 +59,6 @@
protected ITreeIndexAccessor indexAccessor;
private RecordDescriptor recDesc;
- private final RecordDescriptor inputRecDesc;
-
private PermutingFrameTupleReference lowKey;
private PermutingFrameTupleReference highKey;
@@ -79,7 +73,6 @@
int[] lowKeyFields, int[] highKeyFields, IUpdateFunctionFactory functionFactory,
IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
IRecordDescriptorFactory inputRdFactory, int outputArity) {
- inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
this.isForward = isForward;
@@ -123,8 +116,6 @@
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
- tb = new ArrayTupleBuilder(btree.getFieldCount());
- dos = tb.getDataOutput();
appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
@@ -231,29 +222,13 @@
/** write the right result */
private void writeRightResults(ITupleReference frameTuple) throws Exception {
- tb.reset();
- for (int i = 0; i < frameTuple.getFieldCount(); i++) {
- dos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
- tb.addFieldEndOffset();
- }
-
- functionProxy.functionCall(tb, frameTuple);
+ functionProxy.functionCall(frameTuple);
}
/** write the left result */
private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
throws Exception {
- tb.reset();
- for (int i = 0; i < inputRecDesc.getFields().length; i++) {
- int tupleStart = leftAccessor.getTupleStartOffset(tIndex);
- int fieldStart = leftAccessor.getFieldStartOffset(tIndex, i);
- int offset = leftAccessor.getFieldSlotsLength() + tupleStart + fieldStart;
- int len = leftAccessor.getFieldEndOffset(tIndex, i) - fieldStart;
- dos.write(leftAccessor.getBuffer().array(), offset, len);
- tb.addFieldEndOffset();
- }
-
- functionProxy.functionCall(tb, frameTuple);
+ functionProxy.functionCall(leftAccessor, tIndex, frameTuple);
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 4b0f4a5..bb69ff8 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -15,6 +15,7 @@
package edu.uci.ics.pregelix.dataflow.util;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -68,16 +69,19 @@
/**
* Call the function
*
- * @param tb
- * input data
+ * @param leftAccessor
+ * input page accessor
+ * @param leftTupleIndex
+ * the tuple index in the page
* @param updateRef
* update pointer
* @throws HyracksDataException
*/
- public void functionCall(ArrayTupleBuilder tb, ITupleReference updateRef) throws HyracksDataException {
- Object[] tuple = tupleDe.deserializeRecord(tb);
+ public void functionCall(IFrameTupleAccessor leftAccessor, int leftTupleIndex, ITupleReference right)
+ throws HyracksDataException {
+ Object[] tuple = tupleDe.deserializeRecord(leftAccessor, leftTupleIndex, right);
function.process(tuple);
- function.update(updateRef);
+ function.update(right);
}
/**
@@ -93,6 +97,21 @@
}
/**
+ * Call the function
+ *
+ * @param tb
+ * input data
+ * @param updateRef
+ * update pointer
+ * @throws HyracksDataException
+ */
+ public void functionCall(ArrayTupleBuilder tb, ITupleReference updateRef) throws HyracksDataException {
+ Object[] tuple = tupleDe.deserializeRecord(tb, updateRef);
+ function.process(tuple);
+ function.update(updateRef);
+ }
+
+ /**
* Close the function
*
* @throws HyracksDataException
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
index 5ae1d81..4fe83db 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/TupleDeserializer.java
@@ -20,6 +20,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -42,7 +43,7 @@
}
public Object[] deserializeRecord(ITupleReference tupleRef) throws HyracksDataException {
- for (int i = 0; i < record.length; ++i) {
+ for (int i = 0; i < tupleRef.getFieldCount(); ++i) {
byte[] data = tupleRef.getFieldData(i);
int offset = tupleRef.getFieldStart(i);
bbis.setByteArray(data, offset);
@@ -65,11 +66,65 @@
return record;
}
- public Object[] deserializeRecord(ArrayTupleBuilder tb) throws HyracksDataException {
+ public Object[] deserializeRecord(IFrameTupleAccessor left, int tIndex, ITupleReference right)
+ throws HyracksDataException {
+ byte[] data = left.getBuffer().array();
+ int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength();
+ int leftFieldCount = left.getFieldCount();
+ int fStart = tStart;
+ for (int i = 0; i < leftFieldCount; ++i) {
+ /**
+ * reset the input
+ */
+ fStart = tStart + left.getFieldStartOffset(tIndex, i);
+ bbis.setByteArray(data, fStart);
+
+ /**
+ * do deserialization
+ */
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest(i + " " + instance);
+ }
+ record[i] = instance;
+ if (FrameConstants.DEBUG_FRAME_IO) {
+ try {
+ if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+ throw new HyracksDataException("Field magic mismatch");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ for (int i = leftFieldCount; i < record.length; ++i) {
+ byte[] rightData = right.getFieldData(i - leftFieldCount);
+ int rightOffset = right.getFieldStart(i - leftFieldCount);
+ bbis.setByteArray(rightData, rightOffset);
+
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest(i + " " + instance);
+ }
+ record[i] = instance;
+ if (FrameConstants.DEBUG_FRAME_IO) {
+ try {
+ if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+ throw new HyracksDataException("Field magic mismatch");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return record;
+ }
+
+ public Object[] deserializeRecord(ArrayTupleBuilder tb, ITupleReference right) throws HyracksDataException {
byte[] data = tb.getByteArray();
int[] offset = tb.getFieldEndOffsets();
int start = 0;
- for (int i = 0; i < record.length; ++i) {
+ for (int i = 0; i < offset.length; ++i) {
/**
* reset the input
*/
@@ -94,6 +149,26 @@
}
}
}
+ for (int i = offset.length; i < record.length; ++i) {
+ byte[] rightData = right.getFieldData(i - offset.length);
+ int rightOffset = right.getFieldStart(i - offset.length);
+ bbis.setByteArray(rightData, rightOffset);
+
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest(i + " " + instance);
+ }
+ record[i] = instance;
+ if (FrameConstants.DEBUG_FRAME_IO) {
+ try {
+ if (di.readInt() != FrameConstants.FRAME_FIELD_MAGIC) {
+ throw new HyracksDataException("Field magic mismatch");
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
return record;
}
}