Use the new HyracksDatasetReader infrastructure to reuse the client rpc connection object for different queries that lets creating a new reader for every new job.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization_result_distribution@1303 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
index 2122ada..9a572c2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
@@ -19,14 +19,19 @@
 import edu.uci.ics.asterix.aql.parser.ParseException;
 import edu.uci.ics.asterix.aql.translator.AqlTranslator;
 import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.result.ResultReader;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
 
 public class APIServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
 
     private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
 
+    private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+
     private static final String HTML_HEADER_TEMPLATE = "<!DOCTYPE html>"
             + "<html lang=\"en\">"
             + "<head>"
@@ -329,6 +334,8 @@
         printInHtml(out, query);
         ServletContext context = getServletContext();
         IHyracksClientConnection hcc;
+        IHyracksDataset hds;
+
         try {
             synchronized (context) {
                 hcc = (IHyracksClientConnection) context.getAttribute(HYRACKS_CONNECTION_ATTR);
@@ -336,6 +343,12 @@
                     hcc = new HyracksConnection(strIP, port);
                     context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
                 }
+
+                hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
+                if (hds == null) {
+                    hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+                    context.setAttribute(HYRACKS_DATASET_ATTR, hds);
+                }
             }
             AQLParser parser = new AQLParser(query);
             List<Statement> aqlStatements = parser.Statement();
@@ -346,7 +359,7 @@
             AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, out, sessionConfig, format);
             double duration = 0;
             long startTime = System.currentTimeMillis();
-            aqlTranslator.compileAndExecute(hcc, false);
+            aqlTranslator.compileAndExecute(hcc, hds, false);
             long endTime = System.currentTimeMillis();
             duration = (endTime - startTime) / 1000.00;
             out.println("<PRE>Duration of all jobs: " + duration + "</PRE>");
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
index 2b0aff4..8eb5ec2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
@@ -30,14 +30,18 @@
 import edu.uci.ics.asterix.result.ResultUtils;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
 
 public class QueryResultAPIServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
 
     private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
 
+    private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+
     @Override
     public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
         String strHandle = request.getParameter("handle");
@@ -45,6 +49,8 @@
         response.setContentType("text/html");
         ServletContext context = getServletContext();
         IHyracksClientConnection hcc;
+        IHyracksDataset hds;
+
         try {
             HyracksProperties hp = new HyracksProperties();
             String strIP = hp.getHyracksIPAddress();
@@ -56,16 +62,19 @@
                     hcc = new HyracksConnection(strIP, port);
                     context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
                 }
+
+                hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
+                if (hds == null) {
+                    hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+                    context.setAttribute(HYRACKS_DATASET_ATTR, hds);
+                }
             }
             JSONObject handleObj = new JSONObject(strHandle);
             JSONArray handle = handleObj.getJSONArray("handle");
             JobId jobId = new JobId(handle.getLong(0));
             ResultSetId rsId = new ResultSetId(handle.getLong(1));
             ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
-            /* TODO(madhusudancs): We need to find a way to JSON serialize default format obtained from
-             * metadataProvider in the AQLTranslator and store it as part of the result handle.
-             */
-            ResultReader resultReader = new ResultReader(hcc);
+            ResultReader resultReader = new ResultReader(hcc, hds);
             resultReader.open(jobId, rsId);
             buffer.clear();
             JSONObject jsonResponse = new JSONObject();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java
index f36e4e3..9fd3bae 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java
@@ -28,14 +28,18 @@
 import edu.uci.ics.asterix.result.ResultReader;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
 
 public class QueryStatusAPIServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
 
     private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
 
+    private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+
     @Override
     public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
         String strHandle = request.getParameter("handle");
@@ -43,6 +47,8 @@
         response.setContentType("text/html");
         ServletContext context = getServletContext();
         IHyracksClientConnection hcc;
+        IHyracksDataset hds;
+
         try {
             HyracksProperties hp = new HyracksProperties();
             String strIP = hp.getHyracksIPAddress();
@@ -54,6 +60,12 @@
                     hcc = new HyracksConnection(strIP, port);
                     context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
                 }
+
+                hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
+                if (hds == null) {
+                    hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+                    context.setAttribute(HYRACKS_DATASET_ATTR, hds);
+                }
             }
             JSONObject handleObj = new JSONObject(strHandle);
             JSONArray handle = handleObj.getJSONArray("handle");
@@ -63,7 +75,7 @@
             /* TODO(madhusudancs): We need to find a way to JSON serialize default format obtained from
              * metadataProvider in the AQLTranslator and store it as part of the result handle.
              */
-            ResultReader resultReader = new ResultReader(hcc);
+            ResultReader resultReader = new ResultReader(hcc, hds);
             resultReader.open(jobId, rsId);
 
             JSONObject jsonResponse = new JSONObject();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
index f0aa3c5..2180555 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
@@ -16,7 +16,6 @@
 
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.io.StringReader;
 import java.util.List;
 
 import javax.servlet.ServletContext;
