let Pregelix support dynamic vertex value size
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@2677 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
index d62c525..bba2229 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/job/PregelixJob.java
@@ -151,7 +151,7 @@
*
* @param jobId
*/
- final public void setIncStateLengthDynamically(boolean incStateLengthDynamically) {
+ final public void setDynamicVertexValueSize(boolean incStateLengthDynamically) {
getConfiguration().setBoolean(INCREASE_STATE_LENGTH, incStateLengthDynamically);
}
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
index 74bccb9..6066dfb 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/util/BspUtils.java
@@ -418,7 +418,7 @@
* the job configuration
* @return the boolean setting of the parameter, by default it is false
*/
- public static boolean getIncStateLengthDynamically(Configuration conf) {
+ public static boolean getDynamicVertexValueSize(Configuration conf) {
return conf.getBoolean(PregelixJob.INCREASE_STATE_LENGTH, false);
}
}
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
index 62f92dd..a0d365f 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IUpdateFunction.java
@@ -16,17 +16,19 @@
package edu.uci.ics.pregelix.dataflow.std.base;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
public interface IUpdateFunction extends IFunction {
- /**
- * update the tuple pointed by tupleRef called after process,
- * one-input-tuple-at-a-time
- *
- * @param tupleRef
- * @throws HyracksDataException
- */
- public void update(ITupleReference tupleRef) throws HyracksDataException;
+ /**
+ * update the tuple pointed by tupleRef called after process,
+ * one-input-tuple-at-a-time
+ *
+ * @param tupleRef
+ * @throws HyracksDataException
+ */
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb)
+ throws HyracksDataException;
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
index fb84aa0..9389ab6 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
@@ -43,6 +43,7 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class BTreeSearchFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
protected TreeIndexDataflowHelper treeIndexHelper;
@@ -70,6 +71,8 @@
private final IFrameWriter[] writers;
private final FunctionProxy functionProxy;
+ private ArrayTupleBuilder cloneUpdateTb;
+ private final UpdateBuffer updateBuffer;
public BTreeSearchFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -94,6 +97,7 @@
this.writers = new IFrameWriter[outputArity];
this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
writers);
+ this.updateBuffer = new UpdateBuffer(ctx, 2);
}
@Override
@@ -122,6 +126,8 @@
appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
indexAccessor = btree.createAccessor();
+
+ cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
} catch (Exception e) {
treeIndexHelper.deinit();
throw new HyracksDataException(e);
@@ -136,7 +142,24 @@
while (cursor.hasNext()) {
cursor.next();
ITupleReference tuple = cursor.getTuple();
- functionProxy.functionCall(tuple);
+ functionProxy.functionCall(tuple, cloneUpdateTb);
+
+ //doing clone update
+ if (cloneUpdateTb.getSize() > 0) {
+ if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+ //release the cursor/latch
+ cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
+
+ //search again
+ cursor.reset();
+ rangePred.setLowKey(tuple, true);
+ rangePred.setHighKey(highKey, highKeyInclusive);
+ indexAccessor.search(cursor, rangePred);
+ }
+ }
+ cloneUpdateTb.reset();
}
}
@@ -168,6 +191,8 @@
try {
try {
cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 7237537..69bb6a2 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -42,6 +43,7 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class IndexNestedLoopJoinFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -64,6 +66,8 @@
private RecordDescriptor recDesc;
private final IFrameWriter[] writers;
private final FunctionProxy functionProxy;
+ private ArrayTupleBuilder cloneUpdateTb;
+ private final UpdateBuffer updateBuffer;
public IndexNestedLoopJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -87,6 +91,7 @@
this.writers = new IFrameWriter[outputArity];
this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
writers);
+ this.updateBuffer = new UpdateBuffer(ctx, 2);
}
protected void setCursor() {
@@ -140,6 +145,7 @@
appender.reset(writeBuffer, true);
indexAccessor = btree.createAccessor();
+ cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
} catch (Exception e) {
treeIndexOpHelper.deinit();
throw new HyracksDataException(e);
@@ -154,7 +160,23 @@
/**
* call the update function
*/
- functionProxy.functionCall(leftAccessor, tIndex, tupleRef);
+ functionProxy.functionCall(leftAccessor, tIndex, tupleRef, cloneUpdateTb);
+
+ if (cloneUpdateTb.getSize() > 0) {
+ if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+ //release the cursor/latch
+ cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
+
+ //search again
+ cursor.reset();
+ rangePred.setLowKey(tupleRef, true);
+ rangePred.setHighKey(highKey, highKeyInclusive);
+ indexAccessor.search(cursor, rangePred);
+ }
+ }
+ cloneUpdateTb.reset();
}
}
@@ -186,6 +208,8 @@
try {
try {
cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index af53abe..fbab036 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -45,6 +45,7 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable extends
AbstractUnaryInputOperatorNodePushable {
@@ -76,6 +77,8 @@
private final IFrameWriter[] writers;
private final FunctionProxy functionProxy;
+ private ArrayTupleBuilder cloneUpdateTb;
+ private UpdateBuffer updateBuffer;
public IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -100,6 +103,7 @@
this.writers = new IFrameWriter[outputArity];
this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
writers);
+ this.updateBuffer = new UpdateBuffer(ctx, 2);
}
protected void setCursor() {
@@ -144,15 +148,15 @@
rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
-
- nullTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFields().length);
+
+ nullTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFields().length);
dos = nullTupleBuilder.getDataOutput();
nullTupleBuilder.reset();
for (int i = 0; i < inputRecDesc.getFields().length; i++) {
nullWriter[i].writeNull(dos);
nullTupleBuilder.addFieldEndOffset();
}
-
+
appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
@@ -171,18 +175,38 @@
match = false;
}
+ cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
+
} catch (Exception e) {
treeIndexOpHelper.deinit();
throw new HyracksDataException(e);
}
}
+ //for the join match casesos
private void writeResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
throws Exception {
/**
* function call
*/
- functionProxy.functionCall(leftAccessor, tIndex, frameTuple);
+ functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
+
+ //doing clone update
+ if (cloneUpdateTb.getSize() > 0) {
+ if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+ //release the cursor/latch
+ cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
+
+ //search again and recover the cursor
+ cursor.reset();
+ rangePred.setLowKey(frameTuple, true);
+ rangePred.setHighKey(null, true);
+ indexAccessor.search(cursor, rangePred);
+ }
+ cloneUpdateTb.reset();
+ }
}
@Override
@@ -236,6 +260,8 @@
}
try {
cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -267,7 +293,24 @@
/**
* function call
*/
- functionProxy.functionCall(nullTupleBuilder, frameTuple);
+ functionProxy.functionCall(nullTupleBuilder, frameTuple, cloneUpdateTb);
+
+ //doing clone update
+ if (cloneUpdateTb.getSize() > 0) {
+ if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+ //release the cursor/latch
+ cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
+
+ //search again and recover the cursor
+ cursor.reset();
+ rangePred.setLowKey(frameTuple, true);
+ rangePred.setHighKey(null, true);
+ indexAccessor.search(cursor, rangePred);
+ }
+ cloneUpdateTb.reset();
+ }
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 1aa044d..de083df 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -42,6 +43,7 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
import edu.uci.ics.pregelix.dataflow.util.FunctionProxy;
+import edu.uci.ics.pregelix.dataflow.util.UpdateBuffer;
public class IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable extends AbstractUnaryInputOperatorNodePushable {
private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -67,6 +69,8 @@
private final IFrameWriter[] writers;
private final FunctionProxy functionProxy;
+ private ArrayTupleBuilder cloneUpdateTb;
+ private UpdateBuffer updateBuffer;
public IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, boolean isForward,
@@ -90,6 +94,7 @@
this.writers = new IFrameWriter[outputArity];
this.functionProxy = new FunctionProxy(ctx, functionFactory, preHookFactory, postHookFactory, inputRdFactory,
writers);
+ this.updateBuffer = new UpdateBuffer(ctx, 2);
}
protected void setCursor() {
@@ -133,6 +138,7 @@
currentTopTuple = cursor.getTuple();
match = false;
}
+ cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
} catch (Exception e) {
treeIndexOpHelper.deinit();
@@ -198,6 +204,9 @@
}
try {
cursor.close();
+
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -222,13 +231,47 @@
/** write the right result */
private void writeRightResults(ITupleReference frameTuple) throws Exception {
- functionProxy.functionCall(frameTuple);
+ functionProxy.functionCall(frameTuple, cloneUpdateTb);
+
+ //doing clone update
+ if (cloneUpdateTb.getSize() > 0) {
+ if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+ //release the cursor/latch
+ cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
+
+ //search again
+ cursor.reset();
+ rangePred.setLowKey(frameTuple, true);
+ rangePred.setHighKey(null, true);
+ indexAccessor.search(cursor, rangePred);
+ }
+ cloneUpdateTb.reset();
+ }
}
/** write the left result */
private void writeLeftResults(IFrameTupleAccessor leftAccessor, int tIndex, ITupleReference frameTuple)
throws Exception {
- functionProxy.functionCall(leftAccessor, tIndex, frameTuple);
+ functionProxy.functionCall(leftAccessor, tIndex, frameTuple, cloneUpdateTb);
+
+ //doing clone update
+ if (cloneUpdateTb.getSize() > 0) {
+ if (!updateBuffer.appendTuple(cloneUpdateTb)) {
+ //release the cursor/latch
+ cursor.close();
+ //batch update
+ updateBuffer.updateBTree(indexAccessor);
+
+ //search again
+ cursor.reset();
+ rangePred.setLowKey(frameTuple, true);
+ rangePred.setHighKey(null, true);
+ indexAccessor.search(cursor, rangePred);
+ }
+ cloneUpdateTb.reset();
+ }
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index bb69ff8..82ac18e 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -77,11 +77,11 @@
* update pointer
* @throws HyracksDataException
*/
- public void functionCall(IFrameTupleAccessor leftAccessor, int leftTupleIndex, ITupleReference right)
- throws HyracksDataException {
+ public void functionCall(IFrameTupleAccessor leftAccessor, int leftTupleIndex, ITupleReference right,
+ ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
Object[] tuple = tupleDe.deserializeRecord(leftAccessor, leftTupleIndex, right);
function.process(tuple);
- function.update(right);
+ function.update(right, cloneUpdateTb);
}
/**
@@ -90,10 +90,10 @@
* @param updateRef
* @throws HyracksDataException
*/
- public void functionCall(ITupleReference updateRef) throws HyracksDataException {
+ public void functionCall(ITupleReference updateRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
Object[] tuple = tupleDe.deserializeRecord(updateRef);
function.process(tuple);
- function.update(updateRef);
+ function.update(updateRef, cloneUpdateTb);
}
/**
@@ -101,14 +101,15 @@
*
* @param tb
* input data
- * @param updateRef
+ * @param inPlaceUpdateRef
* update pointer
* @throws HyracksDataException
*/
- public void functionCall(ArrayTupleBuilder tb, ITupleReference updateRef) throws HyracksDataException {
- Object[] tuple = tupleDe.deserializeRecord(tb, updateRef);
+ public void functionCall(ArrayTupleBuilder tb, ITupleReference inPlaceUpdateRef, ArrayTupleBuilder cloneUpdateTb)
+ throws HyracksDataException {
+ Object[] tuple = tupleDe.deserializeRecord(tb, inPlaceUpdateRef);
function.process(tuple);
- function.update(updateRef);
+ function.update(inPlaceUpdateRef, cloneUpdateTb);
}
/**
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
new file mode 100644
index 0000000..85503aa
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBuffer.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+
+/**
+ * The buffer to hold updates.
+ * We do a batch update for the B-tree during index search and join so that
+ * avoid to open/close cursors frequently.
+ */
+public class UpdateBuffer {
+
+ private int currentInUse = 0;
+ private final int pageLimit;
+ private final List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ private final FrameTupleAppender appender;
+ private final IHyracksTaskContext ctx;
+ private final FrameTupleReference tuple = new FrameTupleReference();
+ private final IFrameTupleAccessor fta;
+
+ public UpdateBuffer(int numPages, IHyracksTaskContext ctx, int fieldCount) {
+ this.appender = new FrameTupleAppender(ctx.getFrameSize());
+ ByteBuffer buffer = ctx.allocateFrame();
+ this.buffers.add(buffer);
+ this.appender.reset(buffer, true);
+ this.pageLimit = numPages;
+ this.ctx = ctx;
+ this.fta = new UpdateBufferTupleAccessor(ctx.getFrameSize(), fieldCount);
+ }
+
+ public UpdateBuffer(IHyracksTaskContext ctx, int fieldCount) {
+ //by default, the update buffer has 1000 pages
+ this(1000, ctx, fieldCount);
+ }
+
+ public boolean appendTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ if (currentInUse + 1 < pageLimit) {
+ // move to the new buffer
+ currentInUse++;
+ allocate(currentInUse);
+ ByteBuffer buffer = buffers.get(currentInUse);
+ appender.reset(buffer, true);
+
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new HyracksDataException("tuple cannot be appended to a new frame!");
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+ }
+
+ public void updateBTree(ITreeIndexAccessor bta) throws HyracksDataException, IndexException {
+ // batch update
+ for (int i = 0; i <= currentInUse; i++) {
+ ByteBuffer buffer = buffers.get(i);
+ fta.reset(buffer);
+ for (int j = 0; j < fta.getTupleCount(); j++) {
+ tuple.reset(fta, j);
+ bta.update(tuple);
+ }
+ }
+
+ //cleanup the buffer
+ currentInUse = 0;
+ ByteBuffer buffer = buffers.get(0);
+ appender.reset(buffer, true);
+ }
+
+ private void allocate(int index) {
+ if (index >= buffers.size()) {
+ buffers.add(ctx.allocateFrame());
+ }
+ }
+}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBufferTupleAccessor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBufferTupleAccessor.java
new file mode 100644
index 0000000..39f1361
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/UpdateBufferTupleAccessor.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow.util;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+
+public final class UpdateBufferTupleAccessor implements IFrameTupleAccessor {
+ private final int frameSize;
+ private final int fieldCount;
+ private ByteBuffer buffer;
+
+ public UpdateBufferTupleAccessor(int frameSize, int fieldCount) {
+ this.frameSize = frameSize;
+ this.fieldCount = fieldCount;
+ }
+
+ @Override
+ public void reset(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public int getTupleCount() {
+ return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize));
+ }
+
+ @Override
+ public int getTupleStartOffset(int tupleIndex) {
+ return tupleIndex == 0 ? 0 : buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * tupleIndex);
+ }
+
+ @Override
+ public int getTupleEndOffset(int tupleIndex) {
+ return buffer.getInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleIndex + 1));
+ }
+
+ @Override
+ public int getFieldStartOffset(int tupleIndex, int fIdx) {
+ return fIdx == 0 ? 0 : buffer.getInt(getTupleStartOffset(tupleIndex) + (fIdx - 1) * 4);
+ }
+
+ @Override
+ public int getFieldEndOffset(int tupleIndex, int fIdx) {
+ return buffer.getInt(getTupleStartOffset(tupleIndex) + fIdx * 4);
+ }
+
+ @Override
+ public int getFieldLength(int tupleIndex, int fIdx) {
+ return getFieldEndOffset(tupleIndex, fIdx) - getFieldStartOffset(tupleIndex, fIdx);
+ }
+
+ @Override
+ public int getFieldSlotsLength() {
+ return getFieldCount() * 4;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return fieldCount;
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index c0b4a10..2c580c4 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -157,13 +157,6 @@
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
- private static void genPageRank() throws IOException {
- generatePageRankJob("PageRank", outputBase + "PageRank.xml");
- generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
- generatePageRankJobRealComplex("PageRank", outputBase + "PageRankRealComplex.xml");
- generatePageRankJobRealNoCombiner("PageRank", outputBase + "PageRankRealNoCombiner.xml");
- }
-
private static void generateShortestPathJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
job.setVertexClass(ShortestPathsVertex.class);
@@ -177,6 +170,27 @@
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
+ private static void generatePageRankJobRealDynamic(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(PageRankVertex.class);
+ job.setVertexInputFormatClass(TextPageRankInputFormat.class);
+ job.setVertexOutputFormatClass(SimplePageRankVertexOutputFormat.class);
+ job.setMessageCombinerClass(PageRankVertex.SimpleSumCombiner.class);
+ job.setDynamicVertexValueSize(true);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH));
+ job.getConfiguration().setLong(PregelixJob.NUM_VERTICE, 20);
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void genPageRank() throws IOException {
+ generatePageRankJob("PageRank", outputBase + "PageRank.xml");
+ generatePageRankJobReal("PageRank", outputBase + "PageRankReal.xml");
+ generatePageRankJobRealDynamic("PageRank", outputBase + "PageRankRealDynamic.xml");
+ generatePageRankJobRealComplex("PageRank", outputBase + "PageRankRealComplex.xml");
+ generatePageRankJobRealNoCombiner("PageRank", outputBase + "PageRankRealNoCombiner.xml");
+ }
+
private static void genShortestPath() throws IOException {
generateShortestPathJob("ShortestPaths", outputBase + "ShortestPaths.xml");
generateShortestPathJobReal("ShortestPaths", outputBase + "ShortestPathsReal.xml");
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic.result b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic.result
new file mode 100644
index 0000000..ab05d38
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic.result
@@ -0,0 +1,20 @@
+0|Vertex(id=0,value=0.008290140026154316, edges=(1,))
+1|Vertex(id=1,value=0.1535152819247165, edges=(1,2,))
+2|Vertex(id=2,value=0.14646839195826475, edges=(1,2,3,))
+3|Vertex(id=3,value=0.08125113985998214, edges=(1,2,3,4,))
+4|Vertex(id=4,value=0.03976979906329426, edges=(1,2,3,4,5,))
+5|Vertex(id=5,value=0.0225041581462058, edges=(1,2,3,4,5,6,))
+6|Vertex(id=6,value=0.015736276824953852, edges=(1,2,3,4,5,6,7,))
+7|Vertex(id=7,value=0.012542224114863661, edges=(1,2,3,4,5,6,7,8,))
+8|Vertex(id=8,value=0.010628239626209894, edges=(1,2,3,4,5,6,7,8,9,))
+9|Vertex(id=9,value=0.009294348455354817, edges=(1,2,3,4,5,6,7,8,9,10,))
+10|Vertex(id=10,value=0.008290140026154316, edges=(11,))
+11|Vertex(id=11,value=0.15351528192471647, edges=(11,12,))
+12|Vertex(id=12,value=0.14646839195826472, edges=(11,12,13,))
+13|Vertex(id=13,value=0.08125113985998214, edges=(11,12,13,14,))
+14|Vertex(id=14,value=0.03976979906329426, edges=(11,12,13,14,15,))
+15|Vertex(id=15,value=0.0225041581462058, edges=(11,12,13,14,15,16,))
+16|Vertex(id=16,value=0.015736276824953852, edges=(11,12,13,14,15,16,17,))
+17|Vertex(id=17,value=0.012542224114863661, edges=(11,12,13,14,15,16,17,18,))
+18|Vertex(id=18,value=0.010628239626209894, edges=(11,12,13,14,15,16,17,18,19,))
+19|Vertex(id=19,value=0.009294348455354817, edges=(0,11,12,13,14,15,16,17,18,19,))
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
new file mode 100644
index 0000000..c1a04ae
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/PageRankRealDynamic.xml
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/webmap</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/result</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>PageRank</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>pregelix.numVertices</name><value>20</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>pregelix.combinerClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimpleSumCombiner</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.PageRankVertex$SimplePageRankVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.inputformat.TextPageRankInputFormat</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 105d3e2..5d8eaf6 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -90,11 +91,13 @@
private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+ private Configuration conf;
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
- this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+ this.conf = confFactory.createConfiguration();
+ this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
this.writerMsg = writers[0];
@@ -177,7 +180,8 @@
private void writeOutGlobalAggregate() throws HyracksDataException {
try {
/**
- * get partial aggregate result and flush to the final aggregator
+ * get partial aggregate result and flush to the final
+ * aggregator
*/
Writable agg = aggregator.finishPartial();
agg.write(tbGlobalAggregate.getDataOutput());
@@ -203,15 +207,27 @@
}
@Override
- public void update(ITupleReference tupleRef) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
- int fieldCount = tupleRef.getFieldCount();
- for (int i = 1; i < fieldCount; i++) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbos.setByteArray(data, offset);
- vertex.write(output);
+ if (!BspUtils.getDynamicVertexValueSize(conf)) {
+ //in-place update
+ int fieldCount = tupleRef.getFieldCount();
+ for (int i = 1; i < fieldCount; i++) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
+ }
+ } else {
+ //write the vertex id
+ DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+ vertex.getVertexId().write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
+
+ //write the vertex value
+ vertex.write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
}
}
} catch (IOException e) {
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index f72b059..eb08b51 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -93,11 +94,13 @@
private final List<IFrameWriter> writers = new ArrayList<IFrameWriter>();
private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
+ private Configuration conf;
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
- this.aggregator = BspUtils.createGlobalAggregator(confFactory.createConfiguration());
+ this.conf = confFactory.createConfiguration();
+ this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
this.writerMsg = writers[0];
@@ -173,8 +176,8 @@
if (!terminate) {
writeOutTerminationState();
}
-
- /**write out global aggregate value*/
+
+ /** write out global aggregate value */
writeOutGlobalAggregate();
}
@@ -207,15 +210,27 @@
}
@Override
- public void update(ITupleReference tupleRef) throws HyracksDataException {
+ public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
- int fieldCount = tupleRef.getFieldCount();
- for (int i = 1; i < fieldCount; i++) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbos.setByteArray(data, offset);
- vertex.write(output);
+ if (!BspUtils.getDynamicVertexValueSize(conf)) {
+ //in-place update
+ int fieldCount = tupleRef.getFieldCount();
+ for (int i = 1; i < fieldCount; i++) {
+ byte[] data = tupleRef.getFieldData(i);
+ int offset = tupleRef.getFieldStart(i);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
+ }
+ } else {
+ //write the vertex id
+ DataOutput tbOutput = cloneUpdateTb.getDataOutput();
+ vertex.getVertexId().write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
+
+ //write the vertex value
+ vertex.write(tbOutput);
+ cloneUpdateTb.addFieldEndOffset();
}
}
} catch (IOException e) {
@@ -224,5 +239,4 @@
}
};
}
-
}