reintegrate fullstack_dynamic_deployment
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 72256f9..1f071bf 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -22,9 +22,6 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,6 +30,7 @@
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -45,7 +43,6 @@
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.pregelix.core.util.Utilities;
import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
@SuppressWarnings("rawtypes")
@@ -54,9 +51,7 @@
private JobGen jobGen;
private boolean profiling;
- private String applicationName;
private IHyracksClientConnection hcc;
-
private Class exampleClass;
public Driver(Class exampleClass) {
@@ -71,7 +66,6 @@
@Override
public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
throws HyracksException {
- applicationName = exampleClass.getSimpleName() + UUID.randomUUID();
try {
/** add hadoop configurations */
URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
@@ -121,13 +115,13 @@
for (URL url : urls)
if (url.toString().endsWith(".jar"))
jars.add(new File(url.getPath()));
- installApplication(jars);
+ DeploymentId deploymentId = installApplication(jars);
start = System.currentTimeMillis();
FileSystem dfs = FileSystem.get(job.getConfiguration());
dfs.delete(FileOutputFormat.getOutputPath(job), true);
- runCreate(jobGen);
- runDataLoad(jobGen);
+ runCreate(deploymentId, jobGen);
+ runDataLoad(deploymentId, jobGen);
end = System.currentTimeMillis();
time = end - start;
LOG.info("data loading finished " + time + "ms");
@@ -135,7 +129,7 @@
boolean terminate = false;
do {
start = System.currentTimeMillis();
- runLoopBodyIteration(jobGen, i);
+ runLoopBodyIteration(deploymentId, jobGen, i);
end = System.currentTimeMillis();
time = end - start;
LOG.info("iteration " + i + " finished " + time + "ms");
@@ -145,90 +139,86 @@
} while (!terminate);
start = System.currentTimeMillis();
- runHDFSWRite(jobGen);
- runCleanup(jobGen);
+ runHDFSWRite(deploymentId, jobGen);
+ runCleanup(deploymentId, jobGen);
end = System.currentTimeMillis();
time = end - start;
LOG.info("result writing finished " + time + "ms");
+ hcc.unDeployBinary(deploymentId);
LOG.info("job finished");
} catch (Exception e) {
throw new HyracksException(e);
}
}
- private void runCreate(JobGen jobGen) throws Exception {
+ private void runCreate(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
- execute(treeCreateSpec);
+ execute(deploymentId, treeCreateSpec);
} catch (Exception e) {
throw e;
}
}
- private void runDataLoad(JobGen jobGen) throws Exception {
+ private void runDataLoad(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
- execute(bulkLoadJobSpec);
+ execute(deploymentId, bulkLoadJobSpec);
} catch (Exception e) {
throw e;
}
}
- private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
+ private void runLoopBodyIteration(DeploymentId deploymentId, JobGen jobGen, int iteration) throws Exception {
try {
JobSpecification loopBody = jobGen.generateJob(iteration);
- execute(loopBody);
+ execute(deploymentId, loopBody);
} catch (Exception e) {
throw e;
}
}
- private void runHDFSWRite(JobGen jobGen) throws Exception {
+ private void runHDFSWRite(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
- execute(scanSortPrintJobSpec);
+ execute(deploymentId, scanSortPrintJobSpec);
} catch (Exception e) {
throw e;
}
}
- private void runCleanup(JobGen jobGen) throws Exception {
+ private void runCleanup(DeploymentId deploymentId, JobGen jobGen) throws Exception {
try {
JobSpecification[] cleanups = jobGen.generateCleanup();
- runJobArray(cleanups);
+ runJobArray(deploymentId, cleanups);
} catch (Exception e) {
throw e;
}
}
- private void runJobArray(JobSpecification[] jobs) throws Exception {
+ private void runJobArray(DeploymentId deploymentId, JobSpecification[] jobs) throws Exception {
for (JobSpecification job : jobs) {
- execute(job);
+ execute(deploymentId, job);
}
}
- private void execute(JobSpecification job) throws Exception {
+ private void execute(DeploymentId deploymentId, JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
- JobId jobId = hcc
- .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ JobId jobId = hcc.startJob(deploymentId, job,
+ profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
}
- public void installApplication(List<File> jars) throws Exception {
- Set<String> allJars = new TreeSet<String>();
+ public DeploymentId installApplication(List<File> jars) throws Exception {
+ List<String> allJars = new ArrayList<String>();
for (File jar : jars) {
allJars.add(jar.getAbsolutePath());
}
long start = System.currentTimeMillis();
- File appZip = Utilities.getHyracksArchive(applicationName, allJars);
+ DeploymentId deploymentId = hcc.deployBinary(allJars);
long end = System.currentTimeMillis();
- LOG.info("jar packing finished " + (end - start) + "ms");
-
- start = System.currentTimeMillis();
- // TODO: Fix this step to use Yarn
- //hcc.createApplication(applicationName, appZip);
- end = System.currentTimeMillis();
LOG.info("jar deployment finished " + (end - start) + "ms");
+ return deploymentId;
}
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
index f3089ba..d225eb4 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
@@ -17,6 +17,7 @@
import org.apache.hadoop.conf.Configuration;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.pregelix.api.util.SerDeUtils;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -34,11 +35,11 @@
}
@Override
- public Configuration createConfiguration() throws HyracksDataException {
+ public Configuration createConfiguration(IHyracksTaskContext ctx) throws HyracksDataException {
try {
Configuration conf = new Configuration();
+ conf.setClassLoader(ctx.getJobletContext().getClassLoader());
SerDeUtils.deserialize(conf, data);
- conf.setClassLoader(this.getClass().getClassLoader());
return conf;
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 77fd1a7..ce1a34d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -73,7 +73,6 @@
import edu.uci.ics.pregelix.core.jobgen.provider.NormalizedKeyComputerFactoryProvider;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
-import edu.uci.ics.pregelix.core.util.DatatypeHelper;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
@@ -84,6 +83,7 @@
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
public abstract class JobGen implements IJobGen {
private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
@@ -239,7 +239,7 @@
* connect operator descriptors
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ new WritableSerializerDeserializerFactory(vertexIdClass));
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
return spec;
@@ -315,7 +315,7 @@
* connect operator descriptors
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ new WritableSerializerDeserializerFactory(vertexIdClass));
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
comparatorFactories), sorter, 0, writer, 0);
@@ -382,7 +382,7 @@
int[] sortFields = new int[1];
sortFields[0] = 0;
ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
- DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ new WritableSerializerDeserializerFactory(vertexIdClass));
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
comparatorFactories), scanner, 0, writer, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index fe2fcac..ff2f6a0 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -73,6 +73,7 @@
import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
public class JobGenInnerJoin extends JobGen {
@@ -245,7 +246,7 @@
ClusterConfig.setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- rdUnnestedMessage.getFields()[0]);
+ new WritableSerializerDeserializerFactory(vertexIdClass));
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
/** connect all operators **/
@@ -470,7 +471,7 @@
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- rdUnnestedMessage.getFields()[0]);
+ new WritableSerializerDeserializerFactory(vertexIdClass));
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index f1eceb7..ee385f1 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -73,6 +73,7 @@
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
public class JobGenOuterJoin extends JobGen {
@@ -221,14 +222,13 @@
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
- ClusterConfig.setLocationConstraint(spec, emptySink3);
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- rdUnnestedMessage.getFields()[0]);
+ new WritableSerializerDeserializerFactory(vertexIdClass));
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -432,7 +432,7 @@
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- rdUnnestedMessage.getFields()[0]);
+ new WritableSerializerDeserializerFactory(vertexIdClass));
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 314c393..40b5f45 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -72,6 +72,7 @@
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
public class JobGenOuterJoinSingleSort extends JobGen {
@@ -221,7 +222,7 @@
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- rdUnnestedMessage.getFields()[0]);
+ new WritableSerializerDeserializerFactory(vertexIdClass));
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -408,7 +409,7 @@
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- rdUnnestedMessage.getFields()[0]);
+ new WritableSerializerDeserializerFactory(vertexIdClass));
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 0c3db38..3351a2c 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -72,6 +72,7 @@
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.WritableSerializerDeserializerFactory;
public class JobGenOuterJoinSort extends JobGen {
@@ -234,7 +235,7 @@
ClusterConfig.setLocationConstraint(spec, emptySink4);
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- rdUnnestedMessage.getFields()[0]);
+ new WritableSerializerDeserializerFactory(vertexIdClass));
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
@@ -441,7 +442,7 @@
ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
- rdUnnestedMessage.getFields()[0]);
+ new WritableSerializerDeserializerFactory(vertexIdClass));
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
index d1d927d..145169e 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.pregelix.core.runtime.touchpoint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -29,9 +30,9 @@
}
@Override
- public RecordDescriptor createRecordDescriptor() throws HyracksDataException {
+ public RecordDescriptor createRecordDescriptor(IHyracksTaskContext ctx) throws HyracksDataException {
try {
- return DataflowUtils.getRecordDescriptorFromWritableClasses(fieldClasses);
+ return DataflowUtils.getRecordDescriptorFromWritableClasses(ctx, fieldClasses);
} catch (HyracksException e) {
throw new HyracksDataException(e);
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
index bcf3ffc..e9132c4 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -17,6 +17,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -27,6 +28,7 @@
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.runtime.simpleagg.AccumulatingAggregatorFactory;
import edu.uci.ics.pregelix.runtime.simpleagg.AggregationFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.DatatypeHelper;
public class DataflowUtils {
@@ -40,9 +42,10 @@
throws HyracksException {
RecordDescriptor recordDescriptor = null;
try {
+ ClassLoader loader = DataflowUtils.class.getClassLoader();
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
- (Class<? extends Writable>) Class.forName(className1),
- (Class<? extends Writable>) Class.forName(className2));
+ (Class<? extends Writable>) loader.loadClass(className1),
+ (Class<? extends Writable>) loader.loadClass(className2));
} catch (ClassNotFoundException cnfe) {
throw new HyracksException(cnfe);
}
@@ -53,11 +56,12 @@
public static RecordDescriptor getRecordDescriptorFromWritableClasses(String... classNames) throws HyracksException {
RecordDescriptor recordDescriptor = null;
ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
+ ClassLoader loader = DataflowUtils.class.getClassLoader();
try {
int i = 0;
for (String className : classNames)
- serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) Class
- .forName(className));
+ serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) loader
+ .loadClass(className));
} catch (ClassNotFoundException cnfe) {
throw new HyracksException(cnfe);
}
@@ -79,4 +83,35 @@
new IAggregateFunctionFactory[] { aggFuncFactory });
return aggregatorFactory;
}
+
+ @SuppressWarnings("unchecked")
+ public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(IHyracksTaskContext ctx, String className1,
+ String className2) throws HyracksException {
+ RecordDescriptor recordDescriptor = null;
+ try {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) ctx
+ .getJobletContext().loadClass(className1), (Class<? extends Writable>) ctx.getJobletContext()
+ .loadClass(className2));
+ } catch (Exception cnfe) {
+ throw new HyracksException(cnfe);
+ }
+ return recordDescriptor;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public static RecordDescriptor getRecordDescriptorFromWritableClasses(IHyracksTaskContext ctx, String... classNames)
+ throws HyracksException {
+ RecordDescriptor recordDescriptor = null;
+ ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
+ try {
+ int i = 0;
+ for (String className : classNames)
+ serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) ctx
+ .getJobletContext().loadClass(className));
+ } catch (Exception cnfe) {
+ throw new HyracksException(cnfe);
+ }
+ recordDescriptor = new RecordDescriptor(serdes);
+ return recordDescriptor;
+ }
}
diff --git a/pregelix/pregelix-core/src/main/resources/conf/cluster.properties b/pregelix/pregelix-core/src/main/resources/conf/cluster.properties
index 2d2401a..056cce4 100644
--- a/pregelix/pregelix-core/src/main/resources/conf/cluster.properties
+++ b/pregelix/pregelix-core/src/main/resources/conf/cluster.properties
@@ -4,9 +4,6 @@
#The CC port for Hyracks cluster management
CC_CLUSTERPORT=1099
-#The directory of hyracks binaries
-HYRACKS_HOME=../../../../hyracks
-
#The tmp directory for cc to install jars
CCTMP_DIR=/tmp/t1
@@ -29,9 +26,11 @@
FRAME_SIZE=65536
#CC JAVA_OPTS
-CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+CCJAVA_OPTS="-Xmx1g -Djava.util.logging.config.file=logging.properties"
+# debug option: CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
#NC JAVA_OPTS
-NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
-
+NCJAVA_OPTS="-Xmx1g -Djava.util.logging.config.file=logging.properties"
+# debug option: NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
diff --git a/pregelix/pregelix-core/src/main/resources/hyracks-deployment.properties b/pregelix/pregelix-core/src/main/resources/hyracks-deployment.properties
deleted file mode 100644
index d5d7cd0..0000000
--- a/pregelix/pregelix-core/src/main/resources/hyracks-deployment.properties
+++ /dev/null
@@ -1 +0,0 @@
-nc.bootstrap.class=edu.uci.ics.pregelix.runtime.bootstrap.NCBootstrapImpl
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/getip.sh b/pregelix/pregelix-core/src/main/resources/scripts/getip.sh
index a691c0f..1b44d09 100755
--- a/pregelix/pregelix-core/src/main/resources/scripts/getip.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/getip.sh
@@ -1,3 +1,21 @@
+#!/bin/bash
+#
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+#
+
#get the OS
OS_NAME=`uname -a|awk '{print $1}'`
LINUX_OS='Linux'
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelix b/pregelix/pregelix-core/src/main/resources/scripts/pregelix
index b1a2f74..7232ccc 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/pregelix
+++ b/pregelix/pregelix-core/src/main/resources/scripts/pregelix
@@ -1,22 +1,20 @@
#!/bin/sh
-# ----------------------------------------------------------------------------
-# Copyright 2001-2006 The Apache Software Foundation.
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ----------------------------------------------------------------------------
-#
-# Copyright (c) 2001-2006 The Apache Software Foundation. All rights
-# reserved.
# resolve links - $0 may be a softlink
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc b/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc
new file mode 100755
index 0000000..c1ee3f2
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/resources/scripts/pregelixcc
@@ -0,0 +1,114 @@
+#!/bin/sh
+#
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+#
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ]; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`/"$link"
+ fi
+done
+
+PRGDIR=`dirname "$PRG"`
+BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
+
+
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ Darwin*) darwin=true
+ if [ -z "$JAVA_VERSION" ] ; then
+ JAVA_VERSION="CurrentJDK"
+ else
+ echo "Using Java version: $JAVA_VERSION"
+ fi
+ if [ -z "$JAVA_HOME" ] ; then
+ JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=`java-config --jre-home`
+ fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# If a specific java binary isn't specified search for the standard 'java' binary
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD=`which java`
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo "Error: JAVA_HOME is not defined correctly." 1>&2
+ echo " We cannot execute $JAVACMD" 1>&2
+ exit 1
+fi
+
+if [ -z "$REPO" ]
+then
+ REPO="$BASEDIR"/lib
+fi
+
+CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
+
+for f in ${REPO}/*.jar; do
+ CLASSPATH=${CLASSPATH}:$f;
+done
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+ [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+ [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`
+ [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
+ [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
+fi
+
+exec "$JAVACMD" $JAVA_OPTS \
+ -classpath "$CLASSPATH" \
+ -Dapp.name="pregelixcc" \
+ -Dapp.pid="$$" \
+ -Dapp.repo="$REPO" \
+ -Dapp.home="$BASEDIR" \
+ -Dbasedir="$BASEDIR" \
+ edu.uci.ics.hyracks.control.cc.CCDriver \
+ "$@"
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc b/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc
new file mode 100755
index 0000000..c01b4b4
--- /dev/null
+++ b/pregelix/pregelix-core/src/main/resources/scripts/pregelixnc
@@ -0,0 +1,115 @@
+#!/bin/sh
+#
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+#
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ]; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`/"$link"
+ fi
+done
+
+PRGDIR=`dirname "$PRG"`
+BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
+
+
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ Darwin*) darwin=true
+ if [ -z "$JAVA_VERSION" ] ; then
+ JAVA_VERSION="CurrentJDK"
+ else
+ echo "Using Java version: $JAVA_VERSION"
+ fi
+ if [ -z "$JAVA_HOME" ] ; then
+ JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=`java-config --jre-home`
+ fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# If a specific java binary isn't specified search for the standard 'java' binary
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD=`which java`
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo "Error: JAVA_HOME is not defined correctly." 1>&2
+ echo " We cannot execute $JAVACMD" 1>&2
+ exit 1
+fi
+
+if [ -z "$REPO" ]
+then
+ REPO="$BASEDIR"/lib
+fi
+
+CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
+
+for f in ${REPO}/*.jar; do
+ CLASSPATH=${CLASSPATH}:$f;
+done
+
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+ [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+ [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`2
+ [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
+ [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
+fi
+
+exec "$JAVACMD" $JAVA_OPTS \
+ -classpath "$CLASSPATH" \
+ -Dapp.name="pregelixnc" \
+ -Dapp.pid="$$" \
+ -Dapp.repo="$REPO" \
+ -Dapp.home="$BASEDIR" \
+ -Dbasedir="$BASEDIR" \
+ edu.uci.ics.hyracks.control.nc.NCDriver \
+ -app-nc-main-class edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint "$@"
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startAllNCs.sh b/pregelix/pregelix-core/src/main/resources/scripts/startAllNCs.sh
index d30da26..821be00 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startAllNCs.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startAllNCs.sh
@@ -1,3 +1,21 @@
+#!/bin/bash
+#
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+#
+
PREGELIX_PATH=`pwd`
for i in `cat conf/slaves`
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startCluster.sh b/pregelix/pregelix-core/src/main/resources/scripts/startCluster.sh
index a0c2063..cddf9a4 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startCluster.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startCluster.sh
@@ -1,3 +1,21 @@
+#!/bin/bash
+#
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+#
+
bin/startcc.sh
sleep 5
bin/startAllNCs.sh
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startDebugNc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startDebugNc.sh
index fe6cf27..bb43686 100755
--- a/pregelix/pregelix-core/src/main/resources/scripts/startDebugNc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startDebugNc.sh
@@ -1,3 +1,21 @@
+#!/bin/bash
+#
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+#
+
hostname
#Get the IP address of the cc
@@ -40,11 +58,11 @@
#Set JAVA_OPTS
export JAVA_OPTS=$NCJAVA_OPTS2
-cd $HYRACKS_HOME
-HYRACKS_HOME=`pwd`
+PREGELIX_HOME=`pwd`
#Enter the temp dir
cd $NCTMP_DIR2
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS2}" &> $NCLOGS_DIR2/$NODEID.log &
+#echo ${PREGELIX_HOME}/bin/pregelixnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}"
+${PREGELIX_HOME}/bin/pregelixnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS2}" &> $NCLOGS_DIR2/$NODEID.log &
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
index 133b604..282a6e7 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
@@ -1,4 +1,21 @@
#!/bin/bash
+#
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+#
+
hostname
#Import cluster properties
@@ -20,12 +37,15 @@
export JAVA_HOME=$JAVA_HOME
export JAVA_OPTS=$CCJAVA_OPTS
+PREGELIX_HOME=`pwd`
-chmod -R 755 $HYRACKS_HOME
+#Enter the temp dir
+cd $CCTMP_DIR
+
if [ -f "conf/topology.xml" ]; then
#Launch hyracks cc script with topology
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
+${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
else
#Launch hyracks cc script without toplogy
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
+${PREGELIX_HOME}/bin/pregelixcc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
fi
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
index b059aad..970c30b 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -1,3 +1,21 @@
+#!/bin/bash
+#
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+#
+
hostname
MY_NAME=`hostname`
@@ -39,11 +57,10 @@
#Set JAVA_OPTS
export JAVA_OPTS=$NCJAVA_OPTS
-cd $HYRACKS_HOME
-HYRACKS_HOME=`pwd`
+PREGELIX_HOME=`pwd`
#Enter the temp dir
cd $NCTMP_DIR
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+${PREGELIX_HOME}/bin/pregelixnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/stopAllNCs.sh b/pregelix/pregelix-core/src/main/resources/scripts/stopAllNCs.sh
index 12367c1..dc576d1 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/stopAllNCs.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/stopAllNCs.sh
@@ -1,3 +1,21 @@
+#!/bin/bash
+#
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+#
+
PREGELIX_PATH=`pwd`
for i in `cat conf/slaves`
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/stopCluster.sh b/pregelix/pregelix-core/src/main/resources/scripts/stopCluster.sh
index 4889934..6b829aa 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/stopCluster.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/stopCluster.sh
@@ -1,3 +1,21 @@
+#!/bin/bash
+#
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+#
+
bin/stopAllNCs.sh
sleep 2
bin/stopcc.sh
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/stopcc.sh b/pregelix/pregelix-core/src/main/resources/scripts/stopcc.sh
index c2f525a..8f94fb7 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/stopcc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/stopcc.sh
@@ -1,8 +1,37 @@
+#!/bin/bash
+#
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+#
+
hostname
. conf/cluster.properties
#Kill process
-PID=`ps -ef|grep ${USER}|grep java|grep hyracks|awk '{print $2}'`
+#Kill process
+PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=pregelixcc'|awk '{print $2}'`
+
+if [ "$PID" == "" ]; then
+ PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
+ USERID=`id | sed 's/^uid=//;s/(.*$//'`
+ PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=pregelixcc'|awk '{print $2}'`
+fi
+
echo $PID
kill -9 $PID
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
index 35c4794..26ef089 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
@@ -1,8 +1,26 @@
+#!/bin/bash
+#
+#------------------------------------------------------------------------
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+#
+
hostname
. conf/cluster.properties
#Kill process
-PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
+PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=pregelixnc'|awk '{print $2}'`
if [ "$PID" == "" ]; then
PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
@@ -10,7 +28,7 @@
if [ "$PID" == "" ]; then
USERID=`id | sed 's/^uid=//;s/(.*$//'`
- PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
+ PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=pregelixnc'|awk '{print $2}'`
fi
echo $PID
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
index 4be0bed..dc75b07 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IAggregateFunctionFactory.java
@@ -16,9 +16,11 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
public interface IAggregateFunctionFactory extends Serializable {
- public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException;
+ public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx,
+ IDataOutputProvider provider) throws HyracksException;
}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IRecordDescriptorFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IRecordDescriptorFactory.java
index e7de650..7454f2d 100644
--- a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IRecordDescriptorFactory.java
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/IRecordDescriptorFactory.java
@@ -16,11 +16,12 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public interface IRecordDescriptorFactory extends Serializable {
- public RecordDescriptor createRecordDescriptor() throws HyracksDataException;
+ public RecordDescriptor createRecordDescriptor(IHyracksTaskContext ctx) throws HyracksDataException;
}
diff --git a/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java
new file mode 100644
index 0000000..16b067a
--- /dev/null
+++ b/pregelix/pregelix-dataflow-std-base/src/main/java/edu/uci/ics/pregelix/dataflow/std/base/ISerializerDeserializerFactory.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.pregelix.dataflow.std.base;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+
+public interface ISerializerDeserializerFactory<T> extends Serializable {
+
+ public ISerializerDeserializer<T> getSerializerDeserializer();
+
+}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
index 4cbd6c4..24d28b0 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/FunctionCallOperatorDescriptor.java
@@ -66,7 +66,7 @@
@Override
public void open() throws HyracksDataException {
rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
- : inputRdFactory.createRecordDescriptor();
+ : inputRdFactory.createRecordDescriptor(ctx);
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
index 99bca1a..f75dab2 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/util/FunctionProxy.java
@@ -56,7 +56,7 @@
* @throws HyracksDataException
*/
public void functionOpen() throws HyracksDataException {
- inputRd = inputRdFactory.createRecordDescriptor();
+ inputRd = inputRdFactory.createRecordDescriptor(ctx);
tupleDe = new TupleDeserializer(inputRd);
ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
index eda7754..71592ee 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/FinalAggregateOperatorDescriptor.java
@@ -56,11 +56,11 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new AbstractUnaryInputSinkOperatorNodePushable() {
- private Configuration conf = confFactory.createConfiguration();
+ private Configuration conf = confFactory.createConfiguration(ctx);
@SuppressWarnings("rawtypes")
private GlobalAggregator aggregator = BspUtils.createGlobalAggregator(conf);
private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(),
- inputRdFactory.createRecordDescriptor());
+ inputRdFactory.createRecordDescriptor(ctx));
private ByteBufferInputStream inputStream = new ByteBufferInputStream();
private DataInput input = new DataInputStream(inputStream);
private Writable partialAggregateValue = BspUtils.createFinalAggregateValue(conf);
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index 2402748..1d9c778 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -75,14 +75,15 @@
@Override
public void open() throws HyracksDataException {
rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
- : inputRdFactory.createRecordDescriptor();
+ : inputRdFactory.createRecordDescriptor(ctx);
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- conf = confFactory.createConfiguration();
+ conf = confFactory.createConfiguration(ctx);
VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
context = ctxFactory.createContext(conf, partition);
+ context.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
try {
vertexWriter = outputFormat.createVertexWriter(context);
} catch (InterruptedException e) {
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
index b1bb555..9a87f02 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/MaterializingReadOperatorDescriptor.java
@@ -66,14 +66,7 @@
} catch (Exception e) {
writer.fail();
throw new HyracksDataException(e);
- } finally {
- /**
- * remove last iteration's state
- */
- IterationUtils.removeIterationState(ctx, partition);
- writer.close();
}
- complete = true;
}
}
@@ -84,7 +77,12 @@
@Override
public void close() throws HyracksDataException {
-
+ /**
+ * remove last iteration's state
+ */
+ IterationUtils.removeIterationState(ctx, partition);
+ writer.close();
+ complete = true;
}
};
}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
index 88a0dda..f54d176 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/TerminationStateWriterOperatorDescriptor.java
@@ -45,10 +45,10 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new AbstractUnaryInputSinkOperatorNodePushable() {
- private Configuration conf = confFactory.createConfiguration();
+ private Configuration conf = confFactory.createConfiguration(ctx);
private boolean terminate = true;
@Override
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index 0da7baf..343ba7e 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -81,15 +81,12 @@
final List<FileSplit> splits = splitsFactory.getSplits();
return new AbstractUnaryOutputSourceOperatorNodePushable() {
- private ClassLoader ctxCL;
private ContextFactory ctxFactory = new ContextFactory();
@Override
public void initialize() throws HyracksDataException {
- ctxCL = Thread.currentThread().getContextClassLoader();
try {
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- Configuration conf = confFactory.createConfiguration();
+ Configuration conf = confFactory.createConfiguration(ctx);
writer.open();
for (int i = 0; i < scheduledLocations.length; i++) {
if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
@@ -109,8 +106,6 @@
writer.close();
} catch (Exception e) {
throw new HyracksDataException(e);
- } finally {
- Thread.currentThread().setContextClassLoader(ctxCL);
}
}
@@ -135,10 +130,11 @@
VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
InputSplit split = splits.get(splitId);
TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splitId);
+ mapperContext.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
VertexReader vertexReader = vertexInputFormat.createVertexReader(split, mapperContext);
vertexReader.initialize(split, mapperContext);
- Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
+ Vertex readerVertex = (Vertex) BspUtils.createVertex(mapperContext.getConfiguration());
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldSize);
DataOutput dos = tb.getDataOutput();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
index d7cbb3a..5da0239 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
@@ -64,7 +64,7 @@
@Override
public void open() throws HyracksDataException {
rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
- : inputRdFactory.createRecordDescriptor();
+ : inputRdFactory.createRecordDescriptor(ctx);
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
try {
outputWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(splits[partition]
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
index b31f376..2e58196 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/base/IConfigurationFactory.java
@@ -18,10 +18,11 @@
import org.apache.hadoop.conf.Configuration;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public interface IConfigurationFactory extends Serializable {
- public Configuration createConfiguration() throws HyracksDataException;
+ public Configuration createConfiguration(IHyracksTaskContext ctx) throws HyracksDataException;
}
diff --git a/pregelix/pregelix-example/data/clique2/clique.txt b/pregelix/pregelix-example/data/clique2/clique.txt
new file mode 100755
index 0000000..68fbaea
--- /dev/null
+++ b/pregelix/pregelix-example/data/clique2/clique.txt
@@ -0,0 +1,6 @@
+1 2 3 4
+2 1 3
+3 1 2 4 5
+4 1 3
+5 3 6
+6 5
\ No newline at end of file
diff --git a/pregelix/pregelix-example/data/clique3/clique.txt b/pregelix/pregelix-example/data/clique3/clique.txt
new file mode 100755
index 0000000..ed0b7c1
--- /dev/null
+++ b/pregelix/pregelix-example/data/clique3/clique.txt
@@ -0,0 +1,20 @@
+0 1 19
+1 2 3 4 5 6 7 8 9
+2 1 3 4 5 6 7 8 9
+3 1 2 4 5 6 7 8 9
+4 1 2 3 5 6 7 8 9
+5 1 2 3 4 6 7 8 9
+6 1 2 3 4 5 7 8 9
+7 1 2 3 4 5 6 8 9
+8 1 2 3 4 5 6 7 9
+9 1 2 3 4 5 6 7 8 10
+10 9 11
+11 10 12 13 14 15 16 17 18 19
+12 11 13 14 15 16 17 18 19
+13 11 12 14 15 16 17 18 19
+14 11 12 13 15 16 17 18 19
+15 11 12 13 14 16 17 18 19
+16 11 12 13 14 15 17 18 19
+17 11 12 13 14 15 16 18 19
+18 11 12 13 14 15 16 17 19
+19 0 11 12 13 14 15 16 17 18
\ No newline at end of file
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index 37da84f..7ca7e5d 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -69,7 +69,7 @@
<version>2.7.2</version>
<configuration>
<forkMode>pertest</forkMode>
- <argLine>-enableassertions -Xmx512m -XX:MaxPermSize=300m -Dfile.encoding=UTF-8
+ <argLine>-enableassertions -Xmx2047m -XX:MaxPermSize=300m -Dfile.encoding=UTF-8
-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
<includes>
<include>**/*TestSuite.java</include>
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
index a8a752e..8d0b776 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/inputformat/TextPageRankInputFormat.java
@@ -39,7 +39,7 @@
@Override
public VertexReader<VLongWritable, DoubleWritable, FloatWritable, DoubleWritable> createVertexReader(
InputSplit split, TaskAttemptContext context) throws IOException {
- return new TextPageRankGraphReader(textInputFormat.createRecordReader(split, context));
+ return new TextPageRankGraphReader(textInputFormat.createRecordReader(split, context), context);
}
}
@@ -52,7 +52,7 @@
private List<VLongWritable> pool = new ArrayList<VLongWritable>();
private int used = 0;
- public TextPageRankGraphReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ public TextPageRankGraphReader(RecordReader<LongWritable, Text> lineRecordReader, TaskAttemptContext context) {
super(lineRecordReader);
}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java
index 0e22ea1..d2c5e6f 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/CliquesWritable.java
@@ -32,6 +32,7 @@
private List<VLongWritable> cliques = new ArrayList<VLongWritable>();
private int sizeOfClique = 0;
+ private VLongWritable srcId = new VLongWritable(0);
public CliquesWritable(List<VLongWritable> cliques, int sizeOfClique) {
this.cliques = cliques;
@@ -43,6 +44,16 @@
}
/**
+ * Set the srcId
+ *
+ * @param srcId
+ * the source vertex Id
+ */
+ public void setSrcId(VLongWritable srcId) {
+ this.srcId = srcId;
+ }
+
+ /**
* Set the size of cliques.
*
* @param sizeOfClique
@@ -103,6 +114,11 @@
cliques.add(vid);
}
}
+
+ if (srcId == null) {
+ srcId = new VLongWritable();
+ }
+ srcId.readFields(input);
}
@Override
@@ -117,6 +133,8 @@
for (int i = 0; i < cliques.size(); i++) {
cliques.get(i).write(output);
}
+
+ srcId.write(output);
}
@Override
@@ -126,11 +144,14 @@
StringBuffer sb = new StringBuffer();
int numCliques = cliques.size() / sizeOfClique;
for (int i = 0; i < numCliques; i++) {
+ sb.append(srcId);
+ sb.append(",");
+ int start = i * sizeOfClique;
for (int j = 0; j < sizeOfClique - 1; j++) {
- sb.append(cliques.get(j));
+ sb.append(cliques.get(start + j));
sb.append(",");
}
- sb.append(cliques.get(sizeOfClique - 1));
+ sb.append(cliques.get(start + sizeOfClique - 1));
sb.append(";");
}
return sb.toString();
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
index 266feb7..85a139e 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/maximalclique/MaximalCliqueVertex.java
@@ -44,8 +44,8 @@
/**
* The maximal clique example -- find maximal cliques in an undirected graph.
- * The result cliques contains vertexes ordered by the vertex id ascendingly. The algorithm takes
- * advantage of that property to do effective pruning.
+ * The result cliques contains vertexes ordered by the vertex id ascendingly.
+ * The algorithm takes advantage of that property to do effective pruning.
*/
public class MaximalCliqueVertex extends Vertex<VLongWritable, CliquesWritable, NullWritable, AdjacencyListWritable> {
@@ -55,7 +55,7 @@
private int largestCliqueSizeSoFar = 0;
private List<BitSet> currentMaximalCliques = new ArrayList<BitSet>();
private CliquesWritable tmpValue = new CliquesWritable();
- private List<VLongWritable> cliques = new ArrayList<VLongWritable>();
+ private List<VLongWritable> cliqueList = new ArrayList<VLongWritable>();
/**
* Update the current maximal cliques
@@ -68,7 +68,6 @@
vertexList.clear();
invertedMap.clear();
currentMaximalCliques.clear();
- cliques.clear();
tmpValue.reset();
// build the initial sub graph
@@ -76,8 +75,6 @@
AdjacencyListWritable adj = values.next();
map.put(adj.getSource(), adj);
}
- VLongWritable srcId = getVertexId();
- map.put(srcId, new AdjacencyListWritable());
// build the vertex list (vertex id in ascending order) and the inverted list of vertexes
int i = 0;
@@ -86,12 +83,14 @@
invertedMap.put(v, i++);
}
- //clean up adjacency list --- remove vertexes who are not neighbors of key
+ // clean up adjacency list --- remove vertexes who are not neighbors of
+ // key
for (AdjacencyListWritable adj : map.values()) {
adj.cleanNonMatch(vertexList);
}
- // get the h-index of the subgraph --- which is the maximum depth to explore
+ // get the h-index of the subgraph --- which is the maximum depth to
+ // explore
int[] neighborCounts = new int[map.size()];
i = 0;
for (AdjacencyListWritable adj : map.values()) {
@@ -105,11 +104,13 @@
}
h++;
}
- if (h < largestCliqueSizeSoFar) {
+
+ // the clique size is upper-bounded by h+1
+ if (h + 1 < largestCliqueSizeSoFar) {
return;
}
- //start depth-first search
+ // start depth-first search
BitSet cliqueSoFar = new BitSet(h);
for (VLongWritable v : vertexList) {
cliqueSoFar.set(invertedMap.get(v));
@@ -117,16 +118,15 @@
cliqueSoFar.clear();
}
- //output local maximal cliques
+ // output local maximal cliques
+ tmpValue.setSrcId(getVertexId());
for (BitSet clique : currentMaximalCliques) {
- int keyIndex = invertedMap.get(srcId);
- clique.set(keyIndex);
generateClique(clique);
- tmpValue.addCliques(cliques);
+ tmpValue.addCliques(cliqueList);
tmpValue.setCliqueSize(clique.cardinality());
}
- //update the vertex state
+ // update the vertex state
setVertexValue(tmpValue);
}
@@ -136,13 +136,15 @@
* @param clique
* the bitmap representation of a clique
*/
- private void generateClique(BitSet clique) {
+ private List<VLongWritable> generateClique(BitSet clique) {
+ cliqueList.clear();
for (int j = 0; j < clique.length();) {
j = clique.nextSetBit(j);
VLongWritable v = vertexList.get(j);
- cliques.add(v);
+ cliqueList.add(v);
j++;
}
+ return cliqueList;
}
/**
@@ -170,7 +172,7 @@
while (neighbors.hasNext()) {
VLongWritable neighbor = neighbors.next();
if (!isTested(neighbor, cliqueSoFar) && isClique(neighbor, cliqueSoFar)) {
- //snapshot the clique
+ // snapshot the clique
int cliqueLength = cliqueSoFar.length();
// expand the clique
cliqueSoFar.set(invertedMap.get(neighbor));
@@ -217,7 +219,8 @@
int largestSetIndex = cliqueSoFar.length() - 1;
if (index > largestSetIndex) {
// we only return cliques with vertexes in the ascending order
- // hence, the new vertex must be larger than the largesetSetIndex in the clique
+ // hence, the new vertex must be larger than the largesetSetIndex in
+ // the clique
return false;
} else {
// otherwise, we think the vertex is "tested"
@@ -236,7 +239,8 @@
*/
private boolean isClique(VLongWritable newVertex, BitSet cliqueSoFar) {
AdjacencyListWritable adj = map.get(newVertex);
- // check whether each existing vertex is in the neighbor set of newVertex
+ // check whether each existing vertex is in the neighbor set of
+ // newVertex
for (int i = 0; i < cliqueSoFar.length();) {
i = cliqueSoFar.nextSetBit(i);
VLongWritable v = vertexList.get(i);
@@ -249,9 +253,8 @@
}
/**
- * For superstep 1, send outgoing mesages.
- * For superstep 2, calculate maximal cliques.
- * otherwise, vote to halt.
+ * For superstep 1, send outgoing mesages. For superstep 2, calculate
+ * maximal cliques. otherwise, vote to halt.
*/
@Override
public void compute(Iterator<AdjacencyListWritable> msgIterator) {
@@ -300,9 +303,11 @@
private void sendOutgoingMsgs(List<Edge<VLongWritable, NullWritable>> edges) {
for (int i = 0; i < edges.size(); i++) {
if (edges.get(i).getDestVertexId().get() < getVertexId().get()) {
- // only add emit for the vertexes whose id is smaller than the vertex id
+ // only add emit for the vertexes whose id is smaller than the
+ // vertex id
// to avoid the duplicate removal step,
- // because all the resulting cliques will have vertexes in the ascending order.
+ // because all the resulting cliques will have vertexes in the
+ // ascending order.
AdjacencyListWritable msg = new AdjacencyListWritable();
msg.setSource(getVertexId());
for (int j = i + 1; j < edges.size(); j++) {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
index ca5a1c4..0a5b214 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobgen/JobGenerator.java
@@ -55,6 +55,8 @@
private static String HDFS_OUTPUTPAH2 = "/resultcomplex";
private static String HDFS_INPUTPATH3 = "/clique";
+ private static String HDFS_INPUTPATH4 = "/clique2";
+ private static String HDFS_INPUTPATH5 = "/clique3";
private static String HDFS_OUTPUTPAH3 = "/resultclique";
private static void generatePageRankJobReal(String jobName, String outputPath) throws IOException {
@@ -218,6 +220,30 @@
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
}
+
+ private static void generateMaximalCliqueJob2(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(MaximalCliqueVertex.class);
+ job.setGlobalAggregatorClass(MaximalCliqueAggregator.class);
+ job.setDynamicVertexValueSize(true);
+ job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
+ job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH4);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
+
+ private static void generateMaximalCliqueJob3(String jobName, String outputPath) throws IOException {
+ PregelixJob job = new PregelixJob(jobName);
+ job.setVertexClass(MaximalCliqueVertex.class);
+ job.setGlobalAggregatorClass(MaximalCliqueAggregator.class);
+ job.setDynamicVertexValueSize(true);
+ job.setVertexInputFormatClass(TextMaximalCliqueInputFormat.class);
+ job.setVertexOutputFormatClass(MaximalCliqueVertexOutputFormat.class);
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH5);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
+ job.getConfiguration().writeXml(new FileOutputStream(new File(outputPath)));
+ }
private static void generateGraphMutationJob(String jobName, String outputPath) throws IOException {
PregelixJob job = new PregelixJob(jobName);
@@ -261,6 +287,8 @@
private static void genMaximalClique() throws IOException {
generateMaximalCliqueJob("Maximal Clique", outputBase + "MaximalClique.xml");
+ generateMaximalCliqueJob2("Maximal Clique 2", outputBase + "MaximalClique2.xml");
+ generateMaximalCliqueJob3("Maximal Clique 3", outputBase + "MaximalClique3.xml");
}
private static void genGraphMutation() throws IOException {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
index 5a556fa..00f6f54 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestCase.java
@@ -25,20 +25,13 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.pregelix.api.job.PregelixJob;
-import edu.uci.ics.pregelix.core.jobgen.JobGen;
-import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
-import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
+import edu.uci.ics.pregelix.core.base.IDriver.Plan;
+import edu.uci.ics.pregelix.core.driver.Driver;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
-import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
import edu.uci.ics.pregelix.example.util.TestUtils;
public class RunJobTestCase extends TestCase {
- private static final String NC1 = "nc1";
- private static final String HYRACKS_APP_NAME = "pregelix";
private static String HDFS_INPUTPATH = "/webmap";
private static String HDFS_OUTPUTPAH = "/result";
@@ -48,14 +41,21 @@
private static String HDFS_INPUTPATH3 = "/clique";
private static String HDFS_OUTPUTPAH3 = "/resultclique";
- private final PregelixJob job;
- private JobGen[] giraphJobGens;
- private final String resultFileName;
- private final String expectedFileName;
- private final String jobFile;
+ private static String HDFS_INPUTPATH4 = "/clique2";
+ private static String HDFS_OUTPUTPAH4 = "/resultclique";
- public RunJobTestCase(String hadoopConfPath, String jobName, String jobFile, String resultFile, String expectedFile)
- throws Exception {
+ private static String HDFS_INPUTPATH5 = "/clique3";
+ private static String HDFS_OUTPUTPAH5 = "/resultclique";
+
+ private final PregelixJob job;
+ private final String resultFileDir;
+ private final String expectedFileDir;
+ private final String jobFile;
+ private final Driver driver = new Driver(this.getClass());
+ private final FileSystem dfs;
+
+ public RunJobTestCase(String hadoopConfPath, String jobName, String jobFile, String resultFile,
+ String expectedFile, FileSystem dfs) throws Exception {
super("test");
this.jobFile = jobFile;
this.job = new PregelixJob("test");
@@ -68,21 +68,20 @@
} else if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH2)) {
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH2);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH2));
- } else {
+ } else if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH3)) {
FileInputFormat.setInputPaths(job, HDFS_INPUTPATH3);
FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH3));
+ } else if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH4)) {
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH4);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH4));
+ } else if (inputPaths[0].toString().endsWith(HDFS_INPUTPATH5)) {
+ FileInputFormat.setInputPaths(job, HDFS_INPUTPATH5);
+ FileOutputFormat.setOutputPath(job, new Path(HDFS_OUTPUTPAH5));
}
job.setJobName(jobName);
- this.resultFileName = resultFile;
- this.expectedFileName = expectedFile;
- giraphJobGens = new JobGen[4];
- giraphJobGens[0] = new JobGenOuterJoin(job);
- waitawhile();
- giraphJobGens[1] = new JobGenInnerJoin(job);
- waitawhile();
- giraphJobGens[2] = new JobGenOuterJoinSort(job);
- waitawhile();
- giraphJobGens[3] = new JobGenOuterJoinSingleSort(job);
+ this.resultFileDir = resultFile;
+ this.expectedFileDir = expectedFile;
+ this.dfs = dfs;
}
private void waitawhile() throws InterruptedException {
@@ -94,89 +93,19 @@
@Test
public void test() throws Exception {
setUp();
- for (JobGen jobGen : giraphJobGens) {
- FileSystem dfs = FileSystem.get(job.getConfiguration());
- dfs.delete(new Path(HDFS_OUTPUTPAH), true);
- runCreate(jobGen);
- runDataLoad(jobGen);
- int i = 1;
- boolean terminate = false;
- do {
- runLoopBodyIteration(jobGen, i);
- terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId());
- i++;
- } while (!terminate);
- runIndexScan(jobGen);
- runHDFSWRite(jobGen);
- runCleanup(jobGen);
- compareResults();
+ Plan[] plans = new Plan[] { Plan.OUTER_JOIN };
+ for (Plan plan : plans) {
+ driver.runJob(job, plan, PregelixHyracksIntegrationUtil.CC_HOST,
+ PregelixHyracksIntegrationUtil.TEST_HYRACKS_CC_CLIENT_PORT, false);
}
+ compareResults();
tearDown();
waitawhile();
}
- private void runCreate(JobGen jobGen) throws Exception {
- try {
- JobSpecification treeCreateJobSpec = jobGen.generateCreatingJob();
- PregelixHyracksIntegrationUtil.runJob(treeCreateJobSpec, HYRACKS_APP_NAME);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void runDataLoad(JobGen jobGen) throws Exception {
- try {
- JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
- PregelixHyracksIntegrationUtil.runJob(bulkLoadJobSpec, HYRACKS_APP_NAME);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
- try {
- JobSpecification loopBody = jobGen.generateJob(iteration);
- PregelixHyracksIntegrationUtil.runJob(loopBody, HYRACKS_APP_NAME);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void runIndexScan(JobGen jobGen) throws Exception {
- try {
- JobSpecification scanSortPrintJobSpec = jobGen.scanIndexPrintGraph(NC1, resultFileName);
- PregelixHyracksIntegrationUtil.runJob(scanSortPrintJobSpec, HYRACKS_APP_NAME);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void runHDFSWRite(JobGen jobGen) throws Exception {
- try {
- JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
- PregelixHyracksIntegrationUtil.runJob(scanSortPrintJobSpec, HYRACKS_APP_NAME);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void runCleanup(JobGen jobGen) throws Exception {
- try {
- JobSpecification[] cleanups = jobGen.generateCleanup();
- runJobArray(cleanups);
- } catch (Exception e) {
- throw e;
- }
- }
-
- private void runJobArray(JobSpecification[] jobs) throws Exception {
- for (JobSpecification job : jobs) {
- PregelixHyracksIntegrationUtil.runJob(job, HYRACKS_APP_NAME);
- }
- }
-
private void compareResults() throws Exception {
- TestUtils.compareWithResult(new File(resultFileName), new File(expectedFileName));
+ dfs.copyToLocalFile(FileOutputFormat.getOutputPath(job), new Path(resultFileDir));
+ TestUtils.compareWithResultDir(new File(expectedFileDir), new File(resultFileDir));
}
public String toString() {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index 4bf83e6..87bc40d 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -62,7 +62,12 @@
private static final String DATA_PATH3 = "data/clique/clique.txt";
private static final String HDFS_PATH3 = "/clique/";
- private static final String HYRACKS_APP_NAME = "pregelix";
+ private static final String DATA_PATH4 = "data/clique2/clique.txt";
+ private static final String HDFS_PATH4 = "/clique2/";
+
+ private static final String DATA_PATH5 = "data/clique3/clique.txt";
+ private static final String HDFS_PATH5 = "/clique3/";
+
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
private MiniDFSCluster dfsCluster;
@@ -111,6 +116,16 @@
dfs.mkdirs(dest);
dfs.copyFromLocalFile(src, dest);
+ src = new Path(DATA_PATH4);
+ dest = new Path(HDFS_PATH4);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
+ src = new Path(DATA_PATH5);
+ dest = new Path(HDFS_PATH5);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
+
DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
conf.writeXml(confOutput);
confOutput.flush();
@@ -138,6 +153,7 @@
RunJobTestSuite testSuite = new RunJobTestSuite();
testSuite.setUp();
boolean onlyEnabled = false;
+ FileSystem dfs = FileSystem.get(testSuite.conf);
if (onlys.size() > 0) {
onlyEnabled = true;
@@ -153,7 +169,7 @@
String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
String expectedFileName = EXPECTED_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile.getAbsolutePath()
- .toString(), resultFileName, expectedFileName));
+ .toString(), resultFileName, expectedFileName, dfs));
}
}
}
@@ -193,7 +209,7 @@
private static String jobExtToResExt(String fname) {
int dot = fname.lastIndexOf('.');
- return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
+ return fname.substring(0, dot);
}
private static boolean isInList(List<String> onlys, String name) {
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
index d89ec46..d406125 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/util/TestUtils.java
@@ -20,6 +20,13 @@
public class TestUtils {
+ public static void compareWithResultDir(File expectedFileDir, File actualFileDir) throws Exception {
+ String[] fileNames = expectedFileDir.list();
+ for (String fileName : fileNames) {
+ compareWithResult(new File(expectedFileDir, fileName), new File(actualFileDir, fileName));
+ }
+ }
+
public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
@@ -28,7 +35,6 @@
try {
while ((lineExpected = readerExpected.readLine()) != null) {
lineActual = readerActual.readLine();
- // Assert.assertEquals(lineExpected, lineActual);
if (lineActual == null) {
throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
}
@@ -62,8 +68,10 @@
if (row1.equals(row2))
continue;
- String[] fields1 = row1.split(" ");
- String[] fields2 = row2.split(" ");
+ boolean spaceOrTab = false;
+ spaceOrTab = row1.contains(" ");
+ String[] fields1 = spaceOrTab ? row1.split(" ") : row1.split("\t");
+ String[] fields2 = spaceOrTab ? row2.split(" ") : row2.split("\t");
for (int j = 0; j < fields1.length; j++) {
if (fields1[j].equals(fields2[j])) {
@@ -76,7 +84,7 @@
float float1 = (float) double1.doubleValue();
float float2 = (float) double2.doubleValue();
- if (Math.abs(float1 - float2) == 0)
+ if (Math.abs(float1 - float2) < 1.0e-7)
continue;
else {
return false;
diff --git a/pregelix/pregelix-example/src/test/resources/cluster/stores.properties b/pregelix/pregelix-example/src/test/resources/cluster/stores.properties
index daf881e..04732be 100644
--- a/pregelix/pregelix-example/src/test/resources/cluster/stores.properties
+++ b/pregelix/pregelix-example/src/test/resources/cluster/stores.properties
@@ -1 +1 @@
-store=teststore
\ No newline at end of file
+store=teststore1,teststore2
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal.result b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal.result
deleted file mode 100644
index 45376e2..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal.result
+++ /dev/null
@@ -1,20 +0,0 @@
-0 0
-1 0
-2 0
-3 0
-4 0
-5 0
-6 0
-7 0
-8 0
-9 0
-10 0
-11 0
-12 0
-13 0
-14 0
-15 0
-16 0
-17 0
-18 0
-19 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-0 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-0
new file mode 100755
index 0000000..f1f1d9b
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-0
@@ -0,0 +1,5 @@
+0 0
+4 0
+8 0
+12 0
+16 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-1 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-1
new file mode 100755
index 0000000..0fa02c1
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-1
@@ -0,0 +1,5 @@
+1 0
+5 0
+9 0
+13 0
+17 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-2 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-2
new file mode 100755
index 0000000..542ccae
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-2
@@ -0,0 +1,5 @@
+2 0
+6 0
+10 0
+14 0
+18 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-3 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-3
new file mode 100755
index 0000000..1d5d6d9
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsReal/part-3
@@ -0,0 +1,5 @@
+3 0
+7 0
+11 0
+15 0
+19 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex.result b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex.result
deleted file mode 100644
index dbc30fc..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex.result
+++ /dev/null
@@ -1,23 +0,0 @@
-0 0
-1 0
-2 0
-3 0
-4 0
-5 0
-6 0
-7 0
-8 0
-9 0
-10 0
-11 0
-12 0
-13 0
-14 0
-15 0
-16 0
-17 0
-18 0
-19 0
-21 21
-25 25
-27 27
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-0 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-0
new file mode 100755
index 0000000..f1f1d9b
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-0
@@ -0,0 +1,5 @@
+0 0
+4 0
+8 0
+12 0
+16 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-1 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-1
new file mode 100755
index 0000000..4e7d87a
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-1
@@ -0,0 +1,7 @@
+1 0
+5 0
+9 0
+13 0
+17 0
+21 21
+25 25
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-2 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-2
new file mode 100755
index 0000000..542ccae
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-2
@@ -0,0 +1,5 @@
+2 0
+6 0
+10 0
+14 0
+18 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-3 b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-3
new file mode 100755
index 0000000..513f3ff
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ConnectedComponentsRealComplex/part-3
@@ -0,0 +1,6 @@
+3 0
+7 0
+11 0
+15 0
+19 0
+27 27
diff --git a/pregelix/pregelix-example/src/test/resources/expected/GraphMutation.result b/pregelix/pregelix-example/src/test/resources/expected/GraphMutation.result
deleted file mode 100644
index a30166c..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/GraphMutation.result
+++ /dev/null
@@ -1,13 +0,0 @@
-1 0.0
-5 0.0
-7 0.0
-11 0.0
-13 0.0
-17 0.0
-19 0.0
-100 0.0
-500 0.0
-700 0.0
-1100 0.0
-1300 0.0
-1700 0.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/GraphMutation/part-0 b/pregelix/pregelix-example/src/test/resources/expected/GraphMutation/part-0
new file mode 100755
index 0000000..b5f7ed3
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/GraphMutation/part-0
@@ -0,0 +1,6 @@
+100 0.0
+500 0.0
+700 0.0
+1100 0.0
+1300 0.0
+1700 0.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/GraphMutation/part-1 b/pregelix/pregelix-example/src/test/resources/expected/GraphMutation/part-1
new file mode 100755
index 0000000..4eca51d
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/GraphMutation/part-1
@@ -0,0 +1,4 @@
+1 0.0
+5 0.0
+13 0.0
+17 0.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/GraphMutation/part-2 b/pregelix/pregelix-example/src/test/resources/expected/GraphMutation/part-2
new file mode 100755
index 0000000..e69de29
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/GraphMutation/part-2
diff --git a/pregelix/pregelix-example/src/test/resources/expected/GraphMutation/part-3 b/pregelix/pregelix-example/src/test/resources/expected/GraphMutation/part-3
new file mode 100755
index 0000000..9446ff7
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/GraphMutation/part-3
@@ -0,0 +1,3 @@
+7 0.0
+11 0.0
+19 0.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique.result b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique.result
deleted file mode 100644
index d238037..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique.result
+++ /dev/null
@@ -1,7 +0,0 @@
-1 1,2,3,4;
-2 2,3,4;
-3
-4
-5
-6
-7
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique/part-0 b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique/part-0
new file mode 100755
index 0000000..902fadf
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique/part-0
@@ -0,0 +1 @@
+4
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique/part-1 b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique/part-1
new file mode 100755
index 0000000..ba34424
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique/part-1
@@ -0,0 +1,2 @@
+1 1,2,3,4;
+5
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique/part-2 b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique/part-2
new file mode 100755
index 0000000..834e389
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique/part-2
@@ -0,0 +1,2 @@
+2 2,3,4;
+6
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique/part-3 b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique/part-3
new file mode 100755
index 0000000..b8e2461
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique/part-3
@@ -0,0 +1,2 @@
+3 3,4;
+7
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique2/part-0 b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique2/part-0
new file mode 100755
index 0000000..902fadf
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique2/part-0
@@ -0,0 +1 @@
+4
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique2/part-1 b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique2/part-1
new file mode 100755
index 0000000..b83e1a3
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique2/part-1
@@ -0,0 +1,2 @@
+1 1,2,3;1,3,4;
+5
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique2/part-2 b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique2/part-2
new file mode 100755
index 0000000..45e2a23
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique2/part-2
@@ -0,0 +1,2 @@
+2 2,3;
+6
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique2/part-3 b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique2/part-3
new file mode 100755
index 0000000..447aa38
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique2/part-3
@@ -0,0 +1 @@
+3 3,4;3,5;
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique3/part-0 b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique3/part-0
new file mode 100755
index 0000000..b4dced4
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique3/part-0
@@ -0,0 +1,5 @@
+0 0,19;
+4 4,5,6,7,8,9;
+8
+12 12,13,14,15,16,17,18,19;
+16
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique3/part-1 b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique3/part-1
new file mode 100755
index 0000000..f554dfe
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique3/part-1
@@ -0,0 +1,5 @@
+1 1,2,3,4,5,6,7,8,9;
+5
+9
+13
+17
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique3/part-2 b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique3/part-2
new file mode 100755
index 0000000..4df9b8c
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique3/part-2
@@ -0,0 +1,5 @@
+2 2,3,4,5,6,7,8,9;
+6
+10
+14
+18
diff --git a/pregelix/pregelix-example/src/test/resources/expected/MaximalClique3/part-3 b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique3/part-3
new file mode 100755
index 0000000..5131560
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/MaximalClique3/part-3
@@ -0,0 +1,5 @@
+3 3,4,5,6,7,8,9;
+7
+11 11,12,13,14,15,16,17,18,19;
+15
+19
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRank.result b/pregelix/pregelix-example/src/test/resources/expected/PageRank.result
deleted file mode 100644
index 9c4d83a..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/PageRank.result
+++ /dev/null
@@ -1,20 +0,0 @@
-0 0.008290140026154316
-1 0.1535152819247165
-2 0.14646839195826475
-3 0.08125113985998214
-4 0.03976979906329426
-5 0.0225041581462058
-6 0.015736276824953852
-7 0.012542224114863661
-8 0.010628239626209894
-9 0.009294348455354817
-10 0.008290140026154316
-11 0.15351528192471647
-12 0.14646839195826472
-13 0.08125113985998214
-14 0.03976979906329425
-15 0.0225041581462058
-16 0.015736276824953852
-17 0.012542224114863661
-18 0.010628239626209894
-19 0.009294348455354817
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRank/part-0 b/pregelix/pregelix-example/src/test/resources/expected/PageRank/part-0
new file mode 100755
index 0000000..5b07add
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRank/part-0
@@ -0,0 +1,10 @@
+0 0.008290140026154316
+4 0.039769799063294246
+8 0.010628239626209894
+12 0.14646839195826478
+16 0.015736276824953852
+20 0.008290140026154316
+24 0.039769799063294246
+28 0.010628239626209894
+32 0.14646839195826478
+36 0.015736276824953852
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRank/part-1 b/pregelix/pregelix-example/src/test/resources/expected/PageRank/part-1
new file mode 100755
index 0000000..caf8a4c
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRank/part-1
@@ -0,0 +1,10 @@
+1 0.15351528192471647
+5 0.0225041581462058
+9 0.009294348455354817
+13 0.08125113985998214
+17 0.012542224114863661
+21 0.15351528192471647
+25 0.0225041581462058
+29 0.009294348455354817
+33 0.08125113985998214
+37 0.012542224114863661
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRank/part-2 b/pregelix/pregelix-example/src/test/resources/expected/PageRank/part-2
new file mode 100755
index 0000000..29a9d52
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRank/part-2
@@ -0,0 +1,10 @@
+2 0.14646839195826475
+6 0.015736276824953852
+10 0.008290140026154316
+14 0.03976979906329426
+18 0.010628239626209894
+22 0.14646839195826472
+26 0.01573627682495385
+30 0.008290140026154316
+34 0.03976979906329426
+38 0.010628239626209894
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRank/part-3 b/pregelix/pregelix-example/src/test/resources/expected/PageRank/part-3
new file mode 100755
index 0000000..c9bfeb8
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRank/part-3
@@ -0,0 +1,10 @@
+3 0.08125113985998214
+7 0.012542224114863663
+11 0.1535152819247165
+15 0.0225041581462058
+19 0.009294348455354817
+23 0.08125113985998211
+27 0.012542224114863661
+31 0.1535152819247165
+35 0.0225041581462058
+39 0.009294348455354817
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankReal.result b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal.result
deleted file mode 100644
index 6432eda..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/PageRankReal.result
+++ /dev/null
@@ -1,20 +0,0 @@
-0 0.008290140026154316
-1 0.1535152819247165
-2 0.14646839195826475
-3 0.08125113985998214
-4 0.03976979906329426
-5 0.0225041581462058
-6 0.015736276824953852
-7 0.012542224114863661
-8 0.010628239626209894
-9 0.009294348455354817
-10 0.008290140026154316
-11 0.15351528192471647
-12 0.14646839195826472
-13 0.08125113985998214
-14 0.03976979906329426
-15 0.0225041581462058
-16 0.015736276824953852
-17 0.012542224114863661
-18 0.010628239626209894
-19 0.009294348455354817
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankReal/part-0 b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal/part-0
new file mode 100755
index 0000000..383076e
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal/part-0
@@ -0,0 +1,5 @@
+0 0.008290140026154316
+4 0.03976979906329426
+8 0.010628239626209894
+12 0.14646839195826478
+16 0.015736276824953852
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankReal/part-1 b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal/part-1
new file mode 100755
index 0000000..0b1a38c
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal/part-1
@@ -0,0 +1,5 @@
+1 0.15351528192471653
+5 0.0225041581462058
+9 0.009294348455354817
+13 0.08125113985998214
+17 0.012542224114863661
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankReal/part-2 b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal/part-2
new file mode 100755
index 0000000..ec995b2
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal/part-2
@@ -0,0 +1,5 @@
+2 0.14646839195826478
+6 0.015736276824953852
+10 0.008290140026154316
+14 0.03976979906329426
+18 0.010628239626209894
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankReal/part-3 b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal/part-3
new file mode 100755
index 0000000..edb7484
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankReal/part-3
@@ -0,0 +1,5 @@
+3 0.08125113985998214
+7 0.012542224114863661
+11 0.15351528192471653
+15 0.0225041581462058
+19 0.009294348455354817
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex.result b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex.result
deleted file mode 100644
index 2bd09e1..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex.result
+++ /dev/null
@@ -1,23 +0,0 @@
-0 0.0072088164890121405
-1 0.12352056961948686
-2 0.12045670441668178
-3 0.06798545786459467
-4 0.03387281259892814
-5 0.01942600635480669
-6 0.013661020012182747
-7 0.0109034351563503
-8 0.009241684574402657
-9 0.008082028259564783
-10 0.007208817414047232
-11 0.07555839219845861
-12 0.07249452699565352
-13 0.05063539695954156
-14 0.029644452692487822
-15 0.018670183493927354
-16 0.013558283213067561
-17 0.010892790899883237
-18 0.009240874593661061
-19 0.008081987856433137
-21 0.006521739130434782
-25 0.006521739130434782
-27 0.006521739130434782
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex/part-0 b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex/part-0
new file mode 100755
index 0000000..1d26aee2
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex/part-0
@@ -0,0 +1,5 @@
+0 0.0072088164890121405
+4 0.03387281259892814
+8 0.009241684574402657
+12 0.07249452699565351
+16 0.013558283213067561
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex/part-1 b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex/part-1
new file mode 100755
index 0000000..d013c3e
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex/part-1
@@ -0,0 +1,7 @@
+1 0.12352056961948689
+5 0.01942600635480669
+9 0.008082028259564783
+13 0.050635396959541557
+17 0.010892790899883237
+21 0.006521739130434782
+25 0.006521739130434782
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex/part-2 b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex/part-2
new file mode 100755
index 0000000..e8aa0e1
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex/part-2
@@ -0,0 +1,5 @@
+2 0.12045670441668178
+6 0.013661020012182747
+10 0.007208817414047232
+14 0.029644452692487822
+18 0.009240874593661061
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex/part-3 b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex/part-3
new file mode 100755
index 0000000..e52970d
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealComplex/part-3
@@ -0,0 +1,6 @@
+3 0.06798545786459467
+7 0.0109034351563503
+11 0.0755583921984586
+15 0.018670183493927354
+19 0.008081987856433137
+27 0.006521739130434782
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic.result b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic.result
deleted file mode 100644
index 6432eda..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic.result
+++ /dev/null
@@ -1,20 +0,0 @@
-0 0.008290140026154316
-1 0.1535152819247165
-2 0.14646839195826475
-3 0.08125113985998214
-4 0.03976979906329426
-5 0.0225041581462058
-6 0.015736276824953852
-7 0.012542224114863661
-8 0.010628239626209894
-9 0.009294348455354817
-10 0.008290140026154316
-11 0.15351528192471647
-12 0.14646839195826472
-13 0.08125113985998214
-14 0.03976979906329426
-15 0.0225041581462058
-16 0.015736276824953852
-17 0.012542224114863661
-18 0.010628239626209894
-19 0.009294348455354817
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic/part-0 b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic/part-0
new file mode 100755
index 0000000..383076e
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic/part-0
@@ -0,0 +1,5 @@
+0 0.008290140026154316
+4 0.03976979906329426
+8 0.010628239626209894
+12 0.14646839195826478
+16 0.015736276824953852
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic/part-1 b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic/part-1
new file mode 100755
index 0000000..0b1a38c
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic/part-1
@@ -0,0 +1,5 @@
+1 0.15351528192471653
+5 0.0225041581462058
+9 0.009294348455354817
+13 0.08125113985998214
+17 0.012542224114863661
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic/part-2 b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic/part-2
new file mode 100755
index 0000000..ec995b2
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic/part-2
@@ -0,0 +1,5 @@
+2 0.14646839195826478
+6 0.015736276824953852
+10 0.008290140026154316
+14 0.03976979906329426
+18 0.010628239626209894
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic/part-3 b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic/part-3
new file mode 100755
index 0000000..5593738
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealDynamic/part-3
@@ -0,0 +1,5 @@
+3 0.08125113985998214
+7 0.012542224114863661
+11 0.15351528192471653
+15 0.022504158146205808
+19 0.009294348455354817
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner.result b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner.result
deleted file mode 100755
index 9a747a6..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner.result
+++ /dev/null
@@ -1,20 +0,0 @@
-0 0.008290140026154316
-1 0.15351528192471647
-2 0.14646839195826475
-3 0.08125113985998211
-4 0.03976979906329425
-5 0.0225041581462058
-6 0.01573627682495385
-7 0.012542224114863661
-8 0.010628239626209894
-9 0.009294348455354817
-10 0.008290140026154316
-11 0.1535152819247165
-12 0.14646839195826475
-13 0.08125113985998214
-14 0.03976979906329426
-15 0.0225041581462058
-16 0.015736276824953852
-17 0.012542224114863661
-18 0.010628239626209894
-19 0.009294348455354817
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner/part-0 b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner/part-0
new file mode 100755
index 0000000..6fd5f60
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner/part-0
@@ -0,0 +1,5 @@
+0 0.008290140026154316
+4 0.03976979906329425
+8 0.010628239626209894
+12 0.14646839195826475
+16 0.01573627682495385
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner/part-1 b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner/part-1
new file mode 100755
index 0000000..0b1a38c
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner/part-1
@@ -0,0 +1,5 @@
+1 0.15351528192471653
+5 0.0225041581462058
+9 0.009294348455354817
+13 0.08125113985998214
+17 0.012542224114863661
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner/part-2 b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner/part-2
new file mode 100755
index 0000000..69a2803
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner/part-2
@@ -0,0 +1,5 @@
+2 0.14646839195826478
+6 0.015736276824953852
+10 0.008290140026154316
+14 0.03976979906329425
+18 0.010628239626209894
diff --git a/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner/part-3 b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner/part-3
new file mode 100755
index 0000000..7dcb359
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/PageRankRealNoCombiner/part-3
@@ -0,0 +1,5 @@
+3 0.08125113985998214
+7 0.012542224114863661
+11 0.1535152819247165
+15 0.0225041581462058
+19 0.009294348455354817
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result
deleted file mode 100644
index a1dfc0f..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex.result
+++ /dev/null
@@ -1,23 +0,0 @@
-0 2
-1 3
-2 1
-3 1
-4 1
-5 1
-6 1
-7 1
-8 1
-9 1
-10 3
-11 2
-12 2
-13 2
-14 2
-15 2
-16 2
-17 2
-18 2
-19 2
-21 0
-25 0
-27 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex/part-0 b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex/part-0
new file mode 100755
index 0000000..6e3ba89
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex/part-0
@@ -0,0 +1,5 @@
+0 0
+4 1
+8 1
+12 2
+16 2
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex/part-1 b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex/part-1
new file mode 100755
index 0000000..2fcc8eb
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex/part-1
@@ -0,0 +1,7 @@
+1 1
+5 1
+9 1
+13 2
+17 2
+21 0
+25 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex/part-2 b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex/part-2
new file mode 100755
index 0000000..c93732c
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex/part-2
@@ -0,0 +1,5 @@
+2 1
+6 1
+10 3
+14 2
+18 2
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex/part-3 b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex/part-3
new file mode 100755
index 0000000..c745349
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplex/part-3
@@ -0,0 +1,6 @@
+3 1
+7 1
+11 2
+15 2
+19 2
+27 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity.result b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity.result
deleted file mode 100644
index 1693fb2..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity.result
+++ /dev/null
@@ -1,23 +0,0 @@
-0 1
-1 1
-2 1
-3 1
-4 1
-5 1
-6 1
-7 1
-8 1
-9 1
-10 1
-11 1
-12 1
-13 1
-14 1
-15 1
-16 1
-17 1
-18 1
-19 1
-21 0
-25 2
-27 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity/part-0 b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity/part-0
new file mode 100755
index 0000000..8e02e13
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity/part-0
@@ -0,0 +1,5 @@
+0 1
+4 1
+8 1
+12 1
+16 1
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity/part-1 b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity/part-1
new file mode 100755
index 0000000..cfbb359
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity/part-1
@@ -0,0 +1,7 @@
+1 1
+5 1
+9 1
+13 1
+17 1
+21 0
+25 2
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity/part-2 b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity/part-2
new file mode 100755
index 0000000..8e3ca7c
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity/part-2
@@ -0,0 +1,5 @@
+2 1
+6 1
+10 1
+14 1
+18 1
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity/part-3 b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity/part-3
new file mode 100755
index 0000000..5f83a3d
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ReachibilityRealComplexNoConnectivity/part-3
@@ -0,0 +1,6 @@
+3 1
+7 1
+11 1
+15 1
+19 1
+27 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths.result b/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths.result
deleted file mode 100644
index 46d1c73..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths.result
+++ /dev/null
@@ -1,20 +0,0 @@
-0 0.0
-1 0.0
-2 100.0
-3 300.0
-4 600.0
-5 1000.0
-6 1500.0
-7 2100.0
-8 2800.0
-9 3600.0
-10 4500.0
-11 5500.0
-12 6600.0
-13 7800.0
-14 9100.0
-15 10500.0
-16 12000.0
-17 13600.0
-18 15300.0
-19 17100.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths/part-0 b/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths/part-0
new file mode 100755
index 0000000..e8eadcd
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths/part-0
@@ -0,0 +1,10 @@
+0 0.0
+4 600.0
+8 2800.0
+12 6600.0
+16 12000.0
+20 19000.0
+24 27600.0
+28 37800.0
+32 49600.0
+36 63000.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths/part-1 b/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths/part-1
new file mode 100755
index 0000000..c3d200b
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths/part-1
@@ -0,0 +1,10 @@
+1 0.0
+5 1000.0
+9 3600.0
+13 7800.0
+17 13600.0
+21 21000.0
+25 30000.0
+29 40600.0
+33 52800.0
+37 66600.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths/part-2 b/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths/part-2
new file mode 100755
index 0000000..779a683
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths/part-2
@@ -0,0 +1,10 @@
+2 100.0
+6 1500.0
+10 4500.0
+14 9100.0
+18 15300.0
+22 23100.0
+26 32500.0
+30 43500.0
+34 56100.0
+38 70300.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths/part-3 b/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths/part-3
new file mode 100755
index 0000000..57d26c9
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ShortestPaths/part-3
@@ -0,0 +1,10 @@
+3 300.0
+7 2100.0
+11 5500.0
+15 10500.0
+19 17100.0
+23 25300.0
+27 35100.0
+31 46500.0
+35 59500.0
+39 74100.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal.result b/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal.result
deleted file mode 100644
index b42462f..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal.result
+++ /dev/null
@@ -1,20 +0,0 @@
-0 0.0
-1 1.0
-2 2.0
-3 3.0
-4 4.0
-5 5.0
-6 6.0
-7 7.0
-8 8.0
-9 9.0
-10 10.0
-11 11.0
-12 12.0
-13 13.0
-14 14.0
-15 15.0
-16 16.0
-17 17.0
-18 18.0
-19 19.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal/part-0 b/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal/part-0
new file mode 100755
index 0000000..a1cabe2
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal/part-0
@@ -0,0 +1,5 @@
+0 0.0
+4 4.0
+8 8.0
+12 12.0
+16 16.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal/part-1 b/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal/part-1
new file mode 100755
index 0000000..303ed2a
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal/part-1
@@ -0,0 +1,5 @@
+1 1.0
+5 5.0
+9 9.0
+13 13.0
+17 17.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal/part-2 b/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal/part-2
new file mode 100755
index 0000000..0020d25
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal/part-2
@@ -0,0 +1,5 @@
+2 2.0
+6 6.0
+10 10.0
+14 14.0
+18 18.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal/part-3 b/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal/part-3
new file mode 100755
index 0000000..e2657bd
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/ShortestPathsReal/part-3
@@ -0,0 +1,5 @@
+3 3.0
+7 7.0
+11 11.0
+15 15.0
+19 19.0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting.result b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting.result
deleted file mode 100644
index 4818e13..0000000
--- a/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting.result
+++ /dev/null
@@ -1,7 +0,0 @@
-1 3
-2 2
-3 0
-4 0
-5 1
-6 0
-7 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/.part-0.crc b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/.part-0.crc
new file mode 100644
index 0000000..61e7df3
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/.part-0.crc
Binary files differ
diff --git a/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/.part-1.crc b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/.part-1.crc
new file mode 100644
index 0000000..9f34827
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/.part-1.crc
Binary files differ
diff --git a/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/.part-2.crc b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/.part-2.crc
new file mode 100644
index 0000000..cfe7fa2
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/.part-2.crc
Binary files differ
diff --git a/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/.part-3.crc b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/.part-3.crc
new file mode 100644
index 0000000..9959654
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/.part-3.crc
Binary files differ
diff --git a/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/part-0 b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/part-0
new file mode 100755
index 0000000..b36d2b5
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/part-0
@@ -0,0 +1 @@
+4 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/part-1 b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/part-1
new file mode 100755
index 0000000..2812c76
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/part-1
@@ -0,0 +1,2 @@
+1 3
+5 1
diff --git a/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/part-2 b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/part-2
new file mode 100755
index 0000000..25cfb7b
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/part-2
@@ -0,0 +1,2 @@
+2 2
+6 0
diff --git a/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/part-3 b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/part-3
new file mode 100755
index 0000000..2bd8d38
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/expected/TriangleCounting/part-3
@@ -0,0 +1,2 @@
+3 0
+7 0
diff --git a/pregelix/pregelix-example/src/test/resources/hyracks-deployment.properties b/pregelix/pregelix-example/src/test/resources/hyracks-deployment.properties
deleted file mode 100644
index 9c42b89..0000000
--- a/pregelix/pregelix-example/src/test/resources/hyracks-deployment.properties
+++ /dev/null
@@ -1,2 +0,0 @@
-#cc.bootstrap.class=edu.uci.ics.asterix.hyracks.bootstrap.CCBootstrapImpl
-nc.bootstrap.class=edu.uci.ics.pregelix.runtime.bootstrap.NCBootstrapImpl
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
new file mode 100644
index 0000000..5621259
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique2.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/clique2</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultclique</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Maximal Clique 2</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex$MaximalCliqueVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
new file mode 100644
index 0000000..d4f81ba
--- /dev/null
+++ b/pregelix/pregelix-example/src/test/resources/jobs/MaximalClique3.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?><configuration>
+<property><name>mapred.tasktracker.dns.nameserver</name><value>default</value></property>
+<property><name>mapred.queue.default.acl-administer-jobs</name><value>*</value></property>
+<property><name>mapred.skip.map.auto.incr.proc.count</name><value>true</value></property>
+<property><name>mapred.jobtracker.instrumentation</name><value>org.apache.hadoop.mapred.JobTrackerMetricsInst</value></property>
+<property><name>mapred.skip.reduce.auto.incr.proc.count</name><value>true</value></property>
+<property><name>fs.hsftp.impl</name><value>org.apache.hadoop.hdfs.HsftpFileSystem</value></property>
+<property><name>mapred.input.dir</name><value>file:/clique3</value></property>
+<property><name>mapred.submit.replication</name><value>10</value></property>
+<property><name>ipc.server.tcpnodelay</name><value>false</value></property>
+<property><name>fs.checkpoint.dir</name><value>${hadoop.tmp.dir}/dfs/namesecondary</value></property>
+<property><name>mapred.output.compression.type</name><value>RECORD</value></property>
+<property><name>mapred.job.shuffle.merge.percent</name><value>0.66</value></property>
+<property><name>mapred.child.java.opts</name><value>-Xmx200m</value></property>
+<property><name>mapred.queue.default.acl-submit-job</name><value>*</value></property>
+<property><name>keep.failed.task.files</name><value>false</value></property>
+<property><name>mapred.jobtracker.job.history.block.size</name><value>3145728</value></property>
+<property><name>io.bytes.per.checksum</name><value>512</value></property>
+<property><name>mapred.task.tracker.report.address</name><value>127.0.0.1:0</value></property>
+<property><name>hadoop.util.hash.type</name><value>murmur</value></property>
+<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value></property>
+<property><name>fs.ramfs.impl</name><value>org.apache.hadoop.fs.InMemoryFileSystem</value></property>
+<property><name>mapred.jobtracker.restart.recover</name><value>false</value></property>
+<property><name>fs.hftp.impl</name><value>org.apache.hadoop.hdfs.HftpFileSystem</value></property>
+<property><name>fs.checkpoint.period</name><value>3600</value></property>
+<property><name>mapred.child.tmp</name><value>./tmp</value></property>
+<property><name>mapred.local.dir.minspacekill</name><value>0</value></property>
+<property><name>map.sort.class</name><value>org.apache.hadoop.util.QuickSort</value></property>
+<property><name>hadoop.logfile.count</name><value>10</value></property>
+<property><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
+<property><name>mapred.output.dir</name><value>/resultclique</value></property>
+<property><name>io.map.index.skip</name><value>0</value></property>
+<property><name>mapred.tasktracker.expiry.interval</name><value>600000</value></property>
+<property><name>mapred.output.compress</name><value>false</value></property>
+<property><name>io.seqfile.lazydecompress</name><value>true</value></property>
+<property><name>mapred.reduce.parallel.copies</name><value>5</value></property>
+<property><name>fs.checkpoint.size</name><value>67108864</value></property>
+<property><name>mapred.job.reduce.input.buffer.percent</name><value>0.0</value></property>
+<property><name>mapred.job.name</name><value>Maximal Clique 3</value></property>
+<property><name>local.cache.size</name><value>10737418240</value></property>
+<property><name>fs.s3n.impl</name><value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>
+<property><name>mapred.userlog.limit.kb</name><value>0</value></property>
+<property><name>fs.file.impl</name><value>org.apache.hadoop.fs.LocalFileSystem</value></property>
+<property><name>mapred.task.tracker.http.address</name><value>0.0.0.0:50060</value></property>
+<property><name>mapred.task.timeout</name><value>600000</value></property>
+<property><name>fs.kfs.impl</name><value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value></property>
+<property><name>mapred.max.tracker.blacklists</name><value>4</value></property>
+<property><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.dir</name><value>/jobtracker/jobsInfo</value></property>
+<property><name>ipc.client.kill.max</name><value>10</value></property>
+<property><name>mapred.tasktracker.instrumentation</name><value>org.apache.hadoop.mapred.TaskTrackerMetricsInst</value></property>
+<property><name>mapred.reduce.tasks.speculative.execution</name><value>true</value></property>
+<property><name>io.sort.record.percent</name><value>0.05</value></property>
+<property><name>hadoop.security.authorization</name><value>false</value></property>
+<property><name>mapred.max.tracker.failures</name><value>4</value></property>
+<property><name>mapred.jobtracker.taskScheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
+<property><name>mapred.tasktracker.dns.interface</name><value>default</value></property>
+<property><name>mapred.map.tasks</name><value>2</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.hours</name><value>0</value></property>
+<property><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
+<property><name>fs.default.name</name><value>file:///</value></property>
+<property><name>tasktracker.http.threads</name><value>40</value></property>
+<property><name>mapred.tasktracker.taskmemorymanager.monitoring-interval</name><value>5000</value></property>
+<property><name>hadoop.rpc.socket.factory.class.default</name><value>org.apache.hadoop.net.StandardSocketFactory</value></property>
+<property><name>mapred.reduce.tasks</name><value>1</value></property>
+<property><name>topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
+<property><name>pregelix.vertexClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex</value></property>
+<property><name>mapred.skip.reduce.max.skip.groups</name><value>0</value></property>
+<property><name>io.file.buffer.size</name><value>4096</value></property>
+<property><name>mapred.jobtracker.maxtasks.per.job</name><value>-1</value></property>
+<property><name>mapred.tasktracker.indexcache.mb</name><value>10</value></property>
+<property><name>mapred.tasktracker.map.tasks.maximum</name><value>2</value></property>
+<property><name>fs.har.impl.disable.cache</name><value>true</value></property>
+<property><name>mapred.task.profile.maps</name><value>0-2</value></property>
+<property><name>hadoop.native.lib</name><value>true</value></property>
+<property><name>fs.s3.block.size</name><value>67108864</value></property>
+<property><name>mapred.job.reuse.jvm.num.tasks</name><value>1</value></property>
+<property><name>mapred.job.tracker.http.address</name><value>0.0.0.0:50030</value></property>
+<property><name>mapred.tasktracker.reduce.tasks.maximum</name><value>2</value></property>
+<property><name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec</value></property>
+<property><name>mapred.job.shuffle.input.buffer.percent</name><value>0.70</value></property>
+<property><name>io.seqfile.compress.blocksize</name><value>1000000</value></property>
+<property><name>mapred.queue.names</name><value>default</value></property>
+<property><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
+<property><name>io.mapfile.bloom.error.rate</name><value>0.005</value></property>
+<property><name>mapred.job.tracker</name><value>local</value></property>
+<property><name>io.skip.checksum.errors</name><value>false</value></property>
+<property><name>mapred.reduce.max.attempts</name><value>4</value></property>
+<property><name>fs.s3.maxRetries</name><value>4</value></property>
+<property><name>ipc.server.listen.queue.size</name><value>128</value></property>
+<property><name>fs.trash.interval</name><value>0</value></property>
+<property><name>mapred.local.dir.minspacestart</name><value>0</value></property>
+<property><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
+<property><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
+<property><name>io.mapfile.bloom.size</name><value>1048576</value></property>
+<property><name>io.sort.mb</name><value>100</value></property>
+<property><name>mapred.local.dir</name><value>${hadoop.tmp.dir}/mapred/local</value></property>
+<property><name>io.sort.factor</name><value>10</value></property>
+<property><name>mapred.task.profile</name><value>false</value></property>
+<property><name>job.end.retry.interval</name><value>30000</value></property>
+<property><name>mapred.tasktracker.procfsbasedprocesstree.sleeptime-before-sigkill</name><value>5000</value></property>
+<property><name>mapred.jobtracker.completeuserjobs.maximum</name><value>100</value></property>
+<property><name>mapred.task.profile.reduces</name><value>0-2</value></property>
+<property><name>webinterface.private.actions</name><value>false</value></property>
+<property><name>hadoop.tmp.dir</name><value>/tmp/hadoop-${user.name}</value></property>
+<property><name>mapred.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.skip.attempts.to.start.skipping</name><value>2</value></property>
+<property><name>mapred.temp.dir</name><value>${hadoop.tmp.dir}/mapred/temp</value></property>
+<property><name>mapred.merge.recordsBeforeProgress</name><value>10000</value></property>
+<property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.DefaultCodec</value></property>
+<property><name>mapred.compress.map.output</name><value>false</value></property>
+<property><name>io.sort.spill.percent</name><value>0.80</value></property>
+<property><name>fs.checkpoint.edits.dir</name><value>${fs.checkpoint.dir}</value></property>
+<property><name>mapred.userlog.retain.hours</name><value>24</value></property>
+<property><name>mapred.system.dir</name><value>${hadoop.tmp.dir}/mapred/system</value></property>
+<property><name>mapred.line.input.format.linespermap</name><value>1</value></property>
+<property><name>job.end.retry.attempts</name><value>0</value></property>
+<property><name>ipc.client.idlethreshold</name><value>4000</value></property>
+<property><name>pregelix.vertexOutputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueVertex$MaximalCliqueVertexOutputFormat</value></property>
+<property><name>mapred.reduce.copy.backoff</name><value>300</value></property>
+<property><name>mapred.map.tasks.speculative.execution</name><value>true</value></property>
+<property><name>mapred.inmem.merge.threshold</name><value>1000</value></property>
+<property><name>hadoop.logfile.size</name><value>10000000</value></property>
+<property><name>pregelix.vertexInputFormatClass</name><value>edu.uci.ics.pregelix.example.maximalclique.TextMaximalCliqueInputFormat</value></property>
+<property><name>pregelix.aggregatorClass</name><value>edu.uci.ics.pregelix.example.maximalclique.MaximalCliqueAggregator</value></property>
+<property><name>mapred.job.queue.name</name><value>default</value></property>
+<property><name>mapred.job.tracker.persist.jobstatus.active</name><value>false</value></property>
+<property><name>pregelix.incStateLength</name><value>true</value></property>
+<property><name>mapred.reduce.slowstart.completed.maps</name><value>0.05</value></property>
+<property><name>topology.script.number.args</name><value>100</value></property>
+<property><name>mapred.skip.map.max.skip.records</name><value>0</value></property>
+<property><name>fs.ftp.impl</name><value>org.apache.hadoop.fs.ftp.FTPFileSystem</value></property>
+<property><name>mapred.task.cache.levels</name><value>2</value></property>
+<property><name>mapred.job.tracker.handler.count</name><value>10</value></property>
+<property><name>io.serializations</name><value>org.apache.hadoop.io.serializer.WritableSerialization</value></property>
+<property><name>ipc.client.connect.max.retries</name><value>10</value></property>
+<property><name>mapred.min.split.size</name><value>0</value></property>
+<property><name>mapred.map.max.attempts</name><value>4</value></property>
+<property><name>jobclient.output.filter</name><value>FAILED</value></property>
+<property><name>ipc.client.tcpnodelay</name><value>false</value></property>
+<property><name>mapred.acls.enabled</name><value>false</value></property>
+</configuration>
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index 0c09757..0a0a14f 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -109,7 +109,7 @@
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
- this.conf = confFactory.createConfiguration();
+ this.conf = confFactory.createConfiguration(ctx);
this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 1bf6a2b..9998205 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -112,7 +112,7 @@
@Override
public void open(IHyracksTaskContext ctx, RecordDescriptor rd, IFrameWriter... writers)
throws HyracksDataException {
- this.conf = confFactory.createConfiguration();
+ this.conf = confFactory.createConfiguration(ctx);
this.dynamicStateLength = BspUtils.getDynamicVertexValueSize(conf);
this.aggregator = BspUtils.createGlobalAggregator(conf);
this.aggregator.init();
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
index 8f63b6e..851a83a 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AccumulatingAggregatorFactory.java
@@ -40,7 +40,7 @@
@SuppressWarnings("unchecked")
@Override
- public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
+ public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc,
RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException {
return new IAggregatorDescriptor() {
@@ -113,7 +113,7 @@
for (int i = 0; i < agg.length; i++) {
aggOutput[i] = new ArrayBackedValueStorage();
try {
- agg[i] = aggFactories[i].createAggregateFunction(aggOutput[i]);
+ agg[i] = aggFactories[i].createAggregateFunction(ctx, aggOutput[i]);
} catch (Exception e) {
throw new IllegalStateException(e);
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
index 1813dcc..3cf46a2 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunction.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.io.WritableComparable;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
@@ -53,9 +54,9 @@
private MsgList msgList = new MsgList();
private boolean keyRead = false;
- public AggregationFunction(IConfigurationFactory confFactory, DataOutput output, boolean isFinalStage,
- boolean partialAggAsInput) throws HyracksDataException {
- this.conf = confFactory.createConfiguration();
+ public AggregationFunction(IHyracksTaskContext ctx, IConfigurationFactory confFactory, DataOutput output,
+ boolean isFinalStage, boolean partialAggAsInput) throws HyracksDataException {
+ this.conf = confFactory.createConfiguration(ctx);
this.output = output;
this.isFinalStage = isFinalStage;
this.partialAggAsInput = partialAggAsInput;
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
index a09f688..7ce9e1d 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/simpleagg/AggregationFunctionFactory.java
@@ -17,6 +17,7 @@
import java.io.DataOutput;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
@@ -36,8 +37,9 @@
}
@Override
- public IAggregateFunction createAggregateFunction(IDataOutputProvider provider) throws HyracksException {
+ public IAggregateFunction createAggregateFunction(IHyracksTaskContext ctx, IDataOutputProvider provider)
+ throws HyracksException {
DataOutput output = provider.getDataOutput();
- return new AggregationFunction(confFactory, output, isFinalStage, partialAggAsInput);
+ return new AggregationFunction(ctx, confFactory, output, isFinalStage, partialAggAsInput);
}
}
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DatatypeHelper.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
similarity index 98%
rename from pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DatatypeHelper.java
rename to pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
index ee319c6..f9085c4 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DatatypeHelper.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/DatatypeHelper.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.pregelix.core.util;
+package edu.uci.ics.pregelix.runtime.touchpoint;
import java.io.DataInput;
import java.io.DataOutput;
@@ -80,6 +80,7 @@
throw new HyracksDataException(e);
}
}
+
}
@SuppressWarnings({ "rawtypes", "unchecked" })
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
index 5f0ed9e..850ae1e 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/PreSuperStepRuntimeHookFactory.java
@@ -39,7 +39,7 @@
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
- Configuration conf = confFactory.createConfiguration();
+ Configuration conf = confFactory.createConfiguration(ctx);
IterationUtils.setProperties(giraphJobId, ctx, conf);
}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
index c025f85..05b1542 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/RuntimeHookFactory.java
@@ -44,9 +44,10 @@
@Override
public void configure(IHyracksTaskContext ctx) throws HyracksDataException {
- Configuration conf = confFactory.createConfiguration();
+ Configuration conf = confFactory.createConfiguration(ctx);
try {
TaskAttemptContext mapperContext = ctxFactory.createContext(conf, new TaskAttemptID());
+ mapperContext.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
Vertex.setContext(mapperContext);
BspUtils.setDefaultConfiguration(conf);
} catch (Exception e) {
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
index 5eff497..5b4b1f0 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/VertexIdPartitionComputerFactory.java
@@ -24,20 +24,22 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.pregelix.dataflow.std.base.ISerializerDeserializerFactory;
public class VertexIdPartitionComputerFactory<K extends Writable, V extends Writable> implements
ITuplePartitionComputerFactory {
private static final long serialVersionUID = 1L;
- private final ISerializerDeserializer<K> keyIO;
+ private final ISerializerDeserializerFactory<K> keyIOFactory;
- public VertexIdPartitionComputerFactory(ISerializerDeserializer<K> keyIO) {
- this.keyIO = keyIO;
+ public VertexIdPartitionComputerFactory(ISerializerDeserializerFactory<K> keyIOFactory) {
+ this.keyIOFactory = keyIOFactory;
}
public ITuplePartitionComputer createPartitioner() {
return new ITuplePartitionComputer() {
private final ByteBufferInputStream bbis = new ByteBufferInputStream();
private final DataInputStream dis = new DataInputStream(bbis);
+ private final ISerializerDeserializer<K> keyIO = keyIOFactory.getSerializerDeserializer();
public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
new file mode 100644
index 0000000..96f781c
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/touchpoint/WritableSerializerDeserializerFactory.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.pregelix.runtime.touchpoint;
+
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.pregelix.dataflow.std.base.ISerializerDeserializerFactory;
+
+public class WritableSerializerDeserializerFactory<T extends Writable> implements ISerializerDeserializerFactory<T> {
+ private static final long serialVersionUID = 1L;
+ private final Class<T> clazz;
+
+ public WritableSerializerDeserializerFactory(Class<T> clazz) {
+ this.clazz = clazz;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public ISerializerDeserializer getSerializerDeserializer() {
+ return DatatypeHelper.createSerializerDeserializer(clazz);
+ }
+}