add Pregelix codebase
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1960 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix-core/src/main/assembly/binary-assembly.xml b/pregelix-core/src/main/assembly/binary-assembly.xml
new file mode 100755
index 0000000..0500499
--- /dev/null
+++ b/pregelix-core/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,19 @@
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
new file mode 100644
index 0000000..0a169e0
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IDriver.java
@@ -0,0 +1,14 @@
+package edu.uci.ics.pregelix.core.base;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+
+public interface IDriver {
+
+ public static enum Plan {
+ INNER_JOIN, OUTER_JOIN, OUTER_JOIN_SORT, OUTER_JOIN_SINGLE_SORT
+ }
+
+ public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
+ throws HyracksException;
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
new file mode 100644
index 0000000..2a91e0b
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/IJobGen.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.base;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public interface IJobGen {
+
+ public JobSpecification generateCreatingJob() throws HyracksException;
+
+ public JobSpecification generateLoadingJob() throws HyracksException;
+
+ public JobSpecification generateJob(int iteration) throws HyracksException;
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java
new file mode 100644
index 0000000..d839f2a
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/base/INormalizedKeyComputerFactoryProvider.java
@@ -0,0 +1,12 @@
+package edu.uci.ics.pregelix.core.base;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public interface INormalizedKeyComputerFactoryProvider {
+
+ @SuppressWarnings("rawtypes")
+ INormalizedKeyComputerFactory getAscINormalizedKeyComputerFactory(Class keyClass);
+
+ @SuppressWarnings("rawtypes")
+ INormalizedKeyComputerFactory getDescINormalizedKeyComputerFactory(Class keyClass);
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/data/TypeTraits.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/data/TypeTraits.java
new file mode 100644
index 0000000..8cb2a50
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/data/TypeTraits.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.pregelix.core.data;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+
+public class TypeTraits implements ITypeTraits {
+
+ private static final long serialVersionUID = 1L;
+ private final int length;
+ private final boolean isFixedLength;
+
+ public TypeTraits(boolean isFixedLength) {
+ this.isFixedLength = isFixedLength;
+ this.length = 0;
+ }
+
+ public TypeTraits(int length) {
+ this.isFixedLength = true;
+ this.length = length;
+ }
+
+ @Override
+ public boolean isFixedLength() {
+ return isFixedLength;
+ }
+
+ @Override
+ public int getFixedLength() {
+ return length;
+ }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
new file mode 100644
index 0000000..7076f7f
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -0,0 +1,220 @@
+package edu.uci.ics.pregelix.core.driver;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.hadoop.compat.util.Utilities;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.core.base.IDriver;
+import edu.uci.ics.pregelix.core.jobgen.JobGen;
+import edu.uci.ics.pregelix.core.jobgen.JobGenInnerJoin;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoin;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSingleSort;
+import edu.uci.ics.pregelix.core.jobgen.JobGenOuterJoinSort;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.dataflow.util.IterationUtils;
+
+@SuppressWarnings("rawtypes")
+public class Driver implements IDriver {
+ private static final Log LOG = LogFactory.getLog(Driver.class);
+ private static String LIB = "lib";
+ private JobGen jobGen;
+ private PregelixJob job;
+ private boolean profiling;
+
+ private String applicationName;
+ private String GIRAPH_HOME = "GIRAPH_HOME";
+ private IHyracksClientConnection hcc;
+
+ private Class exampleClass;
+
+ public Driver(Class exampleClass) {
+ this.exampleClass = exampleClass;
+ }
+
+ @Override
+ public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
+ throws HyracksException {
+ LOG.info("job started");
+ long start = System.currentTimeMillis();
+ long end = start;
+ long time = 0;
+
+ this.job = job;
+ this.profiling = profiling;
+ try {
+ switch (planChoice) {
+ case INNER_JOIN:
+ jobGen = new JobGenInnerJoin(job);
+ break;
+ case OUTER_JOIN:
+ jobGen = new JobGenOuterJoin(job);
+ break;
+ case OUTER_JOIN_SORT:
+ jobGen = new JobGenOuterJoinSort(job);
+ break;
+ case OUTER_JOIN_SINGLE_SORT:
+ jobGen = new JobGenOuterJoinSingleSort(job);
+ break;
+ default:
+ jobGen = new JobGenInnerJoin(job);
+ }
+
+ if (hcc == null)
+ hcc = new HyracksConnection(ipAddress, port);
+ ClusterConfig.loadClusterConfig(ipAddress, port);
+
+ URLClassLoader classLoader = (URLClassLoader) exampleClass.getClassLoader();
+ URL[] urls = classLoader.getURLs();
+ String jarFile = "";
+ for (URL url : urls)
+ if (url.toString().endsWith(".jar"))
+ jarFile = url.getPath();
+
+ installApplication(jarFile);
+ FileSystem dfs = FileSystem.get(job.getConfiguration());
+ dfs.delete(FileOutputFormat.getOutputPath(job), true);
+
+ runCreate(jobGen);
+ runDataLoad(jobGen);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("data loading finished " + time + "ms");
+ int i = 1;
+ boolean terminate = false;
+ do {
+ start = System.currentTimeMillis();
+ runLoopBodyIteration(jobGen, i);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("iteration " + i + " finished " + time + "ms");
+ terminate = IterationUtils.readTerminationState(job.getConfiguration(), jobGen.getJobId());
+ i++;
+ } while (!terminate);
+
+ start = System.currentTimeMillis();
+ runHDFSWRite(jobGen);
+ runCleanup(jobGen);
+ destroyApplication(applicationName);
+ end = System.currentTimeMillis();
+ time = end - start;
+ LOG.info("result writing finished " + time + "ms");
+ LOG.info("job finished");
+ } catch (Exception e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ private void runCreate(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification treeCreateSpec = jobGen.generateCreatingJob();
+ execute(treeCreateSpec);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ private void runDataLoad(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification bulkLoadJobSpec = jobGen.generateLoadingJob();
+ execute(bulkLoadJobSpec);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ private void runLoopBodyIteration(JobGen jobGen, int iteration) throws Exception {
+ try {
+ JobSpecification loopBody = jobGen.generateJob(iteration);
+ execute(loopBody);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ private void runHDFSWRite(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification scanSortPrintJobSpec = jobGen.scanIndexWriteGraph();
+ execute(scanSortPrintJobSpec);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ private void runCleanup(JobGen jobGen) throws Exception {
+ try {
+ JobSpecification[] cleanups = jobGen.generateCleanup();
+ runJobArray(cleanups);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ private void runJobArray(JobSpecification[] jobs) throws Exception {
+ for (JobSpecification job : jobs) {
+ execute(job);
+ }
+ }
+
+ private void execute(JobSpecification job) throws Exception {
+ JobId jobId = hcc.startJob(applicationName, job,
+ profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ hcc.waitForCompletion(jobId);
+ }
+
+ public void installApplication(String jarFile) throws Exception {
+ System.out.println(jarFile);
+ applicationName = job.getJobName() + new UUID(System.currentTimeMillis(), System.nanoTime());
+ String home = System.getProperty(GIRAPH_HOME);
+ if (home == null)
+ home = "./";
+ String libDir = home + LIB;
+ File dir = new File(libDir);
+ if (!dir.isDirectory()) {
+ throw new HyracksException(libDir + " is not a directory!");
+ }
+ System.out.println(dir.getAbsolutePath());
+ File[] libJars = dir.listFiles(new FileFilter("jar"));
+ Set<String> allJars = new TreeSet<String>();
+ allJars.add(jarFile);
+ for (File jar : libJars) {
+ allJars.add(jar.getAbsolutePath());
+ }
+ File appZip = Utilities.getHyracksArchive(applicationName, allJars);
+ hcc.createApplication(applicationName, appZip);
+ }
+
+ public void destroyApplication(String jarFile) throws Exception {
+ hcc.destroyApplication(applicationName);
+ }
+
+}
+
+class FileFilter implements FilenameFilter {
+ private String ext;
+
+ public FileFilter(String ext) {
+ this.ext = "." + ext;
+ }
+
+ public boolean accept(File dir, String name) {
+ return name.endsWith(ext);
+ }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
new file mode 100644
index 0000000..85f3e6b
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/config/ConfigurationFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.hadoop.config;
+
+import org.apache.hadoop.conf.Configuration;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.pregelix.api.util.SerDeUtils;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+
+public class ConfigurationFactory implements IConfigurationFactory {
+ private static final long serialVersionUID = 1L;
+ private final byte[] data;
+
+ public ConfigurationFactory(Configuration conf) {
+ try {
+ data = SerDeUtils.serialize(conf);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public Configuration createConfiguration() throws HyracksDataException {
+ try {
+ Configuration conf = new Configuration();
+ SerDeUtils.deserialize(conf, data);
+ conf.setClassLoader(this.getClass().getClassLoader());
+ return conf;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/Message.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/Message.java
new file mode 100644
index 0000000..38f12fe
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/Message.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.hadoop.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.pregelix.api.util.BspUtils;
+
+public class Message<I extends Writable, M extends Writable> implements Writable {
+ private I receiverId;
+ private M body;
+ private Configuration conf;
+
+ public Message() {
+ }
+
+ public Message(I receiverId, M body) {
+ this.receiverId = receiverId;
+ this.body = body;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ if (this.receiverId == null && this.body == null) {
+ setClass((Class<I>) BspUtils.getVertexIndexClass(getConf()),
+ (Class<M>) BspUtils.getMessageValueClass(getConf()));
+ }
+ receiverId.readFields(input);
+ body.readFields(input);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ receiverId.write(output);
+ body.write(output);
+ }
+
+ public I getReceiverVertexId() {
+ return receiverId;
+ }
+
+ public M getMessageBody() {
+ return body;
+ }
+
+ private void setClass(Class<I> idClass, Class<M> bodyClass) {
+ receiverId = ReflectionUtils.newInstance(idClass, getConf());
+ body = ReflectionUtils.newInstance(bodyClass, getConf());
+ }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/MessageList.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/MessageList.java
new file mode 100644
index 0000000..880e75c
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/hadoop/data/MessageList.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.hadoop.data;
+
+import edu.uci.ics.pregelix.api.util.ArrayListWritable;
+
+@SuppressWarnings("rawtypes")
+public class MessageList extends ArrayListWritable<Message> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void setClass() {
+ setClass(Message.class);
+ }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
new file mode 100644
index 0000000..9291e94
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -0,0 +1,441 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.pregelix.api.graph.Vertex;
+import edu.uci.ics.pregelix.api.io.VertexInputFormat;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.api.util.ReflectionUtils;
+import edu.uci.ics.pregelix.core.base.IJobGen;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.jobgen.provider.NormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.FileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
+import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public abstract class JobGen implements IJobGen {
+ private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
+ protected static final int MB = 1048576;
+ protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+ protected static final int frameSize = 65536;
+ protected static final int maxFrameSize = (int) (((long) 32 * MB) / frameSize);
+ protected static final int tableSize = 10485767;
+ protected static final String PRIMARY_INDEX = "primary";
+ protected final Configuration conf;
+ protected final PregelixJob giraphJob;
+ protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+ protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+ protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+
+ protected static final String SECONDARY_INDEX_ODD = "secondary1";
+ protected static final String SECONDARY_INDEX_EVEN = "secondary2";
+
+ public JobGen(PregelixJob job) {
+ this.conf = job.getConfiguration();
+ this.giraphJob = job;
+ this.initJobConfiguration();
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void initJobConfiguration() {
+ Class vertexClass = conf.getClass(PregelixJob.VERTEX_CLASS, Vertex.class);
+ List<Type> parameterTypes = ReflectionUtils.getTypeArguments(Vertex.class, vertexClass);
+ Type vertexIndexType = parameterTypes.get(0);
+ Type vertexValueType = parameterTypes.get(1);
+ Type edgeValueType = parameterTypes.get(2);
+ Type messageValueType = parameterTypes.get(3);
+ conf.setClass(PregelixJob.VERTEX_INDEX_CLASS, (Class<?>) vertexIndexType, WritableComparable.class);
+ conf.setClass(PregelixJob.VERTEX_VALUE_CLASS, (Class<?>) vertexValueType, Writable.class);
+ conf.setClass(PregelixJob.EDGE_VALUE_CLASS, (Class<?>) edgeValueType, Writable.class);
+ conf.setClass(PregelixJob.MESSAGE_VALUE_CLASS, (Class<?>) messageValueType, Writable.class);
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public JobSpecification generateCreatingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeCreate);
+ return spec;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public JobSpecification generateLoadingJob() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+ LOGGER.info("number of splits: " + splits.size());
+ for (InputSplit split : splits)
+ LOGGER.info(split.toString());
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+ confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner, splits);
+
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+ .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameSize, sortFields,
+ nkmFactory, comparatorFactories, recordDescriptor);
+ ClusterConfig.setLocationConstraint(spec, sorter);
+
+ /**
+ * construct tree bulk load operator
+ */
+ int[] fieldPermutation = new int[2];
+ fieldPermutation[0] = 0;
+ fieldPermutation[1] = 1;
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+ return spec;
+ }
+
+ @Override
+ public JobSpecification generateJob(int iteration) throws HyracksException {
+ if (iteration <= 0)
+ throw new IllegalStateException("iteration number cannot be less than 1");
+ if (iteration == 1)
+ return generateFirstIteration(iteration);
+ else
+ return generateNonFirstIteration(iteration);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanSortPrintGraph(String nodeName, String path) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ int maxFrameLimit = (int) (((long) 512 * MB) / frameSize);
+ JobSpecification spec = new JobSpecification();
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+ /**
+ * the graph file scan operator and use count constraint first, will use
+ * absolute constraint later
+ */
+ VertexInputFormat inputFormat = BspUtils.createVertexInputFormat(conf);
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ try {
+ splits = inputFormat.getSplits(giraphJob, fileSplitProvider.getFileSplits().length);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
+ confFactory);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner, splits.size());
+
+ /**
+ * construct sort operator
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ INormalizedKeyComputerFactory nkmFactory = NormalizedKeyComputerFactoryProvider.INSTANCE
+ .getAscINormalizedKeyComputerFactory(vertexIdClass);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
+ nkmFactory, comparatorFactories, recordDescriptor);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter, splits.size());
+
+ /**
+ * construct write file operator
+ */
+ FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
+ resultFileSplitProvider, preHookFactory, null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+ /**
+ * connect operator descriptors
+ */
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, sorter, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+ comparatorFactories), sorter, 0, writer, 0);
+ return spec;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanIndexPrintGraph(String nodeName, String path) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct btree search operator
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct write file operator
+ */
+ FileSplit resultFile = new FileSplit(nodeName, new FileReference(new File(path)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
+ resultFileSplitProvider, preHookFactory, null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+ /**
+ * connect operator descriptors
+ */
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new VertexIdPartitionComputerFactory(
+ DatatypeHelper.createSerializerDeserializer(vertexIdClass));
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, hashPartitionComputerFactory, sortFields,
+ comparatorFactories), scanner, 0, writer, 0);
+ spec.setFrameSize(frameSize);
+ return spec;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public JobSpecification scanIndexWriteGraph() throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * construct empty tuple operator
+ */
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor emptyTupleSource = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct btree search operator
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
+ storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct write file operator
+ */
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ HDFSFileWriteOperatorDescriptor writer = new HDFSFileWriteOperatorDescriptor(spec, confFactory, inputRdFactory);
+ ClusterConfig.setLocationConstraint(spec, writer);
+
+ /**
+ * connect operator descriptors
+ */
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, writer, 0);
+ return spec;
+ }
+
+ /***
+ * drop the sindex
+ *
+ * @return JobSpecification
+ * @throws HyracksException
+ */
+ protected JobSpecification dropIndex(String indexName) throws HyracksException {
+ JobSpecification spec = new JobSpecification();
+
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
+ TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
+ treeRegistryProvider, fileSplitProvider);
+
+ ClusterConfig.setLocationConstraint(spec, drop);
+ spec.addRoot(drop);
+ return spec;
+ }
+
+ /** generate non-first iteration job */
+ protected abstract JobSpecification generateNonFirstIteration(int iteration) throws HyracksException;
+
+ /** generate first iteration job */
+ protected abstract JobSpecification generateFirstIteration(int iteration) throws HyracksException;
+
+ /** generate clean-up job */
+ public abstract JobSpecification[] generateCleanup() throws HyracksException;
+
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
new file mode 100644
index 0000000..12e1cd7
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -0,0 +1,418 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.TreeIndexBulkReLoadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public class JobGenInnerJoin extends JobGen {
+
+ public JobGenInnerJoin(PregelixJob job) {
+ super(job);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /** construct runtime hook */
+ RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+ ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+ /**
+ * construct drop index operator
+ */
+ IFileSplitProvider secondaryFileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, SECONDARY_INDEX_ODD);
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+ /**
+ * construct btree search operator
+ */
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+
+ /**
+ * construct compute operator
+ */
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+ RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MessageList.class.getName());
+ IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MsgList.class.getName());
+
+ BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+ recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
+ interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
+ JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
+ inputRdFactory, 3, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage,
+ rdDummy, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * termination state write operator
+ */
+ TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+ configurationFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+
+ /**
+ * construct bulk-load index operator
+ */
+ int[] fieldPermutation = new int[] { 0, 1 };
+ IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+ indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
+ WritableComparator.get(vertexIdClass).getClass());
+ TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+ storageManagerInterface, treeRegistryProvider, secondaryFileSplitProvider, interiorFrameFactory,
+ leafFrameFactory, typeTraits, indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
+ new BTreeDataflowHelperFactory());
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+ /**
+ * construct unnest operator
+ */
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), messageValueClass.getName());
+ /**
+ * construct local sort operator
+ */
+ int[] keyFields = new int[] { 0 };
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+ .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, localSort);
+
+ /**
+ * construct local pre-clustered group-by operator
+ */
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+ PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, localGby);
+
+ /**
+ * construct global group-by operator
+ */
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+ .getAccumulatingAggregatorFactory(conf, true);
+ PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, globalGby);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materialize);
+
+ /**
+ * do pre- & post- super step
+ */
+ RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PostSuperStepRuntimeHookFactory(jobId));
+ ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink);
+
+ ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+ rdUnnestedMessage.getFields()[0]);
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ /** connect all operators **/
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+ terminateWriter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 2, btreeBulkLoad, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+ localGby, 0, globalGby, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+ spec.addRoot(emptySink);
+ spec.addRoot(btreeBulkLoad);
+ spec.addRoot(terminateWriter);
+
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ return spec;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * source aggregate
+ */
+ int[] keyFields = new int[] { 0 };
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), messageValueClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MsgList.class.getName());
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct pre-superstep
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+ ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+ /**
+ * construct the index-set-union operator
+ */
+ String readFile = iteration % 2 == 0 ? SECONDARY_INDEX_ODD : SECONDARY_INDEX_EVEN;
+ IFileSplitProvider secondaryFileSplitProviderRead = ClusterConfig.getFileSplitProvider(jobId, readFile);
+
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
+ storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
+ leafFrameFactory, typeTraits, comparatorFactories, true, keyFields, keyFields, true, true,
+ new BTreeDataflowHelperFactory(), true);
+ ClusterConfig.setLocationConstraint(spec, setUnion);
+
+ /**
+ * construct index-join-function-update operator
+ */
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+ RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MessageList.class.getName());
+ IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+
+ IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
+ spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
+ keyFields, true, true, new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+ ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, join);
+
+ /**
+ * construct bulk-load index operator
+ */
+ int fieldPermutation[] = new int[] { 0, 1 };
+ IBinaryComparatorFactory[] indexCmpFactories = new IBinaryComparatorFactory[1];
+ indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
+ WritableComparator.get(vertexIdClass).getClass());
+ String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
+ IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
+ TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
+ storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderWrite, interiorFrameFactory,
+ leafFrameFactory, typeTraits, indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR,
+ new BTreeDataflowHelperFactory());
+ ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
+
+ /**
+ * construct local sort operator
+ */
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+ .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, localSort);
+
+ /**
+ * construct local pre-clustered group-by operator
+ */
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+ PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, localGby);
+
+ /**
+ * construct global group-by operator
+ */
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+ .getAccumulatingAggregatorFactory(conf, true);
+ PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, globalGby);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materialize);
+
+ /** construct runtime hook */
+ RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PostSuperStepRuntimeHookFactory(jobId));
+ ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink);
+
+ /**
+ * termination state write operator
+ */
+ TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+ configurationFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
+ ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+ rdUnnestedMessage.getFields()[0]);
+ /** connect all operators **/
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, setUnion, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), setUnion, 0, join, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+ terminateWriter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), join, 2, btreeBulkLoad, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+ localGby, 0, globalGby, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+ spec.addRoot(emptySink);
+ spec.addRoot(btreeBulkLoad);
+ spec.addRoot(terminateWriter);
+
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ return spec;
+ }
+
+ @Override
+ public JobSpecification[] generateCleanup() throws HyracksException {
+ JobSpecification[] cleanups = new JobSpecification[3];
+ cleanups[0] = this.dropIndex(PRIMARY_INDEX);
+ cleanups[1] = this.dropIndex(SECONDARY_INDEX_ODD);
+ cleanups[2] = this.dropIndex(SECONDARY_INDEX_EVEN);
+ return cleanups;
+ }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
new file mode 100644
index 0000000..e1944eb
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -0,0 +1,366 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MsgListNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public class JobGenOuterJoin extends JobGen {
+
+ public JobGenOuterJoin(PregelixJob job) {
+ super(job);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /** construct runtime hook */
+ RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+ ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+ /**
+ * construct btree search operator
+ */
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+
+ /**
+ * construct compute operator
+ */
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+ RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MessageList.class.getName());
+ IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+ recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
+ interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
+ JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
+ inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), messageValueClass.getName());
+ /**
+ * construct local sort operator
+ */
+ int[] keyFields = new int[] { 0 };
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+ .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, localSort);
+
+ /**
+ * construct local pre-clustered group-by operator
+ */
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+ PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, localGby);
+
+ /**
+ * construct global group-by operator
+ */
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MsgList.class.getName());
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+ .getAccumulatingAggregatorFactory(conf, true);
+ PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, globalGby);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materialize);
+
+ RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PostSuperStepRuntimeHookFactory(jobId));
+ ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink2);
+
+ /**
+ * termination state write operator
+ */
+ TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+ configurationFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
+ ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+ rdUnnestedMessage.getFields()[0]);
+ /** connect all operators **/
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+ terminateWriter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+ localGby, 0, globalGby, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
+
+ spec.addRoot(terminateWriter);
+ spec.addRoot(emptySink2);
+
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ return spec;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * source aggregate
+ */
+ int[] keyFields = new int[] { 0 };
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), messageValueClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MsgList.class.getName());
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct pre-superstep hook
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+ ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+ /**
+ * construct index join function update operator
+ */
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
+ nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
+ nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
+
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+ RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MessageList.class.getName());
+ IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+
+ IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
+ spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
+ keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
+ ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+ ClusterConfig.setLocationConstraint(spec, join);
+
+ /**
+ * construct local sort operator
+ */
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+ .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, localSort);
+
+ /**
+ * construct local pre-clustered group-by operator
+ */
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+ PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, localGby);
+
+ /**
+ * construct global group-by operator
+ */
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+ .getAccumulatingAggregatorFactory(conf, true);
+ PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, globalGby);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materialize);
+
+ /** construct runtime hook */
+ RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PostSuperStepRuntimeHookFactory(jobId));
+ ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink);
+
+ /**
+ * termination state write operator
+ */
+ TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+ configurationFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+ rdUnnestedMessage.getFields()[0]);
+
+ /** connect all operators **/
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+ terminateWriter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
+ localGby, 0, globalGby, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+ spec.addRoot(terminateWriter);
+ spec.addRoot(emptySink);
+
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ return spec;
+ }
+
+ @Override
+ public JobSpecification[] generateCleanup() throws HyracksException {
+ JobSpecification[] cleanups = new JobSpecification[1];
+ cleanups[0] = this.dropIndex(PRIMARY_INDEX);
+ return cleanups;
+ }
+
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
new file mode 100644
index 0000000..80d11ab
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -0,0 +1,345 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.NonCombinerConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MsgListNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public class JobGenOuterJoinSingleSort extends JobGen {
+
+ public JobGenOuterJoinSingleSort(PregelixJob job) {
+ super(job);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /** construct runtime hook */
+ RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+ ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+ /**
+ * construct btree search operator
+ */
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+
+ /**
+ * construct compute operator
+ */
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+ RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MessageList.class.getName());
+ IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+ recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
+ interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
+ JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
+ inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ /**
+ * construct global sort operator
+ */
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), messageValueClass.getName());
+ int[] keyFields = new int[] { 0 };
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+ .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, globalSort);
+
+ /**
+ * construct global group-by operator
+ */
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MsgList.class.getName());
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+ .getAccumulatingAggregatorFactory(conf, true);
+ PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, globalGby);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materialize);
+
+ RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PostSuperStepRuntimeHookFactory(jobId));
+ ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink2);
+
+ /**
+ * termination state write operator
+ */
+ TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+ configurationFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
+ ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+ rdUnnestedMessage.getFields()[0]);
+ /** connect all operators **/
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 0, globalSort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+ terminateWriter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
+
+ spec.addRoot(terminateWriter);
+ spec.addRoot(emptySink2);
+
+ spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
+ return spec;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * source aggregate
+ */
+ int[] keyFields = new int[] { 0 };
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), messageValueClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MsgList.class.getName());
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct pre-superstep hook
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+ ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+ /**
+ * construct index join function update operator
+ */
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
+ nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
+ nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
+
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+ RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MessageList.class.getName());
+ IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+
+ IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
+ spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
+ keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
+ ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+ ClusterConfig.setLocationConstraint(spec, join);
+
+ /**
+ * construct global sort operator
+ */
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+ .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, globalSort);
+
+ /**
+ * construct global group-by operator
+ */
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+ .getAccumulatingAggregatorFactory(conf, true);
+ PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, globalGby);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materialize);
+
+ /** construct runtime hook */
+ RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PostSuperStepRuntimeHookFactory(jobId));
+ ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink);
+
+ /**
+ * termination state write operator
+ */
+ TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+ configurationFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+ rdUnnestedMessage.getFields()[0]);
+
+ /** connect all operators **/
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 0, globalSort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+ terminateWriter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+ spec.addRoot(terminateWriter);
+ spec.addRoot(emptySink);
+
+ spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
+ return spec;
+ }
+
+ @Override
+ public JobSpecification[] generateCleanup() throws HyracksException {
+ JobSpecification[] cleanups = new JobSpecification[1];
+ cleanups[0] = this.dropIndex(PRIMARY_INDEX);
+ return cleanups;
+ }
+
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
new file mode 100644
index 0000000..f1f89b6
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -0,0 +1,375 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen;
+
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.pregelix.api.graph.MsgList;
+import edu.uci.ics.pregelix.api.job.PregelixJob;
+import edu.uci.ics.pregelix.api.util.BspUtils;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.hadoop.data.MessageList;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
+import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinFunctionUpdateOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.RuntimeHookOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.function.ComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.function.StartComputeUpdateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MergePartitionComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.MsgListNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PostSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.PreSuperStepRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdNullWriterFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
+
+public class JobGenOuterJoinSort extends JobGen {
+
+ public JobGenOuterJoinSort(PregelixJob job) {
+ super(job);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ protected JobSpecification generateFirstIteration(int iteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /** construct runtime hook */
+ RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+ ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+ /**
+ * construct btree search operator
+ */
+ RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+
+ /**
+ * construct compute operator
+ */
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+ RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MessageList.class.getName());
+ IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), vertexClass.getName());
+ BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
+ recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider,
+ interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
+ JobGenUtil.getForwardScan(iteration), null, null, true, true, new BTreeDataflowHelperFactory(),
+ inputRdFactory, 2, StartComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+ ClusterConfig.setLocationConstraint(spec, scanner);
+
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), messageValueClass.getName());
+ /**
+ * construct local sort operator
+ */
+ int[] keyFields = new int[] { 0 };
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+ .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, localSort);
+
+ /**
+ * construct local pre-clustered group-by operator
+ */
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+ PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, localGby);
+
+ /**
+ * construct global sort operator
+ */
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, globalSort);
+
+ /**
+ * construct global group-by operator
+ */
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MsgList.class.getName());
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+ .getAccumulatingAggregatorFactory(conf, true);
+ PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, globalGby);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materialize);
+
+ RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PostSuperStepRuntimeHookFactory(jobId));
+ ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink2 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink2);
+
+ /**
+ * termination state write operator
+ */
+ TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+ configurationFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+
+ ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+ rdUnnestedMessage.getFields()[0]);
+ /** connect all operators **/
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+ terminateWriter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink2, 0);
+
+ spec.addRoot(terminateWriter);
+ spec.addRoot(emptySink2);
+
+ return spec;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ protected JobSpecification generateNonFirstIteration(int iteration) throws HyracksException {
+ Class<? extends WritableComparable<?>> vertexIdClass = BspUtils.getVertexIndexClass(conf);
+ Class<? extends Writable> vertexClass = BspUtils.getVertexClass(conf);
+ Class<? extends Writable> messageValueClass = BspUtils.getMessageValueClass(conf);
+ JobSpecification spec = new JobSpecification();
+
+ /**
+ * source aggregate
+ */
+ int[] keyFields = new int[] { 0 };
+ RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
+ vertexIdClass.getName(), messageValueClass.getName());
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
+ .getClass());
+ RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MsgList.class.getName());
+
+ /**
+ * construct empty tuple operator
+ */
+ EmptyTupleSourceOperatorDescriptor emptyTupleSource = new EmptyTupleSourceOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptyTupleSource);
+
+ /**
+ * construct pre-superstep hook
+ */
+ IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ RuntimeHookOperatorDescriptor preSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PreSuperStepRuntimeHookFactory(jobId, confFactory));
+ ClusterConfig.setLocationConstraint(spec, preSuperStep);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingReadOperatorDescriptor materializeRead = new MaterializingReadOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materializeRead);
+
+ /**
+ * construct index join function update operator
+ */
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
+ ITypeTraits[] typeTraits = new ITypeTraits[2];
+ typeTraits[0] = new TypeTraits(false);
+ typeTraits[1] = new TypeTraits(false);
+ ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[2];
+ nullWriterFactories[0] = VertexIdNullWriterFactory.INSTANCE;
+ nullWriterFactories[1] = MsgListNullWriterFactory.INSTANCE;
+
+ RecordDescriptor rdDummy = DataflowUtils.getRecordDescriptorFromWritableClasses(VLongWritable.class.getName());
+ RecordDescriptor rdMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ MessageList.class.getName());
+ IConfigurationFactory configurationFactory = new ConfigurationFactory(conf);
+ IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(configurationFactory);
+ IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
+ vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
+
+ IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
+ spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
+ keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 2,
+ ComputeUpdateFunctionFactory.INSTANCE, preHookFactory, null, rdMessage, rdDummy);
+ ClusterConfig.setLocationConstraint(spec, join);
+
+ /**
+ * construct local sort operator
+ */
+ INormalizedKeyComputerFactory nkmFactory = JobGenUtil
+ .getINormalizedKeyComputerFactory(iteration, vertexIdClass);
+ IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
+ sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
+ .getClass());
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, localSort);
+
+ /**
+ * construct local pre-clustered group-by operator
+ */
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false);
+ PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, localGby);
+
+ /**
+ * construct global sort operator
+ */
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ nkmFactory, sortCmpFactories, rdUnnestedMessage);
+ ClusterConfig.setLocationConstraint(spec, globalSort);
+
+ /**
+ * construct global group-by operator
+ */
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
+ .getAccumulatingAggregatorFactory(conf, true);
+ PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
+ sortCmpFactories, aggregatorFactoryFinal, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, globalGby);
+
+ /**
+ * construct the materializing write operator
+ */
+ MaterializingWriteOperatorDescriptor materialize = new MaterializingWriteOperatorDescriptor(spec, rdFinal);
+ ClusterConfig.setLocationConstraint(spec, materialize);
+
+ /** construct runtime hook */
+ RuntimeHookOperatorDescriptor postSuperStep = new RuntimeHookOperatorDescriptor(spec,
+ new PostSuperStepRuntimeHookFactory(jobId));
+ ClusterConfig.setLocationConstraint(spec, postSuperStep);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink);
+
+ /**
+ * termination state write operator
+ */
+ TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
+ configurationFactory, jobId);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
+
+ ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
+ rdUnnestedMessage.getFields()[0]);
+
+ /** connect all operators **/
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+ terminateWriter, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), postSuperStep, 0, emptySink, 0);
+
+ spec.addRoot(terminateWriter);
+ spec.addRoot(emptySink);
+ return spec;
+ }
+
+ @Override
+ public JobSpecification[] generateCleanup() throws HyracksException {
+ JobSpecification[] cleanups = new JobSpecification[1];
+ cleanups[0] = this.dropIndex(PRIMARY_INDEX);
+ return cleanups;
+ }
+
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
new file mode 100644
index 0000000..608980c
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenUtil.java
@@ -0,0 +1,46 @@
+package edu.uci.ics.pregelix.core.jobgen;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
+import edu.uci.ics.pregelix.core.jobgen.provider.NormalizedKeyComputerFactoryProvider;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class JobGenUtil {
+
+ /**
+ * get normalized key factory for an iteration, to sort messages iteration
+ * 1: asc order iteration 2: desc order
+ *
+ * @param iteration
+ * @param keyClass
+ * @return
+ */
+ public static INormalizedKeyComputerFactory getINormalizedKeyComputerFactory(int iteration, Class keyClass) {
+ return NormalizedKeyComputerFactoryProvider.INSTANCE.getAscINormalizedKeyComputerFactory(keyClass);
+ }
+
+ /**
+ * get comparator for an iteration, to sort messages iteration 1: asc order
+ * iteration 0: desc order
+ *
+ * @param iteration
+ * @param keyClass
+ * @return
+ */
+ public static IBinaryComparatorFactory getIBinaryComparatorFactory(int iteration, Class keyClass) {
+ return new WritableComparingBinaryComparatorFactory(keyClass);
+ }
+
+ /**
+ * get the B-tree scan order for an iteration iteration 1: desc order,
+ * backward scan iteration 2: asc order, forward scan
+ *
+ * @param iteration
+ * @return
+ */
+ public static boolean getForwardScan(int iteration) {
+ return true;
+ }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
new file mode 100644
index 0000000..a22b6f4
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.jobgen.clusterconfig;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class ClusterConfig {
+
+ private static String[] NCs;
+ private static String propertiesPath = "conf/stores.properties";
+ private static Map<String, List<String>> ipToNcMapping;
+ private static String[] stores;
+
+ /**
+ * let tests set config path to be whatever
+ *
+ * @param confPath
+ * @param propertiesPath
+ */
+ public static void setStorePath(String propertiesPath) {
+ ClusterConfig.propertiesPath = propertiesPath;
+ }
+
+ /**
+ * get NC names running on one IP address
+ *
+ * @param ipAddress
+ * @return
+ * @throws HyracksDataException
+ */
+ public static List<String> getNCNames(String ipAddress) throws HyracksException {
+ return ipToNcMapping.get(ipAddress);
+ }
+
+ /**
+ * get file split provider
+ *
+ * @param jobId
+ * @return
+ * @throws HyracksDataException
+ */
+ public static IFileSplitProvider getFileSplitProvider(String jobId, String indexName) throws HyracksException {
+ if (stores == null) {
+ loadStores();
+ }
+ FileSplit[] fileSplits = new FileSplit[stores.length * NCs.length];
+ int i = 0;
+ for (String nc : NCs) {
+ for (String st : stores) {
+ FileSplit split = new FileSplit(nc, st + File.separator + nc + "-data" + File.separator + jobId
+ + File.separator + indexName);
+ fileSplits[i++] = split;
+ }
+ }
+ return new ConstantFileSplitProvider(fileSplits);
+ }
+
+ private static void loadStores() throws HyracksException {
+ Properties properties = new Properties();
+ try {
+ properties.load(new FileInputStream(propertiesPath));
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ String store = properties.getProperty("store");
+ stores = store.split(";");
+ }
+
+ /**
+ * set location constraint
+ *
+ * @param spec
+ * @param operator
+ * @throws HyracksDataException
+ */
+ public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator,
+ List<InputSplit> splits) throws HyracksException {
+ if (stores == null) {
+ loadStores();
+ }
+ int count = splits.size();
+ String[] locations = new String[splits.size()];
+ Random random = new Random(System.currentTimeMillis());
+ for (int i = 0; i < splits.size(); i++) {
+ try {
+ String[] loc = splits.get(i).getLocations();
+ Collections.shuffle(Arrays.asList(loc), random);
+ if (loc.length > 0) {
+ InetAddress[] allIps = InetAddress.getAllByName(loc[0]);
+ for (InetAddress ip : allIps) {
+ if (ipToNcMapping.get(ip.getHostAddress()) != null) {
+ List<String> ncs = ipToNcMapping.get(ip.getHostAddress());
+ int pos = random.nextInt(ncs.size());
+ locations[i] = ncs.get(pos);
+ } else {
+ int pos = random.nextInt(NCs.length);
+ locations[i] = NCs[pos];
+ }
+ }
+ } else {
+ int pos = random.nextInt(NCs.length);
+ locations[i] = NCs[pos];
+ }
+ } catch (IOException e) {
+ throw new HyracksException(e);
+ } catch (InterruptedException e) {
+ throw new HyracksException(e);
+ }
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
+ }
+
+ /**
+ * set location constraint
+ *
+ * @param spec
+ * @param operator
+ * @throws HyracksDataException
+ */
+ public static void setLocationConstraint(JobSpecification spec, IOperatorDescriptor operator)
+ throws HyracksException {
+ if (stores == null) {
+ loadStores();
+ }
+ int count = 0;
+ String[] locations = new String[NCs.length * stores.length];
+ for (String nc : NCs) {
+ for (int i = 0; i < stores.length; i++) {
+ locations[count] = nc;
+ count++;
+ }
+ }
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, operator, locations);
+ }
+
+ /**
+ * set location constraint
+ *
+ * @param spec
+ * @param operator
+ * @throws HyracksDataException
+ */
+ public static void setCountConstraint(JobSpecification spec, IOperatorDescriptor operator) throws HyracksException {
+ if (stores == null) {
+ loadStores();
+ }
+ int count = NCs.length * stores.length;
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, operator, count);
+ }
+
+ public static void loadClusterConfig(String ipAddress, int port) throws HyracksException {
+ try {
+ IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);
+ Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();
+ NCs = new String[ncNameToNcInfos.size()];
+ ipToNcMapping = new HashMap<String, List<String>>();
+ int i = 0;
+ for (Map.Entry<String, NodeControllerInfo> entry : ncNameToNcInfos.entrySet()) {
+ String ipAddr = InetAddress.getByAddress(entry.getValue().getNetworkAddress().getIpAddress())
+ .getHostAddress();
+ List<String> matchedNCs = ipToNcMapping.get(ipAddr);
+ if (matchedNCs == null) {
+ matchedNCs = new ArrayList<String>();
+ ipToNcMapping.put(ipAddr, matchedNCs);
+ }
+ matchedNCs.add(entry.getKey());
+ NCs[i] = entry.getKey();
+ i++;
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/provider/NormalizedKeyComputerFactoryProvider.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/provider/NormalizedKeyComputerFactoryProvider.java
new file mode 100644
index 0000000..1c331c0
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/provider/NormalizedKeyComputerFactoryProvider.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.pregelix.core.jobgen.provider;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.pregelix.core.base.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.pregelix.runtime.touchpoint.VLongAscNormalizedKeyComputerFactory;
+import edu.uci.ics.pregelix.runtime.touchpoint.VLongDescNormalizedKeyComputerFactory;
+
+public class NormalizedKeyComputerFactoryProvider implements INormalizedKeyComputerFactoryProvider {
+
+ public static INormalizedKeyComputerFactoryProvider INSTANCE = new NormalizedKeyComputerFactoryProvider();
+
+ private NormalizedKeyComputerFactoryProvider() {
+
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public INormalizedKeyComputerFactory getAscINormalizedKeyComputerFactory(Class keyClass) {
+ if (keyClass.getName().indexOf("VLongWritable") > 0)
+ return new VLongAscNormalizedKeyComputerFactory();
+ else
+ return null;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public INormalizedKeyComputerFactory getDescINormalizedKeyComputerFactory(Class keyClass) {
+ if (keyClass.getName().indexOf("VLongWritable") > 0)
+ return new VLongDescNormalizedKeyComputerFactory();
+ else
+ return null;
+ }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
new file mode 100644
index 0000000..d1d927d
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/runtime/touchpoint/WritableRecordDescriptorFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.runtime.touchpoint;
+
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+
+public class WritableRecordDescriptorFactory implements IRecordDescriptorFactory {
+ private static final long serialVersionUID = 1L;
+ private String[] fieldClasses;
+
+ public WritableRecordDescriptorFactory(String... fieldClasses) {
+ this.fieldClasses = fieldClasses;
+ }
+
+ @Override
+ public RecordDescriptor createRecordDescriptor() throws HyracksDataException {
+ try {
+ return DataflowUtils.getRecordDescriptorFromWritableClasses(fieldClasses);
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/BufferSerDeUtils.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/BufferSerDeUtils.java
new file mode 100644
index 0000000..3f15197
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/BufferSerDeUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+public class BufferSerDeUtils {
+
+ public static double getDouble(byte[] bytes, int offset) {
+ return Double.longBitsToDouble(getLong(bytes, offset));
+ }
+
+ public static float getFloat(byte[] bytes, int offset) {
+ return Float.intBitsToFloat(getInt(bytes, offset));
+ }
+
+ public static boolean getBoolean(byte[] bytes, int offset) {
+ if (bytes[offset] == 0)
+ return false;
+ else
+ return true;
+ }
+
+ public static int getInt(byte[] bytes, int offset) {
+ return ((bytes[offset] & 0xff) << 24) + ((bytes[offset + 1] & 0xff) << 16) + ((bytes[offset + 2] & 0xff) << 8)
+ + ((bytes[offset + 3] & 0xff) << 0);
+ }
+
+ public static long getLong(byte[] bytes, int offset) {
+ return (((long) (bytes[offset] & 0xff)) << 56) + (((long) (bytes[offset + 1] & 0xff)) << 48)
+ + (((long) (bytes[offset + 2] & 0xff)) << 40) + (((long) (bytes[offset + 3] & 0xff)) << 32)
+ + (((long) (bytes[offset + 4] & 0xff)) << 24) + (((long) (bytes[offset + 5] & 0xff)) << 16)
+ + (((long) (bytes[offset + 6] & 0xff)) << 8) + (((long) (bytes[offset + 7] & 0xff)) << 0);
+ }
+
+ public static void writeBoolean(boolean value, byte[] bytes, int offset) {
+ if (value)
+ bytes[offset] = (byte) 1;
+ else
+ bytes[offset] = (byte) 0;
+ }
+
+ public static void writeInt(int value, byte[] bytes, int offset) {
+ bytes[offset++] = (byte) (value >> 24);
+ bytes[offset++] = (byte) (value >> 16);
+ bytes[offset++] = (byte) (value >> 8);
+ bytes[offset++] = (byte) (value);
+ }
+
+ public static void writeLong(long value, byte[] bytes, int offset) {
+ bytes[offset++] = (byte) (value >> 56);
+ bytes[offset++] = (byte) (value >> 48);
+ bytes[offset++] = (byte) (value >> 40);
+ bytes[offset++] = (byte) (value >> 32);
+ bytes[offset++] = (byte) (value >> 24);
+ bytes[offset++] = (byte) (value >> 16);
+ bytes[offset++] = (byte) (value >> 8);
+ bytes[offset++] = (byte) (value);
+ }
+
+ public static void writeDouble(double value, byte[] bytes, int offset) {
+ long lValue = Double.doubleToLongBits(value);
+ writeLong(lValue, bytes, offset);
+ }
+
+ public static void writeFloat(float value, byte[] bytes, int offset) {
+ int iValue = Float.floatToIntBits(value);
+ writeInt(iValue, bytes, offset);
+ }
+
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataBalancer.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataBalancer.java
new file mode 100644
index 0000000..d0cff48
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataBalancer.java
@@ -0,0 +1,68 @@
+package edu.uci.ics.pregelix.core.util;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+@SuppressWarnings("deprecation")
+public class DataBalancer {
+
+ public static class MapRecordOnly extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
+
+ public void map(LongWritable id, Text inputValue, OutputCollector<LongWritable, Text> output, Reporter reporter)
+ throws IOException {
+ output.collect(id, inputValue);
+ }
+ }
+
+ public static class ReduceRecordOnly extends MapReduceBase implements
+ Reducer<LongWritable, Text, NullWritable, Text> {
+
+ NullWritable key = NullWritable.get();
+
+ public void reduce(LongWritable inputKey, Iterator<Text> inputValue,
+ OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+ while (inputValue.hasNext())
+ output.collect(key, inputValue.next());
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ JobConf job = new JobConf(DataBalancer.class);
+
+ job.setJobName(DataBalancer.class.getSimpleName());
+ job.setMapperClass(MapRecordOnly.class);
+ job.setReducerClass(ReduceRecordOnly.class);
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setInputFormat(TextInputFormat.class);
+ FileInputFormat.setInputPaths(job, args[0]);
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+ job.setNumReduceTasks(Integer.parseInt(args[2]));
+
+ if (args.length > 3) {
+ if (args[3].startsWith("bzip"))
+ FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
+ if (args[3].startsWith("gz"))
+ FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+ }
+ JobClient.runJob(job);
+ }
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataGenerator.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataGenerator.java
new file mode 100644
index 0000000..aa63edf
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataGenerator.java
@@ -0,0 +1,213 @@
+package edu.uci.ics.pregelix.core.util;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+/**
+ * generate graph data from a base dataset
+ *
+ */
+@SuppressWarnings("deprecation")
+public class DataGenerator {
+
+ public static class MapMaxId extends MapReduceBase implements
+ Mapper<LongWritable, Text, NullWritable, VLongWritable> {
+ private NullWritable key = NullWritable.get();
+ private VLongWritable value = new VLongWritable();
+
+ @Override
+ public void map(LongWritable id, Text inputValue, OutputCollector<NullWritable, VLongWritable> output,
+ Reporter reporter) throws IOException {
+ String[] vertices = inputValue.toString().split(" ");
+ long max = Long.parseLong(vertices[0]);
+ for (int i = 1; i < vertices.length; i++) {
+ long vid = Long.parseLong(vertices[i]);
+ if (vid > max)
+ max = vid;
+ }
+ value.set(max);
+ output.collect(key, value);
+ }
+ }
+
+ public static class ReduceMaxId extends MapReduceBase implements
+ Reducer<NullWritable, VLongWritable, NullWritable, Text> {
+
+ private NullWritable key = NullWritable.get();
+ private long max = Long.MIN_VALUE;
+ private OutputCollector<NullWritable, Text> output;
+
+ @Override
+ public void reduce(NullWritable inputKey, Iterator<VLongWritable> inputValue,
+ OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+ while (inputValue.hasNext()) {
+ long vid = inputValue.next().get();
+ if (vid > max)
+ max = vid;
+ }
+ if (this.output == null)
+ this.output = output;
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ output.collect(key, new Text(new VLongWritable(max).toString()));
+ }
+ }
+
+ public static class CombineMaxId extends MapReduceBase implements
+ Reducer<NullWritable, VLongWritable, NullWritable, VLongWritable> {
+
+ private NullWritable key = NullWritable.get();
+ private long max = Long.MIN_VALUE;
+ private OutputCollector<NullWritable, VLongWritable> output;
+
+ @Override
+ public void reduce(NullWritable inputKey, Iterator<VLongWritable> inputValue,
+ OutputCollector<NullWritable, VLongWritable> output, Reporter reporter) throws IOException {
+ while (inputValue.hasNext()) {
+ long vid = inputValue.next().get();
+ if (vid > max)
+ max = vid;
+ }
+ if (this.output == null)
+ this.output = output;
+ }
+
+ public void close() throws IOException {
+ output.collect(key, new VLongWritable(max));
+ }
+ }
+
+ public static class MapRecordGen extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
+
+ private long maxId = 0;
+ private Text text = new Text();
+ private int x = 2;
+
+ @Override
+ public void configure(JobConf conf) {
+ try {
+ x = conf.getInt("hyracks.x", 2);
+ String fileName = conf.get("hyracks.maxid.file");
+ FileSystem dfs = FileSystem.get(conf);
+ dfs.delete(new Path(fileName + "/_SUCCESS"), true);
+ dfs.delete(new Path(fileName + "/_logs"), true);
+ FileStatus[] files = dfs.listStatus(new Path(fileName));
+
+ for (int i = 0; i < files.length; i++) {
+ if (!files[i].isDir()) {
+ DataInputStream input = dfs.open(files[i].getPath());
+ String id = input.readLine();
+ maxId = Long.parseLong(id) + 1;
+ input.close();
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void map(LongWritable id, Text inputValue, OutputCollector<LongWritable, Text> output, Reporter reporter)
+ throws IOException {
+ String[] vertices = inputValue.toString().split(" ");
+
+ /**
+ * generate data x times
+ */
+ for (int k = 0; k < x; k++) {
+ long max = maxId * k;
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < vertices.length - 1; i++) {
+ long vid = Long.parseLong(vertices[i]) + max;
+ sb.append(vid);
+ sb.append(" ");
+ }
+ long vid = Long.parseLong(vertices[vertices.length - 1]) + max;
+ sb.append(vid);
+ text.set(sb.toString().getBytes());
+ output.collect(id, text);
+ }
+ }
+ }
+
+ public static class ReduceRecordGen extends MapReduceBase implements
+ Reducer<LongWritable, Text, NullWritable, Text> {
+
+ private NullWritable key = NullWritable.get();
+
+ public void reduce(LongWritable inputKey, Iterator<Text> inputValue,
+ OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException {
+ while (inputValue.hasNext())
+ output.collect(key, inputValue.next());
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+
+ JobConf job = new JobConf(DataGenerator.class);
+ FileSystem dfs = FileSystem.get(job);
+ String maxFile = "/maxtemp";
+ dfs.delete(new Path(maxFile), true);
+
+ job.setJobName(DataGenerator.class.getSimpleName() + "max ID");
+ job.setMapperClass(MapMaxId.class);
+ job.setCombinerClass(CombineMaxId.class);
+ job.setReducerClass(ReduceMaxId.class);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(VLongWritable.class);
+
+ job.setInputFormat(TextInputFormat.class);
+ FileInputFormat.setInputPaths(job, args[0]);
+ FileOutputFormat.setOutputPath(job, new Path(maxFile));
+ job.setNumReduceTasks(1);
+ JobClient.runJob(job);
+
+ job = new JobConf(DataGenerator.class);
+ job.set("hyracks.maxid.file", maxFile);
+ job.setInt("hyracks.x", Integer.parseInt(args[2]));
+ dfs.delete(new Path(args[1]), true);
+
+ job.setJobName(DataGenerator.class.getSimpleName());
+ job.setMapperClass(MapRecordGen.class);
+ job.setReducerClass(ReduceRecordGen.class);
+ job.setMapOutputKeyClass(LongWritable.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setInputFormat(TextInputFormat.class);
+ FileInputFormat.setInputPaths(job, args[0]);
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+ job.setNumReduceTasks(Integer.parseInt(args[3]));
+
+ if (args.length > 4) {
+ if (args[4].startsWith("bzip"))
+ FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
+ if (args[4].startsWith("gz"))
+ FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+ }
+ JobClient.runJob(job);
+ }
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
new file mode 100644
index 0000000..f17802b
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/DataflowUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.pregelix.core.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.runtime.base.IAggregateFunctionFactory;
+import edu.uci.ics.pregelix.runtime.simpleagg.AccumulatingAggregatorFactory;
+import edu.uci.ics.pregelix.runtime.simpleagg.AggregationFunctionFactory;
+
+public class DataflowUtils {
+
+ public enum AggregationMode {
+ PARTIAL, FINAL
+ }
+
+ @SuppressWarnings("unchecked")
+ public static RecordDescriptor getRecordDescriptorFromKeyValueClasses(String className1, String className2)
+ throws HyracksException {
+ RecordDescriptor recordDescriptor = null;
+ try {
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) Class.forName(className1),
+ (Class<? extends Writable>) Class.forName(className2));
+ } catch (ClassNotFoundException cnfe) {
+ throw new HyracksException(cnfe);
+ }
+ return recordDescriptor;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public static RecordDescriptor getRecordDescriptorFromWritableClasses(String... classNames) throws HyracksException {
+ RecordDescriptor recordDescriptor = null;
+ ISerializerDeserializer[] serdes = new ISerializerDeserializer[classNames.length];
+ try {
+ int i = 0;
+ for (String className : classNames)
+ serdes[i++] = DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) Class
+ .forName(className));
+ } catch (ClassNotFoundException cnfe) {
+ throw new HyracksException(cnfe);
+ }
+ recordDescriptor = new RecordDescriptor(serdes);
+ return recordDescriptor;
+ }
+
+ public static IRecordDescriptorFactory getWritableRecordDescriptorFactoryFromWritableClasses(String... classNames)
+ throws HyracksException {
+ IRecordDescriptorFactory rdFactory = new WritableRecordDescriptorFactory(classNames);
+ return rdFactory;
+ }
+
+ public static IAggregatorDescriptorFactory getAccumulatingAggregatorFactory(Configuration conf, boolean isFinal) {
+ IAggregateFunctionFactory aggFuncFactory = new AggregationFunctionFactory(new ConfigurationFactory(conf),
+ isFinal);
+ IAggregatorDescriptorFactory aggregatorFactory = new AccumulatingAggregatorFactory(
+ new IAggregateFunctionFactory[] { aggFuncFactory }, new int[] { 0 }, new int[] {});
+ return aggregatorFactory;
+ }
+}
diff --git a/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
new file mode 100644
index 0000000..ed04746
--- /dev/null
+++ b/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+
+public class PregelixHyracksIntegrationUtil {
+
+ public static final String NC1_ID = "nc1";
+ public static final String NC2_ID = "nc2";
+
+ public static final int DEFAULT_HYRACKS_CC_PORT = 1099;
+ public static final int TEST_HYRACKS_CC_PORT = 1099;
+ public static final int TEST_HYRACKS_CC_CLIENT_PORT = 2099;
+ public static final String CC_HOST = "localhost";
+
+ public static final int FRAME_SIZE = 65536;
+
+ private static ClusterControllerService cc;
+ private static NodeControllerService nc1;
+ private static NodeControllerService nc2;
+ private static IHyracksClientConnection hcc;
+
+ public static void init() throws Exception {
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clientNetIpAddress = CC_HOST;
+ ccConfig.clusterNetIpAddress = CC_HOST;
+ ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
+ ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
+ ccConfig.defaultMaxJobAttempts = 0;
+ ccConfig.jobHistorySize = 10;
+
+ // cluster controller
+ cc = new ClusterControllerService(ccConfig);
+ cc.start();
+
+ // two node controllers
+ NCConfig ncConfig1 = new NCConfig();
+ ncConfig1.ccHost = "localhost";
+ ncConfig1.clusterNetIPAddress = "localhost";
+ ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
+ ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.nodeId = NC1_ID;
+ nc1 = new NodeControllerService(ncConfig1);
+ nc1.start();
+
+ NCConfig ncConfig2 = new NCConfig();
+ ncConfig2.ccHost = "localhost";
+ ncConfig2.clusterNetIPAddress = "localhost";
+ ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
+ ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.nodeId = NC2_ID;
+ nc2 = new NodeControllerService(ncConfig2);
+ nc2.start();
+
+ // hyracks connection
+ hcc = new HyracksConnection(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
+ ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
+ }
+
+ public static void destroyApp(String hyracksAppName) throws Exception {
+ hcc.destroyApplication(hyracksAppName);
+ }
+
+ public static void createApp(String hyracksAppName) throws Exception {
+ hcc.createApplication(hyracksAppName, null);
+ }
+
+ public static void deinit() throws Exception {
+ nc2.stop();
+ nc1.stop();
+ cc.stop();
+ }
+
+ public static void runJob(JobSpecification spec, String appName) throws Exception {
+ spec.setFrameSize(FRAME_SIZE);
+ JobId jobId = hcc.startJob(appName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ hcc.waitForCompletion(jobId);
+ }
+
+}
diff --git a/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java b/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
new file mode 100644
index 0000000..1fdfde6
--- /dev/null
+++ b/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Names a file or directory in a {@link FileSystem}. Path strings use slash as
+ * the directory separator. A path string is absolute if it begins with a slash.
+ */
+@SuppressWarnings("rawtypes")
+public class Path implements Comparable, Serializable {
+ private static final long serialVersionUID = 1L;
+ /** The directory separator, a slash. */
+ public static final String SEPARATOR = "/";
+ public static final char SEPARATOR_CHAR = '/';
+
+ public static final String CUR_DIR = ".";
+
+ static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
+
+ private URI uri; // a hierarchical uri
+
+ /** Resolve a child path against a parent path. */
+ public Path(String parent, String child) {
+ this(new Path(parent), new Path(child));
+ }
+
+ /** Resolve a child path against a parent path. */
+ public Path(Path parent, String child) {
+ this(parent, new Path(child));
+ }
+
+ /** Resolve a child path against a parent path. */
+ public Path(String parent, Path child) {
+ this(new Path(parent), child);
+ }
+
+ /** Resolve a child path against a parent path. */
+ public Path(Path parent, Path child) {
+ // Add a slash to parent's path so resolution is compatible with URI's
+ URI parentUri = parent.uri;
+ String parentPath = parentUri.getPath();
+ if (!(parentPath.equals("/") || parentPath.equals("")))
+ try {
+ parentUri = new URI(parentUri.getScheme(), parentUri.getAuthority(), parentUri.getPath() + "/", null,
+ parentUri.getFragment());
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ URI resolved = parentUri.resolve(child.uri);
+ initialize(resolved.getScheme(), resolved.getAuthority(), normalizePath(resolved.getPath()),
+ resolved.getFragment());
+ }
+
+ private void checkPathArg(String path) {
+ // disallow construction of a Path from an empty string
+ if (path == null) {
+ throw new IllegalArgumentException("Can not create a Path from a null string");
+ }
+ if (path.length() == 0) {
+ throw new IllegalArgumentException("Can not create a Path from an empty string");
+ }
+ }
+
+ /**
+ * Construct a path from a String. Path strings are URIs, but with unescaped
+ * elements and some additional normalization.
+ */
+ public Path(String pathString) {
+ checkPathArg(pathString);
+
+ // We can't use 'new URI(String)' directly, since it assumes things are
+ // escaped, which we don't require of Paths.
+
+ // add a slash in front of paths with Windows drive letters
+ if (hasWindowsDrive(pathString, false))
+ pathString = "/" + pathString;
+
+ // parse uri components
+ String scheme = null;
+ String authority = null;
+
+ int start = 0;
+
+ // parse uri scheme, if any
+ int colon = pathString.indexOf(':');
+ int slash = pathString.indexOf('/');
+ if ((colon != -1) && ((slash == -1) || (colon < slash))) { // has a
+ // scheme
+ scheme = pathString.substring(0, colon);
+ start = colon + 1;
+ }
+
+ // parse uri authority, if any
+ if (pathString.startsWith("//", start) && (pathString.length() - start > 2)) { // has
+ // authority
+ int nextSlash = pathString.indexOf('/', start + 2);
+ int authEnd = nextSlash > 0 ? nextSlash : pathString.length();
+ authority = pathString.substring(start + 2, authEnd);
+ start = authEnd;
+ }
+
+ // uri path is the rest of the string -- query & fragment not supported
+ String path = pathString.substring(start, pathString.length());
+
+ initialize(scheme, authority, path, null);
+ }
+
+ /** Construct a Path from components. */
+ public Path(String scheme, String authority, String path) {
+ checkPathArg(path);
+ initialize(scheme, authority, path, null);
+ }
+
+ /**
+ * Construct a path from a URI
+ */
+ public Path(URI aUri) {
+ uri = aUri;
+ }
+
+ private void initialize(String scheme, String authority, String path, String fragment) {
+ try {
+ this.uri = new URI(scheme, authority, normalizePath(path), null, fragment).normalize();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private String normalizePath(String path) {
+ // remove double slashes & backslashes
+ if (path.indexOf("//") != -1) {
+ path = path.replace("//", "/");
+ }
+ if (path.indexOf("\\") != -1) {
+ path = path.replace("\\", "/");
+ }
+
+ // trim trailing slash from non-root path (ignoring windows drive)
+ int minLength = hasWindowsDrive(path, true) ? 4 : 1;
+ if (path.length() > minLength && path.endsWith("/")) {
+ path = path.substring(0, path.length() - 1);
+ }
+
+ return path;
+ }
+
+ private boolean hasWindowsDrive(String path, boolean slashed) {
+ if (!WINDOWS)
+ return false;
+ int start = slashed ? 1 : 0;
+ return path.length() >= start + 2
+ && (slashed ? path.charAt(0) == '/' : true)
+ && path.charAt(start + 1) == ':'
+ && ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') || (path.charAt(start) >= 'a' && path
+ .charAt(start) <= 'z'));
+ }
+
+ /** Convert this to a URI. */
+ public URI toUri() {
+ return uri;
+ }
+
+ /** Return the FileSystem that owns this Path. */
+ public FileSystem getFileSystem(Configuration conf) throws IOException {
+ return FileSystem.get(this.toUri(), conf);
+ }
+
+ /** True if the directory of this path is absolute. */
+ public boolean isAbsolute() {
+ int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0;
+ return uri.getPath().startsWith(SEPARATOR, start);
+ }
+
+ /** Returns the final component of this path. */
+ public String getName() {
+ String path = uri.getPath();
+ int slash = path.lastIndexOf(SEPARATOR);
+ return path.substring(slash + 1);
+ }
+
+ /** Returns the parent of a path or null if at root. */
+ public Path getParent() {
+ String path = uri.getPath();
+ int lastSlash = path.lastIndexOf('/');
+ int start = hasWindowsDrive(path, true) ? 3 : 0;
+ if ((path.length() == start) || // empty path
+ (lastSlash == start && path.length() == start + 1)) { // at root
+ return null;
+ }
+ String parent;
+ if (lastSlash == -1) {
+ parent = CUR_DIR;
+ } else {
+ int end = hasWindowsDrive(path, true) ? 3 : 0;
+ parent = path.substring(0, lastSlash == end ? end + 1 : lastSlash);
+ }
+ return new Path(uri.getScheme(), uri.getAuthority(), parent);
+ }
+
+ /** Adds a suffix to the final name in the path. */
+ public Path suffix(String suffix) {
+ return new Path(getParent(), getName() + suffix);
+ }
+
+ public String toString() {
+ // we can't use uri.toString(), which escapes everything, because we
+ // want
+ // illegal characters unescaped in the string, for glob processing, etc.
+ StringBuffer buffer = new StringBuffer();
+ if (uri.getScheme() != null) {
+ buffer.append(uri.getScheme());
+ buffer.append(":");
+ }
+ if (uri.getAuthority() != null) {
+ buffer.append("//");
+ buffer.append(uri.getAuthority());
+ }
+ if (uri.getPath() != null) {
+ String path = uri.getPath();
+ if (path.indexOf('/') == 0 && hasWindowsDrive(path, true) && // has
+ // windows
+ // drive
+ uri.getScheme() == null && // but no scheme
+ uri.getAuthority() == null) // or authority
+ path = path.substring(1); // remove slash before drive
+ buffer.append(path);
+ }
+ if (uri.getFragment() != null) {
+ buffer.append("#");
+ buffer.append(uri.getFragment());
+ }
+ return buffer.toString();
+ }
+
+ public boolean equals(Object o) {
+ if (!(o instanceof Path)) {
+ return false;
+ }
+ Path that = (Path) o;
+ return this.uri.equals(that.uri);
+ }
+
+ public int hashCode() {
+ return uri.hashCode();
+ }
+
+ public int compareTo(Object o) {
+ Path that = (Path) o;
+ return this.uri.compareTo(that.uri);
+ }
+
+ /** Return the number of elements in this path. */
+ public int depth() {
+ String path = uri.getPath();
+ int depth = 0;
+ int slash = path.length() == 1 && path.charAt(0) == '/' ? -1 : 0;
+ while (slash != -1) {
+ depth++;
+ slash = path.indexOf(SEPARATOR, slash + 1);
+ }
+ return depth;
+ }
+
+ /** Returns a qualified path object. */
+ public Path makeQualified(FileSystem fs) {
+ Path path = this;
+ if (!isAbsolute()) {
+ path = new Path(fs.getWorkingDirectory(), this);
+ }
+
+ URI pathUri = path.toUri();
+ URI fsUri = fs.getUri();
+
+ String scheme = pathUri.getScheme();
+ String authority = pathUri.getAuthority();
+ String fragment = pathUri.getFragment();
+ if (scheme != null && (authority != null || fsUri.getAuthority() == null))
+ return path;
+
+ if (scheme == null) {
+ scheme = fsUri.getScheme();
+ }
+
+ if (authority == null) {
+ authority = fsUri.getAuthority();
+ if (authority == null) {
+ authority = "";
+ }
+ }
+
+ URI newUri = null;
+ try {
+ newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ return new Path(newUri);
+ }
+
+ /** Returns a qualified path object. */
+ public Path makeQualified(URI defaultUri, Path workingDir) {
+ Path path = this;
+ if (!isAbsolute()) {
+ path = new Path(workingDir, this);
+ }
+
+ URI pathUri = path.toUri();
+
+ String scheme = pathUri.getScheme();
+ String authority = pathUri.getAuthority();
+ String fragment = pathUri.getFragment();
+
+ if (scheme != null && (authority != null || defaultUri.getAuthority() == null))
+ return path;
+
+ if (scheme == null) {
+ scheme = defaultUri.getScheme();
+ }
+
+ if (authority == null) {
+ authority = defaultUri.getAuthority();
+ if (authority == null) {
+ authority = "";
+ }
+ }
+
+ URI newUri = null;
+ try {
+ newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ return new Path(newUri);
+ }
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java b/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
new file mode 100644
index 0000000..6643ab5
--- /dev/null
+++ b/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * <code>InputSplit</code> represents the data to be processed by an individual
+ * {@link Mapper}.
+ *
+ * <p>
+ * Typically, it presents a byte-oriented view on the input and is the
+ * responsibility of {@link RecordReader} of the job to process this and present
+ * a record-oriented view.
+ *
+ * @see InputFormat
+ * @see RecordReader
+ */
+public abstract class InputSplit implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Get the size of the split, so that the input splits can be sorted by
+ * size.
+ *
+ * @return the number of bytes in the split
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract long getLength() throws IOException, InterruptedException;
+
+ /**
+ * Get the list of nodes by name where the data for the split would be
+ * local. The locations do not need to be serialized.
+ *
+ * @return a new array of the node nodes.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract String[] getLocations() throws IOException, InterruptedException;
+}
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/conf/cluster.properties b/pregelix-core/src/main/resources/conf/cluster.properties
new file mode 100644
index 0000000..74a98b0
--- /dev/null
+++ b/pregelix-core/src/main/resources/conf/cluster.properties
@@ -0,0 +1,37 @@
+#The CC port for Hyracks clients
+CC_CLIENTPORT=3099
+
+#The CC port for Hyracks cluster management
+CC_CLUSTERPORT=1099
+
+#The directory of hyracks binaries
+HYRACKS_HOME=~/workspace/hyracks_asterix_stabilization
+
+#The tmp directory for cc to install jars
+CCTMP_DIR=/tmp/t1
+
+#The tmp directory for nc to install jars
+NCTMP_DIR=/tmp/t2
+
+#The directory to put cc logs
+CCLOGS_DIR=$CCTMP_DIR/logs
+
+#The directory to put nc logs
+NCLOGS_DIR=$NCTMP_DIR/logs
+
+#Comma separated I/O directories for the spilling of external sort
+IO_DIRS="/tmp/t3,/tmp/t4"
+
+#The JAVA_HOME
+JAVA_HOME=$JAVA_HOME
+
+#The frame size of the internal dataflow engine
+FRAME_SIZE=65536
+
+#CC JAVA_OPTS
+CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx3g -Djava.util.logging.config.file=logging.properties"
+# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
+
+#NC JAVA_OPTS
+NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx3g -Djava.util.logging.config.file=logging.properties"
+
diff --git a/pregelix-core/src/main/resources/conf/master b/pregelix-core/src/main/resources/conf/master
new file mode 100644
index 0000000..2fbb50c
--- /dev/null
+++ b/pregelix-core/src/main/resources/conf/master
@@ -0,0 +1 @@
+localhost
diff --git a/pregelix-core/src/main/resources/conf/slaves b/pregelix-core/src/main/resources/conf/slaves
new file mode 100644
index 0000000..2fbb50c
--- /dev/null
+++ b/pregelix-core/src/main/resources/conf/slaves
@@ -0,0 +1 @@
+localhost
diff --git a/pregelix-core/src/main/resources/conf/stores.properties b/pregelix-core/src/main/resources/conf/stores.properties
new file mode 100644
index 0000000..d1a4e10
--- /dev/null
+++ b/pregelix-core/src/main/resources/conf/stores.properties
@@ -0,0 +1,2 @@
+#Comma separated directories for storing the partitioned graph on each machine
+store=/tmp/teststore1,/tmp/teststore2
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/hyracks-deployment.properties b/pregelix-core/src/main/resources/hyracks-deployment.properties
new file mode 100644
index 0000000..8c84699
--- /dev/null
+++ b/pregelix-core/src/main/resources/hyracks-deployment.properties
@@ -0,0 +1,17 @@
+#-------------------------------------------------------------------------------
+# /*
+# * Copyright 2009-2010 by The Regents of the University of California
+# * Licensed under the Apache License, Version 2.0 (the "License");
+# * you may not use this file except in compliance with the License.
+# * you may obtain a copy of the License from
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+#-------------------------------------------------------------------------------
+nc.bootstrap.class=edu.uci.ics.pregelix.runtime.bootstrap.NCBootstrapImpl
diff --git a/pregelix-core/src/main/resources/scripts/pregelix b/pregelix-core/src/main/resources/scripts/pregelix
new file mode 100644
index 0000000..275175d
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/pregelix
@@ -0,0 +1,115 @@
+#!/bin/sh
+# ----------------------------------------------------------------------------
+# Copyright 2001-2006 The Apache Software Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ----------------------------------------------------------------------------
+#
+# Copyright (c) 2001-2006 The Apache Software Foundation. All rights
+# reserved.
+
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [ -h "$PRG" ]; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`/"$link"
+ fi
+done
+
+PRGDIR=`dirname "$PRG"`
+BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
+
+
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+case "`uname`" in
+ CYGWIN*) cygwin=true ;;
+ Darwin*) darwin=true
+ if [ -z "$JAVA_VERSION" ] ; then
+ JAVA_VERSION="CurrentJDK"
+ else
+ echo "Using Java version: $JAVA_VERSION"
+ fi
+ if [ -z "$JAVA_HOME" ] ; then
+ JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=`java-config --jre-home`
+ fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
+ [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
+fi
+
+# If a specific java binary isn't specified search for the standard 'java' binary
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD=`which java`
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo "Error: JAVA_HOME is not defined correctly." 1>&2
+ echo " We cannot execute $JAVACMD" 1>&2
+ exit 1
+fi
+
+if [ -z "$REPO" ]
+then
+ REPO="$BASEDIR"/lib
+fi
+
+cp $BASEDIR"/../a-hadoop-patch.jar "$REPO"/
+
+CLASSPATH=$CLASSPATH_PREFIX:"$BASEDIR"/etc:"$REPO"/a-hadoop-patch.jar:"$REPO"/pregelix-api-0.0.1-SNAPSHOT.jar:"$REPO"/hyracks-dataflow-common-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-api-0.2.2-SNAPSHOT.jar:"$REPO"/json-20090211.jar:"$REPO"/httpclient-4.1-alpha2.jar:"$REPO"/httpcore-4.1-beta1.jar:"$REPO"/commons-logging-1.1.1.jar:"$REPO"/commons-codec-1.3.jar:"$REPO"/args4j-2.0.12.jar:"$REPO"/hyracks-ipc-0.2.2-SNAPSHOT.jar:"$REPO"/commons-lang3-3.1.jar:"$REPO"/hyracks-data-std-0.2.2-SNAPSHOT.jar:"$REPO"/hadoop-core-0.20.2.jar:"$REPO"/commons-cli-1.2.jar:"$REPO"/xmlenc-0.52.jar:"$REPO"/commons-httpclient-3.0.1.jar:"$REPO"/commons-net-1.4.1.jar:"$REPO"/oro-2.0.8.jar:"$REPO"/jetty-6.1.14.jar:"$REPO"/jetty-util-6.1.14.jar:"$REPO"/servlet-api-2.5-6.1.14.jar:"$REPO"/jasper-runtime-5.5.12.jar:"$REPO"/jasper-compiler-5.5.12.jar:"$REPO"/jsp-api-2.1-6.1.14.jar:"$REPO"/jsp-2.1-6.1.14.jar:"$REPO"/core-3.1.1.jar:"$REPO"/ant-1.6.5.jar:"$REPO"/commons-el-1.0.jar:"$REPO"/jets3t-0.7.1.jar:"$REPO"/kfs-0.3.jar:"$REPO"/hsqldb-1.8.0.10.jar:"$REPO"/pregelix-dataflow-std-0.0.1-SNAPSHOT.jar:"$REPO"/pregelix-dataflow-std-base-0.0.1-SNAPSHOT.jar:"$REPO"/hyracks-dataflow-std-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-dataflow-hadoop-0.2.2-SNAPSHOT.jar:"$REPO"/dcache-client-0.0.1.jar:"$REPO"/jetty-client-8.0.0.M0.jar:"$REPO"/jetty-http-8.0.0.RC0.jar:"$REPO"/jetty-io-8.0.0.RC0.jar:"$REPO"/jetty-util-8.0.0.RC0.jar:"$REPO"/hyracks-storage-am-common-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-storage-common-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-storage-am-btree-0.2.2-SNAPSHOT.jar:"$REPO"/btreehelper-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-control-cc-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-control-common-0.2.2-SNAPSHOT.jar:"$REPO"/commons-io-1.3.1.jar:"$REPO"/jetty-server-8.0.0.RC0.jar:"$REPO"/servlet-api-3.0.20100224.jar:"$REPO"/jetty-continuation-8.0.0.RC0.jar:"$REPO"/jetty-webapp-8.0.0.RC0.jar:"$REPO"/jetty-xml-8.0.0.RC0.jar:"$REPO"/jetty-servlet-8.0.0.RC0.jar:"$REPO"/jetty-security-8.0.0.RC0.jar:"$REPO"/wicket-core-1.5.2.jar:"$REPO"/wicket-util-1.5.2.jar:"$REPO"/slf4j-api-1.6.1.jar:"$REPO"/wicket-request-1.5.2.jar:"$REPO"/slf4j-jcl-1.6.3.jar:"$REPO"/hyracks-control-nc-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-net-0.2.2-SNAPSHOT.jar:"$REPO"/hyracks-hadoop-compat-0.2.2-SNAPSHOT.jar:"$REPO"/pregelix-dataflow-0.0.1-SNAPSHOT.jar:"$REPO"/pregelix-runtime-0.0.1-SNAPSHOT.jar:"$REPO"/hadoop-test-0.20.2.jar:"$REPO"/ftplet-api-1.0.0.jar:"$REPO"/mina-core-2.0.0-M5.jar:"$REPO"/ftpserver-core-1.0.0.jar:"$REPO"/ftpserver-deprecated-1.0.0-M2.jar:"$REPO"/javax.servlet-api-3.0.1.jar:"$REPO"/pregelix-core-0.0.1-SNAPSHOT.jar
+
+# For Cygwin, switch paths to Windows format before running java
+if $cygwin; then
+ [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
+ [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
+ [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`
+ [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
+ [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
+fi
+
+exec "$JAVACMD" $JAVA_OPTS \
+ -classpath "$CLASSPATH" \
+ -Dapp.name="pregelix" \
+ -Dapp.pid="$$" \
+ -Dapp.repo="$REPO" \
+ -Dapp.home="$BASEDIR" \
+ -Dbasedir="$BASEDIR" \
+ org.apache.hadoop.util.RunJar \
+ "$@"
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/scripts/pregelix.bat b/pregelix-core/src/main/resources/scripts/pregelix.bat
new file mode 100644
index 0000000..536e3c8
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/pregelix.bat
@@ -0,0 +1,110 @@
+@REM ----------------------------------------------------------------------------
+@REM Copyright 2001-2006 The Apache Software Foundation.
+@REM
+@REM Licensed under the Apache License, Version 2.0 (the "License");
+@REM you may not use this file except in compliance with the License.
+@REM You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing, software
+@REM distributed under the License is distributed on an "AS IS" BASIS,
+@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@REM See the License for the specific language governing permissions and
+@REM limitations under the License.
+@REM ----------------------------------------------------------------------------
+@REM
+@REM Copyright (c) 2001-2006 The Apache Software Foundation. All rights
+@REM reserved.
+
+@echo off
+
+set ERROR_CODE=0
+
+:init
+@REM Decide how to startup depending on the version of windows
+
+@REM -- Win98ME
+if NOT "%OS%"=="Windows_NT" goto Win9xArg
+
+@REM set local scope for the variables with windows NT shell
+if "%OS%"=="Windows_NT" @setlocal
+
+@REM -- 4NT shell
+if "%eval[2+2]" == "4" goto 4NTArgs
+
+@REM -- Regular WinNT shell
+set CMD_LINE_ARGS=%*
+goto WinNTGetScriptDir
+
+@REM The 4NT Shell from jp software
+:4NTArgs
+set CMD_LINE_ARGS=%$
+goto WinNTGetScriptDir
+
+:Win9xArg
+@REM Slurp the command line arguments. This loop allows for an unlimited number
+@REM of arguments (up to the command line limit, anyway).
+set CMD_LINE_ARGS=
+:Win9xApp
+if %1a==a goto Win9xGetScriptDir
+set CMD_LINE_ARGS=%CMD_LINE_ARGS% %1
+shift
+goto Win9xApp
+
+:Win9xGetScriptDir
+set SAVEDIR=%CD%
+%0\
+cd %0\..\..
+set BASEDIR=%CD%
+cd %SAVEDIR%
+set SAVE_DIR=
+goto repoSetup
+
+:WinNTGetScriptDir
+set BASEDIR=%~dp0\..
+
+:repoSetup
+
+
+if "%JAVACMD%"=="" set JAVACMD=java
+
+if "%REPO%"=="" set REPO=%BASEDIR%\lib
+
+cp $BASEDIR"\..\a-hadoop-patch.jar "$REPO"\
+
+set CLASSPATH="%BASEDIR%"\etc;"%REPO%"\a-hadoop-patch.jar;"%REPO%"\pregelix-api-0.0.1-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-api-0.2.2-SNAPSHOT.jar;"%REPO%"\json-20090211.jar;"%REPO%"\httpclient-4.1-alpha2.jar;"%REPO%"\httpcore-4.1-beta1.jar;"%REPO%"\commons-logging-1.1.1.jar;"%REPO%"\commons-codec-1.3.jar;"%REPO%"\args4j-2.0.12.jar;"%REPO%"\hyracks-ipc-0.2.2-SNAPSHOT.jar;"%REPO%"\commons-lang3-3.1.jar;"%REPO%"\hyracks-data-std-0.2.2-SNAPSHOT.jar;"%REPO%"\hadoop-core-0.20.2.jar;"%REPO%"\commons-cli-1.2.jar;"%REPO%"\xmlenc-0.52.jar;"%REPO%"\commons-httpclient-3.0.1.jar;"%REPO%"\commons-net-1.4.1.jar;"%REPO%"\oro-2.0.8.jar;"%REPO%"\jetty-6.1.14.jar;"%REPO%"\jetty-util-6.1.14.jar;"%REPO%"\servlet-api-2.5-6.1.14.jar;"%REPO%"\jasper-runtime-5.5.12.jar;"%REPO%"\jasper-compiler-5.5.12.jar;"%REPO%"\jsp-api-2.1-6.1.14.jar;"%REPO%"\jsp-2.1-6.1.14.jar;"%REPO%"\core-3.1.1.jar;"%REPO%"\ant-1.6.5.jar;"%REPO%"\commons-el-1.0.jar;"%REPO%"\jets3t-0.7.1.jar;"%REPO%"\kfs-0.3.jar;"%REPO%"\hsqldb-1.8.0.10.jar;"%REPO%"\pregelix-dataflow-std-0.0.1-SNAPSHOT.jar;"%REPO%"\pregelix-dataflow-std-base-0.0.1-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-std-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-hadoop-0.2.2-SNAPSHOT.jar;"%REPO%"\dcache-client-0.0.1.jar;"%REPO%"\jetty-client-8.0.0.M0.jar;"%REPO%"\jetty-http-8.0.0.RC0.jar;"%REPO%"\jetty-io-8.0.0.RC0.jar;"%REPO%"\jetty-util-8.0.0.RC0.jar;"%REPO%"\hyracks-storage-am-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-storage-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-storage-am-btree-0.2.2-SNAPSHOT.jar;"%REPO%"\btreehelper-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-control-cc-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-control-common-0.2.2-SNAPSHOT.jar;"%REPO%"\commons-io-1.3.1.jar;"%REPO%"\jetty-server-8.0.0.RC0.jar;"%REPO%"\servlet-api-3.0.20100224.jar;"%REPO%"\jetty-continuation-8.0.0.RC0.jar;"%REPO%"\jetty-webapp-8.0.0.RC0.jar;"%REPO%"\jetty-xml-8.0.0.RC0.jar;"%REPO%"\jetty-servlet-8.0.0.RC0.jar;"%REPO%"\jetty-security-8.0.0.RC0.jar;"%REPO%"\wicket-core-1.5.2.jar;"%REPO%"\wicket-util-1.5.2.jar;"%REPO%"\slf4j-api-1.6.1.jar;"%REPO%"\wicket-request-1.5.2.jar;"%REPO%"\slf4j-jcl-1.6.3.jar;"%REPO%"\hyracks-control-nc-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-net-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-hadoop-compat-0.2.2-SNAPSHOT.jar;"%REPO%"\pregelix-dataflow-0.0.1-SNAPSHOT.jar;"%REPO%"\pregelix-runtime-0.0.1-SNAPSHOT.jar;"%REPO%"\hadoop-test-0.20.2.jar;"%REPO%"\ftplet-api-1.0.0.jar;"%REPO%"\mina-core-2.0.0-M5.jar;"%REPO%"\ftpserver-core-1.0.0.jar;"%REPO%"\ftpserver-deprecated-1.0.0-M2.jar;"%REPO%"\javax.servlet-api-3.0.1.jar;"%REPO%"\pregelix-core-0.0.1-SNAPSHOT.jar
+goto endInit
+
+@REM Reaching here means variables are defined and arguments have been captured
+:endInit
+
+%JAVACMD% %JAVA_OPTS% -classpath %CLASSPATH_PREFIX%;%CLASSPATH% -Dapp.name="pregelix" -Dapp.repo="%REPO%" -Dapp.home="%BASEDIR%" -Dbasedir="%BASEDIR%" org.apache.hadoop.util.RunJar %CMD_LINE_ARGS%
+if ERRORLEVEL 1 goto error
+goto end
+
+:error
+if "%OS%"=="Windows_NT" @endlocal
+set ERROR_CODE=%ERRORLEVEL%
+
+:end
+@REM set local scope for the variables with windows NT shell
+if "%OS%"=="Windows_NT" goto endNT
+
+@REM For old DOS remove the set variables from ENV - we assume they were not set
+@REM before we started - at least we don't leave any baggage around
+set CMD_LINE_ARGS=
+goto postExec
+
+:endNT
+@REM If error code is set to 1 then the endlocal was done already in :error.
+if %ERROR_CODE% EQU 0 @endlocal
+
+
+:postExec
+
+if "%FORCE_EXIT_ON_ERROR%" == "on" (
+ if %ERROR_CODE% NEQ 0 exit %ERROR_CODE%
+)
+
+exit /B %ERROR_CODE%
\ No newline at end of file
diff --git a/pregelix-core/src/main/resources/scripts/startAllNCs.sh b/pregelix-core/src/main/resources/scripts/startAllNCs.sh
new file mode 100644
index 0000000..64b7fcf
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/startAllNCs.sh
@@ -0,0 +1,6 @@
+PREGELIX_PATH=`pwd`
+
+for i in `cat conf/slaves`
+do
+ ssh $i "cd ${PREGELIX_PATH}; echo `pwd`; bin/startnc.sh"
+done
diff --git a/pregelix-core/src/main/resources/scripts/startCluster.sh b/pregelix-core/src/main/resources/scripts/startCluster.sh
new file mode 100644
index 0000000..bc1e7c4
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/startCluster.sh
@@ -0,0 +1,3 @@
+bin/startcc.sh
+sleep 10
+bin/startAllNCs.sh
diff --git a/pregelix-core/src/main/resources/scripts/startcc.sh b/pregelix-core/src/main/resources/scripts/startcc.sh
new file mode 100644
index 0000000..aac9e60
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/startcc.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+
+#Import cluster properties
+. conf/cluster.properties
+
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CCHOST=`dig +short $CCHOST_NAME`
+
+#Remove the temp dir
+rm -rf $CCTMP_DIR
+mkdir $CCTMP_DIR
+
+#Remove the logs dir
+rm -rf $CCLOGS_DIR
+mkdir $CCLOGS_DIR
+
+#Export JAVA_HOME and JAVA_OPTS
+export JAVA_HOME=$JAVA_HOME
+export JAVA_OPTS=$CCJAVA_OPTS
+
+echo $JAVA_HOME
+echo $JAVA_OPTS
+
+#Launch hyracks cc script
+chmod -R 755 $HYRACKS_HOME
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 &> $CCLOGS_DIR/cc.log &
diff --git a/pregelix-core/src/main/resources/scripts/startnc.sh b/pregelix-core/src/main/resources/scripts/startnc.sh
new file mode 100644
index 0000000..1da6248
--- /dev/null
+++ b/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -0,0 +1,47 @@
+hostname
+
+#Get the IP address of the cc
+CCHOST_NAME=`cat conf/master`
+CCHOST=`dig +short $CCHOST_NAME`
+
+#Import cluster properties
+. conf/cluster.properties
+
+#Clean up temp dir
+
+rm -rf $NCTMP_DIR
+mkdir $NCTMP_DIR
+
+#Clean up log dir
+rm -rf $NCLOGS_DIR
+mkdir $NCLOGS_DIR
+
+
+#Clean up I/O working dir
+io_dirs=$(echo $IO_DIRS | tr "," "\n")
+for io_dir in $io_dirs
+do
+ rm -rf $io_dir
+ mkdir $io_dir
+done
+
+#Set JAVA_HOME
+export JAVA_HOME=$JAVA_HOME
+
+#Get IP Address
+IPADDR=`/sbin/ifconfig eth0 | grep "inet addr" | awk '{print $2}' | cut -f 2 -d ':'`
+#HOSTNAME=`hostname`
+#echo $HOSTNAME
+#IPADDR=`dig +short ${HOSTNAME}`
+NODEID=`hostname | cut -d '.' -f 1`
+
+#Set JAVA_OPTS
+export JAVA_OPTS=$NCJAVA_OPTS
+
+#Enter the temp dir
+cd $NCTMP_DIR
+
+#Launch hyracks nc
+chmod -R 755 $HYRACKS_HOME
+echo $HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" -frame-size $FRAME_SIZE
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &