remove patched hadoop code for pregelix
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2854 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
index aa75019..4fa0164 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/ConfFactory.java
@@ -11,7 +11,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@SuppressWarnings("deprecation")
-class ConfFactory implements Serializable {
+public class ConfFactory implements Serializable {
private static final long serialVersionUID = 1L;
private byte[] confBytes;
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
index e7227d2..9cc9ebc 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/InputSplitsFactory.java
@@ -28,7 +28,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@SuppressWarnings({ "deprecation", "rawtypes" })
-class InputSplitsFactory implements Serializable {
+public class InputSplitsFactory implements Serializable {
private static final long serialVersionUID = 1L;
private byte[] splitBytes;
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java
index 01fc2f3..14dc70c 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/FileSplitsFactory.java
@@ -30,7 +30,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@SuppressWarnings("rawtypes")
-class FileSplitsFactory implements Serializable {
+public class FileSplitsFactory implements Serializable {
private static final long serialVersionUID = 1L;
private byte[] splitBytes;
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 22559e6..8a1ba6d 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -125,7 +125,6 @@
synchronized (executed) {
if (executed[i] == false) {
executed[i] = true;
- System.out.println("thread " + Thread.currentThread().getId() + " setting " + i);
} else {
continue;
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
index 7179737..ea33691 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/io/BasicGenInputSplit.java
@@ -21,60 +21,64 @@
import java.io.Serializable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* This InputSplit will not give any ordering or location data. It is used
* internally by BspInputFormat (which determines how many tasks to run the
* application on). Users should not use this directly.
*/
-public class BasicGenInputSplit extends InputSplit implements Writable, Serializable {
- private static final long serialVersionUID = 1L;
- /** Number of splits */
- private int numSplits = -1;
- /** Split index */
- private int splitIndex = -1;
+public class BasicGenInputSplit extends FileSplit implements Writable,
+ Serializable {
+ private static final long serialVersionUID = 1L;
+ /** Number of splits */
+ private int numSplits = -1;
+ /** Split index */
+ private int splitIndex = -1;
- public BasicGenInputSplit() {
- }
+ public BasicGenInputSplit() {
+ super(null, 0, 0, null);
+ }
- public BasicGenInputSplit(int splitIndex, int numSplits) {
- this.splitIndex = splitIndex;
- this.numSplits = numSplits;
- }
+ public BasicGenInputSplit(int splitIndex, int numSplits) {
+ super(null, 0, 0, null);
+ this.splitIndex = splitIndex;
+ this.numSplits = numSplits;
+ }
- @Override
- public long getLength() throws IOException, InterruptedException {
- return 0;
- }
+ @Override
+ public long getLength() {
+ return 0;
+ }
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- return new String[] {};
- }
+ @Override
+ public String[] getLocations() throws IOException {
+ return new String[] {};
+ }
- @Override
- public void readFields(DataInput in) throws IOException {
- splitIndex = in.readInt();
- numSplits = in.readInt();
- }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ splitIndex = in.readInt();
+ numSplits = in.readInt();
+ }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(splitIndex);
- out.writeInt(numSplits);
- }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(splitIndex);
+ out.writeInt(numSplits);
+ }
- public int getSplitIndex() {
- return splitIndex;
- }
+ public int getSplitIndex() {
+ return splitIndex;
+ }
- public int getNumSplits() {
- return numSplits;
- }
+ public int getNumSplits() {
+ return numSplits;
+ }
- @Override
- public String toString() {
- return "'" + getClass().getCanonicalName() + ", index=" + getSplitIndex() + ", num=" + getNumSplits();
- }
+ @Override
+ public String toString() {
+ return "'" + getClass().getCanonicalName() + ", index="
+ + getSplitIndex() + ", num=" + getNumSplits();
+ }
}
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 1618810..34faa82 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -1,4 +1,5 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-core</artifactId>
<packaging>jar</packaging>
@@ -57,20 +58,6 @@
</includes>
</configuration>
</execution>
- <execution>
- <id>patch</id>
- <goals>
- <goal>jar</goal>
- </goals>
- <phase>package</phase>
- <configuration>
- <classifier>patch</classifier>
- <finalName>a-hadoop</finalName>
- <includes>
- <include>**/org/apache/**</include>
- </includes>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
@@ -158,25 +145,6 @@
</resources>
</configuration>
</execution>
- <execution>
- <id>copy-hadoop-patch</id>
- <!-- here the phase you need -->
- <phase>package</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>target/appassembler/lib</outputDirectory>
- <resources>
- <resource>
- <directory>target</directory>
- <includes>
- <include>a-hadoop-patch.jar</include>
- </includes>
- </resource>
- </resources>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
@@ -185,7 +153,8 @@
<version>2.7.2</version>
<configuration>
<forkMode>pertest</forkMode>
- <argLine>-enableassertions -Xmx512m -XX:MaxPermSize=300m -Dfile.encoding=UTF-8
+ <argLine>-enableassertions -Xmx512m -XX:MaxPermSize=300m
+ -Dfile.encoding=UTF-8
-Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
<includes>
<include>**/*TestSuite.java</include>
diff --git a/pregelix/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java b/pregelix/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
deleted file mode 100644
index 5efdde8..0000000
--- a/pregelix/pregelix-core/src/main/java/org/apache/hadoop/fs/Path.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Names a file or directory in a {@link FileSystem}. Path strings use slash as
- * the directory separator. A path string is absolute if it begins with a slash.
- */
-@SuppressWarnings("rawtypes")
-public class Path implements Comparable, Serializable {
- private static final long serialVersionUID = 1L;
- /** The directory separator, a slash. */
- public static final String SEPARATOR = "/";
- public static final char SEPARATOR_CHAR = '/';
-
- public static final String CUR_DIR = ".";
-
- static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
-
- private URI uri; // a hierarchical uri
-
- /** Resolve a child path against a parent path. */
- public Path(String parent, String child) {
- this(new Path(parent), new Path(child));
- }
-
- /** Resolve a child path against a parent path. */
- public Path(Path parent, String child) {
- this(parent, new Path(child));
- }
-
- /** Resolve a child path against a parent path. */
- public Path(String parent, Path child) {
- this(new Path(parent), child);
- }
-
- /** Resolve a child path against a parent path. */
- public Path(Path parent, Path child) {
- // Add a slash to parent's path so resolution is compatible with URI's
- URI parentUri = parent.uri;
- String parentPath = parentUri.getPath();
- if (!(parentPath.equals("/") || parentPath.equals("")))
- try {
- parentUri = new URI(parentUri.getScheme(), parentUri.getAuthority(), parentUri.getPath() + "/", null,
- parentUri.getFragment());
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- URI resolved = parentUri.resolve(child.uri);
- initialize(resolved.getScheme(), resolved.getAuthority(), normalizePath(resolved.getPath()),
- resolved.getFragment());
- }
-
- private void checkPathArg(String path) {
- // disallow construction of a Path from an empty string
- if (path == null) {
- throw new IllegalArgumentException("Can not create a Path from a null string");
- }
- if (path.length() == 0) {
- throw new IllegalArgumentException("Can not create a Path from an empty string");
- }
- }
-
- /**
- * Construct a path from a String. Path strings are URIs, but with unescaped
- * elements and some additional normalization.
- */
- public Path(String pathString) {
- checkPathArg(pathString);
-
- // We can't use 'new URI(String)' directly, since it assumes things are
- // escaped, which we don't require of Paths.
-
- // add a slash in front of paths with Windows drive letters
- if (hasWindowsDrive(pathString, false))
- pathString = "/" + pathString;
-
- // parse uri components
- String scheme = null;
- String authority = null;
-
- int start = 0;
-
- // parse uri scheme, if any
- int colon = pathString.indexOf(':');
- int slash = pathString.indexOf('/');
- if ((colon != -1) && ((slash == -1) || (colon < slash))) { // has a
- // scheme
- scheme = pathString.substring(0, colon);
- start = colon + 1;
- }
-
- // parse uri authority, if any
- if (pathString.startsWith("//", start) && (pathString.length() - start > 2)) { // has
- // authority
- int nextSlash = pathString.indexOf('/', start + 2);
- int authEnd = nextSlash > 0 ? nextSlash : pathString.length();
- authority = pathString.substring(start + 2, authEnd);
- start = authEnd;
- }
-
- // uri path is the rest of the string -- query & fragment not supported
- String path = pathString.substring(start, pathString.length());
-
- initialize(scheme, authority, path, null);
- }
-
- /** Construct a Path from components. */
- public Path(String scheme, String authority, String path) {
- checkPathArg(path);
- initialize(scheme, authority, path, null);
- }
-
- /**
- * Construct a path from a URI
- */
- public Path(URI aUri) {
- uri = aUri;
- }
-
- private void initialize(String scheme, String authority, String path, String fragment) {
- try {
- this.uri = new URI(scheme, authority, normalizePath(path), null, fragment).normalize();
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- private String normalizePath(String path) {
- // remove double slashes & backslashes
- if (path.indexOf("//") != -1) {
- path = path.replace("//", "/");
- }
- if (path.indexOf("\\") != -1) {
- path = path.replace("\\", "/");
- }
-
- // trim trailing slash from non-root path (ignoring windows drive)
- int minLength = hasWindowsDrive(path, true) ? 4 : 1;
- if (path.length() > minLength && path.endsWith("/")) {
- path = path.substring(0, path.length() - 1);
- }
-
- return path;
- }
-
- private boolean hasWindowsDrive(String path, boolean slashed) {
- if (!WINDOWS)
- return false;
- int start = slashed ? 1 : 0;
- return path.length() >= start + 2
- && (slashed ? path.charAt(0) == '/' : true)
- && path.charAt(start + 1) == ':'
- && ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') || (path.charAt(start) >= 'a' && path
- .charAt(start) <= 'z'));
- }
-
- /** Convert this to a URI. */
- public URI toUri() {
- return uri;
- }
-
- /** Return the FileSystem that owns this Path. */
- public FileSystem getFileSystem(Configuration conf) throws IOException {
- return FileSystem.get(this.toUri(), conf);
- }
-
- /** True if the directory of this path is absolute. */
- public boolean isAbsolute() {
- int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0;
- return uri.getPath().startsWith(SEPARATOR, start);
- }
-
- /** Returns the final component of this path. */
- public String getName() {
- String path = uri.getPath();
- int slash = path.lastIndexOf(SEPARATOR);
- return path.substring(slash + 1);
- }
-
- /** Returns the parent of a path or null if at root. */
- public Path getParent() {
- String path = uri.getPath();
- int lastSlash = path.lastIndexOf('/');
- int start = hasWindowsDrive(path, true) ? 3 : 0;
- if ((path.length() == start) || // empty path
- (lastSlash == start && path.length() == start + 1)) { // at root
- return null;
- }
- String parent;
- if (lastSlash == -1) {
- parent = CUR_DIR;
- } else {
- int end = hasWindowsDrive(path, true) ? 3 : 0;
- parent = path.substring(0, lastSlash == end ? end + 1 : lastSlash);
- }
- return new Path(uri.getScheme(), uri.getAuthority(), parent);
- }
-
- /** Adds a suffix to the final name in the path. */
- public Path suffix(String suffix) {
- return new Path(getParent(), getName() + suffix);
- }
-
- public String toString() {
- // we can't use uri.toString(), which escapes everything, because we
- // want
- // illegal characters unescaped in the string, for glob processing, etc.
- StringBuffer buffer = new StringBuffer();
- if (uri.getScheme() != null) {
- buffer.append(uri.getScheme());
- buffer.append(":");
- }
- if (uri.getAuthority() != null) {
- buffer.append("//");
- buffer.append(uri.getAuthority());
- }
- if (uri.getPath() != null) {
- String path = uri.getPath();
- if (path.indexOf('/') == 0 && hasWindowsDrive(path, true) && // has
- // windows
- // drive
- uri.getScheme() == null && // but no scheme
- uri.getAuthority() == null) // or authority
- path = path.substring(1); // remove slash before drive
- buffer.append(path);
- }
- if (uri.getFragment() != null) {
- buffer.append("#");
- buffer.append(uri.getFragment());
- }
- return buffer.toString();
- }
-
- public boolean equals(Object o) {
- if (!(o instanceof Path)) {
- return false;
- }
- Path that = (Path) o;
- return this.uri.equals(that.uri);
- }
-
- public int hashCode() {
- return uri.hashCode();
- }
-
- public int compareTo(Object o) {
- Path that = (Path) o;
- return this.uri.compareTo(that.uri);
- }
-
- /** Return the number of elements in this path. */
- public int depth() {
- String path = uri.getPath();
- int depth = 0;
- int slash = path.length() == 1 && path.charAt(0) == '/' ? -1 : 0;
- while (slash != -1) {
- depth++;
- slash = path.indexOf(SEPARATOR, slash + 1);
- }
- return depth;
- }
-
- /** Returns a qualified path object. */
- public Path makeQualified(FileSystem fs) {
- Path path = this;
- if (!isAbsolute()) {
- path = new Path(fs.getWorkingDirectory(), this);
- }
-
- URI pathUri = path.toUri();
- URI fsUri = fs.getUri();
-
- String scheme = pathUri.getScheme();
- String authority = pathUri.getAuthority();
- String fragment = pathUri.getFragment();
- if (scheme != null && (authority != null || fsUri.getAuthority() == null))
- return path;
-
- if (scheme == null) {
- scheme = fsUri.getScheme();
- }
-
- if (authority == null) {
- authority = fsUri.getAuthority();
- if (authority == null) {
- authority = "";
- }
- }
-
- URI newUri = null;
- try {
- newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- return new Path(newUri);
- }
-
- /** Returns a qualified path object. */
- public Path makeQualified(URI defaultUri, Path workingDir) {
- Path path = this;
- if (!isAbsolute()) {
- path = new Path(workingDir, this);
- }
-
- URI pathUri = path.toUri();
-
- String scheme = pathUri.getScheme();
- String authority = pathUri.getAuthority();
- String fragment = pathUri.getFragment();
-
- if (scheme != null && (authority != null || defaultUri.getAuthority() == null))
- return path;
-
- if (scheme == null) {
- scheme = defaultUri.getScheme();
- }
-
- if (authority == null) {
- authority = defaultUri.getAuthority();
- if (authority == null) {
- authority = "";
- }
- }
-
- URI newUri = null;
- try {
- newUri = new URI(scheme, authority, normalizePath(pathUri.getPath()), null, fragment);
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(e);
- }
- return new Path(newUri);
- }
-}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java b/pregelix/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
deleted file mode 100644
index ac72160..0000000
--- a/pregelix/pregelix-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * <code>InputSplit</code> represents the data to be processed by an individual {@link Mapper}.
- * <p>
- * Typically, it presents a byte-oriented view on the input and is the responsibility of {@link RecordReader} of the job to process this and present a record-oriented view.
- *
- * @see InputFormat
- * @see RecordReader
- */
-public abstract class InputSplit implements Serializable {
- private static final long serialVersionUID = 1L;
-
- /**
- * Get the size of the split, so that the input splits can be sorted by
- * size.
- *
- * @return the number of bytes in the split
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract long getLength() throws IOException, InterruptedException;
-
- /**
- * Get the list of nodes by name where the data for the split would be
- * local. The locations do not need to be serialized.
- *
- * @return a new array of the node nodes.
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract String[] getLocations() throws IOException, InterruptedException;
-}
\ No newline at end of file
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/pregelix b/pregelix/pregelix-core/src/main/resources/scripts/pregelix
index c3fd27b..6997078 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/pregelix
+++ b/pregelix/pregelix-core/src/main/resources/scripts/pregelix
@@ -91,7 +91,7 @@
REPO="$BASEDIR"/lib
fi
-CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:"$BASEDIR"/etc:$(echo ${REPO}/*.jar | tr ' ' ':'):$1
+CLASSPATH=$CLASSPATH_PREFIX:"$HADOOP_HOME"/conf:"$BASEDIR"/etc:$1
# For Cygwin, switch paths to Windows format before running java
if $cygwin; then
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 9454ece..31b6adc 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -144,5 +144,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index 0736e1d..cb0e339 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -17,9 +17,9 @@
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -43,6 +43,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
@@ -51,9 +52,8 @@
@SuppressWarnings("rawtypes")
public class VertexFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final Logger LOGGER = Logger.getLogger(VertexFileScanOperatorDescriptor.class.getName());
private static final long serialVersionUID = 1L;
- private final List<InputSplit> splits;
+ private final FileSplitsFactory splitsFactory;
private final IConfigurationFactory confFactory;
private final int fieldSize = 2;
private final String[] scheduledLocations;
@@ -65,7 +65,11 @@
public VertexFileScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, List<InputSplit> splits,
String[] scheduledLocations, IConfigurationFactory confFactory) throws HyracksException {
super(spec, 0, 1);
- this.splits = splits;
+ List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+ for (int i = 0; i < splits.size(); i++) {
+ fileSplits.add((FileSplit) splits.get(i));
+ }
+ this.splitsFactory = new FileSplitsFactory(fileSplits);
this.confFactory = confFactory;
this.scheduledLocations = scheduledLocations;
this.executed = new boolean[scheduledLocations.length];
@@ -76,6 +80,8 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
+ final List<FileSplit> splits = splitsFactory.getSplits();
+
return new AbstractUnaryOutputSourceOperatorNodePushable() {
private Configuration conf = confFactory.createConfiguration();
@@ -116,7 +122,7 @@
* @throws InterruptedException
*/
@SuppressWarnings("unchecked")
- private void loadVertices(final IHyracksTaskContext ctx, int partitionId) throws IOException,
+ private void loadVertices(final IHyracksTaskContext ctx, int splitId) throws IOException,
ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
ByteBuffer frame = ctx.allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -124,14 +130,8 @@
VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
- InputSplit split = splits.get(partitionId);
+ InputSplit split = splits.get(splitId);
- if (split instanceof FileSplit) {
- FileSplit fileSplit = (FileSplit) split;
- LOGGER.info("read file split: " + fileSplit.getPath() + " location:" + fileSplit.getLocations()[0]
- + " start:" + fileSplit.getStart() + " length:" + split.getLength() + " partition:"
- + partitionId);
- }
VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
vertexReader.initialize(split, context);
Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
@@ -142,7 +142,7 @@
* set context
*/
Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- splits.get(partitionId));
+ splits.get(splitId));
Vertex.setContext(mapperContext);
/**