Merge fullstack_asterix_stabilization into fullstack_hyracks_result_distribution branch.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2862 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index aaa7186..31b6adc 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -42,6 +42,7 @@
 			</plugin>
 			<plugin>
 				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version>
 				<configuration>
 					<filesets>
 						<fileset>
@@ -143,5 +144,12 @@
 			<type>jar</type>
 			<scope>compile</scope>
 		</dependency>
+		<dependency>
+			<groupId>edu.uci.ics.hyracks</groupId>
+			<artifactId>hyracks-hdfs</artifactId>
+			<version>0.2.3-SNAPSHOT</version>
+			<type>jar</type>
+			<scope>compile</scope>
+		</dependency>
 	</dependencies>
 </project>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
index f1b98f5..cb0e339 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexFileScanOperatorDescriptor.java
@@ -17,8 +17,9 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import java.util.logging.Logger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
@@ -42,6 +43,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.hdfs2.dataflow.FileSplitsFactory;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 import edu.uci.ics.pregelix.api.io.VertexInputFormat;
 import edu.uci.ics.pregelix.api.io.VertexReader;
@@ -50,26 +52,36 @@
 
 @SuppressWarnings("rawtypes")
 public class VertexFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-    private static final Logger LOGGER = Logger.getLogger(VertexFileScanOperatorDescriptor.class.getName());
     private static final long serialVersionUID = 1L;
-    private final List<InputSplit> splits;
+    private final FileSplitsFactory splitsFactory;
     private final IConfigurationFactory confFactory;
     private final int fieldSize = 2;
+    private final String[] scheduledLocations;
+    private final boolean[] executed;
 
     /**
      * @param spec
      */
     public VertexFileScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rd, List<InputSplit> splits,
-            IConfigurationFactory confFactory) throws HyracksException {
+            String[] scheduledLocations, IConfigurationFactory confFactory) throws HyracksException {
         super(spec, 0, 1);
-        this.splits = splits;
+        List<FileSplit> fileSplits = new ArrayList<FileSplit>();
+        for (int i = 0; i < splits.size(); i++) {
+            fileSplits.add((FileSplit) splits.get(i));
+        }
+        this.splitsFactory = new FileSplitsFactory(fileSplits);
         this.confFactory = confFactory;
+        this.scheduledLocations = scheduledLocations;
+        this.executed = new boolean[scheduledLocations.length];
+        Arrays.fill(executed, false);
         this.recordDescriptors[0] = rd;
     }
 
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
+        final List<FileSplit> splits = splitsFactory.getSplits();
+
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
             private Configuration conf = confFactory.createConfiguration();
 
