svn merge -r3096:3112 https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3113 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-dist/pom.xml b/hivesterix/hivesterix-dist/pom.xml
index 2a7b798..ce61422 100644
--- a/hivesterix/hivesterix-dist/pom.xml
+++ b/hivesterix/hivesterix-dist/pom.xml
@@ -439,6 +439,25 @@
</resources>
</configuration>
</execution>
+ <execution>
+ <id>copy-jar</id>
+ <!-- here the phase you need -->
+ <phase>package</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>target/appassembler/lib</outputDirectory>
+ <resources>
+ <resource>
+ <directory>target</directory>
+ <includes>
+ <include>*patch.jar</include>
+ </includes>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
</executions>
</plugin>
<plugin>
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/hive b/hivesterix/hivesterix-dist/src/main/resources/scripts/hive
index 8a83bde..f98f340 100755
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/hive
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/hive
@@ -81,7 +81,7 @@
exit 3;
fi
-CLASSPATH=${CLASSPATH}:${HIVE_LIB}/algebricks-hivesterix-0.0.1-SNAPSHOT.jar
+CLASSPATH=${CLASSPATH}:${HIVE_LIB}/a-hive-path.jar
for f in ${HIVE_LIB}/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/pregelix b/hivesterix/hivesterix-dist/src/main/resources/scripts/pregelix
deleted file mode 100644
index 6997078..0000000
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/pregelix
+++ /dev/null
@@ -1,113 +0,0 @@
-#!/bin/sh
-# ----------------------------------------------------------------------------
-# Copyright 2001-2006 The Apache Software Foundation.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ----------------------------------------------------------------------------
-#
-# Copyright (c) 2001-2006 The Apache Software Foundation. All rights
-# reserved.
-
-
-# resolve links - $0 may be a softlink
-PRG="$0"
-
-while [ -h "$PRG" ]; do
- ls=`ls -ld "$PRG"`
- link=`expr "$ls" : '.*-> \(.*\)$'`
- if expr "$link" : '/.*' > /dev/null; then
- PRG="$link"
- else
- PRG=`dirname "$PRG"`/"$link"
- fi
-done
-
-PRGDIR=`dirname "$PRG"`
-BASEDIR=`cd "$PRGDIR/.." >/dev/null; pwd`
-
-
-
-# OS specific support. $var _must_ be set to either true or false.
-cygwin=false;
-darwin=false;
-case "`uname`" in
- CYGWIN*) cygwin=true ;;
- Darwin*) darwin=true
- if [ -z "$JAVA_VERSION" ] ; then
- JAVA_VERSION="CurrentJDK"
- else
- echo "Using Java version: $JAVA_VERSION"
- fi
- if [ -z "$JAVA_HOME" ] ; then
- JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/${JAVA_VERSION}/Home
- fi
- ;;
-esac
-
-if [ -z "$JAVA_HOME" ] ; then
- if [ -r /etc/gentoo-release ] ; then
- JAVA_HOME=`java-config --jre-home`
- fi
-fi
-
-# For Cygwin, ensure paths are in UNIX format before anything is touched
-if $cygwin ; then
- [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
- [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
-fi
-
-# If a specific java binary isn't specified search for the standard 'java' binary
-if [ -z "$JAVACMD" ] ; then
- if [ -n "$JAVA_HOME" ] ; then
- if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
- # IBM's JDK on AIX uses strange locations for the executables
- JAVACMD="$JAVA_HOME/jre/sh/java"
- else
- JAVACMD="$JAVA_HOME/bin/java"
- fi
- else
- JAVACMD=`which java`
- fi
-fi
-
-if [ ! -x "$JAVACMD" ] ; then
- echo "Error: JAVA_HOME is not defined correctly." 1>&2
- echo " We cannot execute $JAVACMD" 1>&2
- exit 1
-fi
-
-if [ -z "$REPO" ]
-then
- REPO="$BASEDIR"/lib
-fi
-
-CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:"$BASEDIR"/etc:$1
-
-# For Cygwin, switch paths to Windows format before running java
-if $cygwin; then
- [ -n "$CLASSPATH" ] && CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
- [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
- [ -n "$HOME" ] && HOME=`cygpath --path --windows "$HOME"`
- [ -n "$BASEDIR" ] && BASEDIR=`cygpath --path --windows "$BASEDIR"`
- [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"`
-fi
-
-exec "$JAVACMD" $JAVA_OPTS \
- -classpath "$CLASSPATH" \
- -Dapp.name="pregelix" \
- -Dapp.pid="$$" \
- -Dapp.repo="$REPO" \
- -Dapp.home="$BASEDIR" \
- -Dbasedir="$BASEDIR" \
- org.apache.hadoop.util.RunJar \
- "$@"
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/pregelix.bat b/hivesterix/hivesterix-dist/src/main/resources/scripts/pregelix.bat
deleted file mode 100644
index 536e3c8..0000000
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/pregelix.bat
+++ /dev/null
@@ -1,110 +0,0 @@
-@REM ----------------------------------------------------------------------------
-@REM Copyright 2001-2006 The Apache Software Foundation.
-@REM
-@REM Licensed under the Apache License, Version 2.0 (the "License");
-@REM you may not use this file except in compliance with the License.
-@REM You may obtain a copy of the License at
-@REM
-@REM http://www.apache.org/licenses/LICENSE-2.0
-@REM
-@REM Unless required by applicable law or agreed to in writing, software
-@REM distributed under the License is distributed on an "AS IS" BASIS,
-@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-@REM See the License for the specific language governing permissions and
-@REM limitations under the License.
-@REM ----------------------------------------------------------------------------
-@REM
-@REM Copyright (c) 2001-2006 The Apache Software Foundation. All rights
-@REM reserved.
-
-@echo off
-
-set ERROR_CODE=0
-
-:init
-@REM Decide how to startup depending on the version of windows
-
-@REM -- Win98ME
-if NOT "%OS%"=="Windows_NT" goto Win9xArg
-
-@REM set local scope for the variables with windows NT shell
-if "%OS%"=="Windows_NT" @setlocal
-
-@REM -- 4NT shell
-if "%eval[2+2]" == "4" goto 4NTArgs
-
-@REM -- Regular WinNT shell
-set CMD_LINE_ARGS=%*
-goto WinNTGetScriptDir
-
-@REM The 4NT Shell from jp software
-:4NTArgs
-set CMD_LINE_ARGS=%$
-goto WinNTGetScriptDir
-
-:Win9xArg
-@REM Slurp the command line arguments. This loop allows for an unlimited number
-@REM of arguments (up to the command line limit, anyway).
-set CMD_LINE_ARGS=
-:Win9xApp
-if %1a==a goto Win9xGetScriptDir
-set CMD_LINE_ARGS=%CMD_LINE_ARGS% %1
-shift
-goto Win9xApp
-
-:Win9xGetScriptDir
-set SAVEDIR=%CD%
-%0\
-cd %0\..\..
-set BASEDIR=%CD%
-cd %SAVEDIR%
-set SAVE_DIR=
-goto repoSetup
-
-:WinNTGetScriptDir
-set BASEDIR=%~dp0\..
-
-:repoSetup
-
-
-if "%JAVACMD%"=="" set JAVACMD=java
-
-if "%REPO%"=="" set REPO=%BASEDIR%\lib
-
-cp $BASEDIR"\..\a-hadoop-patch.jar "$REPO"\
-
-set CLASSPATH="%BASEDIR%"\etc;"%REPO%"\a-hadoop-patch.jar;"%REPO%"\pregelix-api-0.0.1-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-api-0.2.2-SNAPSHOT.jar;"%REPO%"\json-20090211.jar;"%REPO%"\httpclient-4.1-alpha2.jar;"%REPO%"\httpcore-4.1-beta1.jar;"%REPO%"\commons-logging-1.1.1.jar;"%REPO%"\commons-codec-1.3.jar;"%REPO%"\args4j-2.0.12.jar;"%REPO%"\hyracks-ipc-0.2.2-SNAPSHOT.jar;"%REPO%"\commons-lang3-3.1.jar;"%REPO%"\hyracks-data-std-0.2.2-SNAPSHOT.jar;"%REPO%"\hadoop-core-0.20.2.jar;"%REPO%"\commons-cli-1.2.jar;"%REPO%"\xmlenc-0.52.jar;"%REPO%"\commons-httpclient-3.0.1.jar;"%REPO%"\commons-net-1.4.1.jar;"%REPO%"\oro-2.0.8.jar;"%REPO%"\jetty-6.1.14.jar;"%REPO%"\jetty-util-6.1.14.jar;"%REPO%"\servlet-api-2.5-6.1.14.jar;"%REPO%"\jasper-runtime-5.5.12.jar;"%REPO%"\jasper-compiler-5.5.12.jar;"%REPO%"\jsp-api-2.1-6.1.14.jar;"%REPO%"\jsp-2.1-6.1.14.jar;"%REPO%"\core-3.1.1.jar;"%REPO%"\ant-1.6.5.jar;"%REPO%"\commons-el-1.0.jar;"%REPO%"\jets3t-0.7.1.jar;"%REPO%"\kfs-0.3.jar;"%REPO%"\hsqldb-1.8.0.10.jar;"%REPO%"\pregelix-dataflow-std-0.0.1-SNAPSHOT.jar;"%REPO%"\pregelix-dataflow-std-base-0.0.1-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-std-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-dataflow-hadoop-0.2.2-SNAPSHOT.jar;"%REPO%"\dcache-client-0.0.1.jar;"%REPO%"\jetty-client-8.0.0.M0.jar;"%REPO%"\jetty-http-8.0.0.RC0.jar;"%REPO%"\jetty-io-8.0.0.RC0.jar;"%REPO%"\jetty-util-8.0.0.RC0.jar;"%REPO%"\hyracks-storage-am-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-storage-common-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-storage-am-btree-0.2.2-SNAPSHOT.jar;"%REPO%"\btreehelper-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-control-cc-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-control-common-0.2.2-SNAPSHOT.jar;"%REPO%"\commons-io-1.3.1.jar;"%REPO%"\jetty-server-8.0.0.RC0.jar;"%REPO%"\servlet-api-3.0.20100224.jar;"%REPO%"\jetty-continuation-8.0.0.RC0.jar;"%REPO%"\jetty-webapp-8.0.0.RC0.jar;"%REPO%"\jetty-xml-8.0.0.RC0.jar;"%REPO%"\jetty-servlet-8.0.0.RC0.jar;"%REPO%"\jetty-security-8.0.0.RC0.jar;"%REPO%"\wicket-core-1.5.2.jar;"%REPO%"\wicket-util-1.5.2.jar;"%REPO%"\slf4j-api-1.6.1.jar;"%REPO%"\wicket-request-1.5.2.jar;"%REPO%"\slf4j-jcl-1.6.3.jar;"%REPO%"\hyracks-control-nc-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-net-0.2.2-SNAPSHOT.jar;"%REPO%"\hyracks-hadoop-compat-0.2.2-SNAPSHOT.jar;"%REPO%"\pregelix-dataflow-0.0.1-SNAPSHOT.jar;"%REPO%"\pregelix-runtime-0.0.1-SNAPSHOT.jar;"%REPO%"\hadoop-test-0.20.2.jar;"%REPO%"\ftplet-api-1.0.0.jar;"%REPO%"\mina-core-2.0.0-M5.jar;"%REPO%"\ftpserver-core-1.0.0.jar;"%REPO%"\ftpserver-deprecated-1.0.0-M2.jar;"%REPO%"\javax.servlet-api-3.0.1.jar;"%REPO%"\pregelix-core-0.0.1-SNAPSHOT.jar
-goto endInit
-
-@REM Reaching here means variables are defined and arguments have been captured
-:endInit
-
-%JAVACMD% %JAVA_OPTS% -classpath %CLASSPATH_PREFIX%;%CLASSPATH% -Dapp.name="pregelix" -Dapp.repo="%REPO%" -Dapp.home="%BASEDIR%" -Dbasedir="%BASEDIR%" org.apache.hadoop.util.RunJar %CMD_LINE_ARGS%
-if ERRORLEVEL 1 goto error
-goto end
-
-:error
-if "%OS%"=="Windows_NT" @endlocal
-set ERROR_CODE=%ERRORLEVEL%
-
-:end
-@REM set local scope for the variables with windows NT shell
-if "%OS%"=="Windows_NT" goto endNT
-
-@REM For old DOS remove the set variables from ENV - we assume they were not set
-@REM before we started - at least we don't leave any baggage around
-set CMD_LINE_ARGS=
-goto postExec
-
-:endNT
-@REM If error code is set to 1 then the endlocal was done already in :error.
-if %ERROR_CODE% EQU 0 @endlocal
-
-
-:postExec
-
-if "%FORCE_EXIT_ON_ERROR%" == "on" (
- if %ERROR_CODE% NEQ 0 exit %ERROR_CODE%
-)
-
-exit /B %ERROR_CODE%
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index e82006c..f86d9fb 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -256,8 +256,6 @@
outputAppender.reset(outputFrame, true);
- //writer.open();
-
if (tPointers == null) {
// Not sorted
for (int i = 0; i < tableSize; ++i) {
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index 71ec2d1..f3ec2fd 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -97,15 +97,15 @@
String fileName = outputDirPath + File.separator + "part-"
+ partition;
- tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
- try {
- FileSystem dfs = FileSystem.get(conf);
- dos = dfs.create(new Path(fileName), true);
- tupleWriter.open(dos);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
+ tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
+ try {
+ FileSystem dfs = FileSystem.get(conf);
+ dos = dfs.create(new Path(fileName), true);
+ tupleWriter.open(dos);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
@Override
public void nextFrame(ByteBuffer buffer)
@@ -123,17 +123,17 @@
}
- @Override
- public void close() throws HyracksDataException {
- try {
- tupleWriter.close(dos);
- dos.close();
- } catch (Exception e) {
- throw new HyracksDataException(e);
- } finally {
- Thread.currentThread().setContextClassLoader(ctxCL);
- }
- }
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ tupleWriter.close(dos);
+ dos.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
};
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index d292673..c1c227c 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -39,8 +39,8 @@
import edu.uci.ics.hyracks.hdfs.api.ITupleWriterFactory;
/**
- * The HDFS file write operator using the Hadoop new API.
- * To use this operator, a user need to provide an ITupleWriterFactory.
+ * The HDFS file write operator using the Hadoop new API. To use this operator,
+ * a user need to provide an ITupleWriterFactory.
*/
public class HDFSWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
@@ -116,7 +116,7 @@
@Override
public void close() throws HyracksDataException {
try {
- tupleWriter.close(dos);
+ tupleWriter.close(dos);
dos.close();
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index b1730d3..9de4c04 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -262,12 +262,10 @@
*/
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 4,
- deleteOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5,
- btreeBulkLoad, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 5, btreeBulkLoad, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, partionFactory, keyFields, sortCmpFactories),
localGby, 0, globalGby, 0);
@@ -487,11 +485,9 @@
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 3,
- insertOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 4,
- deleteOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 5, btreeBulkLoad, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 87f2156..91c15b2 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -241,11 +241,9 @@
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 3, insertOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 4, deleteOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
/**
@@ -449,9 +447,9 @@
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 3, insertOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 4, deleteOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index 6ea258e..ee1fd0f 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -219,26 +219,24 @@
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink4);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
/** connect all operators **/
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 0, globalSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
@@ -383,7 +381,6 @@
FinalAggregateOperatorDescriptor finalAggregator = new FinalAggregateOperatorDescriptor(spec,
configurationFactory, aggRdFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, finalAggregator, 1);
-
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
@@ -400,7 +397,7 @@
comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
NoOpOperationCallbackProvider.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
-
+
/** construct empty sink operator */
EmptySinkOperatorDescriptor emptySink3 = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink3);
@@ -409,7 +406,7 @@
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink4);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -418,20 +415,18 @@
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 0, globalSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
finalAggregator, 0);
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
-
+
spec.connect(new OneToOneConnectorDescriptor(spec), globalSort, 0, globalGby, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), globalGby, 0, materialize, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materialize, 0, postSuperStep, 0);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index c6dabb0..628e9ce 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -195,7 +195,7 @@
TerminationStateWriterOperatorDescriptor terminateWriter = new TerminationStateWriterOperatorDescriptor(spec,
configurationFactory, jobId);
PartitionConstraintHelper.addPartitionCountConstraint(spec, terminateWriter, 1);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
/**
* final aggregate write operator
@@ -239,18 +239,16 @@
spec.connect(new OneToOneConnectorDescriptor(spec), emptyTupleSource, 0, preSuperStep, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, scanner, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, localSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), scanner, 2,
finalAggregator, 0);
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 3, insertOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 4, deleteOp,
- 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), scanner, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
@@ -441,7 +439,7 @@
EmptySinkOperatorDescriptor emptySink4 = new EmptySinkOperatorDescriptor(spec);
ClusterConfig.setLocationConstraint(spec, emptySink4);
- ITuplePartitionComputerFactory hashPartitionComputerFactory = new MergePartitionComputerFactory();
+ ITuplePartitionComputerFactory unifyingPartitionComputerFactory = new MergePartitionComputerFactory();
ITuplePartitionComputerFactory partionFactory = new VertexIdPartitionComputerFactory(
rdUnnestedMessage.getFields()[0]);
@@ -450,16 +448,16 @@
spec.connect(new OneToOneConnectorDescriptor(spec), preSuperStep, 0, materializeRead, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), materializeRead, 0, join, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, localSort, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 1,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 1,
terminateWriter, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 2,
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, unifyingPartitionComputerFactory), join, 2,
finalAggregator, 0);
/**
* connect the insert/delete operator
*/
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 3, insertOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 3, insertOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), insertOp, 0, emptySink3, 0);
- spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), join, 4, deleteOp, 0);
+ spec.connect(new MToNPartitioningConnectorDescriptor(spec, partionFactory), join, 4, deleteOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), deleteOp, 0, emptySink4, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), localSort, 0, localGby, 0);
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
index 5773602..e54373f 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/GraphMutationVertex.java
@@ -47,7 +47,7 @@
if (newVertex == null) {
newVertex = new GraphMutationVertex();
}
- if (getVertexId().get() % 2 == 0) {
+ if (getVertexId().get() % 2 == 0 || getVertexId().get() % 3 == 0) {
deleteVertex(getVertexId());
} else {
vid.set(100 * getVertexId().get());
@@ -59,7 +59,7 @@
} else {
if (getVertexId().get() % 190 == 0) {
deleteVertex(getVertexId());
- }
+ }
voteToHalt();
}
}
diff --git a/pregelix/pregelix-example/src/test/resources/expected/GraphMutation.result b/pregelix/pregelix-example/src/test/resources/expected/GraphMutation.result
index 3f2fd60..a30166c 100644
--- a/pregelix/pregelix-example/src/test/resources/expected/GraphMutation.result
+++ b/pregelix/pregelix-example/src/test/resources/expected/GraphMutation.result
@@ -1,19 +1,13 @@
1 0.0
-3 0.0
5 0.0
7 0.0
-9 0.0
11 0.0
13 0.0
-15 0.0
17 0.0
19 0.0
100 0.0
-300 0.0
500 0.0
700 0.0
-900 0.0
1100 0.0
1300 0.0
-1500 0.0
1700 0.0