Merged fullstack_asterix_stabilization -r 2813:2933
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_ioc@2995 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 0082b7a9..f580752 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -78,11 +78,11 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index b7f9e3d..a8cd3db 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -26,7 +26,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -66,7 +66,7 @@
/** List of incoming messages from the previous superstep */
private final List<M> msgList = new ArrayList<M>();
/** map context */
- private static Mapper.Context context = null;
+ private static TaskAttemptContext context = null;
/** a delegate for hyracks stuff */
private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
/** this vertex is updated or not */
@@ -444,8 +444,10 @@
/**
* Add a new vertex into the graph
*
- * @param vertexId the vertex id
- * @param vertex the vertex
+ * @param vertexId
+ * the vertex id
+ * @param vertex
+ * the vertex
*/
public final void addVertex(I vertexId, V vertex) {
delegate.addVertex(vertexId, vertex);
@@ -454,7 +456,8 @@
/**
* Delete a vertex from id
*
- * @param vertexId the vertex id
+ * @param vertexId
+ * the vertex id
*/
public final void deleteVertex(I vertexId) {
delegate.deleteVertex(vertexId);
@@ -528,7 +531,7 @@
/**
* Pregelix internal use only
*/
- public static final Mapper<?, ?, ?, ?>.Context getContext() {
+ public static final TaskAttemptContext getContext() {
return context;
}
@@ -537,7 +540,7 @@
*
* @param context
*/
- public static final void setContext(Mapper<?, ?, ?, ?>.Context context) {
+ public static final void setContext(TaskAttemptContext context) {
Vertex.context = context;
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
index 7179737..ea33691 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
@@ -21,60 +21,64 @@
import java.io.Serializable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* This InputSplit will not give any ordering or location data. It is used
* internally by BspInputFormat (which determines how many tasks to run the
* application on). Users should not use this directly.
*/
-public class BasicGenInputSplit extends InputSplit implements Writable, Serializable {
- private static final long serialVersionUID = 1L;
- /** Number of splits */
- private int numSplits = -1;
- /** Split index */
- private int splitIndex = -1;
+public class BasicGenInputSplit extends FileSplit implements Writable,
+ Serializable {
+ private static final long serialVersionUID = 1L;
+ /** Number of splits */
+ private int numSplits = -1;
+ /** Split index */
+ private int splitIndex = -1;
- public BasicGenInputSplit() {
- }
+ public BasicGenInputSplit() {
+ super(null, 0, 0, null);
+ }
- public BasicGenInputSplit(int splitIndex, int numSplits) {
- this.splitIndex = splitIndex;
- this.numSplits = numSplits;
- }
+ public BasicGenInputSplit(int splitIndex, int numSplits) {
+ super(null, 0, 0, null);
+ this.splitIndex = splitIndex;
+ this.numSplits = numSplits;
+ }
- @Override
- public long getLength() throws IOException, InterruptedException {
- return 0;
- }
+ @Override
+ public long getLength() {
+ return 0;
+ }
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- return new String[] {};
- }
+ @Override
+ public String[] getLocations() throws IOException {
+ return new String[] {};
+ }
- @Override
- public void readFields(DataInput in) throws IOException {
- splitIndex = in.readInt();
- numSplits = in.readInt();
- }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ splitIndex = in.readInt();
+ numSplits = in.readInt();
+ }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(splitIndex);
- out.writeInt(numSplits);
- }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(splitIndex);
+ out.writeInt(numSplits);
+ }
- public int getSplitIndex() {
- return splitIndex;
- }
+ public int getSplitIndex() {
+ return splitIndex;
+ }
- public int getNumSplits() {
- return numSplits;
- }
+ public int getNumSplits() {
+ return numSplits;
+ }
- @Override
- public String toString() {
- return "'" + getClass().getCanonicalName() + ", index=" + getSplitIndex() + ", num=" + getNumSplits();
- }
+ @Override
+ public String toString() {
+ return "'" + getClass().getCanonicalName() + ", index="
+ + getSplitIndex() + ", num=" + getNumSplits();
+ }
}
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 44e9547..972d0ec 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -1,4 +1,5 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-core</artifactId>
<packaging>jar</packaging>
@@ -58,20 +59,6 @@
</includes>
</configuration>
</execution>
- <execution>
- <id>patch</id>
- <goals>
- <goal>jar</goal>
- </goals>
- <phase>package</phase>
- <configuration>
- <classifier>patch</classifier>
- <finalName>a-hadoop</finalName>
- <includes>
- <include>**/org/apache/**</include>
- </includes>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
@@ -159,25 +146,6 @@
</resources>
</configuration>
</execution>
- <execution>
- <id>copy-hadoop-patch</id>
- <!-- here the phase you need -->
- <phase>package</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>target/appassembler/lib</outputDirectory>
- <resources>
- <resource>
- <directory>target</directory>
- <includes>
- <include>a-hadoop-patch.jar</include>
- </includes>
- </resource>
- </resources>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
@@ -186,7 +154,8 @@
<version>2.7.2</version>
<configuration>
<forkMode>pertest</forkMode>
- <argLine>-enableassertions -Xmx512m -XX:MaxPermSize=300m -Dfile.encoding=UTF-8
+ <argLine>-enableassertions -Xmx512m -XX:MaxPermSize=300m
+ -Dfile.encoding=UTF-8
-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
<includes>
<include>**/*TestSuite.java</include>
@@ -276,11 +245,6 @@
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
<version>0.2.3-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -313,13 +277,6 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>com.kenai.nbpwr</groupId>
<artifactId>org-apache-commons-io</artifactId>
<version>1.3.1-201002241208</version>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 7d48c06..d9b267d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -72,22 +72,23 @@
public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
throws HyracksException {
applicationName = exampleClass.getSimpleName() + UUID.randomUUID();
- /** add hadoop configurations */
- URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
- job.getConfiguration().addResource(hadoopCore);
- URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
- job.getConfiguration().addResource(hadoopMapRed);
- URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
- job.getConfiguration().addResource(hadoopHdfs);
- ClusterConfig.loadClusterConfig(ipAddress, port);
-
- LOG.info("job started");
- long start = System.currentTimeMillis();
- long end = start;
- long time = 0;
-
- this.profiling = profiling;
try {
+ /** add hadoop configurations */
+ URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+ job.getConfiguration().addResource(hadoopCore);
+ URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+ job.getConfiguration().addResource(hadoopMapRed);
+ URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+ job.getConfiguration().addResource(hadoopHdfs);
+ ClusterConfig.loadClusterConfig(ipAddress, port);
+
+ LOG.info("job started");
+ long start = System.currentTimeMillis();
+ long end = start;
+ long time = 0;
+
+ this.profiling = profiling;
+
switch (planChoice) {
case INNER_JOIN:
jobGen = new JobGenInnerJoin(job);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index efdbd41..0b1be61 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -74,9 +74,9 @@
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
import edu.uci.ics.pregelix.core.util.DatatypeHelper;
-import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
@@ -86,462 +86,390 @@
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
public abstract class JobGen implements IJobGen {
- private static final Logger LOGGER = Logger.getLogger(JobGen.class
- .getName());
- protected static final int MB = 1048576;
- protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
- protected static final int tableSize = 10485767;
- protected static final String PRIMARY_INDEX = "primary";
- protected final Configuration conf;
- protected final PregelixJob giraphJob;
- protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
- protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
- protected String jobId = new UUID(System.currentTimeMillis(),
- System.nanoTime()).toString();
- protected int frameSize = ClusterConfig.getFrameSize();
- protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
+ protected static final int MB = 1048576;
+ protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+ protected static final int tableSize = 10485767;
+ protected static final String PRIMARY_INDEX = "primary";
+ protected final Configuration conf;
+ protected final PregelixJob giraphJob;
+ protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+ protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+ protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+ protected int frameSize = ClusterConfig.getFrameSize();
+ protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
- protected static final String SECONDARY_INDEX_ODD = "secondary1";
- protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+ protected static final String SECONDARY_INDEX_ODD = "secondary1";
+ protected static final String SECONDARY_INDEX_EVEN = "secondary2";
- public JobGen(PregelixJob job) {
- this.conf = job.getConfiguration();
- this.giraphJob = job;
- this.initJobConfiguration();
- job.setJobId(jobId);
-
- //set the frame size to be the one user specified if the user did specify.
- int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
- if (specifiedFrameSize > 0) {
- frameSize = specifiedFrameSize;
- maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
- }
- if (maxFrameNumber <= 0) {
- maxFrameNumber = 1;
- }
- }
+ public JobGen(PregelixJob job) {
+ this.conf = job.getConfiguration();
+ this.giraphJob = job;
+ this.initJobConfiguration();
+ job.setJobId(jobId);
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private void initJobConfiguration() {
- Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS,
- Vertex.class);
- List<Type> parameterTypes = ReflectionUtils.getTypeArguments(
- Vertex.class, vertexClass);
- Type vertexIndexType = parameterTypes.get(0);
- Type vertexValueType = parameterTypes.get(1);
- Type edgeValueType = parameterTypes.get(2);
- Type messageValueType = parameterTypes.get(3);
- conf.setClass(PregelixJob.VERTEX_INDEX_CLASS,
- (Class<?>) vertexIndexType, WritableComparable.class);
- conf.setClass(PregelixJob.VERTEX_VALUE_CLASS,
- (Class<?>) vertexValueType, Writable.class);
- conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType,
- Writable.class);
- conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS,
- (Class<?>) messageValueType, Writable.class);
+ // set the frame size to be the one user specified if the user did
+ // specify.
+ int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
+ if (specifiedFrameSize > 0) {
+ frameSize = specifiedFrameSize;
+ maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ }
+ if (maxFrameNumber <= 0) {
+ maxFrameNumber = 1;
+ }
+ }
- Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
- if (!aggregatorClass.equals(GlobalAggregator.class)) {
- List<Type> argTypes = ReflectionUtils.getTypeArguments(
- GlobalAggregator.class, aggregatorClass);
- Type partialAggregateValueType = argTypes.get(4);
- conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS,
- (Class<?>) partialAggregateValueType, Writable.class);
- Type finalAggregateValueType = argTypes.get(5);
- conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS,
- (Class<?>) finalAggregateValueType, Writable.class);
- }
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void initJobConfiguration() {
+ Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
+ List<Type> parameterTypes = ReflectionUtils.getTypeArguments(Vertex.class, vertexClass);
+ Type vertexIndexType = parameterTypes.get(0);
+ Type vertexValueType = parameterTypes.get(1);
+ Type edgeValueType = parameterTypes.get(2);
+ Type messageValueType = parameterTypes.get(3);
+ conf.setClass(PregelixJob.VERTEX_INDEX_CLASS, (Class<?>) vertexIndexType, WritableComparable.class);
+ conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
+ conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
+ conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
- Class combinerClass = BspUtils.getMessageCombinerClass(conf);
- if (!combinerClass.equals(MessageCombiner.class)) {
- List<Type> argTypes = ReflectionUtils.getTypeArguments(
- MessageCombiner.class, combinerClass);
- Type partialCombineValueType = argTypes.get(2);
- conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS,
- (Class<?>) partialCombineValueType, Writable.class);
- }
- }
+ Class aggregatorClass = BspUtils.getGlobalAggregatorClass(conf);
+ if (!aggregatorClass.equals(GlobalAggregator.class)) {
+ List<Type> argTypes = ReflectionUtils.getTypeArguments(GlobalAggregator.class, aggregatorClass);
+ Type partialAggregateValueType = argTypes.get(4);
+ conf.setClass(PregelixJob.PARTIAL_AGGREGATE_VALUE_CLASS, (Class<?>) partialAggregateValueType,
+ Writable.class);
+ Type finalAggregateValueType = argTypes.get(5);
+ conf.setClass(PregelixJob.FINAL_AGGREGATE_VALUE_CLASS, (Class<?>) finalAggregateValueType, Writable.class);
+ }
- public String getJobId() {
- return jobId;
- }
+ Class combinerClass = BspUtils.getMessageCombinerClass(conf);
+ if (!combinerClass.equals(MessageCombiner.class)) {
+ List<Type> argTypes = ReflectionUtils.getTypeArguments(MessageCombiner.class, combinerClass);
+ Type partialCombineValueType = argTypes.get(2);
+ conf.setClass(PregelixJob.PARTIAL_COMBINE_VALUE_CLASS, (Class<?>) partialCombineValueType, Writable.class);
+ }
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public JobSpecification generateCreatingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- JobSpecification spec = new JobSpecification();
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
+ public String getJobId() {
+ return jobId;
+ }
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
- TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider,
- fileSplitProvider, typeTraits, comparatorFactories,
- new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeCreate);
- return spec;
- }
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public JobSpecification generateCreatingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public JobSpecification generateLoadingJob() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeCreate);
+ return spec;
+ }
- /**
- * the graph file scan operator and use count constraint first, will use
- * absolute constraint later
- */
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- try {
- splits = inputFormat.getSplits(giraphJob,
- fileSplitProvider.getFileSplits().length);
- LOGGER.info("number of splits: " + splits.size());
- for (InputSplit split : splits)
- LOGGER.info(split.toString());
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- RecordDescriptor recordDescriptor = DataflowUtils
- .getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
- spec, recordDescriptor, splits, confFactory);
- ClusterConfig.setLocationConstraint(spec, scanner, splits);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public JobSpecification generateLoadingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * construct sort operator
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
- .getAscINormalizedKeyComputerFactory(vertexIdClass);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
- spec, maxFrameNumber, sortFields, nkmFactory,
- comparatorFactories, recordDescriptor);
- ClusterConfig.setLocationConstraint(spec, sorter);
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+ LOGGER.info("number of splits: " + splits.size());
+ for (InputSplit split : splits)
+ LOGGER.info(split.toString());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+ readSchedule, confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * construct tree bulk load operator
- */
- int[] fieldPermutation = new int[2];
- fieldPermutation[0] = 0;
- fieldPermutation[1] = 1;
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider,
- fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
- new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+ .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
+ nkmFactory, comparatorFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sorter);
- /**
- * connect operator descriptors
- */
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new MToNPartitioningConnectorDescriptor(spec,
- hashPartitionComputerFactory), scanner, 0, sorter, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
- btreeBulkLoad, 0);
- return spec;
- }
+ /**
+ * construct tree bulk load operator
+ */
+ int[] fieldPermutation = new int[2];
+ fieldPermutation[0] = 0;
+ fieldPermutation[1] = 1;
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
- @Override
- public JobSpecification generateJob(int iteration) throws HyracksException {
- if (iteration <= 0)
- throw new IllegalStateException(
- "iteration number cannot be less than 1");
- if (iteration == 1)
- return generateFirstIteration(iteration);
- else
- return generateNonFirstIteration(iteration);
- }
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+ return spec;
+ }
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanSortPrintGraph(String nodeName, String path)
- throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
- JobSpecification spec = new JobSpecification();
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ @Override
+ public JobSpecification generateJob(int iteration) throws HyracksException {
+ if (iteration <= 0)
+ throw new IllegalStateException("iteration number cannot be less than 1");
+ if (iteration == 1)
+ return generateFirstIteration(iteration);
+ else
+ return generateNonFirstIteration(iteration);
+ }
- /**
- * the graph file scan operator and use count constraint first, will use
- * absolute constraint later
- */
- VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
- List<InputSplit> splits = new ArrayList<InputSplit>();
- try {
- splits = inputFormat.getSplits(giraphJob,
- fileSplitProvider.getFileSplits().length);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- RecordDescriptor recordDescriptor = DataflowUtils
- .getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(
- spec, recordDescriptor, splits, confFactory);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner,
- splits.size());
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanSortPrintGraph(String nodeName, String path) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * construct sort operator
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
- .getAscINormalizedKeyComputerFactory(vertexIdClass);
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(
- spec, maxFrameLimit, sortFields, nkmFactory,
- comparatorFactories, recordDescriptor);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter,
- splits.size());
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+ readSchedule, confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * construct write file operator
- */
- FileSplit resultFile = new FileSplit(nodeName, new FileReference(
- new File(path)));
- FileSplit[] results = new FileSplit[1];
- results[0] = resultFile;
- IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
- results);
- IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
- IRecordDescriptorFactory inputRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(
- spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
- null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
- new String[] { "nc1" });
- PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+ .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
+ nkmFactory, comparatorFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sorter);
- /**
- * connect operator descriptors
- */
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter,
- 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
- hashPartitionComputerFactory, sortFields, comparatorFactories),
- sorter, 0, writer, 0);
- return spec;
- }
+ /**
+ * construct write file operator
+ */
+ FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
+ resultFileSplitProvider, preHookFactory, null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanIndexPrintGraph(String nodeName, String path)
- throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+ comparatorFactories), sorter, 0, writer, 0);
+ return spec;
+ }
- /**
- * construct empty tuple operator
- */
- ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
- spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
- tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
- /**
- * construct btree search operator
- */
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- RecordDescriptor recordDescriptor = DataflowUtils
- .getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
- spec, recordDescriptor, storageManagerInterface,
- treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, null, true, true,
- new BTreeDataflowHelperFactory(), false,
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
- /**
- * construct write file operator
- */
- FileSplit resultFile = new FileSplit(nodeName, new FileReference(
- new File(path)));
- FileSplit[] results = new FileSplit[1];
- results[0] = resultFile;
- IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(
- results);
- IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
- IRecordDescriptorFactory inputRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(
- spec, inputRdFactory, resultFileSplitProvider, preHookFactory,
- null);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer,
- new String[] { "nc1" });
- PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+ /**
+ * construct btree search operator
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * connect operator descriptors
- */
- int[] sortFields = new int[1];
- sortFields[0] = 0;
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
- spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
- 0, scanner, 0);
- spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec,
- hashPartitionComputerFactory, sortFields, comparatorFactories),
- scanner, 0, writer, 0);
- spec.setFrameSize(frameSize);
- return spec;
- }
+ /**
+ * construct write file operator
+ */
+ FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
+ resultFileSplitProvider, preHookFactory, null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public JobSpecification scanIndexWriteGraph() throws HyracksException {
- Class<? extends WritableComparable<?>> vertexIdClass = BspUtils
- .getVertexIndexClass(conf);
- Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
- JobSpecification spec = new JobSpecification();
+ /**
+ * connect operator descriptors
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+ comparatorFactories), scanner, 0, writer, 0);
+ spec.setFrameSize(frameSize);
+ return spec;
+ }
- /**
- * construct empty tuple operator
- */
- ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = {
- UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(
- spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
- tb.getSize());
- ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanIndexWriteGraph() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
- /**
- * construct btree search operator
- */
- IConfigurationFactory confFactory = new ConfigurationFactory(conf);
- RecordDescriptor recordDescriptor = DataflowUtils
- .getRecordDescriptorFromKeyValueClasses(
- vertexIdClass.getName(), vertexClass.getName());
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
- comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(
- WritableComparator.get(vertexIdClass).getClass());
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, PRIMARY_INDEX);
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
- ITypeTraits[] typeTraits = new ITypeTraits[2];
- typeTraits[0] = new TypeTraits(false);
- typeTraits[1] = new TypeTraits(false);
- BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(
- spec, recordDescriptor, storageManagerInterface,
- treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, null, null, true, true,
- new BTreeDataflowHelperFactory(), false,
- NoOpOperationCallbackProvider.INSTANCE);
- ClusterConfig.setLocationConstraint(spec, scanner);
+ /**
+ * construct btree search operator
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
- /**
- * construct write file operator
- */
- IRecordDescriptorFactory inputRdFactory = DataflowUtils
- .getWritableRecordDescriptorFactoryFromWritableClasses(
- vertexIdClass.getName(), vertexClass.getName());
- HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(
- spec, confFactory, inputRdFactory);
- ClusterConfig.setLocationConstraint(spec, writer);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
- /**
- * connect operator descriptors
- */
- spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource,
- 0, scanner, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer,
- 0);
- return spec;
- }
+ /**
+ * construct write file operator
+ */
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
+ ClusterConfig.setLocationConstraint(spec, writer);
- /***
- * drop the sindex
- *
- * @return JobSpecification
- * @throws HyracksException
- */
- protected JobSpecification dropIndex(String indexName)
- throws HyracksException {
- JobSpecification spec = new JobSpecification();
+ /**
+ * connect operator descriptors
+ */
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+ return spec;
+ }
- IFileSplitProvider fileSplitProvider = ClusterConfig
- .getFileSplitProvider(jobId, indexName);
- TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider,
- fileSplitProvider);
+ /***
+ * drop the sindex
+ *
+ * @return JobSpecification
+ * @throws HyracksException
+ */
+ protected JobSpecification dropIndex(String indexName) throws HyracksException {
+ JobSpecification spec = new JobSpecification();
- ClusterConfig.setLocationConstraint(spec, drop);
- spec.addRoot(drop);
- return spec;
- }
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+ TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
+ treeRegistryProvider, fileSplitProvider);
- /** generate non-first iteration job */
- protected abstract JobSpecification generateNonFirstIteration(int iteration)
- throws HyracksException;
+ ClusterConfig.setLocationConstraint(spec, drop);
+ spec.addRoot(drop);
+ return spec;
+ }
- /** generate first iteration job */
- protected abstract JobSpecification generateFirstIteration(int iteration)
- throws HyracksException;
+ /** generate non-first iteration job */
+ protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
- /** generate clean-up job */
- public abstract JobSpecification[] generateCleanup()
- throws HyracksException;
+ /** generate first iteration job */
+ protected abstract JobSpecification generateFirstIteration(int iteration) throws HyracksException;
+
+ /** generate clean-up job */
+ public abstract JobSpecification[] generateCleanup() throws HyracksException;
}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 8eadab9..d26e637 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -40,6 +40,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.hdfs2.scheduler.Scheduler;
public class ClusterConfig {
@@ -49,6 +50,7 @@
private static Properties clusterProperties = new Properties();
private static Map<String, List<String>> ipToNcMapping;
private static String[] stores;
+ private static Scheduler hdfsScheduler;
/**
* let tests set config path to be whatever
@@ -211,6 +213,8 @@
NCs[i] = entry.getKey();
i++;
}
+
+ hdfsScheduler = new Scheduler(ipAddress, port);
} catch (Exception e) {
throw new IllegalStateException(e);
}
@@ -218,4 +222,8 @@
loadClusterProperties();
loadStores();
}
+
+ public static Scheduler getHdfsScheduler() {
+ return hdfsScheduler;
+ }
}
diff --git a/pregelix/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java b/pregelix/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
deleted file mode 100644
index 5efdde8..0000000
--- a/pregelix/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Names a file or directory in a {@link FileSystem}. Path strings use slash as
- * the directory separator. A path string is absolute if it begins with a slash.
- */
-@SuppressWarnings("rawtypes")
-public class Path implements Comparable, Serializable {
- private static final long serialVersionUID = 1L;
- /** The directory separator, a slash. */
- public static final String SEPARATOR = "/";
- public static final char SEPARATOR_CHAR = '/';
-
- public static final String CUR_DIR = ".";
-
- static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
-
- private URI uri; // a hierarchical uri
-
- /** Resolve a child path against a parent path. */
- public Path(String parent, String child) {
- this(new Path(parent), new Path(child));
- }
-
- /** Resolve a child path against a parent path. */
- public Path(Path parent, String child) {
- this(parent, new Path(child));
- }
-
- /** Resolve a child path against a parent path. */
- public Path(String parent, Path child) {
- this(new Path(parent), child);
- }
-
- /** Resolve a child path against a parent path. */
- public Path(Path parent, Path child) {
- // Add a slash to parent's path so resolution is compatible with URI's
- URI parentUri = parent.uri;
- String parentPath = parentUri.getPath();
- if (!(parentPath.equals("/") || parentPath.equals("")))
- try {
- parentUri = new URI(parentUri.getScheme(), parentUri.getAuthority(), parentUri.getPath() + "/", null,
- parentUri.getFragment());
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- URI resolved = parentUri.resolve(child.uri);
- initialize(resolved.getScheme(), resolved.getAuthority(), normalizePath(resolved.getPath()),
- resolved.getFragment());
- }
-
- private void checkPathArg(String path) {
- // disallow construction of a Path from an empty string
- if (path == null) {
- throw new IllegalArgumentException("Can not create a Path from a null string");
- }
- if (path.length() == 0) {
- throw new IllegalArgumentException("Can not create a Path from an empty string");
- }
- }
-
- /**
- * Construct a path from a String. Path strings are URIs, but with unescaped
- * elements and some additional normalization.
- */
- public Path(String pathString) {
- checkPathArg(pathString);
-
- // We can't use 'new URI(String)' directly, since it assumes things are
- // escaped, which we don't require of Paths.
-
- // add a slash in front of paths with Windows drive letters
- if (hasWindowsDrive(pathString, false))
- pathString = "/" + pathString;
-
- // parse uri components
- String scheme = null;
- String authority = null;
-
- int start = 0;
-
- // parse uri scheme, if any
- int colon = pathString.indexOf(':');
- int slash = pathString.indexOf('/');
- if ((colon != -1) && ((slash == -1) || (colon < slash))) { // has a
- // scheme
- scheme = pathString.substring(0, colon);
- start = colon + 1;
- }
-
- // parse uri authority, if any
- if (pathString.startsWith("//", start) && (pathString.length() - start > 2)) { // has
- // authority
- int nextSlash = pathString.indexOf('/', start + 2);
- int authEnd = nextSlash > 0 ? nextSlash : pathString.length();
- authority = pathString.substring(start + 2, authEnd);
- start = authEnd;
- }
-
- // uri path is the rest of the string -- query & fragment not supported
- String path = pathString.substring(start, pathString.length());
-
- initialize(scheme, authority, path, null);
- }
-
- /** Construct a Path from components. */
- public Path(String scheme, String authority, String path) {
- checkPathArg(path);
- initialize(scheme, authority, path, null);
- }
-
- /**
- * Construct a path from a URI
- */
- public Path(URI aUri) {
- uri = aUri;
- }
-
- private void initialize(String scheme, String authority, String path, String fragment) {
- try {
- this.uri = new URI(scheme, authority, normalizePath(path), null, fragment).normalize();
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- private String normalizePath(String path) {
- // remove double slashes & backslashes
- if (path.indexOf("//") != -1) {
- path = path.replace("//", "/");
- }
- if (path.indexOf("\\") != -1) {
- path = path.replace("\\", "/");
- }
-
- // trim trailing slash from non-root path (ignoring windows drive)
- int minLength = hasWindowsDrive(path, true) ? 4 : 1;
- if (path.length() > minLength && path.endsWith("/")) {
- path = path.substring(0, path.length() - 1);
- }
-
- return path;
- }
-
- private boolean hasWindowsDrive(String path, boolean slashed) {
- if (!WINDOWS)
- return false;
- int start = slashed ? 1 : 0;
- return path.length() >= start + 2
- && (slashed ? path.charAt(0) == '/' : true)
- && path.charAt(start + 1) == ':'
- && ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') || (path.charAt(start) >= 'a' && path
- .charAt(start) <= 'z'));
- }
-
- /** Convert this to a URI. */
- public URI toUri() {
- return uri;
- }
-
- /** Return the FileSystem that owns this Path. */
- public FileSystem getFileSystem(Configuration conf) throws IOException {
- return FileSystem.get(this.toUri(), conf);
- }
-
- /** True if the directory of this path is absolute. */
- public boolean isAbsolute() {
- int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0;
- return uri.getPath().startsWith(SEPARATOR, start);
- }
-
- /** Returns the final component of this path. */
- public String getName() {
- String path = uri.getPath();
- int slash = path.lastIndexOf(SEPARATOR);
- return path.substring(slash + 1);
- }
-
- /** Returns the parent of a path or null if at root. */
- public Path getParent() {
- String path = uri.getPath();
- int lastSlash = path.lastIndexOf('/');
- int start = hasWindowsDrive(path, true) ? 3 : 0;
- if ((path.length() == start) || // empty path
- (lastSlash == start && path.length() == start + 1)) { // at root
- return null;
- }
- String parent;
- if (lastSlash == -1) {
- parent = CUR_DIR;
- } else {
- int end = hasWindowsDrive(path, true) ? 3 : 0;
- parent = path.substring(0, lastSlash == end ? end + 1 : lastSlash);
- }
- return new Path(uri.getScheme(), uri.getAuthority(), parent);
- }
-
- /** Adds a suffix to the final name in the path. */
- public Path suffix(String suffix) {
- return new Path(getParent(), getName() + suffix);
- }
-
- public String toString() {
- // we can't use uri.toString(), which escapes everything, because we
- // want
- // illegal characters unescaped in the string, for glob processing, etc.
- StringBuffer buffer = new StringBuffer();
- if (uri.getScheme() != null) {
- buffer.append(uri.getScheme());
- buffer.append(":");
- }
- if (uri.getAuthority() != null) {
- buffer.append("//");
- buffer.append(uri.getAuthority());
- }
- if (uri.getPath() != null) {
- String path = uri.getPath();
- if (path.indexOf('/') == 0 && hasWindowsDrive(path, true) && // has
- // windows
- // drive
- uri.getScheme() == null && // but no scheme
- uri.getAuthority() == null) // or authority
- path = path.substring(1); // remove slash before drive
- buffer.append(path);
- }
- if (uri.getFragment() != null) {
- buffer.append("#");
- buffer.append(uri.getFragment());
- }
- return buffer.toString();
- }
-
- public boolean equals(Object o) {
- if (!(o instanceof Path)) {
- return false;
- }
- Path that = (Path) o;
- return this.uri.equals(that.uri);
- }
-
- public int hashCode() {
- return uri.hashCode();
- }
-
- public int compareTo(Object o) {
- Path that = (Path) o;
- return this.uri.compareTo(that.uri);
- }
-
- /** Return the number of elements in this path. */
- public int depth() {
- String path = uri.getPath();
- int depth = 0;
- int slash = path.length() == 1 && path.charAt(0) == '/' ? -1 : 0;
- while (slash != -1) {
- depth++;
- slash = path.indexOf(SEPARATOR, slash + 1);
- }
- return depth;
- }
-
- /** Returns a qualified path object. */
- public Path makeQualified(FileSystem fs) {
- Path path = this;
- if (!isAbsolute()) {
- path = new Path(fs.getWorkingDirectory(), this);
- }
-
- URI pathUri = path.toUri();
- URI fsUri = fs.getUri();
-
- String scheme = pathUri.getScheme();
- String authority = pathUri.getAuthority();
- String fragment = pathUri.getFragment();
- if (scheme != null && (authority != null || fsUri.getAuthority() == null))
- return path;
-
- if (scheme == null) {
- scheme = fsUri.getScheme();
- }
-
- if (authority == null) {
- authority = fsUri.getAuthority();
- if (authority == null) {
- authority = "";
- }
- }
-
- URI newUri = null;
- try {
- newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- return new Path(newUri);
- }
-
- /** Returns a qualified path object. */
- public Path makeQualified(URI defaultUri, Path workingDir) {
- Path path = this;
- if (!isAbsolute()) {
- path = new Path(workingDir, this);
- }
-
- URI pathUri = path.toUri();
-
- String scheme = pathUri.getScheme();
- String authority = pathUri.getAuthority();
- String fragment = pathUri.getFragment();
-
- if (scheme != null && (authority != null || defaultUri.getAuthority() == null))
- return path;
-
- if (scheme == null) {
- scheme = defaultUri.getScheme();
- }
-
- if (authority == null) {
- authority = defaultUri.getAuthority();
- if (authority == null) {
- authority = "";
- }
- }
-
- URI newUri = null;
- try {
- newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- return new Path(newUri);
- }
-}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java b/pregelix/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
deleted file mode 100644
index ac72160..0000000
--- a/pregelix/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * <code>InputSplit</code> represents the data to be processed by an individual {@link Mapper}.
- * <p>
- * Typically, it presents a byte-oriented view on the input and is the responsibility of {@link RecordReader} of the job to process this and present a record-oriented view.
- *
- * @see InputFormat
- * @see RecordReader
- */
-public abstract class InputSplit implements Serializable {
- private static final long serialVersionUID = 1L;
-
- /**
- * Get the size of the split, so that the input splits can be sorted by
- * size.
- *
- * @return the number of bytes in the split
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract long getLength() throws IOException, InterruptedException;
-
- /**
- * Get the list of nodes by name where the data for the split would be
- * local. The locations do not need to be serialized.
- *
- * @return a new array of the node nodes.
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract String[] getLocations() throws IOException, InterruptedException;
-}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelix b/pregelix/pregelix-core/src/main/resources/scripts/pregelix
index c3fd27b..6997078 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/pregelix
+++ b/pregelix/pregelix-core/src/main/resources/scripts/pregelix
@@ -91,7 +91,7 @@
REPO="$BASEDIR"/lib
fi
-CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:"$BASEDIR"/etc:$(echo ${REPO}/*.jar | tr ' ' ':'):$1
+CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:"$BASEDIR"/etc:$1
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
diff --git a/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..f89dd79 100644
--- a/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,8 +18,8 @@
<value>20</value>
</property>
<property>
- <name>mapred.min.split.size</name>
- <value>65536</value>
+ <name>mapred.max.split.size</name>
+ <value>4096</value>
</property>
</configuration>
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index db10aef..b27d88d 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-dataflow-std</artifactId>
<packaging>jar</packaging>
<name>pregelix-dataflow-std</name>
<parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>pregelix</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
<properties>
@@ -102,11 +103,13 @@
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
<version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
index ee3ac82..4cbd6c4 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -61,22 +61,21 @@
private FrameDeserializer frameDeserializer;
private final IFrameWriter[] writers = new IFrameWriter[outputArity];
private final IFunction function = functionFactory.createFunction();
+ private ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
@Override
- public void close() throws HyracksDataException {
- if (postHookFactory != null)
- postHookFactory.createRuntimeHook().configure(ctx);
- function.close();
+ public void open() throws HyracksDataException {
+ rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+ : inputRdFactory.createRecordDescriptor();
+ frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
for (IFrameWriter writer : writers) {
- writer.close();
+ writer.open();
}
- }
-
- @Override
- public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers) {
- writer.fail();
- }
+ if (preHookFactory != null)
+ preHookFactory.createRuntimeHook().configure(ctx);
+ function.open(ctx, rd0, writers);
}
@Override
@@ -89,17 +88,21 @@
}
@Override
- public void open() throws HyracksDataException {
- rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
- : inputRdFactory.createRecordDescriptor();
- frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ public void close() throws HyracksDataException {
+ if (postHookFactory != null)
+ postHookFactory.createRuntimeHook().configure(ctx);
+ function.close();
for (IFrameWriter writer : writers) {
- writer.open();
+ writer.close();
}
- if (preHookFactory != null)
- preHookFactory.createRuntimeHook().configure(ctx);
- function.open(ctx, rd0, writers);
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.fail();
+ }
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 82ac18e..99bca1a 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -37,6 +37,7 @@
private final IFrameWriter[] writers;
private TupleDeserializer tupleDe;
private RecordDescriptor inputRd;
+ private ClassLoader ctxCL;
public FunctionProxy(IHyracksTaskContext ctx, IUpdateFunctionFactory functionFactory,
IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
@@ -57,6 +58,7 @@
public void functionOpen() throws HyracksDataException {
inputRd = inputRdFactory.createRecordDescriptor();
tupleDe = new TupleDeserializer(inputRd);
+ ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
for (IFrameWriter writer : writers) {
writer.open();
@@ -124,5 +126,6 @@
for (IFrameWriter writer : writers) {
writer.close();
}
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 763ed72..77a76aa 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-dataflow</artifactId>
<packaging>jar</packaging>
<name>pregelix-dataflow</name>
<parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>pregelix</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
<properties>
@@ -104,13 +105,6 @@
<version>0.2.3-SNAPSHOT</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
<version>0.2.3-SNAPSHOT</version>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index c25e4c6..0133d761 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -64,17 +64,20 @@
return new AbstractUnaryInputSinkOperatorNodePushable() {
private RecordDescriptor rd0;
private FrameDeserializer frameDeserializer;
- private Configuration conf = confFactory.createConfiguration();
+ private Configuration conf;
private VertexWriter vertexWriter;
private TaskAttemptContext context;
private String TEMP_DIR = "_temporary";
+ private ClassLoader ctxCL;
@Override
public void open() throws HyracksDataException {
rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
: inputRdFactory.createRecordDescriptor();
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ conf = confFactory.createConfiguration();
VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
@@ -107,7 +110,7 @@
@Override
public void fail() throws HyracksDataException {
-
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
@Override
@@ -151,6 +154,8 @@
dfs.rename(srcFile, filePath);
} catch (IOException e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index f1b98f5..a38b19e 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -17,15 +17,14 @@
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -42,6 +41,8 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
@@ -50,38 +51,67 @@
@SuppressWarnings("rawtypes")
public class VertexFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final Logger LOGGER = Logger.getLogger(VertexFileScanOperatorDescriptor.class.getName());
private static final long serialVersionUID = 1L;
- private final List<InputSplit> splits;
+ private final FileSplitsFactory splitsFactory;
private final IConfigurationFactory confFactory;
private final int fieldSize = 2;
+ private final String[] scheduledLocations;
+ private final boolean[] executed;
/**
* @param spec
*/
public VertexFileScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, List<InputSplit> splits,
- IConfigurationFactory confFactory) throws HyracksException {
+ String[] scheduledLocations, IConfigurationFactory confFactory) throws HyracksException {
super(spec, 0, 1);
- this.splits = splits;
+ List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+ for (int i = 0; i < splits.size(); i++) {
+ fileSplits.add((FileSplit) splits.get(i));
+ }
+ this.splitsFactory = new FileSplitsFactory(fileSplits);
this.confFactory = confFactory;
+ this.scheduledLocations = scheduledLocations;
+ this.executed = new boolean[scheduledLocations.length];
+ Arrays.fill(executed, false);
this.recordDescriptors[0] = rd;
}
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
+ final List<FileSplit> splits = splitsFactory.getSplits();
+
return new AbstractUnaryOutputSourceOperatorNodePushable() {
- private Configuration conf = confFactory.createConfiguration();
+ private ClassLoader ctxCL;
+ private ContextFactory ctxFactory = new ContextFactory();
@Override
public void initialize() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ Configuration conf = confFactory.createConfiguration();
writer.open();
- loadVertices(ctx, partition);
+ for (int i = 0; i < scheduledLocations.length; i++) {
+ if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
+ /**
+ * pick one from the FileSplit queue
+ */
+ synchronized (executed) {
+ if (!executed[i]) {
+ executed[i] = true;
+ } else {
+ continue;
+ }
+ }
+ loadVertices(ctx, conf, i);
+ }
+ }
writer.close();
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
@@ -96,22 +126,17 @@
* @throws InterruptedException
*/
@SuppressWarnings("unchecked")
- private void loadVertices(final IHyracksTaskContext ctx, int partitionId) throws IOException,
- ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
+ private void loadVertices(final IHyracksTaskContext ctx, Configuration conf, int splitId)
+ throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
+ IllegalAccessException {
ByteBuffer frame = ctx.allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(frame, true);
VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
- InputSplit split = splits.get(partition);
+ InputSplit split = splits.get(splitId);
- if (split instanceof FileSplit) {
- FileSplit fileSplit = (FileSplit) split;
- LOGGER.info("read file split: " + fileSplit.getPath() + " location:" + fileSplit.getLocations()[0]
- + " start:" + fileSplit.getStart() + " length:" + split.getLength() + " partition:"
- + partition);
- }
VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
vertexReader.initialize(split, context);
Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
@@ -121,8 +146,7 @@
/**
* set context
*/
- Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- splits.get(partition));
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splits.get(splitId));
Vertex.setContext(mapperContext);
/**
@@ -166,5 +190,4 @@
}
};
}
-
}
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 02e1625..b6d4da7 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -54,6 +54,7 @@
public static final String ITERATIONS = "HyracksPageRankVertex.iteration";
private DoubleWritable outputValue = new DoubleWritable();
private DoubleWritable tmpVertexValue = new DoubleWritable();
+ private int maxIteration = -1;
/**
* Test whether combiner is called by summing up the messages.
@@ -97,7 +98,9 @@
@Override
public void compute(Iterator<DoubleWritable> msgIterator) {
- int maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
+ if (maxIteration < 0) {
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
+ }
if (getSuperstep() == 1) {
tmpVertexValue.set(1.0 / getNumVertices());
setVertexValue(tmpVertexValue);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index daafc82..0895386 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -85,6 +85,7 @@
}
private ByteWritable tmpVertexValue = new ByteWritable();
+ private long sourceId = -1;
/** The source vertex id */
public static final String SOURCE_ID = "ReachibilityVertex.sourceId";
@@ -101,7 +102,7 @@
* @return True if the source id
*/
private boolean isSource(VLongWritable v) {
- return (v.get() == getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT));
+ return (v.get() == sourceId);
}
/**
@@ -115,6 +116,9 @@
@Override
public void compute(Iterator<ByteWritable> msgIterator) {
+ if (sourceId < 0) {
+ sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+ }
if (getSuperstep() == 1) {
boolean isSource = isSource(getVertexId());
if (isSource) {
@@ -160,7 +164,7 @@
}
voteToHalt();
}
-
+
@Override
public String toString() {
return getVertexId() + " " + getVertexValue();
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
index 7a5bba6..5a556fa 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
@@ -38,7 +38,7 @@
public class RunJobTestCase extends TestCase {
private static final String NC1 = "nc1";
- private static final String HYRACKS_APP_NAME = "giraph";
+ private static final String HYRACKS_APP_NAME = "pregelix";
private static String HDFS_INPUTPATH = "/webmap";
private static String HDFS_OUTPUTPAH = "/result";
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index ca16c15..4bf83e6 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -62,7 +62,7 @@
private static final String DATA_PATH3 = "data/clique/clique.txt";
private static final String HDFS_PATH3 = "/clique/";
- private static final String HYRACKS_APP_NAME = "giraph";
+ private static final String HYRACKS_APP_NAME = "pregelix";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
private MiniDFSCluster dfsCluster;
diff --git a/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..71450f1 100644
--- a/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/pregelix/pregelix-example/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,8 +18,8 @@
<value>20</value>
</property>
<property>
- <name>mapred.min.split.size</name>
- <value>65536</value>
+ <name>mapred.max.split.size</name>
+ <value>128</value>
</property>
</configuration>
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 41baab0..a0d25cc 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -118,13 +118,6 @@
<version>0.2.3-SNAPSHOT</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
<version>0.2.3-SNAPSHOT</version>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 1b8fce4..a0dca3d 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -104,11 +104,13 @@
private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
private Configuration conf;
+ private boolean dynamicStateLength;
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
this.conf = confFactory.createConfiguration();
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
@@ -241,8 +243,8 @@
public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
- if (!BspUtils.getDynamicVertexValueSize(conf)) {
- //in-place update
+ if (!dynamicStateLength) {
+ // in-place update
int fieldCount = tupleRef.getFieldCount();
for (int i = 1; i < fieldCount; i++) {
byte[] data = tupleRef.getFieldData(i);
@@ -251,12 +253,12 @@
vertex.write(output);
}
} else {
- //write the vertex id
+ // write the vertex id
DataOutput tbOutput = cloneUpdateTb.getDataOutput();
vertex.getVertexId().write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
- //write the vertex value
+ // write the vertex value
vertex.write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index a4d54c8..3d8a355 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -79,7 +79,7 @@
private ByteBuffer bufferGlobalAggregate;
private GlobalAggregator aggregator;
- //for writing out the global aggregate
+ // for writing out the global aggregate
private IFrameWriter writerTerminate;
private FrameTupleAppender appenderTerminate;
private ByteBuffer bufferTerminate;
@@ -107,11 +107,13 @@
private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
private Configuration conf;
+ private boolean dynamicStateLength;
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
this.conf = confFactory.createConfiguration();
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
@@ -215,7 +217,8 @@
private void writeOutGlobalAggregate() throws HyracksDataException {
try {
/**
- * get partial aggregate result and flush to the final aggregator
+ * get partial aggregate result and flush to the final
+ * aggregator
*/
Writable agg = aggregator.finishPartial();
agg.write(tbGlobalAggregate.getDataOutput());
@@ -244,8 +247,8 @@
public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
- if (!BspUtils.getDynamicVertexValueSize(conf)) {
- //in-place update
+ if (!dynamicStateLength) {
+ // in-place update
int fieldCount = tupleRef.getFieldCount();
for (int i = 1; i < fieldCount; i++) {
byte[] data = tupleRef.getFieldData(i);
@@ -254,12 +257,12 @@
vertex.write(output);
}
} else {
- //write the vertex id
+ // write the vertex id
DataOutput tbOutput = cloneUpdateTb.getDataOutput();
vertex.getVertexId().write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
- //write the vertex value
+ // write the vertex value
vertex.write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index f7d0018..d968262 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -15,12 +15,11 @@
package edu.uci.ics.pregelix.runtime.touchpoint;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -40,14 +39,13 @@
public IRuntimeHook createRuntimeHook() {
return new IRuntimeHook() {
+ private ContextFactory ctxFactory = new ContextFactory();
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
Configuration conf = confFactory.createConfiguration();
try {
- Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- null);
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, null);
Vertex.setContext(mapperContext);
BspUtils.setDefaultConfiguration(conf);
} catch (Exception e) {