cross merge fullstack_release_candidate into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@3208 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-core/pom.xml b/fullstack/pregelix/pregelix-core/pom.xml
index 17ea6c3..5238068 100644
--- a/fullstack/pregelix/pregelix-core/pom.xml
+++ b/fullstack/pregelix/pregelix-core/pom.xml
@@ -1,4 +1,5 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-core</artifactId>
<packaging>jar</packaging>
@@ -19,6 +20,7 @@
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
<executions>
<execution>
<id>balancer</id>
@@ -38,11 +40,6 @@
</includes>
</configuration>
</execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
<execution>
<id>generator</id>
<goals>
@@ -64,36 +61,19 @@
</executions>
</plugin>
<plugin>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <id>patch</id>
- <goals>
- <goal>jar</goal>
- </goals>
- <phase>package</phase>
- <configuration>
- <classifier>patch</classifier>
- <finalName>a-hadoop</finalName>
- <includes>
- <include>**/org/apache/**</include>
- </includes>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>appassembler-maven-plugin</artifactId>
+ <version>1.3</version>
<executions>
<execution>
<configuration>
@@ -166,25 +146,6 @@
</resources>
</configuration>
</execution>
- <execution>
- <id>copy-hadoop-patch</id>
- <!-- here the phase you need -->
- <phase>package</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>target/appassembler/lib</outputDirectory>
- <resources>
- <resource>
- <directory>target</directory>
- <includes>
- <include>a-hadoop-patch.jar</include>
- </includes>
- </resource>
- </resources>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
@@ -193,7 +154,8 @@
<version>2.7.2</version>
<configuration>
<forkMode>pertest</forkMode>
- <argLine>-enableassertions -Xmx512m -XX:MaxPermSize=300m -Dfile.encoding=UTF-8
+ <argLine>-enableassertions -Xmx512m -XX:MaxPermSize=300m
+ -Dfile.encoding=UTF-8
-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
<includes>
<include>**/*TestSuite.java</include>
@@ -203,6 +165,7 @@
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version>
<configuration>
<filesets>
<fileset>
@@ -282,11 +245,6 @@
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-data-std</artifactId>
<version>0.2.3-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
@@ -319,13 +277,6 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-test</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>com.kenai.nbpwr</groupId>
<artifactId>org-apache-commons-io</artifactId>
<version>1.3.1-201002241208</version>
@@ -335,7 +286,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks.examples</groupId>
<artifactId>hyracks-integration-tests</artifactId>
- <version>0.2.1</version>
+ <version>0.2.3-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 1b6f195..3a4c41b 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -72,22 +72,29 @@
public void runJob(PregelixJob job, Plan planChoice, String ipAddress, int port, boolean profiling)
throws HyracksException {
applicationName = exampleClass.getSimpleName() + UUID.randomUUID();
- /** add hadoop configurations */
- URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
- job.getConfiguration().addResource(hadoopCore);
- URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
- job.getConfiguration().addResource(hadoopMapRed);
- URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
- job.getConfiguration().addResource(hadoopHdfs);
- ClusterConfig.loadClusterConfig(ipAddress, port);
-
- LOG.info("job started");
- long start = System.currentTimeMillis();
- long end = start;
- long time = 0;
-
- this.profiling = profiling;
try {
+ /** add hadoop configurations */
+ URL hadoopCore = job.getClass().getClassLoader().getResource("core-site.xml");
+ if (hadoopCore != null) {
+ job.getConfiguration().addResource(hadoopCore);
+ }
+ URL hadoopMapRed = job.getClass().getClassLoader().getResource("mapred-site.xml");
+ if (hadoopMapRed != null) {
+ job.getConfiguration().addResource(hadoopMapRed);
+ }
+ URL hadoopHdfs = job.getClass().getClassLoader().getResource("hdfs-site.xml");
+ if (hadoopHdfs != null) {
+ job.getConfiguration().addResource(hadoopHdfs);
+ }
+ ClusterConfig.loadClusterConfig(ipAddress, port);
+
+ LOG.info("job started");
+ long start = System.currentTimeMillis();
+ long end = start;
+ long time = 0;
+
+ this.profiling = profiling;
+
switch (planChoice) {
case INNER_JOIN:
jobGen = new JobGenInnerJoin(job);
@@ -146,6 +153,16 @@
LOG.info("result writing finished " + time + "ms");
LOG.info("job finished");
} catch (Exception e) {
+ try {
+ /**
+ * destroy application if there is any exception
+ */
+ if (hcc != null) {
+ destroyApplication(applicationName);
+ }
+ } catch (Exception e2) {
+ throw new HyracksException(e2);
+ }
throw new HyracksException(e);
}
}
@@ -224,8 +241,8 @@
LOG.info("jar deployment finished " + (end - start) + "ms");
}
- public void destroyApplication(String jarFile) throws Exception {
- hcc.destroyApplication(applicationName);
+ public void destroyApplication(String appName) throws Exception {
+ hcc.destroyApplication(appName);
}
}
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index de29dbc..0b1be61 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -76,8 +76,8 @@
import edu.uci.ics.pregelix.core.util.DatatypeHelper;
import edu.uci.ics.pregelix.dataflow.HDFSFileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.VertexFileScanOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
-import edu.uci.ics.pregelix.dataflow.std.FileWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
@@ -89,8 +89,6 @@
private static final Logger LOGGER = Logger.getLogger(JobGen.class.getName());
protected static final int MB = 1048576;
protected static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
- protected static final int frameSize = ClusterConfig.getFrameSize();
- protected static final int maxFrameSize = (int) (((long) 32 * MB) / frameSize);
protected static final int tableSize = 10485767;
protected static final String PRIMARY_INDEX = "primary";
protected final Configuration conf;
@@ -98,6 +96,8 @@
protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
+ protected int frameSize = ClusterConfig.getFrameSize();
+ protected int maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
protected static final String SECONDARY_INDEX_ODD = "secondary1";
protected static final String SECONDARY_INDEX_EVEN = "secondary2";
@@ -107,6 +107,17 @@
this.giraphJob = job;
this.initJobConfiguration();
job.setJobId(jobId);
+
+ // set the frame size to be the one user specified if the user did
+ // specify.
+ int specifiedFrameSize = BspUtils.getFrameSize(job.getConfiguration());
+ if (specifiedFrameSize > 0) {
+ frameSize = specifiedFrameSize;
+ maxFrameNumber = (int) (((long) 32 * MB) / frameSize);
+ }
+ if (maxFrameNumber <= 0) {
+ maxFrameNumber = 1;
+ }
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -189,9 +200,10 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
- confFactory);
- ClusterConfig.setLocationConstraint(spec, scanner, splits);
+ readSchedule, confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner);
/**
* construct sort operator
@@ -203,7 +215,7 @@
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
comparatorFactories[0] = new WritableComparingBinaryComparatorFactory(WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameSize, sortFields,
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, sortFields,
nkmFactory, comparatorFactories, recordDescriptor);
ClusterConfig.setLocationConstraint(spec, sorter);
@@ -264,9 +276,10 @@
RecordDescriptor recordDescriptor = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), vertexClass.getName());
IConfigurationFactory confFactory = new ConfigurationFactory(conf);
+ String[] readSchedule = ClusterConfig.getHdfsScheduler().getLocationConstraints(splits);
VertexFileScanOperatorDescriptor scanner = new VertexFileScanOperatorDescriptor(spec, recordDescriptor, splits,
- confFactory);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, scanner, splits.size());
+ readSchedule, confFactory);
+ ClusterConfig.setLocationConstraint(spec, scanner);
/**
* construct sort operator
@@ -280,7 +293,7 @@
.getClass());
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, maxFrameLimit, sortFields,
nkmFactory, comparatorFactories, recordDescriptor);
- PartitionConstraintHelper.addPartitionCountConstraint(spec, sorter, splits.size());
+ ClusterConfig.setLocationConstraint(spec, sorter);
/**
* construct write file operator
@@ -292,7 +305,7 @@
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
vertexIdClass.getName(), vertexClass.getName());
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
resultFileSplitProvider, preHookFactory, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -357,7 +370,7 @@
IRuntimeHookFactory preHookFactory = new RuntimeHookFactory(confFactory);
IRecordDescriptorFactory inputRdFactory = DataflowUtils.getWritableRecordDescriptorFactoryFromWritableClasses(
vertexIdClass.getName(), vertexClass.getName());
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, inputRdFactory,
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, inputRdFactory,
resultFileSplitProvider, preHookFactory, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { "nc1" });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 00cdf07..9de4c04 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -39,6 +39,9 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -127,13 +130,16 @@
MsgList.class.getName());
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 4,
+ new BTreeDataflowHelperFactory(), inputRdFactory, 6,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate, rdFinal);
+ rdPartialAggregate, rdInsert, rdDelete, rdFinal);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -173,7 +179,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -212,18 +218,54 @@
EmptySinkOperatorDescriptor emptySink = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink);
+ /**
+ * add the insert operator to insert vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
+
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 3, btreeBulkLoad, 0);
+
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
@@ -235,8 +277,10 @@
spec.addRoot(btreeBulkLoad);
spec.addRoot(terminateWriter);
spec.addRoot(finalAggregator);
+ spec.addRoot(emptySink3);
+ spec.addRoot(emptySink4);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
spec.setFrameSize(frameSize);
return spec;
}
@@ -261,6 +305,9 @@
.getClass());
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
/**
* construct empty tuple operator
@@ -316,8 +363,8 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 4, new ComputeUpdateFunctionFactory(confFactory),
- preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdFinal);
+ new BTreeDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
+ preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -342,7 +389,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -395,7 +442,33 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ /**
+ * add the insert operator to insert vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
/** connect all operators **/
@@ -404,12 +477,20 @@
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, setUnion, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), setUnion, 0, join, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
terminateWriter, 0);
-
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
finalAggregator, 0);
- spec.connect(new OneToOneConnectorDescriptor(spec), join, 3, btreeBulkLoad, 0);
+
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
@@ -420,8 +501,10 @@
spec.addRoot(emptySink);
spec.addRoot(btreeBulkLoad);
spec.addRoot(terminateWriter);
+ spec.addRoot(emptySink3);
+ spec.addRoot(emptySink4);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
spec.setFrameSize(frameSize);
return spec;
}
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 3847aa7..91c15b2 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -39,6 +39,9 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -121,13 +124,16 @@
vertexIdClass.getName(), vertexClass.getName());
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+ new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate);
+ rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -139,14 +145,15 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false, false);
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+ false);
PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -156,8 +163,8 @@
*/
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
- .getAccumulatingAggregatorFactory(conf, true, true);
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ true, true);
PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -182,7 +189,7 @@
TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
configurationFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
/**
* final aggregate write operator
@@ -193,16 +200,55 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+ /**
+ * add the insert operator to insert vertexes
+ */
+ int[] fieldPermutation = new int[] { 0, 1 };
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
+
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
+ /**
+ * connect the group-by operator
+ */
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
@@ -213,8 +259,10 @@
spec.addRoot(terminateWriter);
spec.addRoot(finalAggregator);
spec.addRoot(emptySink2);
+ spec.addRoot(emptySink3);
+ spec.addRoot(emptySink4);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
spec.setFrameSize(frameSize);
return spec;
}
@@ -239,6 +287,9 @@
.getClass());
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
/**
* construct empty tuple operator
@@ -286,9 +337,9 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
- keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+ keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate);
+ rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -299,14 +350,15 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
/**
* construct local pre-clustered group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false, false);
+ IAggregatorDescriptorFactory aggregatorFactory = DataflowUtils.getAccumulatingAggregatorFactory(conf, false,
+ false);
PreclusteredGroupOperatorDescriptor localGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactory, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localGby);
@@ -314,8 +366,8 @@
/**
* construct global group-by operator
*/
- IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils
- .getAccumulatingAggregatorFactory(conf, true, true);
+ IAggregatorDescriptorFactory aggregatorFactoryFinal = DataflowUtils.getAccumulatingAggregatorFactory(conf,
+ true, true);
PreclusteredGroupOperatorDescriptor globalGby = new PreclusteredGroupOperatorDescriptor(spec, keyFields,
sortCmpFactories, aggregatorFactoryFinal, rdFinal);
ClusterConfig.setLocationConstraint(spec, globalGby);
@@ -351,7 +403,34 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ /**
+ * add the insert operator to insert vertexes
+ */
+ int[] fieldPermutation = new int[] { 0, 1 };
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -360,10 +439,19 @@
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
finalAggregator, 0);
+
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
@@ -375,7 +463,7 @@
spec.addRoot(finalAggregator);
spec.addRoot(emptySink);
- spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
spec.setFrameSize(frameSize);
return spec;
}
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ec783a7..ee1fd0f 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -38,6 +38,9 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -47,12 +50,12 @@
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingReadOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.MaterializingWriteOperatorDescriptor;
-import edu.uci.ics.pregelix.dataflow.NonCombinerConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.TerminationStateWriterOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.BTreeSearchFunctionUpdateOperatorDescriptor;
@@ -123,12 +126,16 @@
vertexIdClass.getName(), vertexClass.getName());
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+ new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate);
+ rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -140,7 +147,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, globalSort);
@@ -185,17 +192,53 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ /**
+ * add the insert operator to insert vertexes
+ */
+ int[] fieldPermutation = new int[] { 0, 1 };
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 0, globalSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
+
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
@@ -204,8 +247,10 @@
spec.addRoot(terminateWriter);
spec.addRoot(finalAggregator);
spec.addRoot(emptySink2);
+ spec.addRoot(emptySink3);
+ spec.addRoot(emptySink4);
- spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
spec.setFrameSize(frameSize);
return spec;
}
@@ -230,6 +275,9 @@
.getClass());
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
/**
* construct empty tuple operator
@@ -277,9 +325,9 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
- keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+ keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate);
+ rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -290,7 +338,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, globalSort);
@@ -334,7 +382,31 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ int[] fieldPermutation = new int[] { 0, 1 };
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -343,10 +415,18 @@
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 0, globalSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
finalAggregator, 0);
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
@@ -356,7 +436,7 @@
spec.addRoot(finalAggregator);
spec.addRoot(emptySink);
- spec.setConnectorPolicyAssignmentPolicy(new NonCombinerConnectorPolicyAssignmentPolicy());
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
spec.setFrameSize(frameSize);
return spec;
}
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index bb939e3..628e9ce 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -38,6 +38,9 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -47,6 +50,7 @@
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.runtime.touchpoint.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.pregelix.core.util.DataflowUtils;
+import edu.uci.ics.pregelix.dataflow.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.pregelix.dataflow.EmptySinkOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.EmptyTupleSourceOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.FinalAggregateOperatorDescriptor;
@@ -119,12 +123,16 @@
vertexIdClass.getName(), vertexClass.getName());
RecordDescriptor rdUnnestedMessage = DataflowUtils.getRecordDescriptorFromKeyValueClasses(
vertexIdClass.getName(), messageValueClass.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
+
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
- new BTreeDataflowHelperFactory(), inputRdFactory, 3,
+ new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate);
+ rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -136,7 +144,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -152,7 +160,7 @@
/**
* construct global sort operator
*/
- ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, globalSort);
@@ -187,7 +195,7 @@
TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
configurationFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
/**
* final aggregate write operator
@@ -198,16 +206,51 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
+ /**
+ * add the insert operator to insert vertexes
+ */
+ int[] fieldPermutation = new int[] { 0, 1 };
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -218,8 +261,11 @@
spec.addRoot(terminateWriter);
spec.addRoot(finalAggregator);
spec.addRoot(emptySink2);
+ spec.addRoot(emptySink3);
+ spec.addRoot(emptySink4);
spec.setFrameSize(frameSize);
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
return spec;
}
@@ -243,6 +289,9 @@
.getClass());
RecordDescriptor rdFinal = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
MsgList.class.getName());
+ RecordDescriptor rdInsert = DataflowUtils.getRecordDescriptorFromKeyValueClasses(vertexIdClass.getName(),
+ vertexClass.getName());
+ RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
/**
* construct empty tuple operator
@@ -290,9 +339,9 @@
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
- keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 3,
+ keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
- rdPartialAggregate);
+ rdPartialAggregate, rdInsert, rdDelete);
ClusterConfig.setLocationConstraint(spec, join);
/**
@@ -303,7 +352,7 @@
IBinaryComparatorFactory[] sortCmpFactories = new IBinaryComparatorFactory[1];
sortCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration, WritableComparator.get(vertexIdClass)
.getClass());
- ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor localSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, localSort);
@@ -319,7 +368,7 @@
/**
* construct global sort operator
*/
- ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameSize, keyFields,
+ ExternalSortOperatorDescriptor globalSort = new ExternalSortOperatorDescriptor(spec, maxFrameNumber, keyFields,
nkmFactory, sortCmpFactories, rdUnnestedMessage);
ClusterConfig.setLocationConstraint(spec, globalSort);
@@ -363,7 +412,34 @@
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ /**
+ * add the insert operator to insert vertexes
+ */
+ int[] fieldPermutation = new int[] { 0, 1 };
+ TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, insertOp);
+
+ /**
+ * add the delete operator to delete vertexes
+ */
+ TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
+ NoOpOperationCallbackProvider.INSTANCE);
+ ClusterConfig.setLocationConstraint(spec, deleteOp);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink3);
+
+ /** construct empty sink operator */
+ EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
+ ClusterConfig.setLocationConstraint(spec, emptySink4);
+
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -372,10 +448,18 @@
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
finalAggregator, 0);
+ /**
+ * connect the insert/delete operator
+ */
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
+
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), localGby, 0, globalSort, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -386,8 +470,11 @@
spec.addRoot(terminateWriter);
spec.addRoot(finalAggregator);
spec.addRoot(emptySink);
+ spec.addRoot(emptySink3);
+ spec.addRoot(emptySink4);
spec.setFrameSize(frameSize);
+ spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy(spec));
return spec;
}
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
index 8eadab9..d26e637 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/clusterconfig/ClusterConfig.java
@@ -40,6 +40,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.hdfs2.scheduler.Scheduler;
public class ClusterConfig {
@@ -49,6 +50,7 @@
private static Properties clusterProperties = new Properties();
private static Map<String, List<String>> ipToNcMapping;
private static String[] stores;
+ private static Scheduler hdfsScheduler;
/**
* let tests set config path to be whatever
@@ -211,6 +213,8 @@
NCs[i] = entry.getKey();
i++;
}
+
+ hdfsScheduler = new Scheduler(ipAddress, port);
} catch (Exception e) {
throw new IllegalStateException(e);
}
@@ -218,4 +222,8 @@
loadClusterProperties();
loadStores();
}
+
+ public static Scheduler getHdfsScheduler() {
+ return hdfsScheduler;
+ }
}
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index ed04746..cd2a864 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.pregelix.core.util;
+import java.io.File;
import java.util.EnumSet;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -44,14 +45,16 @@
private static NodeControllerService nc2;
private static IHyracksClientConnection hcc;
- public static void init() throws Exception {
+ public static void init(String topologyFilePath) throws Exception {
CCConfig ccConfig = new CCConfig();
ccConfig.clientNetIpAddress = CC_HOST;
ccConfig.clusterNetIpAddress = CC_HOST;
ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
ccConfig.defaultMaxJobAttempts = 0;
- ccConfig.jobHistorySize = 10;
+ ccConfig.jobHistorySize = 0;
+ ccConfig.profileDumpPeriod = -1;
+ ccConfig.clusterTopologyDefinition = new File(topologyFilePath);
// cluster controller
cc = new ClusterControllerService(ccConfig);
@@ -63,6 +66,7 @@
ncConfig1.clusterNetIPAddress = "localhost";
ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -72,6 +76,7 @@
ncConfig2.clusterNetIPAddress = "localhost";
ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java b/fullstack/pregelix/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
deleted file mode 100644
index 5efdde8..0000000
--- a/fullstack/pregelix/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Names a file or directory in a {@link FileSystem}. Path strings use slash as
- * the directory separator. A path string is absolute if it begins with a slash.
- */
-@SuppressWarnings("rawtypes")
-public class Path implements Comparable, Serializable {
- private static final long serialVersionUID = 1L;
- /** The directory separator, a slash. */
- public static final String SEPARATOR = "/";
- public static final char SEPARATOR_CHAR = '/';
-
- public static final String CUR_DIR = ".";
-
- static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
-
- private URI uri; // a hierarchical uri
-
- /** Resolve a child path against a parent path. */
- public Path(String parent, String child) {
- this(new Path(parent), new Path(child));
- }
-
- /** Resolve a child path against a parent path. */
- public Path(Path parent, String child) {
- this(parent, new Path(child));
- }
-
- /** Resolve a child path against a parent path. */
- public Path(String parent, Path child) {
- this(new Path(parent), child);
- }
-
- /** Resolve a child path against a parent path. */
- public Path(Path parent, Path child) {
- // Add a slash to parent's path so resolution is compatible with URI's
- URI parentUri = parent.uri;
- String parentPath = parentUri.getPath();
- if (!(parentPath.equals("/") || parentPath.equals("")))
- try {
- parentUri = new URI(parentUri.getScheme(), parentUri.getAuthority(), parentUri.getPath() + "/", null,
- parentUri.getFragment());
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- URI resolved = parentUri.resolve(child.uri);
- initialize(resolved.getScheme(), resolved.getAuthority(), normalizePath(resolved.getPath()),
- resolved.getFragment());
- }
-
- private void checkPathArg(String path) {
- // disallow construction of a Path from an empty string
- if (path == null) {
- throw new IllegalArgumentException("Can not create a Path from a null string");
- }
- if (path.length() == 0) {
- throw new IllegalArgumentException("Can not create a Path from an empty string");
- }
- }
-
- /**
- * Construct a path from a String. Path strings are URIs, but with unescaped
- * elements and some additional normalization.
- */
- public Path(String pathString) {
- checkPathArg(pathString);
-
- // We can't use 'new URI(String)' directly, since it assumes things are
- // escaped, which we don't require of Paths.
-
- // add a slash in front of paths with Windows drive letters
- if (hasWindowsDrive(pathString, false))
- pathString = "/" + pathString;
-
- // parse uri components
- String scheme = null;
- String authority = null;
-
- int start = 0;
-
- // parse uri scheme, if any
- int colon = pathString.indexOf(':');
- int slash = pathString.indexOf('/');
- if ((colon != -1) && ((slash == -1) || (colon < slash))) { // has a
- // scheme
- scheme = pathString.substring(0, colon);
- start = colon + 1;
- }
-
- // parse uri authority, if any
- if (pathString.startsWith("//", start) && (pathString.length() - start > 2)) { // has
- // authority
- int nextSlash = pathString.indexOf('/', start + 2);
- int authEnd = nextSlash > 0 ? nextSlash : pathString.length();
- authority = pathString.substring(start + 2, authEnd);
- start = authEnd;
- }
-
- // uri path is the rest of the string -- query & fragment not supported
- String path = pathString.substring(start, pathString.length());
-
- initialize(scheme, authority, path, null);
- }
-
- /** Construct a Path from components. */
- public Path(String scheme, String authority, String path) {
- checkPathArg(path);
- initialize(scheme, authority, path, null);
- }
-
- /**
- * Construct a path from a URI
- */
- public Path(URI aUri) {
- uri = aUri;
- }
-
- private void initialize(String scheme, String authority, String path, String fragment) {
- try {
- this.uri = new URI(scheme, authority, normalizePath(path), null, fragment).normalize();
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- private String normalizePath(String path) {
- // remove double slashes & backslashes
- if (path.indexOf("//") != -1) {
- path = path.replace("//", "/");
- }
- if (path.indexOf("\\") != -1) {
- path = path.replace("\\", "/");
- }
-
- // trim trailing slash from non-root path (ignoring windows drive)
- int minLength = hasWindowsDrive(path, true) ? 4 : 1;
- if (path.length() > minLength && path.endsWith("/")) {
- path = path.substring(0, path.length() - 1);
- }
-
- return path;
- }
-
- private boolean hasWindowsDrive(String path, boolean slashed) {
- if (!WINDOWS)
- return false;
- int start = slashed ? 1 : 0;
- return path.length() >= start + 2
- && (slashed ? path.charAt(0) == '/' : true)
- && path.charAt(start + 1) == ':'
- && ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') || (path.charAt(start) >= 'a' && path
- .charAt(start) <= 'z'));
- }
-
- /** Convert this to a URI. */
- public URI toUri() {
- return uri;
- }
-
- /** Return the FileSystem that owns this Path. */
- public FileSystem getFileSystem(Configuration conf) throws IOException {
- return FileSystem.get(this.toUri(), conf);
- }
-
- /** True if the directory of this path is absolute. */
- public boolean isAbsolute() {
- int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0;
- return uri.getPath().startsWith(SEPARATOR, start);
- }
-
- /** Returns the final component of this path. */
- public String getName() {
- String path = uri.getPath();
- int slash = path.lastIndexOf(SEPARATOR);
- return path.substring(slash + 1);
- }
-
- /** Returns the parent of a path or null if at root. */
- public Path getParent() {
- String path = uri.getPath();
- int lastSlash = path.lastIndexOf('/');
- int start = hasWindowsDrive(path, true) ? 3 : 0;
- if ((path.length() == start) || // empty path
- (lastSlash == start && path.length() == start + 1)) { // at root
- return null;
- }
- String parent;
- if (lastSlash == -1) {
- parent = CUR_DIR;
- } else {
- int end = hasWindowsDrive(path, true) ? 3 : 0;
- parent = path.substring(0, lastSlash == end ? end + 1 : lastSlash);
- }
- return new Path(uri.getScheme(), uri.getAuthority(), parent);
- }
-
- /** Adds a suffix to the final name in the path. */
- public Path suffix(String suffix) {
- return new Path(getParent(), getName() + suffix);
- }
-
- public String toString() {
- // we can't use uri.toString(), which escapes everything, because we
- // want
- // illegal characters unescaped in the string, for glob processing, etc.
- StringBuffer buffer = new StringBuffer();
- if (uri.getScheme() != null) {
- buffer.append(uri.getScheme());
- buffer.append(":");
- }
- if (uri.getAuthority() != null) {
- buffer.append("//");
- buffer.append(uri.getAuthority());
- }
- if (uri.getPath() != null) {
- String path = uri.getPath();
- if (path.indexOf('/') == 0 && hasWindowsDrive(path, true) && // has
- // windows
- // drive
- uri.getScheme() == null && // but no scheme
- uri.getAuthority() == null) // or authority
- path = path.substring(1); // remove slash before drive
- buffer.append(path);
- }
- if (uri.getFragment() != null) {
- buffer.append("#");
- buffer.append(uri.getFragment());
- }
- return buffer.toString();
- }
-
- public boolean equals(Object o) {
- if (!(o instanceof Path)) {
- return false;
- }
- Path that = (Path) o;
- return this.uri.equals(that.uri);
- }
-
- public int hashCode() {
- return uri.hashCode();
- }
-
- public int compareTo(Object o) {
- Path that = (Path) o;
- return this.uri.compareTo(that.uri);
- }
-
- /** Return the number of elements in this path. */
- public int depth() {
- String path = uri.getPath();
- int depth = 0;
- int slash = path.length() == 1 && path.charAt(0) == '/' ? -1 : 0;
- while (slash != -1) {
- depth++;
- slash = path.indexOf(SEPARATOR, slash + 1);
- }
- return depth;
- }
-
- /** Returns a qualified path object. */
- public Path makeQualified(FileSystem fs) {
- Path path = this;
- if (!isAbsolute()) {
- path = new Path(fs.getWorkingDirectory(), this);
- }
-
- URI pathUri = path.toUri();
- URI fsUri = fs.getUri();
-
- String scheme = pathUri.getScheme();
- String authority = pathUri.getAuthority();
- String fragment = pathUri.getFragment();
- if (scheme != null && (authority != null || fsUri.getAuthority() == null))
- return path;
-
- if (scheme == null) {
- scheme = fsUri.getScheme();
- }
-
- if (authority == null) {
- authority = fsUri.getAuthority();
- if (authority == null) {
- authority = "";
- }
- }
-
- URI newUri = null;
- try {
- newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- return new Path(newUri);
- }
-
- /** Returns a qualified path object. */
- public Path makeQualified(URI defaultUri, Path workingDir) {
- Path path = this;
- if (!isAbsolute()) {
- path = new Path(workingDir, this);
- }
-
- URI pathUri = path.toUri();
-
- String scheme = pathUri.getScheme();
- String authority = pathUri.getAuthority();
- String fragment = pathUri.getFragment();
-
- if (scheme != null && (authority != null || defaultUri.getAuthority() == null))
- return path;
-
- if (scheme == null) {
- scheme = defaultUri.getScheme();
- }
-
- if (authority == null) {
- authority = defaultUri.getAuthority();
- if (authority == null) {
- authority = "";
- }
- }
-
- URI newUri = null;
- try {
- newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- return new Path(newUri);
- }
-}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java b/fullstack/pregelix/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
deleted file mode 100644
index ac72160..0000000
--- a/fullstack/pregelix/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * <code>InputSplit</code> represents the data to be processed by an individual {@link Mapper}.
- * <p>
- * Typically, it presents a byte-oriented view on the input and is the responsibility of {@link RecordReader} of the job to process this and present a record-oriented view.
- *
- * @see InputFormat
- * @see RecordReader
- */
-public abstract class InputSplit implements Serializable {
- private static final long serialVersionUID = 1L;
-
- /**
- * Get the size of the split, so that the input splits can be sorted by
- * size.
- *
- * @return the number of bytes in the split
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract long getLength() throws IOException, InterruptedException;
-
- /**
- * Get the list of nodes by name where the data for the split would be
- * local. The locations do not need to be serialized.
- *
- * @return a new array of the node nodes.
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract String[] getLocations() throws IOException, InterruptedException;
-}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-core/src/main/resources/conf/topology-template.xml b/fullstack/pregelix/pregelix-core/src/main/resources/conf/topology-template.xml
new file mode 100755
index 0000000..4710706
--- /dev/null
+++ b/fullstack/pregelix/pregelix-core/src/main/resources/conf/topology-template.xml
@@ -0,0 +1,7 @@
+<cluster-topology>
+ <network-switch name="Global">
+ <network-switch name="local">
+ <terminal name="127.0.0.1"/>
+ </network-switch>
+ </network-switch>
+</cluster-topology>
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-core/src/main/resources/scripts/getip.sh b/fullstack/pregelix/pregelix-core/src/main/resources/scripts/getip.sh
index e0cdf73..a691c0f 100755
--- a/fullstack/pregelix/pregelix-core/src/main/resources/scripts/getip.sh
+++ b/fullstack/pregelix/pregelix-core/src/main/resources/scripts/getip.sh
@@ -6,6 +6,10 @@
then
#Get IP Address
IPADDR=`/sbin/ifconfig eth0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+ if [ "$IPADDR" = "" ]
+ then
+ IPADDR=`/sbin/ifconfig em1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+ fi
if [ "$IPADDR" = "" ]
then
IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
diff --git a/fullstack/pregelix/pregelix-core/src/main/resources/scripts/pregelix b/fullstack/pregelix/pregelix-core/src/main/resources/scripts/pregelix
index c3fd27b..b1a2f74 100644
--- a/fullstack/pregelix/pregelix-core/src/main/resources/scripts/pregelix
+++ b/fullstack/pregelix/pregelix-core/src/main/resources/scripts/pregelix
@@ -91,7 +91,7 @@
REPO="$BASEDIR"/lib
fi
-CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:"$BASEDIR"/etc:$(echo ${REPO}/*.jar | tr ' ' ':'):$1
+CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:/etc/hadoop/conf:"$BASEDIR"/etc:$1
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
diff --git a/fullstack/pregelix/pregelix-core/src/main/resources/scripts/startAllNCs.sh b/fullstack/pregelix/pregelix-core/src/main/resources/scripts/startAllNCs.sh
index 629bd90..d30da26 100644
--- a/fullstack/pregelix/pregelix-core/src/main/resources/scripts/startAllNCs.sh
+++ b/fullstack/pregelix/pregelix-core/src/main/resources/scripts/startAllNCs.sh
@@ -2,5 +2,5 @@
for i in `cat conf/slaves`
do
- ssh $i "cd ${PREGELIX_PATH}; bin/startnc.sh"
+ ssh $i "cd ${PREGELIX_PATH}; export JAVA_HOME=${JAVA_HOME}; bin/startnc.sh"
done
diff --git a/fullstack/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh b/fullstack/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
index fe2551d..133b604 100644
--- a/fullstack/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
+++ b/fullstack/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
@@ -20,6 +20,12 @@
export JAVA_HOME=$JAVA_HOME
export JAVA_OPTS=$CCJAVA_OPTS
-#Launch hyracks cc script
+
chmod -R 755 $HYRACKS_HOME
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
+if [ -f "conf/topology.xml" ]; then
+#Launch hyracks cc script with topology
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
+else
+#Launch hyracks cc script without toplogy
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
+fi
diff --git a/fullstack/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh b/fullstack/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
index 6e0f90e..b059aad 100644
--- a/fullstack/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
+++ b/fullstack/pregelix/pregelix-core/src/main/resources/scripts/startnc.sh
@@ -46,4 +46,4 @@
cd $NCTMP_DIR
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/fullstack/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh b/fullstack/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
index 03ce4e7..35c4794 100644
--- a/fullstack/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
+++ b/fullstack/pregelix/pregelix-core/src/main/resources/scripts/stopnc.sh
@@ -5,6 +5,10 @@
PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
if [ "$PID" == "" ]; then
+ PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
USERID=`id | sed 's/^uid=//;s/(.*$//'`
PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
fi
diff --git a/fullstack/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/fullstack/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 4dfe57d..f7cadf6 100644
--- a/fullstack/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/fullstack/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -62,7 +62,7 @@
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
import edu.uci.ics.pregelix.core.util.TestUtils;
-import edu.uci.ics.pregelix.dataflow.std.FileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.ProjectOperatorDescriptor;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
@@ -102,7 +102,7 @@
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
- PregelixHyracksIntegrationUtil.init();
+ PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
@@ -195,7 +195,7 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -362,7 +362,7 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -459,7 +459,7 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -566,7 +566,7 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
diff --git a/fullstack/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml b/fullstack/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..f89dd79 100644
--- a/fullstack/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/fullstack/pregelix/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
@@ -18,8 +18,8 @@
<value>20</value>
</property>
<property>
- <name>mapred.min.split.size</name>
- <value>65536</value>
+ <name>mapred.max.split.size</name>
+ <value>4096</value>
</property>
</configuration>
diff --git a/fullstack/pregelix/pregelix-core/src/test/resources/topology.xml b/fullstack/pregelix/pregelix-core/src/test/resources/topology.xml
new file mode 100755
index 0000000..4710706
--- /dev/null
+++ b/fullstack/pregelix/pregelix-core/src/test/resources/topology.xml
@@ -0,0 +1,7 @@
+<cluster-topology>
+ <network-switch name="Global">
+ <network-switch name="local">
+ <terminal name="127.0.0.1"/>
+ </network-switch>
+ </network-switch>
+</cluster-topology>
\ No newline at end of file