Implement the metadata provider method to let algebricks compile the result writer operator instead of print file operator to distribute the results.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization_result_distribution@1144 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index d92c76d..7a65070 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -16,6 +16,7 @@
 package edu.uci.ics.asterix.metadata.declared;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -90,6 +91,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
@@ -97,6 +99,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
@@ -119,6 +122,7 @@
     private Map<String, String> config;
     private IAWriterFactory writerFactory;
     private FileSplit outputFile;
+    private ResultSetId resultSetId;
     private long jobTxnId;
 
     private final Dataverse defaultDataverse;
@@ -183,6 +187,14 @@
         this.outputFile = outputFile;
     }
 
+    public ResultSetId getResultSetId() {
+        return resultSetId;
+    }
+
+    public void setResultSetId(ResultSetId resultSetId) {
+        this.resultSetId = resultSetId;
+    }
+
     @Override
     public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
         AqlSourceId aqlId = (AqlSourceId) id;
@@ -541,6 +553,25 @@
     }
 
     @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
+            RecordDescriptor recordDescriptor, boolean ordered, JobSpecification spec) throws AlgebricksException {
+        ResultSetDataSink rsds = (ResultSetDataSink) sink;
+        ResultSetSinkId rssId = (ResultSetSinkId) rsds.getId();
+        ResultSetId rsId = rssId.getResultSetId();
+        String nodeName = rssId.getResultNodeName();
+
+        ResultWriterOperatorDescriptor resultWriter = null;
+        try {
+            resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, ordered, recordDescriptor);
+        } catch (IOException e) {
+            throw new AlgebricksException(e);
+        }
+
+        AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeName });
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(resultWriter, apc);
+    }
+
+    @Override
     public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
             throws AlgebricksException {
         AqlDataSource ads = findDataSource(dataSourceId);