cross merge fullstack_release_candidate into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@3208 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-dataflow/pom.xml b/fullstack/pregelix/pregelix-dataflow/pom.xml
index aaa7186..d3f396e 100644
--- a/fullstack/pregelix/pregelix-dataflow/pom.xml
+++ b/fullstack/pregelix/pregelix-dataflow/pom.xml
@@ -1,14 +1,15 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>pregelix-dataflow</artifactId>
<packaging>jar</packaging>
<name>pregelix-dataflow</name>
<parent>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>pregelix</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>pregelix</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
<properties>
@@ -22,8 +23,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
<plugin>
@@ -42,6 +44,7 @@
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version>
<configuration>
<filesets>
<fileset>
@@ -102,13 +105,6 @@
<version>0.2.3-SNAPSHOT</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.20.2</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-storage-am-common</artifactId>
<version>0.2.3-SNAPSHOT</version>
diff --git a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
index d29afca..ae47ed8 100644
--- a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
+++ b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/ConnectorPolicyAssignmentPolicy.java
@@ -15,25 +15,44 @@
package edu.uci.ics.pregelix.dataflow;
+import org.apache.commons.lang3.tuple.Pair;
+
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedBlockingConnectorPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
private static final long serialVersionUID = 1L;
- private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy senderSideMatPipPolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+ private IConnectorPolicy senderSideMatBlkPolicy = new SendSideMaterializedBlockingConnectorPolicy();
private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+ private JobSpecification spec;
+
+ public ConnectorPolicyAssignmentPolicy(JobSpecification spec) {
+ this.spec = spec;
+ }
@Override
public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
int[] fanouts) {
if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
- return senderSideMaterializePolicy;
+ return senderSideMatPipPolicy;
} else {
- return pipeliningPolicy;
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> endPoints = spec
+ .getConnectorOperatorMap().get(c.getConnectorId());
+ IOperatorDescriptor consumer = endPoints.getRight().getLeft();
+ if (consumer instanceof TreeIndexInsertUpdateDeleteOperatorDescriptor) {
+ return senderSideMatBlkPolicy;
+ } else {
+ return pipeliningPolicy;
+ }
}
}
}
diff --git a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
index c25e4c6..2402748 100644
--- a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
+++ b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/HDFSFileWriteOperatorDescriptor.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.pregelix.dataflow;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -23,9 +25,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -37,6 +37,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexOutputFormat;
import edu.uci.ics.pregelix.api.io.VertexWriter;
@@ -64,21 +65,24 @@
return new AbstractUnaryInputSinkOperatorNodePushable() {
private RecordDescriptor rd0;
private FrameDeserializer frameDeserializer;
- private Configuration conf = confFactory.createConfiguration();
+ private Configuration conf;
private VertexWriter vertexWriter;
private TaskAttemptContext context;
private String TEMP_DIR = "_temporary";
+ private ClassLoader ctxCL;
+ private ContextFactory ctxFactory = new ContextFactory();
@Override
public void open() throws HyracksDataException {
rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
: inputRdFactory.createRecordDescriptor();
frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ ctxCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ conf = confFactory.createConfiguration();
VertexOutputFormat outputFormat = BspUtils.createVertexOutputFormat(conf);
- TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
- context = new TaskAttemptContext(conf, tid);
+ context = ctxFactory.createContext(conf, partition);
try {
vertexWriter = outputFormat.createVertexWriter(context);
} catch (InterruptedException e) {
@@ -107,7 +111,7 @@
@Override
public void fail() throws HyracksDataException {
-
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
@Override
@@ -124,36 +128,63 @@
private void moveFilesToFinalPath() throws HyracksDataException {
try {
- JobContext job = new JobContext(conf, new JobID("0", 0));
+ JobContext job = ctxFactory.createJobContext(conf);
Path outputPath = FileOutputFormat.getOutputPath(job);
FileSystem dfs = FileSystem.get(conf);
Path filePath = new Path(outputPath, "part-" + new Integer(partition).toString());
- FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
- @Override
- public boolean accept(Path dir) {
- return dir.getName().endsWith(TEMP_DIR);
- }
- });
- Path tempDir = tempPaths[0].getPath();
- FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
- @Override
- public boolean accept(Path dir) {
- return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
- }
- });
- Path srcDir = results[0].getPath();
- if (!dfs.exists(srcDir))
- throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
-
- FileStatus[] srcFiles = dfs.listStatus(srcDir);
- Path srcFile = srcFiles[0].getPath();
- dfs.delete(filePath, true);
- dfs.rename(srcFile, filePath);
+ FileStatus[] results = findPartitionPaths(outputPath, dfs);
+ if (results.length >= 1) {
+ /**
+ * for Hadoop-0.20.2
+ */
+ renameFile(dfs, filePath, results);
+ } else {
+ /**
+ * for Hadoop-0.23.1
+ */
+ int jobId = job.getJobID().getId();
+ outputPath = new Path(outputPath.toString() + File.separator + TEMP_DIR + File.separator
+ + jobId);
+ results = findPartitionPaths(outputPath, dfs);
+ renameFile(dfs, filePath, results);
+ }
} catch (IOException e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
+ private FileStatus[] findPartitionPaths(Path outputPath, FileSystem dfs) throws FileNotFoundException,
+ IOException {
+ FileStatus[] tempPaths = dfs.listStatus(outputPath, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().endsWith(TEMP_DIR);
+ }
+ });
+ Path tempDir = tempPaths[0].getPath();
+ FileStatus[] results = dfs.listStatus(tempDir, new PathFilter() {
+ @Override
+ public boolean accept(Path dir) {
+ return dir.getName().indexOf(context.getTaskAttemptID().toString()) >= 0;
+ }
+ });
+ return results;
+ }
+
+ private void renameFile(FileSystem dfs, Path filePath, FileStatus[] results) throws IOException,
+ HyracksDataException, FileNotFoundException {
+ Path srcDir = results[0].getPath();
+ if (!dfs.exists(srcDir))
+ throw new HyracksDataException("file " + srcDir.toString() + " does not exist!");
+
+ FileStatus[] srcFiles = dfs.listStatus(srcDir);
+ Path srcFile = srcFiles[0].getPath();
+ dfs.delete(filePath, true);
+ dfs.rename(srcFile, filePath);
+ }
+
};
}
}
diff --git a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index f1b98f5..0da7baf 100644
--- a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -17,17 +17,15 @@
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -42,6 +40,8 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs.ContextFactory;
+import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
import edu.uci.ics.pregelix.api.graph.Vertex;
import edu.uci.ics.pregelix.api.io.VertexInputFormat;
import edu.uci.ics.pregelix.api.io.VertexReader;
@@ -50,38 +50,67 @@
@SuppressWarnings("rawtypes")
public class VertexFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final Logger LOGGER = Logger.getLogger(VertexFileScanOperatorDescriptor.class.getName());
private static final long serialVersionUID = 1L;
- private final List<InputSplit> splits;
+ private final FileSplitsFactory splitsFactory;
private final IConfigurationFactory confFactory;
private final int fieldSize = 2;
+ private final String[] scheduledLocations;
+ private final boolean[] executed;
/**
* @param spec
*/
public VertexFileScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, List<InputSplit> splits,
- IConfigurationFactory confFactory) throws HyracksException {
+ String[] scheduledLocations, IConfigurationFactory confFactory) throws HyracksException {
super(spec, 0, 1);
- this.splits = splits;
+ List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+ for (int i = 0; i < splits.size(); i++) {
+ fileSplits.add((FileSplit) splits.get(i));
+ }
+ this.splitsFactory = new FileSplitsFactory(fileSplits);
this.confFactory = confFactory;
+ this.scheduledLocations = scheduledLocations;
+ this.executed = new boolean[scheduledLocations.length];
+ Arrays.fill(executed, false);
this.recordDescriptors[0] = rd;
}
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
+ final List<FileSplit> splits = splitsFactory.getSplits();
+
return new AbstractUnaryOutputSourceOperatorNodePushable() {
- private Configuration conf = confFactory.createConfiguration();
+ private ClassLoader ctxCL;
+ private ContextFactory ctxFactory = new ContextFactory();
@Override
public void initialize() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ Configuration conf = confFactory.createConfiguration();
writer.open();
- loadVertices(ctx, partition);
+ for (int i = 0; i < scheduledLocations.length; i++) {
+ if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
+ /**
+ * pick one from the FileSplit queue
+ */
+ synchronized (executed) {
+ if (!executed[i]) {
+ executed[i] = true;
+ } else {
+ continue;
+ }
+ }
+ loadVertices(ctx, conf, i);
+ }
+ }
writer.close();
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
@@ -96,24 +125,19 @@
* @throws InterruptedException
*/
@SuppressWarnings("unchecked")
- private void loadVertices(final IHyracksTaskContext ctx, int partitionId) throws IOException,
- ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
+ private void loadVertices(final IHyracksTaskContext ctx, Configuration conf, int splitId)
+ throws IOException, ClassNotFoundException, InterruptedException, InstantiationException,
+ IllegalAccessException {
ByteBuffer frame = ctx.allocateFrame();
FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
appender.reset(frame, true);
VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
- TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
- InputSplit split = splits.get(partition);
+ InputSplit split = splits.get(splitId);
+ TaskAttemptContext mapperContext = ctxFactory.createContext(conf, splitId);
- if (split instanceof FileSplit) {
- FileSplit fileSplit = (FileSplit) split;
- LOGGER.info("read file split: " + fileSplit.getPath() + " location:" + fileSplit.getLocations()[0]
- + " start:" + fileSplit.getStart() + " length:" + split.getLength() + " partition:"
- + partition);
- }
- VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
- vertexReader.initialize(split, context);
+ VertexReader vertexReader = vertexInputFormat.createVertexReader(split, mapperContext);
+ vertexReader.initialize(split, mapperContext);
Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldSize);
DataOutput dos = tb.getDataOutput();
@@ -121,8 +145,6 @@
/**
* set context
*/
- Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
- splits.get(partition));
Vertex.setContext(mapperContext);
/**
@@ -166,5 +188,4 @@
}
};
}
-
}
\ No newline at end of file
diff --git a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
new file mode 100644
index 0000000..d7cbb3a
--- /dev/null
+++ b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.util.StringSerializationUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+
+public class VertexWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final FileSplit[] splits;
+ private final IRuntimeHookFactory preHookFactory;
+ private final IRuntimeHookFactory postHookFactory;
+ private final IRecordDescriptorFactory inputRdFactory;
+
+ public VertexWriteOperatorDescriptor(JobSpecification spec, IRecordDescriptorFactory inputRdFactory,
+ IFileSplitProvider fileSplitProvider, IRuntimeHookFactory preHookFactory,
+ IRuntimeHookFactory postHookFactory) {
+ super(spec, 1, 0);
+ this.splits = fileSplitProvider.getFileSplits();
+ this.preHookFactory = preHookFactory;
+ this.postHookFactory = postHookFactory;
+ this.inputRdFactory = inputRdFactory;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ private RecordDescriptor rd0;
+ private FrameDeserializer frameDeserializer;
+ private PrintWriter outputWriter;
+
+ @Override
+ public void open() throws HyracksDataException {
+ rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+ : inputRdFactory.createRecordDescriptor();
+ frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+ try {
+ outputWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(splits[partition]
+ .getLocalFile().getFile())));
+ if (preHookFactory != null)
+ preHookFactory.createRuntimeHook().configure(ctx);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+ frameDeserializer.reset(frame);
+ while (!frameDeserializer.done()) {
+ Object[] tuple = frameDeserializer.deserializeRecord();
+ // output the vertex
+ outputWriter.print(StringSerializationUtils.toString(tuple[tuple.length - 1]));
+ outputWriter.println();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (postHookFactory != null)
+ postHookFactory.createRuntimeHook().configure(ctx);
+ outputWriter.close();
+ }
+
+ };
+ return op;
+ }
+}
diff --git a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 43b6d17..567e220 100644
--- a/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/fullstack/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -119,23 +119,23 @@
Vertex.setNumEdges(numEdges);
giraphJobIdToSuperStep.put(giraphJobId, superStep);
giraphJobIdToMove.put(giraphJobId, false);
- LOGGER.info("start iteration " + Vertex.getCurrentSuperstep());
+ LOGGER.info("start iteration " + Vertex.getSuperstep());
}
System.gc();
}
public synchronized void endSuperStep(String giraphJobId) {
giraphJobIdToMove.put(giraphJobId, true);
- LOGGER.info("end iteration " + Vertex.getCurrentSuperstep());
+ LOGGER.info("end iteration " + Vertex.getSuperstep());
}
@Override
public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
final FileReference fRef = ioManager.createWorkspaceFile(prefix);
- List<FileReference> files = iterationToFiles.get(Vertex.getCurrentSuperstep());
+ List<FileReference> files = iterationToFiles.get(Vertex.getSuperstep());
if (files == null) {
files = new ArrayList<FileReference>();
- iterationToFiles.put(Vertex.getCurrentSuperstep(), files);
+ iterationToFiles.put(Vertex.getSuperstep(), files);
}
files.add(fRef);
return fRef;