reduce unnecessary B-Tree (non-inplace) update
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java
deleted file mode 100644
index 4a4f5b3..0000000
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.pregelix.core.base;
-
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-
-public interface INormalizedKeyComputerFactoryProvider {
-
- @SuppressWarnings("rawtypes")
- INormalizedKeyComputerFactory getAscINormalizedKeyComputerFactory();
-}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
index 3f89543..30df916 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/CopyUpdateUtil.java
@@ -29,6 +29,17 @@
UpdateBuffer updateBuffer, ArrayTupleBuilder cloneUpdateTb, ITreeIndexAccessor indexAccessor,
ITreeIndexCursor cursor, RangePredicate rangePred) throws HyracksDataException, IndexException {
if (cloneUpdateTb.getSize() > 0) {
+ int[] fieldEndOffsets = cloneUpdateTb.getFieldEndOffsets();
+ int srcStart = fieldEndOffsets[0];
+ int srcLen = fieldEndOffsets[1] - fieldEndOffsets[0]; // the updated vertex size
+ int frSize = frameTuple.getFieldLength(1); // the vertex binary size in the leaf page
+ if (srcLen <= frSize) {
+ //doing in-place update if possible, save the "real update" overhead
+ System.arraycopy(cloneUpdateTb.getByteArray(), srcStart, frameTuple.getFieldData(1),
+ frameTuple.getFieldStart(1), srcLen);
+ cloneUpdateTb.reset();
+ return;
+ }
if (!updateBuffer.appendTuple(cloneUpdateTb)) {
tempTupleReference.reset(frameTuple.getFieldData(0), frameTuple.getFieldStart(0),
frameTuple.getFieldLength(0));
@@ -49,5 +60,4 @@
cloneUpdateTb.reset();
}
}
-
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 0a0a14f..caeeb10 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -249,13 +249,10 @@
if (vertex != null && vertex.hasUpdate()) {
if (!dynamicStateLength) {
// in-place update
- int fieldCount = tupleRef.getFieldCount();
- for (int i = 1; i < fieldCount; i++) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbos.setByteArray(data, offset);
- vertex.write(output);
- }
+ byte[] data = tupleRef.getFieldData(1);
+ int offset = tupleRef.getFieldStart(1);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
} else {
// write the vertex id
DataOutput tbOutput = cloneUpdateTb.getDataOutput();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 9998205..48d4d80 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -253,13 +253,10 @@
if (vertex != null && vertex.hasUpdate()) {
if (!dynamicStateLength) {
// in-place update
- int fieldCount = tupleRef.getFieldCount();
- for (int i = 1; i < fieldCount; i++) {
- byte[] data = tupleRef.getFieldData(i);
- int offset = tupleRef.getFieldStart(i);
- bbos.setByteArray(data, offset);
- vertex.write(output);
- }
+ byte[] data = tupleRef.getFieldData(1);
+ int offset = tupleRef.getFieldStart(1);
+ bbos.setByteArray(data, offset);
+ vertex.write(output);
} else {
// write the vertex id
DataOutput tbOutput = cloneUpdateTb.getDataOutput();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongAscNormalizedKeyComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongAscNormalizedKeyComputerFactory.java
deleted file mode 100644
index 9181691..0000000
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongAscNormalizedKeyComputerFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package edu.uci.ics.pregelix.runtime.touchpoint;
-
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import edu.uci.ics.pregelix.api.util.SerDeUtils;
-
-public class VLongAscNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
- private static final long serialVersionUID = 1L;
-
- @Override
- public INormalizedKeyComputer createNormalizedKeyComputer() {
- return new INormalizedKeyComputer() {
- private static final int POSTIVE_LONG_MASK = (3 << 30);
- private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
- private static final int NEGATIVE_LONG_MASK = (0 << 30);
-
- @Override
- public int normalize(byte[] bytes, int start, int length) {
- long value = SerDeUtils.readVLong(bytes, start, length);
- int highValue = (int) (value >> 32);
- if (highValue > 0) {
- /**
- * larger than Integer.MAX
- */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= POSTIVE_LONG_MASK;
- return highNmk;
- } else if (highValue == 0) {
- /**
- * smaller than Integer.MAX but >=0
- */
- int lowNmk = (int) value;
- lowNmk >>= 2;
- lowNmk |= NON_NEGATIVE_INT_MASK;
- return lowNmk;
- } else {
- /**
- * less than 0; TODO: have not optimized for that
- */
- int highNmk = getKey(highValue);
- highNmk >>= 2;
- highNmk |= NEGATIVE_LONG_MASK;
- return highNmk;
- }
- }
-
- private int getKey(int value) {
- long unsignedFirstValue = (long) value;
- int nmk = (int) ((unsignedFirstValue - ((long) Integer.MIN_VALUE)) & 0xffffffffL);
- return nmk;
- }
-
- };
- }
-}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongDescNormalizedKeyComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongDescNormalizedKeyComputerFactory.java
deleted file mode 100644
index 6b2738b..0000000
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VLongDescNormalizedKeyComputerFactory.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package edu.uci.ics.pregelix.runtime.touchpoint;
-
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
-import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-
-public class VLongDescNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
- private static final long serialVersionUID = 1L;
- private final INormalizedKeyComputerFactory ascNormalizedKeyComputerFactory = new VLongAscNormalizedKeyComputerFactory();
-
- @Override
- public INormalizedKeyComputer createNormalizedKeyComputer() {
- return new INormalizedKeyComputer() {
- private INormalizedKeyComputer nmkComputer = ascNormalizedKeyComputerFactory.createNormalizedKeyComputer();
-
- @Override
- public int normalize(byte[] bytes, int start, int length) {
- int nk = nmkComputer.normalize(bytes, start, length);
- return (int) ((long) Integer.MAX_VALUE - (long) (nk - Integer.MIN_VALUE));
- }
-
- };
- }
-}