@@ -34,15 +33,20 @@
 import edu.uci.ics.asterix.aql.parser.ParseException;
 import edu.uci.ics.asterix.aql.translator.AqlTranslator;
 import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.result.ResultReader;
 import edu.uci.ics.asterix.result.ResultUtils;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
 
 abstract class RESTAPIServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
 
     private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
 
+    private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+
     @Override
     public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
         PrintWriter out = response.getWriter();
@@ -63,6 +67,7 @@
         response.setContentType("application/json");
         ServletContext context = getServletContext();
         IHyracksClientConnection hcc;
+        IHyracksDataset hds;
 
         try {
             HyracksProperties hp = new HyracksProperties();
@@ -75,6 +80,12 @@
                     hcc = new HyracksConnection(strIP, port);
                     context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
                 }
+
+                hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
+                if (hds == null) {
+                    hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+                    context.setAttribute(HYRACKS_DATASET_ATTR, hds);
+                }
             }
 
             AQLParser parser = new AQLParser(query);
@@ -88,7 +99,7 @@
 
             AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, out, sessionConfig, format);
 
-            aqlTranslator.compileAndExecute(hcc, asyncResults);
+            aqlTranslator.compileAndExecute(hcc, hds, asyncResults);
 
         } catch (ParseException pe) {
             StringBuilder errorMessage = new StringBuilder();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
index fb09299..3f773a1 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
@@ -68,7 +68,7 @@
                 generateBinaryRuntime, printJob);
 
         AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, writer, pc, DisplayFormat.TEXT);
-        aqlTranslator.compileAndExecute(hcc, false);
+        aqlTranslator.compileAndExecute(hcc, null, false);
         writer.flush();
     }
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 7248972..0499c4c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -109,6 +109,7 @@
 import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -149,14 +150,18 @@
 
     /**
      * Compiles and submits for execution a list of AQL statements.
+     * 
      * @param hcc
      *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
+     * @param hdc
+     *            A Hyracks dataset client object that is used to read the results.
      * @param asyncResults
      *            True if the results should be read asynchronously or false if we should wait for results to be read.
      * @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
      * @throws Exception
      */
-    public List<QueryResult> compileAndExecute(IHyracksClientConnection hcc, boolean asyncResults) throws Exception {
+    public List<QueryResult> compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, boolean asyncResults)
+            throws Exception {
         int resultSetIdCounter = 0;
         List<QueryResult> executionResult = new ArrayList<QueryResult>();
         FileSplit outputFile = null;
@@ -298,7 +303,7 @@
                         response.put("handle", handle);
                     } else {
                         ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
-                        ResultReader resultReader = new ResultReader(hcc);
+                        ResultReader resultReader = new ResultReader(hcc, hdc);
                         resultReader.open(jobId, metadataProvider.getResultSetId());
                         buffer.clear();
                         JSONArray results = new JSONArray();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
index a366abd..48bde1c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
@@ -14,49 +14,47 @@
  */
 package edu.uci.ics.asterix.result;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.client.dataset.DatasetClientContext;
-import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
 
 public class ResultReader {
-    private final DatasetClientContext datasetClientCtx;
-
     private final IHyracksDataset hyracksDataset;
 
+    private IHyracksDatasetReader reader;
+
     private IFrameTupleAccessor frameTupleAccessor;
 
     // Number of parallel result reader buffers
-    private static final int NUM_READERS = 1;
+    public static final int NUM_READERS = 1;
 
-    // 32K buffer size;
-    public static final int FRAME_SIZE = 32768;
+    public static final int FRAME_SIZE = GlobalConfig.getFrameSize();
 
-    public ResultReader(IHyracksClientConnection hcc) throws Exception {
-        datasetClientCtx = new DatasetClientContext(FRAME_SIZE);
-        hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, NUM_READERS);
+    public ResultReader(IHyracksClientConnection hcc, IHyracksDataset hdc) throws Exception {
+        hyracksDataset = hdc;
     }
 
-    public void open(JobId jobId, ResultSetId resultSetId) throws IOException, ClassNotFoundException {
-        hyracksDataset.open(jobId, resultSetId);
+    public void open(JobId jobId, ResultSetId resultSetId) throws HyracksDataException {
+        reader = hyracksDataset.createReader(jobId, resultSetId);
 
-        frameTupleAccessor = new ResultFrameTupleAccessor(datasetClientCtx.getFrameSize());
+        frameTupleAccessor = new ResultFrameTupleAccessor(FRAME_SIZE);
     }
 
     public Status getStatus() {
-        return hyracksDataset.getResultStatus();
+        return reader.getResultStatus();
     }
 
     public int read(ByteBuffer buffer) throws HyracksDataException {
-        return hyracksDataset.read(buffer);
+        return reader.read(buffer);
     }
 
     public IFrameTupleAccessor getFrameTupleAccessor() {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 7fa3659..ac775be 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -65,6 +65,7 @@
 import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
 import edu.uci.ics.asterix.runtime.transaction.TreeIndexInsertUpdateDeleteOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;