@@ -78,7 +90,21 @@
                 try {
                     Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
                     writer.open();
-                    loadVertices(ctx, partition);
+                    for (int i = 0; i < scheduledLocations.length; i++) {
+                        if (scheduledLocations[i].equals(ctx.getJobletContext().getApplicationContext().getNodeId())) {
+                            /**
+                             * pick one from the FileSplit queue
+                             */
+                            synchronized (executed) {
+                                if (!executed[i]) {
+                                    executed[i] = true;
+                                } else {
+                                    continue;
+                                }
+                            }
+                            loadVertices(ctx, i);
+                        }
+                    }
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
@@ -96,7 +122,7 @@
              * @throws InterruptedException
              */
             @SuppressWarnings("unchecked")
-            private void loadVertices(final IHyracksTaskContext ctx, int partitionId) throws IOException,
+            private void loadVertices(final IHyracksTaskContext ctx, int splitId) throws IOException,
                     ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
                 ByteBuffer frame = ctx.allocateFrame();
                 FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -104,14 +130,8 @@
 
                 VertexInputFormat vertexInputFormat = BspUtils.createVertexInputFormat(conf);
                 TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
-                InputSplit split = splits.get(partition);
+                InputSplit split = splits.get(splitId);
 
-                if (split instanceof FileSplit) {
-                    FileSplit fileSplit = (FileSplit) split;
-                    LOGGER.info("read file split: " + fileSplit.getPath() + " location:" + fileSplit.getLocations()[0]
-                            + " start:" + fileSplit.getStart() + " length:" + split.getLength() + " partition:"
-                            + partition);
-                }
                 VertexReader vertexReader = vertexInputFormat.createVertexReader(split, context);
                 vertexReader.initialize(split, context);
                 Vertex readerVertex = (Vertex) BspUtils.createVertex(conf);
@@ -122,7 +142,7 @@
                  * set context
                  */
                 Context mapperContext = new Mapper().new Context(conf, new TaskAttemptID(), null, null, null, null,
-                        splits.get(partition));
+                        splits.get(splitId));
                 Vertex.setContext(mapperContext);
 
                 /**
@@ -166,5 +186,4 @@
             }
         };
     }
-
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
new file mode 100644
index 0000000..d7cbb3a
--- /dev/null
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/VertexWriteOperatorDescriptor.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.dataflow;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.util.StringSerializationUtils;
+import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
+import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+
+public class VertexWriteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    private final FileSplit[] splits;
+    private final IRuntimeHookFactory preHookFactory;
+    private final IRuntimeHookFactory postHookFactory;
+    private final IRecordDescriptorFactory inputRdFactory;
+
+    public VertexWriteOperatorDescriptor(JobSpecification spec, IRecordDescriptorFactory inputRdFactory,
+            IFileSplitProvider fileSplitProvider, IRuntimeHookFactory preHookFactory,
+            IRuntimeHookFactory postHookFactory) {
+        super(spec, 1, 0);
+        this.splits = fileSplitProvider.getFileSplits();
+        this.preHookFactory = preHookFactory;
+        this.postHookFactory = postHookFactory;
+        this.inputRdFactory = inputRdFactory;
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+        IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+            private RecordDescriptor rd0;
+            private FrameDeserializer frameDeserializer;
+            private PrintWriter outputWriter;
+
+            @Override
+            public void open() throws HyracksDataException {
+                rd0 = inputRdFactory == null ? recordDescProvider.getInputRecordDescriptor(getActivityId(), 0)
+                        : inputRdFactory.createRecordDescriptor();
+                frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), rd0);
+                try {
+                    outputWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(splits[partition]
+                            .getLocalFile().getFile())));
+                    if (preHookFactory != null)
+                        preHookFactory.createRuntimeHook().configure(ctx);
+                } catch (IOException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+                frameDeserializer.reset(frame);
+                while (!frameDeserializer.done()) {
+                    Object[] tuple = frameDeserializer.deserializeRecord();
+                    // output the vertex
+                    outputWriter.print(StringSerializationUtils.toString(tuple[tuple.length - 1]));
+                    outputWriter.println();
+                }
+            }
+
+            @Override
+            public void fail() throws HyracksDataException {
+
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                if (postHookFactory != null)
+                    postHookFactory.createRuntimeHook().configure(ctx);
+                outputWriter.close();
+            }
+
+        };
+        return op;
+    }
+}
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 43b6d17..567e220 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -119,23 +119,23 @@
             Vertex.setNumEdges(numEdges);
             giraphJobIdToSuperStep.put(giraphJobId, superStep);
             giraphJobIdToMove.put(giraphJobId, false);
-            LOGGER.info("start iteration " + Vertex.getCurrentSuperstep());
+            LOGGER.info("start iteration " + Vertex.getSuperstep());
         }
         System.gc();
     }
 
     public synchronized void endSuperStep(String giraphJobId) {
         giraphJobIdToMove.put(giraphJobId, true);
-        LOGGER.info("end iteration " + Vertex.getCurrentSuperstep());
+        LOGGER.info("end iteration " + Vertex.getSuperstep());
     }
 
     @Override
     public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
         final FileReference fRef = ioManager.createWorkspaceFile(prefix);
-        List<FileReference> files = iterationToFiles.get(Vertex.getCurrentSuperstep());
+        List<FileReference> files = iterationToFiles.get(Vertex.getSuperstep());
         if (files == null) {
             files = new ArrayList<FileReference>();
-            iterationToFiles.put(Vertex.getCurrentSuperstep(), files);
+            iterationToFiles.put(Vertex.getSuperstep(), files);
         }
         files.add(fRef);
         return fRef;