Several major changes in hyracks:
-- reduced CC/NC communications for reporting partition request and availability; partition request/availability are only reported for the case of send-side materialized (without pipelining) policies in case of task re-attempt.
-- changed buffer cache to dynamically allocate memory based on needs instead of pre-allocating
-- changed each network channel to lazily allocate memory based on needs, and changed materialized connectors to lazily allocate files based on needs
-- changed several major CCNCCFunctions to use non-java serde
-- added a sort-based group-by operator which pushes group-by aggregations into an external sort
-- make external sort a stable sort
1,3,and 4 is to reduce the job overhead.
2 is to reduce the unecessary NC resource consumptions such as memory and files.
5 and 6 are improvements to runtime operators.
One change in algebricks:
-- implemented a rule to push group-by aggregation into sort, i.e., using the sort-based gby operator
Several important changes in pregelix:
-- remove static states in vertex
-- direct check halt bit without deserialization
-- optimize the sort algorithm by packing yet-another 2-byte normalized key into the tPointers array
Change-Id: Id696f9a9f1647b4a025b8b33d20b3a89127c60d6
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/35
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/pregelix-runtime/pom.xml b/pregelix-runtime/pom.xml
index 56a52b2..4268444 100644
--- a/pregelix-runtime/pom.xml
+++ b/pregelix-runtime/pom.xml
@@ -1,18 +1,14 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License"); ! you may
+ not use this file except in compliance with the License. ! you may obtain
+ a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0
+ ! ! Unless required by applicable law or agreed to in writing, software !
+ distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the
+ License for the specific language governing permissions and ! limitations
+ under the License. ! -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-runtime</artifactId>
<packaging>jar</packaging>
@@ -111,6 +107,8 @@
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
<version>0.2.12-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AccumulatingAggregatorFactory.java
similarity index 88%
rename from pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
rename to pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AccumulatingAggregatorFactory.java
index d243c8a..12fb642 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AccumulatingAggregatorFactory.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.runtime.simpleagg;
+package edu.uci.ics.pregelix.runtime.agg;
import java.nio.ByteBuffer;
@@ -29,10 +29,10 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
-import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.group.IClusteredAggregatorDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunction;
import edu.uci.ics.pregelix.dataflow.std.base.IAggregateFunctionFactory;
+import edu.uci.ics.pregelix.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.group.IClusteredAggregatorDescriptorFactory;
public class AccumulatingAggregatorFactory implements IClusteredAggregatorDescriptorFactory {
@@ -73,8 +73,8 @@
}
@Override
- public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
+ public void init(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
setGroupKeySize(accessor, tIndex);
initAggregateFunctions(state, true);
int stateSize = estimateStep(accessor, tIndex, state);
@@ -88,8 +88,8 @@
}
@Override
- public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
- int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, AggregateState state)
+ throws HyracksDataException {
int stateSize = estimateStep(accessor, tIndex, state);
if (stateSize > frameSize) {
emitResultTuple(accessor, tIndex, state);
@@ -99,20 +99,31 @@
}
@Override
- public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
+ public boolean outputFinalResult(IFrameTupleAccessor accessor, int tIndex, AggregateState state,
+ FrameTupleAppender appender) throws HyracksDataException {
Pair<ArrayBackedValueStorage[], IAggregateFunction[]> aggState = (Pair<ArrayBackedValueStorage[], IAggregateFunction[]>) state.state;
ArrayBackedValueStorage[] aggOutput = aggState.getLeft();
IAggregateFunction[] agg = aggState.getRight();
for (int i = 0; i < agg.length; i++) {
try {
agg[i].finishAll();
- tupleBuilder.addField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
- aggOutput[i].getLength());
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
+ //write group Keys
+ for (int i = 0; i < groupFields.length; i++) {
+ if (!appender.appendField(accessor, tIndex, groupFields[i])) {
+ return false;
+ }
+ }
+ //write aggregate fields
+ for (int i = 0; i < agg.length; i++) {
+ if (!appender.appendField(aggOutput[i].getByteArray(), aggOutput[i].getStartOffset(),
+ aggOutput[i].getLength())) {
+ return false;
+ }
+ }
return true;
}
@@ -122,8 +133,8 @@
}
@Override
- public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
- AggregateState state) throws HyracksDataException {
+ public boolean outputPartialResult(IFrameTupleAccessor accessor, int tIndex, AggregateState state,
+ FrameTupleAppender appender) throws HyracksDataException {
throw new IllegalStateException("this method should not be called");
}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AggregationFunction.java
similarity index 81%
rename from pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
rename to pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AggregationFunction.java
index 5bc30a2..0070c91 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AggregationFunction.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.pregelix.runtime.simpleagg;
+package edu.uci.ics.pregelix.runtime.agg;
import java.io.DataInput;
import java.io.DataInputStream;
@@ -55,6 +55,7 @@
private Writable combinedResult;
private MsgList msgList = new MsgList();
private boolean keyRead = false;
+ private boolean skipKey = false;
public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput tmpOutput,
IFrameWriter groupByOutputWriter, boolean isFinalStage, boolean partialAggAsInput)
@@ -68,6 +69,7 @@
combiner = BspUtils.createMessageCombiner(conf);
key = BspUtils.createVertexIndex(conf);
value = !partialAggAsInput ? BspUtils.createMessageValue(conf) : BspUtils.createPartialCombineValue(conf);
+ skipKey = BspUtils.getSkipCombinerKey(conf);
}
@Override
@@ -84,8 +86,12 @@
@Override
public void step(IFrameTupleReference tuple) throws HyracksDataException {
- if (!partialAggAsInput) {
- combiner.stepPartial(key, (WritableSizable) value);
+ if (!isFinalStage) {
+ if (!partialAggAsInput) {
+ combiner.stepPartial(key, (WritableSizable) value);
+ } else {
+ combiner.stepPartial2(key, value);
+ }
} else {
combiner.stepFinal(key, value);
}
@@ -95,12 +101,16 @@
public void finish() throws HyracksDataException {
try {
if (!isFinalStage) {
- combinedResult = combiner.finishPartial();
+ if (!partialAggAsInput) {
+ combinedResult = combiner.finishPartial();
+ } else {
+ combinedResult = combiner.finishPartial2();
+ }
} else {
combinedResult = combiner.finishFinal();
}
combinedResult.write(output);
- } catch (IOException e) {
+ } catch (Exception e) {
throw new HyracksDataException(e);
}
}
@@ -109,7 +119,11 @@
public void finishAll() throws HyracksDataException {
try {
if (!isFinalStage) {
- combinedResult = combiner.finishPartial();
+ if (!partialAggAsInput) {
+ combinedResult = combiner.finishPartial();
+ } else {
+ combinedResult = combiner.finishPartial2();
+ }
} else {
combinedResult = combiner.finishFinalAll();
}
@@ -134,13 +148,20 @@
valueInputStream.setByteBuffer(buffer, valueStart);
try {
- if (!keyRead) {
+ //read key if necessary
+ if (!keyRead && !skipKey) {
key.readFields(keyInput);
keyRead = true;
}
+ //read value
value.readFields(valueInput);
- if (!partialAggAsInput) {
- return combiner.estimateAccumulatedStateByteSizePartial(key, (WritableSizable) value);
+
+ if (!isFinalStage) {
+ if (!partialAggAsInput) {
+ return combiner.estimateAccumulatedStateByteSizePartial(key, (WritableSizable) value);
+ } else {
+ return combiner.estimateAccumulatedStateByteSizePartial2(key, value);
+ }
} else {
return combiner.estimateAccumulatedStateByteSizeFinal(key, value);
}
@@ -148,5 +169,4 @@
throw new HyracksDataException(e);
}
}
-
}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AggregationFunctionFactory.java
similarity index 97%
rename from pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
rename to pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AggregationFunctionFactory.java
index 54eccf5..a0deb46 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/AggregationFunctionFactory.java
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-package edu.uci.ics.pregelix.runtime.simpleagg;
+package edu.uci.ics.pregelix.runtime.agg;
import java.io.DataOutput;
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregateFunction.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregateFunction.java
new file mode 100644
index 0000000..3906676
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregateFunction.java
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.agg;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import edu.uci.ics.pregelix.api.graph.MessageCombiner;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.io.WritableSizable;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.ISerializableAggregateFunction;
+import edu.uci.ics.pregelix.dataflow.std.util.ResetableByteArrayOutputStream;
+
+@SuppressWarnings("rawtypes")
+public class SerializableAggregateFunction implements ISerializableAggregateFunction {
+ private final Configuration conf;
+ private final boolean partialAggAsInput;
+ private MessageCombiner combiner;
+ private ByteBufferInputStream keyInputStream = new ByteBufferInputStream();
+ private ByteBufferInputStream valueInputStream = new ByteBufferInputStream();
+ private ByteBufferInputStream stateInputStream = new ByteBufferInputStream();
+ private DataInput keyInput = new DataInputStream(keyInputStream);
+ private DataInput valueInput = new DataInputStream(valueInputStream);
+ private DataInput stateInput = new DataInputStream(stateInputStream);
+ private ResetableByteArrayOutputStream stateBos = new ResetableByteArrayOutputStream();
+ private DataOutput stateOutput = new DataOutputStream(stateBos);
+ private WritableComparable key;
+ private Writable value;
+ private Writable combinedResult;
+ private Writable finalResult;
+ private MsgList msgList = new MsgList();
+
+ public SerializableAggregateFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory,
+ boolean partialAggAsInput) throws HyracksDataException {
+ this.conf = confFactory.createConfiguration(ctx);
+ this.partialAggAsInput = partialAggAsInput;
+ msgList.setConf(this.conf);
+
+ combiner = BspUtils.createMessageCombiner(conf);
+ key = BspUtils.createVertexIndex(conf);
+ value = !partialAggAsInput ? BspUtils.createMessageValue(conf) : BspUtils.createPartialCombineValue(conf);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(IFrameTupleReference tuple, ArrayTupleBuilder state) throws HyracksDataException {
+ try {
+ /**
+ * bind key and value
+ */
+ bindKeyValue(tuple);
+ key.readFields(keyInput);
+ value.readFields(valueInput);
+
+ combiner.init(msgList);
+
+ /**
+ * call the step function of the aggregator
+ */
+ if (!partialAggAsInput) {
+ combiner.stepPartial(key, (WritableSizable) value);
+ } else {
+ combiner.stepFinal(key, (WritableSizable) value);
+ }
+
+ /**
+ * output state to the array tuple builder
+ */
+ combinedResult = combiner.finishPartial();
+ combinedResult.write(state.getDataOutput());
+ state.addFieldEndOffset();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void step(IFrameTupleReference tuple, IFrameTupleReference state) throws HyracksDataException {
+ try {
+ /**
+ * bind key and value
+ */
+ bindKeyValue(tuple);
+ key.readFields(keyInput);
+ value.readFields(valueInput);
+
+ /**
+ * bind state
+ */
+ bindState(state);
+ combinedResult.readFields(stateInput);
+
+ /**
+ * set the partial state
+ */
+ combiner.setPartialCombineState(combinedResult);
+
+ /**
+ * call the step function of the aggregator
+ */
+ if (!partialAggAsInput) {
+ combiner.stepPartial(key, (WritableSizable) value);
+ } else {
+ combiner.stepFinal(key, (WritableSizable) value);
+ }
+
+ /**
+ * write out partial state
+ */
+ combinedResult = combiner.finishPartial();
+ combinedResult.write(stateOutput);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void finishPartial(IFrameTupleReference state, ArrayTupleBuilder output) throws HyracksDataException {
+ try {
+ /**
+ * bind state
+ */
+ bindState(state);
+ combinedResult.readFields(stateInput);
+
+ /**
+ * set the partial state
+ */
+ combiner.setPartialCombineState(combinedResult);
+ combinedResult = combiner.finishPartial();
+ combinedResult.write(output.getDataOutput());
+ output.addFieldEndOffset();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void finishFinal(IFrameTupleReference state, ArrayTupleBuilder output) throws HyracksDataException {
+ try {
+ /**
+ * bind key and value
+ */
+ bindKeyValue(state);
+ key.readFields(keyInput);
+
+ /**
+ * bind state
+ */
+ bindState(state);
+ combinedResult.readFields(stateInput);
+
+ /**
+ * set the partial state
+ */
+ if (!partialAggAsInput) {
+ combiner.setPartialCombineState(combinedResult);
+ combinedResult = combiner.finishPartial();
+ combinedResult.write(output.getDataOutput());
+ } else {
+ combiner.setPartialCombineState(combinedResult);
+ finalResult = combiner.finishFinal();
+ finalResult.write(output.getDataOutput());
+ }
+ output.addFieldEndOffset();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ /**
+ * @param state
+ */
+ private void bindState(IFrameTupleReference state) {
+ FrameTupleReference ftr = (FrameTupleReference) state;
+ IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
+ ByteBuffer buffer = fta.getBuffer();
+ int tIndex = ftr.getTupleIndex();
+ int combinedStateStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex)
+ + fta.getFieldStartOffset(tIndex, 1);
+ stateInputStream.setByteBuffer(buffer, combinedStateStart);
+ stateBos.setByteArray(buffer.array(), combinedStateStart);
+ }
+
+ /**
+ * @param tuple
+ */
+ private void bindKeyValue(IFrameTupleReference tuple) {
+ FrameTupleReference ftr = (FrameTupleReference) tuple;
+ IFrameTupleAccessor fta = ftr.getFrameTupleAccessor();
+ ByteBuffer buffer = fta.getBuffer();
+ int tIndex = ftr.getTupleIndex();
+ int keyStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex) + fta.getFieldStartOffset(tIndex, 0);
+ int valueStart = fta.getFieldSlotsLength() + fta.getTupleStartOffset(tIndex)
+ + fta.getFieldStartOffset(tIndex, 1);
+ keyInputStream.setByteBuffer(buffer, keyStart);
+ valueInputStream.setByteBuffer(buffer, valueStart);
+ }
+
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregationFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregationFunctionFactory.java
new file mode 100644
index 0000000..c6e41b9
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregationFunctionFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.pregelix.runtime.agg;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.ISerializableAggregateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.ISerializableAggregateFunctionFactory;
+
+public class SerializableAggregationFunctionFactory implements ISerializableAggregateFunctionFactory {
+ private static final long serialVersionUID = 1L;
+ private final IConfigurationFactory confFactory;
+ private final boolean partialAggAsInput;
+
+ public SerializableAggregationFunctionFactory(IConfigurationFactory confFactory, boolean partialAggAsInput) {
+ this.confFactory = confFactory;
+ this.partialAggAsInput = partialAggAsInput;
+ }
+
+ @Override
+ public ISerializableAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IFrameWriter writer)
+ throws HyracksException {
+ return new SerializableAggregateFunction(ctx, confFactory, partialAggAsInput);
+ }
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregatorDescriptorFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregatorDescriptorFactory.java
new file mode 100644
index 0000000..11b7b63
--- /dev/null
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/agg/SerializableAggregatorDescriptorFactory.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.runtime.agg;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.ISerializableAggregateFunction;
+import edu.uci.ics.pregelix.dataflow.std.base.ISerializableAggregateFunctionFactory;
+
+public class SerializableAggregatorDescriptorFactory implements IAggregatorDescriptorFactory {
+ private static final long serialVersionUID = 1L;
+ private ISerializableAggregateFunctionFactory aggFuncFactory;
+
+ public SerializableAggregatorDescriptorFactory(ISerializableAggregateFunctionFactory aggFuncFactory) {
+ this.aggFuncFactory = aggFuncFactory;
+ }
+
+ @Override
+ public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor,
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer)
+ throws HyracksDataException {
+ try {
+ final FrameTupleReference tupleRef = new FrameTupleReference();
+ final FrameTupleReference stateRef = new FrameTupleReference();
+ final ISerializableAggregateFunction aggFunc = aggFuncFactory.createAggregateFunction(ctx, writer);
+
+ /**
+ * The serializable version aggregator itself is stateless
+ */
+ return new IAggregatorDescriptor() {
+
+ @Override
+ public AggregateState createAggregateStates() {
+ return new AggregateState();
+ }
+
+ @Override
+ public void init(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor, int tIndex,
+ AggregateState state) throws HyracksDataException {
+ tupleRef.reset(accessor, tIndex);
+ aggFunc.init(tupleRef, tupleBuilder);
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor,
+ int stateTupleIndex, AggregateState state) throws HyracksDataException {
+ tupleRef.reset(accessor, tIndex);
+ stateRef.reset(stateAccessor, stateTupleIndex);
+ aggFunc.step(tupleRef, stateRef);
+ }
+
+ @Override
+ public boolean outputPartialResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) throws HyracksDataException {
+ stateRef.reset(accessor, tIndex);
+ aggFunc.finishPartial(stateRef, tupleBuilder);
+ return true;
+ }
+
+ @Override
+ public boolean outputFinalResult(ArrayTupleBuilder tupleBuilder, IFrameTupleAccessor accessor,
+ int tIndex, AggregateState state) throws HyracksDataException {
+ stateRef.reset(accessor, tIndex);
+ aggFunc.finishFinal(stateRef, tupleBuilder);
+ return true;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ };
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 3e4a811..bd05687 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -42,7 +42,7 @@
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
-import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
+import edu.uci.ics.pregelix.dataflow.std.util.ResetableByteArrayOutputStream;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class ComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
@@ -107,6 +107,7 @@
private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
private Configuration conf;
private boolean dynamicStateLength;
+ private boolean userConfigured;
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
@@ -115,6 +116,7 @@
//LSM index does not have in-place update
this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);
this.aggregators = BspUtils.createGlobalAggregators(conf);
+ this.userConfigured = false;
for (int i = 0; i < aggregators.size(); i++) {
this.aggregators.get(i).init();
}
@@ -123,7 +125,7 @@
this.writerMsg = writers[0];
this.bufferMsg = ctx.allocateFrame();
- this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize(), 2);
this.appenderMsg.reset(bufferMsg, true);
this.writers.add(writerMsg);
this.appenders.add(appenderMsg);
@@ -155,7 +157,7 @@
if (writers.length > 5) {
this.writerAlive = writers[5];
this.bufferAlive = ctx.allocateFrame();
- this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize(), 2);
this.appenderAlive.reset(bufferAlive, true);
this.pushAlive = true;
this.writers.add(writerAlive);
@@ -195,6 +197,10 @@
}
try {
+ if (!userConfigured) {
+ vertex.configure(conf);
+ userConfigured = true;
+ }
if (msgContentList.segmentStart()) {
vertex.open();
}
@@ -239,6 +245,11 @@
/** write out global aggregate value */
writeOutGlobalAggregate();
+
+ /** end of a superstep, for vertices to release resources */
+ if (userConfigured) {
+ vertex.endSuperstep(conf);
+ }
}
private void writeOutGlobalAggregate() throws HyracksDataException {
@@ -255,7 +266,7 @@
if (!appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize())) {
// aggregate state exceed the page size, write to HDFS
- FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, Vertex.getSuperstep());
+ FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, vertex.getSuperstep());
appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
}
FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 9ddcce5..774c180 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -42,7 +42,7 @@
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunction;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
-import edu.uci.ics.pregelix.dataflow.util.ResetableByteArrayOutputStream;
+import edu.uci.ics.pregelix.dataflow.std.util.ResetableByteArrayOutputStream;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class StartComputeUpdateFunctionFactory implements IUpdateFunctionFactory {
@@ -110,13 +110,15 @@
private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
private Configuration conf;
private boolean dynamicStateLength;
+ private boolean userConfigured;
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
this.conf = confFactory.createConfiguration(ctx);
//LSM index does not have in-place update
- this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);;
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf) || BspUtils.useLSM(conf);
+ this.userConfigured = false;
this.aggregators = BspUtils.createGlobalAggregators(conf);
for (int i = 0; i < aggregators.size(); i++) {
this.aggregators.get(i).init();
@@ -126,7 +128,7 @@
this.writerMsg = writers[0];
this.bufferMsg = ctx.allocateFrame();
- this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderMsg = new FrameTupleAppender(ctx.getFrameSize(), 2);
this.appenderMsg.reset(bufferMsg, true);
this.writers.add(writerMsg);
this.appenders.add(appenderMsg);
@@ -158,7 +160,7 @@
if (writers.length > 5) {
this.writerAlive = writers[5];
this.bufferAlive = ctx.allocateFrame();
- this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize());
+ this.appenderAlive = new FrameTupleAppender(ctx.getFrameSize(), 2);
this.appenderAlive.reset(bufferAlive, true);
this.pushAlive = true;
this.writers.add(writerAlive);
@@ -192,6 +194,10 @@
}
try {
+ if (!userConfigured) {
+ vertex.configure(conf);
+ userConfigured = true;
+ }
vertex.open();
vertex.compute(msgIterator);
vertex.close();
@@ -228,6 +234,11 @@
/** write out global aggregate value */
writeOutGlobalAggregate();
+
+ /** end of a superstep, for vertices to release resources */
+ if (userConfigured) {
+ vertex.endSuperstep(conf);
+ }
}
private void writeOutGlobalAggregate() throws HyracksDataException {
@@ -244,7 +255,7 @@
if (!appenderGlobalAggregate.append(tbGlobalAggregate.getFieldEndOffsets(),
tbGlobalAggregate.getByteArray(), 0, tbGlobalAggregate.getSize())) {
// aggregate state exceed the page size, write to HDFS
- FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, Vertex.getSuperstep());
+ FrameTupleUtils.flushTupleToHDFS(tbGlobalAggregate, conf, vertex.getSuperstep());
appenderGlobalAggregate.reset(bufferGlobalAggregate, true);
}
FrameTupleUtils.flushTuplesFinal(appenderGlobalAggregate, writerGlobalAggregate);
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
index e99fcb3..b7a896d 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
@@ -22,10 +22,14 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.util.ArrayListWritable;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
public class DatatypeHelper {
private static final class WritableSerializerDeserializer<T extends Writable> implements ISerializerDeserializer<T> {
@@ -33,11 +37,13 @@
private final Class<T> clazz;
private transient Configuration conf;
+ private IHyracksTaskContext ctx;
private T object;
- private WritableSerializerDeserializer(Class<T> clazz, Configuration conf) {
+ private WritableSerializerDeserializer(Class<T> clazz, Configuration conf, IHyracksTaskContext ctx) {
this.clazz = clazz;
this.conf = conf;
+ this.ctx = ctx;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -49,6 +55,12 @@
}
try {
T t = clazz.newInstance();
+ if (t instanceof Vertex) {
+ Vertex vertex = (Vertex) t;
+ if (vertex.getVertexContext() == null && ctx != null) {
+ vertex.setVertexContext(IterationUtils.getVertexContext(BspUtils.getJobId(conf), ctx));
+ }
+ }
if (t instanceof ArrayListWritable) {
((ArrayListWritable) t).setConf(conf);
}
@@ -87,16 +99,16 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
public static ISerializerDeserializer<? extends Writable> createSerializerDeserializer(
- Class<? extends Writable> fClass, Configuration conf) {
- return new WritableSerializerDeserializer(fClass, conf);
+ Class<? extends Writable> fClass, Configuration conf, IHyracksTaskContext ctx) {
+ return new WritableSerializerDeserializer(fClass, conf, ctx);
}
public static RecordDescriptor createKeyValueRecordDescriptor(Class<? extends Writable> keyClass,
Class<? extends Writable> valueClass, Configuration conf) {
@SuppressWarnings("rawtypes")
ISerializerDeserializer[] fields = new ISerializerDeserializer[2];
- fields[0] = createSerializerDeserializer(keyClass, conf);
- fields[1] = createSerializerDeserializer(valueClass, conf);
+ fields[0] = createSerializerDeserializer(keyClass, conf, null);
+ fields[1] = createSerializerDeserializer(valueClass, conf, null);
return new RecordDescriptor(fields);
}
}
\ No newline at end of file
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index 3151df2..3489578 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -14,8 +14,6 @@
*/
package edu.uci.ics.pregelix.runtime.touchpoint;
-import java.lang.reflect.Field;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -23,9 +21,11 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHook;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
public class RuntimeHookFactory implements IRuntimeHookFactory {
@@ -48,12 +48,10 @@
try {
TaskAttemptContext mapperContext = ctxFactory.createContext(conf, new TaskAttemptID());
mapperContext.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
-
- ClassLoader cl = ctx.getJobletContext().getClassLoader();
- Class<?> vClass = (Class<?>) cl.loadClass("edu.uci.ics.pregelix.api.graph.Vertex");
- Field contextField = vClass.getDeclaredField("context");
- contextField.setAccessible(true);
- contextField.set(null, mapperContext);
+ if(BspUtils.getJobId(conf)==null){
+ System.out.println("here");
+ }
+ IterationUtils.setJobContext(BspUtils.getJobId(conf), ctx, mapperContext);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
index c9b67fb..4c934d3 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
@@ -14,50 +14,45 @@
*/
package edu.uci.ics.pregelix.runtime.touchpoint;
-import java.io.DataInputStream;
-
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.ISerializerDeserializerFactory;
public class VertexIdPartitionComputerFactory<K extends Writable, V extends Writable> implements
ITuplePartitionComputerFactory {
private static final long serialVersionUID = 1L;
- private final ISerializerDeserializerFactory<K> keyIOFactory;
- private final IConfigurationFactory confFactory;
public VertexIdPartitionComputerFactory(ISerializerDeserializerFactory<K> keyIOFactory,
IConfigurationFactory confFactory) {
- this.keyIOFactory = keyIOFactory;
- this.confFactory = confFactory;
}
public ITuplePartitionComputer createPartitioner() {
try {
- final Configuration conf = confFactory.createConfiguration();
return new ITuplePartitionComputer() {
- private final ByteBufferInputStream bbis = new ByteBufferInputStream();
- private final DataInputStream dis = new DataInputStream(bbis);
- private final ISerializerDeserializer<K> keyIO = keyIOFactory.getSerializerDeserializer(conf);
public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+ accessor.getFieldStartOffset(tIndex, 0);
- bbis.setByteBuffer(accessor.getBuffer(), keyStart);
- K key = keyIO.deserialize(dis);
- return Math.abs(key.hashCode() % nParts);
+ int len = accessor.getFieldLength(tIndex, 0);
+ return Math.abs(hash(accessor.getBuffer().array(), keyStart, len) % nParts);
+ }
+
+ private int hash(byte[] bytes, int offset, int length) {
+ int value = 1;
+ int end = offset + length;
+ for (int i = offset; i < end; i++)
+ value = value * 31 + (int) bytes[i];
+ return value;
}
};
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
+
}
\ No newline at end of file
diff --git a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
index c11ac5b..8b89877 100644
--- a/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
+++ b/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
@@ -31,6 +31,6 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public ISerializerDeserializer getSerializerDeserializer(Configuration conf) {
- return DatatypeHelper.createSerializerDeserializer(clazz, conf);
+ return DatatypeHelper.createSerializerDeserializer(clazz, conf, null);
}
}