Merge fullstack_asterix_stabilization into fullstack_hyracks_result_distribution.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2965 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index 2caa93b..0643804 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -77,11 +77,11 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index b7f9e3d..a8cd3db 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -26,7 +26,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -66,7 +66,7 @@
/** List of incoming messages from the previous superstep */
private final List<M> msgList = new ArrayList<M>();
/** map context */
- private static Mapper.Context context = null;
+ private static TaskAttemptContext context = null;
/** a delegate for hyracks stuff */
private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
/** this vertex is updated or not */
@@ -444,8 +444,10 @@
/**
* Add a new vertex into the graph
*
- * @param vertexId the vertex id
- * @param vertex the vertex
+ * @param vertexId
+ * the vertex id
+ * @param vertex
+ * the vertex
*/
public final void addVertex(I vertexId, V vertex) {
delegate.addVertex(vertexId, vertex);
@@ -454,7 +456,8 @@
/**
* Delete a vertex from id
*
- * @param vertexId the vertex id
+ * @param vertexId
+ * the vertex id
*/
public final void deleteVertex(I vertexId) {
delegate.deleteVertex(vertexId);
@@ -528,7 +531,7 @@
/**
* Pregelix internal use only
*/
- public static final Mapper<?, ?, ?, ?>.Context getContext() {
+ public static final TaskAttemptContext getContext() {
return context;
}
@@ -537,7 +540,7 @@
*
* @param context
*/
- public static final void setContext(Mapper<?, ?, ?, ?>.Context context) {
+ public static final void setContext(TaskAttemptContext context) {
Vertex.context = context;
}
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 34faa82..0908829 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -244,11 +244,6 @@
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
<version>0.2.3-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -281,13 +276,6 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>com.kenai.nbpwr</groupId>
<artifactId>org-apache-commons-io</artifactId>
<version>1.3.1-201002241208</version>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 1b6f195..f07a246 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -72,22 +72,23 @@
public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
throws HyracksException {
applicationName = exampleClass.getSimpleName() + UUID.randomUUID();
- /** add hadoop configurations */
- URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
- job.getConfiguration().addResource(hadoopCore);
- URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
- job.getConfiguration().addResource(hadoopMapRed);
- URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
- job.getConfiguration().addResource(hadoopHdfs);
- ClusterConfig.loadClusterConfig(ipAddress, port);
-
- LOG.info("job started");
- long start = System.currentTimeMillis();
- long end = start;
- long time = 0;
-
- this.profiling = profiling;
try {
+ /** add hadoop configurations */
+ URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+ job.getConfiguration().addResource(hadoopCore);
+ URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+ job.getConfiguration().addResource(hadoopMapRed);
+ URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+ job.getConfiguration().addResource(hadoopHdfs);
+ ClusterConfig.loadClusterConfig(ipAddress, port);
+
+ LOG.info("job started");
+ long start = System.currentTimeMillis();
+ long end = start;
+ long time = 0;
+
+ this.profiling = profiling;
+
switch (planChoice) {
case INNER_JOIN:
jobGen = new JobGenInnerJoin(job);
@@ -146,6 +147,16 @@
LOG.info("result writing finished " + time + "ms");
LOG.info("job finished");
} catch (Exception e) {
+ try {
+ /**
+ * destroy application if there is any exception
+ */
+ if (hcc != null) {
+ destroyApplication(applicationName);
+ }
+ } catch (Exception e2) {
+ throw new HyracksException(e2);
+ }
throw new HyracksException(e);
}
}
@@ -224,8 +235,8 @@
LOG.info("jar deployment finished " + (end - start) + "ms");
}
- public void destroyApplication(String jarFile) throws Exception {
- hcc.destroyApplication(applicationName);
+ public void destroyApplication(String appName) throws Exception {
+ hcc.destroyApplication(appName);
}
}
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 6c039bf..3f3ba00 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-dataflow-std</artifactId>
<packaging>jar</packaging>
<name>pregelix-dataflow-std</name>
<parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>pregelix</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
<properties>
@@ -101,11 +102,13 @@
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
<version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -144,12 +147,5 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-hdfs</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
index ee3ac82..4cbd6c4 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -61,22 +61,21 @@
private FrameDeserializer frameDeserializer;
private final IFrameWriter[] writers = new IFrameWriter[outputArity];
private final IFunction function = functionFactory.createFunction();
+ private ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
@Override
- public void close() throws HyracksDataException {
- if (postHookFactory != null)
- postHookFactory.createRuntimeHook().configure(ctx);
- function.close();
+ public void open() throws HyracksDataException {
+ rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+ : inputRdFactory.createRecordDescriptor();
+ frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
for (IFrameWriter writer : writers) {
- writer.close();
+ writer.open();
}
- }
-
- @Override
- public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers) {
- writer.fail();
- }
+ if (preHookFactory != null)
+ preHookFactory.createRuntimeHook().configure(ctx);
+ function.open(ctx, rd0, writers);
}
@Override
@@ -89,17 +88,21 @@
}
@Override
- public void open() throws HyracksDataException {
- rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
- : inputRdFactory.createRecordDescriptor();
- frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ public void close() throws HyracksDataException {
+ if (postHookFactory != null)
+ postHookFactory.createRuntimeHook().configure(ctx);
+ function.close();
for (IFrameWriter writer : writers) {
- writer.open();
+ writer.close();
}
- if (preHookFactory != null)
- preHookFactory.createRuntimeHook().configure(ctx);
- function.open(ctx, rd0, writers);
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.fail();
+ }
}
@Override
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 82ac18e..99bca1a 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -37,6 +37,7 @@
private final IFrameWriter[] writers;
private TupleDeserializer tupleDe;
private RecordDescriptor inputRd;
+ private ClassLoader ctxCL;
public FunctionProxy(IHyracksTaskContext ctx, IUpdateFunctionFactory functionFactory,
IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory,
@@ -57,6 +58,7 @@
public void functionOpen() throws HyracksDataException {
inputRd = inputRdFactory.createRecordDescriptor();
tupleDe = new TupleDeserializer(inputRd);
+ ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
for (IFrameWriter writer : writers) {
writer.open();
@@ -124,5 +126,6 @@
for (IFrameWriter writer : writers) {
writer.close();
}
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 31b6adc..107f059 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-dataflow</artifactId>
<packaging>jar</packaging>
<name>pregelix-dataflow</name>
<parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>pregelix</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
<properties>
@@ -103,13 +104,6 @@
<version>0.2.3-SNAPSHOT</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
<version>0.2.3-SNAPSHOT</version>
@@ -144,12 +138,5 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-hdfs</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index c25e4c6..0133d76 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -64,17 +64,20 @@
return new AbstractUnaryInputSinkOperatorNodePushable() {
private RecordDescriptor rd0;
private FrameDeserializer frameDeserializer;
- private Configuration conf = confFactory.createConfiguration();
+ private Configuration conf;
private VertexWriter vertexWriter;
private TaskAttemptContext context;
private String TEMP_DIR = "_temporary";
+ private ClassLoader ctxCL;
@Override
public void open() throws HyracksDataException {
rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
: inputRdFactory.createRecordDescriptor();
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ conf = confFactory.createConfiguration();
VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
@@ -107,7 +110,7 @@
@Override
public void fail() throws HyracksDataException {
-
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
@Override
@@ -151,6 +154,8 @@
dfs.rename(srcFile, filePath);
} catch (IOException e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index cb0e339..a38b19e 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -25,8 +25,6 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -43,6 +41,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
@@ -83,12 +82,15 @@
final List<FileSplit> splits = splitsFactory.getSplits();
return new AbstractUnaryOutputSourceOperatorNodePushable() {
- private Configuration conf = confFactory.createConfiguration();
+ private ClassLoader ctxCL;
+ private ContextFactory ctxFactory = new ContextFactory();
@Override
public void initialize() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ Configuration conf = confFactory.createConfiguration();
writer.open();
for (int i = 0; i < scheduledLocations.length; i++) {
if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
@@ -102,12 +104,14 @@
continue;
}
}
- loadVertices(ctx, i);
+ loadVertices(ctx, conf, i);
}
}
writer.close();
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
@@ -122,8 +126,9 @@
* @throws InterruptedException
*/
@SuppressWarnings("unchecked")
- private void loadVertices(final IHyracksTaskContext ctx, int splitId) throws IOException,
- ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
+ private void loadVertices(final IHyracksTaskContext ctx, Configuration conf, int splitId)
+ throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
+ IllegalAccessException {
ByteBuffer frame = ctx.allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(frame, true);
@@ -141,8 +146,7 @@
/**
* set context
*/
- Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- splits.get(splitId));
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splits.get(splitId));
Vertex.setContext(mapperContext);
/**
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
index 02e1625..b6d4da7 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/PageRankVertex.java
@@ -54,6 +54,7 @@
public static final String ITERATIONS = "HyracksPageRankVertex.iteration";
private DoubleWritable outputValue = new DoubleWritable();
private DoubleWritable tmpVertexValue = new DoubleWritable();
+ private int maxIteration = -1;
/**
* Test whether combiner is called by summing up the messages.
@@ -97,7 +98,9 @@
@Override
public void compute(Iterator<DoubleWritable> msgIterator) {
- int maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
+ if (maxIteration < 0) {
+ maxIteration = getContext().getConfiguration().getInt(ITERATIONS, 10);
+ }
if (getSuperstep() == 1) {
tmpVertexValue.set(1.0 / getNumVertices());
setVertexValue(tmpVertexValue);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index daafc82..0895386 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -85,6 +85,7 @@
}
private ByteWritable tmpVertexValue = new ByteWritable();
+ private long sourceId = -1;
/** The source vertex id */
public static final String SOURCE_ID = "ReachibilityVertex.sourceId";
@@ -101,7 +102,7 @@
* @return True if the source id
*/
private boolean isSource(VLongWritable v) {
- return (v.get() == getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT));
+ return (v.get() == sourceId);
}
/**
@@ -115,6 +116,9 @@
@Override
public void compute(Iterator<ByteWritable> msgIterator) {
+ if (sourceId < 0) {
+ sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+ }
if (getSuperstep() == 1) {
boolean isSource = isSource(getVertexId());
if (isSource) {
@@ -160,7 +164,7 @@
}
voteToHalt();
}
-
+
@Override
public String toString() {
return getVertexId() + " " + getVertexValue();
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
index 7a5bba6..5a556fa 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
@@ -38,7 +38,7 @@
public class RunJobTestCase extends TestCase {
private static final String NC1 = "nc1";
- private static final String HYRACKS_APP_NAME = "giraph";
+ private static final String HYRACKS_APP_NAME = "pregelix";
private static String HDFS_INPUTPATH = "/webmap";
private static String HDFS_OUTPUTPAH = "/result";
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index b5abd94..79a5c3c 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -41,168 +41,176 @@
@SuppressWarnings("deprecation")
public class RunJobTestSuite extends TestSuite {
- private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class
+ .getName());
- private static final String ACTUAL_RESULT_DIR = "actual";
- private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
- private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
- private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
- private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
- private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
- private static final String FILE_EXTENSION_OF_RESULTS = "result";
+ private static final String ACTUAL_RESULT_DIR = "actual";
+ private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+ private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+ private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
+ private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
+ private static final String FILE_EXTENSION_OF_RESULTS = "result";
- private static final String DATA_PATH = "data/webmap/webmap_link.txt";
- private static final String HDFS_PATH = "/webmap/";
+ private static final String DATA_PATH = "data/webmap/webmap_link.txt";
+ private static final String HDFS_PATH = "/webmap/";
- private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
- private static final String HDFS_PATH2 = "/webmapcomplex/";
+ private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
+ private static final String HDFS_PATH2 = "/webmapcomplex/";
- private static final String DATA_PATH3 = "data/clique/clique.txt";
- private static final String HDFS_PATH3 = "/clique/";
+ private static final String DATA_PATH3 = "data/clique/clique.txt";
+ private static final String HDFS_PATH3 = "/clique/";
- private static final String HYRACKS_APP_NAME = "giraph";
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
+ private static final String HYRACKS_APP_NAME = "pregelix";
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
+ + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
- public void setUp() throws Exception {
- ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
- ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
- cleanupStores();
- PregelixHyracksIntegrationUtil.init();
- PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
- LOGGER.info("Hyracks mini-cluster started");
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
- }
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init();
+ PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_PATH);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(DATA_PATH);
+ Path dest = new Path(HDFS_PATH);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
- src = new Path(DATA_PATH2);
- dest = new Path(HDFS_PATH2);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ src = new Path(DATA_PATH2);
+ dest = new Path(HDFS_PATH2);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
- src = new Path(DATA_PATH3);
- dest = new Path(HDFS_PATH3);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ src = new Path(DATA_PATH3);
+ dest = new Path(HDFS_PATH3);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
- DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
+ DataOutputStream confOutput = new DataOutputStream(
+ new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
- /**
- * cleanup hdfs cluster
- */
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
- public void tearDown() throws Exception {
- PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
- PregelixHyracksIntegrationUtil.deinit();
- LOGGER.info("Hyracks mini-cluster shut down");
- cleanupHDFS();
- }
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
- public static Test suite() throws Exception {
- List<String> ignores = getFileList(PATH_TO_IGNORE);
- List<String> onlys = getFileList(PATH_TO_ONLY);
- File testData = new File(PATH_TO_JOBS);
- File[] queries = testData.listFiles();
- RunJobTestSuite testSuite = new RunJobTestSuite();
- testSuite.setUp();
- boolean onlyEnabled = false;
+ public static Test suite() throws Exception {
+ List<String> ignores = getFileList(PATH_TO_IGNORE);
+ List<String> onlys = getFileList(PATH_TO_ONLY);
+ File testData = new File(PATH_TO_JOBS);
+ File[] queries = testData.listFiles();
+ RunJobTestSuite testSuite = new RunJobTestSuite();
+ testSuite.setUp();
+ boolean onlyEnabled = false;
- if (onlys.size() > 0) {
- onlyEnabled = true;
- }
- for (File qFile : queries) {
- if (isInList(ignores, qFile.getName()))
- continue;
+ if (onlys.size() > 0) {
+ onlyEnabled = true;
+ }
+ for (File qFile : queries) {
+ if (isInList(ignores, qFile.getName()))
+ continue;
- if (qFile.isFile()) {
- if (onlyEnabled && !isInList(onlys, qFile.getName())) {
- continue;
- } else {
- String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
- String expectedFileName = EXPECTED_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
- testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile.getAbsolutePath()
- .toString(), resultFileName, expectedFileName));
- }
- }
- }
- return testSuite;
- }
+ if (qFile.isFile()) {
+ if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+ continue;
+ } else {
+ String resultFileName = ACTUAL_RESULT_DIR + File.separator
+ + jobExtToResExt(qFile.getName());
+ String expectedFileName = EXPECTED_RESULT_DIR
+ + File.separator + jobExtToResExt(qFile.getName());
+ testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH,
+ qFile.getName(),
+ qFile.getAbsolutePath().toString(), resultFileName,
+ expectedFileName));
+ }
+ }
+ }
+ return testSuite;
+ }
- /**
- * Runs the tests and collects their result in a TestResult.
- */
- @Override
- public void run(TestResult result) {
- try {
- int testCount = countTestCases();
- for (int i = 0; i < testCount; i++) {
- // cleanupStores();
- Test each = this.testAt(i);
- if (result.shouldStop())
- break;
- runTest(each, result);
- }
- tearDown();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
+ /**
+ * Runs the tests and collects their result in a TestResult.
+ */
+ @Override
+ public void run(TestResult result) {
+ try {
+ int testCount = countTestCases();
+ for (int i = 0; i < testCount; i++) {
+ // cleanupStores();
+ Test each = this.testAt(i);
+ if (result.shouldStop())
+ break;
+ runTest(each, result);
+ }
+ tearDown();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
- protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
- BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
- String s = null;
- List<String> ignores = new ArrayList<String>();
- while ((s = reader.readLine()) != null) {
- ignores.add(s);
- }
- reader.close();
- return ignores;
- }
+ protected static List<String> getFileList(String ignorePath)
+ throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
- private static String jobExtToResExt(String fname) {
- int dot = fname.lastIndexOf('.');
- return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
- }
+ private static String jobExtToResExt(String fname) {
+ int dot = fname.lastIndexOf('.');
+ return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
+ }
- private static boolean isInList(List<String> onlys, String name) {
- for (String only : onlys)
- if (name.indexOf(only) >= 0)
- return true;
- return false;
- }
+ private static boolean isInList(List<String> onlys, String name) {
+ for (String only : onlys)
+ if (name.indexOf(only) >= 0)
+ return true;
+ return false;
+ }
}
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index 1c414ff..82be6b9 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -117,13 +117,6 @@
<version>0.2.3-SNAPSHOT</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
<version>0.2.3-SNAPSHOT</version>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
index e7caeaf..76c725e 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
@@ -34,7 +34,7 @@
@Override
public void stop() throws Exception {
- LOGGER.info("Stopping Giraph NC Bootstrap");
+ LOGGER.info("Stopping NC Bootstrap");
RuntimeContext rCtx = (RuntimeContext) appCtx.getApplicationObject();
rCtx.close();
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 1b8fce4..a0dca3d 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -104,11 +104,13 @@
private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
private Configuration conf;
+ private boolean dynamicStateLength;
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
this.conf = confFactory.createConfiguration();
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
@@ -241,8 +243,8 @@
public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
- if (!BspUtils.getDynamicVertexValueSize(conf)) {
- //in-place update
+ if (!dynamicStateLength) {
+ // in-place update
int fieldCount = tupleRef.getFieldCount();
for (int i = 1; i < fieldCount; i++) {
byte[] data = tupleRef.getFieldData(i);
@@ -251,12 +253,12 @@
vertex.write(output);
}
} else {
- //write the vertex id
+ // write the vertex id
DataOutput tbOutput = cloneUpdateTb.getDataOutput();
vertex.getVertexId().write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
- //write the vertex value
+ // write the vertex value
vertex.write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index a4d54c8..3d8a355 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -79,7 +79,7 @@
private ByteBuffer bufferGlobalAggregate;
private GlobalAggregator aggregator;
- //for writing out the global aggregate
+ // for writing out the global aggregate
private IFrameWriter writerTerminate;
private FrameTupleAppender appenderTerminate;
private ByteBuffer bufferTerminate;
@@ -107,11 +107,13 @@
private final List<FrameTupleAppender> appenders = new ArrayList<FrameTupleAppender>();
private final List<ArrayTupleBuilder> tbs = new ArrayList<ArrayTupleBuilder>();
private Configuration conf;
+ private boolean dynamicStateLength;
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
this.conf = confFactory.createConfiguration();
+ this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
@@ -215,7 +217,8 @@
private void writeOutGlobalAggregate() throws HyracksDataException {
try {
/**
- * get partial aggregate result and flush to the final aggregator
+ * get partial aggregate result and flush to the final
+ * aggregator
*/
Writable agg = aggregator.finishPartial();
agg.write(tbGlobalAggregate.getDataOutput());
@@ -244,8 +247,8 @@
public void update(ITupleReference tupleRef, ArrayTupleBuilder cloneUpdateTb) throws HyracksDataException {
try {
if (vertex != null && vertex.hasUpdate()) {
- if (!BspUtils.getDynamicVertexValueSize(conf)) {
- //in-place update
+ if (!dynamicStateLength) {
+ // in-place update
int fieldCount = tupleRef.getFieldCount();
for (int i = 1; i < fieldCount; i++) {
byte[] data = tupleRef.getFieldData(i);
@@ -254,12 +257,12 @@
vertex.write(output);
}
} else {
- //write the vertex id
+ // write the vertex id
DataOutput tbOutput = cloneUpdateTb.getDataOutput();
vertex.getVertexId().write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
- //write the vertex value
+ // write the vertex value
vertex.write(tbOutput);
cloneUpdateTb.addFieldEndOffset();
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index f7d0018..d968262 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -15,12 +15,11 @@
package edu.uci.ics.pregelix.runtime.touchpoint;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.util.BspUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -40,14 +39,13 @@
public IRuntimeHook createRuntimeHook() {
return new IRuntimeHook() {
+ private ContextFactory ctxFactory = new ContextFactory();
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
Configuration conf = confFactory.createConfiguration();
try {
- Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- null);
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, null);
Vertex.setContext(mapperContext);
BspUtils.setDefaultConfiguration(conf);
} catch (Exception e) {