remove patched hadoop code for pregelix
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@2854 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index 9454ece..31b6adc 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -144,5 +144,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-hdfs</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index 0736e1d..cb0e339 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -17,9 +17,9 @@
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -43,6 +43,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
@@ -51,9 +52,8 @@
@SuppressWarnings("rawtypes")
public class VertexFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final Logger LOGGER = Logger.getLogger(VertexFileScanOperatorDescriptor.class.getName());
private static final long serialVersionUID = 1L;
- private final List<InputSplit> splits;
+ private final FileSplitsFactory splitsFactory;
private final IConfigurationFactory confFactory;
private final int fieldSize = 2;
private final String[] scheduledLocations;
@@ -65,7 +65,11 @@
public VertexFileScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, List<InputSplit> splits,
String[] scheduledLocations, IConfigurationFactory confFactory) throws HyracksException {
super(spec, 0, 1);
- this.splits = splits;
+ List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+ for (int i = 0; i < splits.size(); i++) {
+ fileSplits.add((FileSplit) splits.get(i));
+ }
+ this.splitsFactory = new FileSplitsFactory(fileSplits);
this.confFactory = confFactory;
this.scheduledLocations = scheduledLocations;
this.executed = new boolean[scheduledLocations.length];
@@ -76,6 +80,8 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
+ final List<FileSplit> splits = splitsFactory.getSplits();
+
return new AbstractUnaryOutputSourceOperatorNodePushable() {
private Configuration conf = confFactory.createConfiguration();
@@ -116,7 +122,7 @@
* @throws InterruptedException
*/
@SuppressWarnings("unchecked")
- private void loadVertices(final IHyracksTaskContext ctx, int partitionId) throws IOException,
+ private void loadVertices(final IHyracksTaskContext ctx, int splitId) throws IOException,
ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
ByteBuffer frame = ctx.allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -124,14 +130,8 @@
VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
- InputSplit split = splits.get(partitionId);
+ InputSplit split = splits.get(splitId);
- if (split instanceof FileSplit) {
- FileSplit fileSplit = (FileSplit) split;
- LOGGER.info("read file split: " + fileSplit.getPath() + " location:" + fileSplit.getLocations()[0]
- + " start:" + fileSplit.getStart() + " length:" + split.getLength() + " partition:"
- + partitionId);
- }
VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
vertexReader.initialize(split, context);
Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
@@ -142,7 +142,7 @@
* set context
*/
Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- splits.get(partitionId));
+ splits.get(splitId));
Vertex.setContext(mapperContext);
/**