Use the new HyracksDatasetReader infrastructure to reuse the client rpc connection object for different queries that lets creating a new reader for every new job.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization_result_distribution@1303 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
index 2122ada..9a572c2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
@@ -19,14 +19,19 @@
import edu.uci.ics.asterix.aql.parser.ParseException;
import edu.uci.ics.asterix.aql.translator.AqlTranslator;
import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.result.ResultReader;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
public class APIServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
+ private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+
private static final String HTML_HEADER_TEMPLATE = "<!DOCTYPE html>"
+ "<html lang=\"en\">"
+ "<head>"
@@ -329,6 +334,8 @@
printInHtml(out, query);
ServletContext context = getServletContext();
IHyracksClientConnection hcc;
+ IHyracksDataset hds;
+
try {
synchronized (context) {
hcc = (IHyracksClientConnection) context.getAttribute(HYRACKS_CONNECTION_ATTR);
@@ -336,6 +343,12 @@
hcc = new HyracksConnection(strIP, port);
context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
}
+
+ hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
+ if (hds == null) {
+ hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+ context.setAttribute(HYRACKS_DATASET_ATTR, hds);
+ }
}
AQLParser parser = new AQLParser(query);
List<Statement> aqlStatements = parser.Statement();
@@ -346,7 +359,7 @@
AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, out, sessionConfig, format);
double duration = 0;
long startTime = System.currentTimeMillis();
- aqlTranslator.compileAndExecute(hcc, false);
+ aqlTranslator.compileAndExecute(hcc, hds, false);
long endTime = System.currentTimeMillis();
duration = (endTime - startTime) / 1000.00;
out.println("<PRE>Duration of all jobs: " + duration + "</PRE>");
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
index 2b0aff4..8eb5ec2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
@@ -30,14 +30,18 @@
import edu.uci.ics.asterix.result.ResultUtils;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
public class QueryResultAPIServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
+ private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
String strHandle = request.getParameter("handle");
@@ -45,6 +49,8 @@
response.setContentType("text/html");
ServletContext context = getServletContext();
IHyracksClientConnection hcc;
+ IHyracksDataset hds;
+
try {
HyracksProperties hp = new HyracksProperties();
String strIP = hp.getHyracksIPAddress();
@@ -56,16 +62,19 @@
hcc = new HyracksConnection(strIP, port);
context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
}
+
+ hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
+ if (hds == null) {
+ hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+ context.setAttribute(HYRACKS_DATASET_ATTR, hds);
+ }
}
JSONObject handleObj = new JSONObject(strHandle);
JSONArray handle = handleObj.getJSONArray("handle");
JobId jobId = new JobId(handle.getLong(0));
ResultSetId rsId = new ResultSetId(handle.getLong(1));
ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
- /* TODO(madhusudancs): We need to find a way to JSON serialize default format obtained from
- * metadataProvider in the AQLTranslator and store it as part of the result handle.
- */
- ResultReader resultReader = new ResultReader(hcc);
+ ResultReader resultReader = new ResultReader(hcc, hds);
resultReader.open(jobId, rsId);
buffer.clear();
JSONObject jsonResponse = new JSONObject();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java
index f36e4e3..9fd3bae 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryStatusAPIServlet.java
@@ -28,14 +28,18 @@
import edu.uci.ics.asterix.result.ResultReader;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
public class QueryStatusAPIServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
+ private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
String strHandle = request.getParameter("handle");
@@ -43,6 +47,8 @@
response.setContentType("text/html");
ServletContext context = getServletContext();
IHyracksClientConnection hcc;
+ IHyracksDataset hds;
+
try {
HyracksProperties hp = new HyracksProperties();
String strIP = hp.getHyracksIPAddress();
@@ -54,6 +60,12 @@
hcc = new HyracksConnection(strIP, port);
context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
}
+
+ hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
+ if (hds == null) {
+ hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+ context.setAttribute(HYRACKS_DATASET_ATTR, hds);
+ }
}
JSONObject handleObj = new JSONObject(strHandle);
JSONArray handle = handleObj.getJSONArray("handle");
@@ -63,7 +75,7 @@
/* TODO(madhusudancs): We need to find a way to JSON serialize default format obtained from
* metadataProvider in the AQLTranslator and store it as part of the result handle.
*/
- ResultReader resultReader = new ResultReader(hcc);
+ ResultReader resultReader = new ResultReader(hcc, hds);
resultReader.open(jobId, rsId);
JSONObject jsonResponse = new JSONObject();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
index f0aa3c5..2180555 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
@@ -16,7 +16,6 @@
import java.io.IOException;
import java.io.PrintWriter;
-import java.io.StringReader;
import java.util.List;
import javax.servlet.ServletContext;
@@ -34,15 +33,20 @@
import edu.uci.ics.asterix.aql.parser.ParseException;
import edu.uci.ics.asterix.aql.translator.AqlTranslator;
import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.result.ResultReader;
import edu.uci.ics.asterix.result.ResultUtils;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
abstract class RESTAPIServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private static final String HYRACKS_CONNECTION_ATTR = "edu.uci.ics.asterix.HYRACKS_CONNECTION";
+ private static final String HYRACKS_DATASET_ATTR = "edu.uci.ics.asterix.HYRACKS_DATASET";
+
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
PrintWriter out = response.getWriter();
@@ -63,6 +67,7 @@
response.setContentType("application/json");
ServletContext context = getServletContext();
IHyracksClientConnection hcc;
+ IHyracksDataset hds;
try {
HyracksProperties hp = new HyracksProperties();
@@ -75,6 +80,12 @@
hcc = new HyracksConnection(strIP, port);
context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
}
+
+ hds = (IHyracksDataset) context.getAttribute(HYRACKS_DATASET_ATTR);
+ if (hds == null) {
+ hds = new HyracksDataset(hcc, ResultReader.FRAME_SIZE, ResultReader.NUM_READERS);
+ context.setAttribute(HYRACKS_DATASET_ATTR, hds);
+ }
}
AQLParser parser = new AQLParser(query);
@@ -88,7 +99,7 @@
AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, out, sessionConfig, format);
- aqlTranslator.compileAndExecute(hcc, asyncResults);
+ aqlTranslator.compileAndExecute(hcc, hds, asyncResults);
} catch (ParseException pe) {
StringBuilder errorMessage = new StringBuilder();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
index fb09299..3f773a1 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
@@ -68,7 +68,7 @@
generateBinaryRuntime, printJob);
AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, writer, pc, DisplayFormat.TEXT);
- aqlTranslator.compileAndExecute(hcc, false);
+ aqlTranslator.compileAndExecute(hcc, null, false);
writer.flush();
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 7248972..0499c4c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -109,6 +109,7 @@
import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobId;
@@ -149,14 +150,18 @@
/**
* Compiles and submits for execution a list of AQL statements.
+ *
* @param hcc
* A Hyracks client connection that is used to submit a jobspec to Hyracks.
+ * @param hdc
+ * A Hyracks dataset client object that is used to read the results.
* @param asyncResults
* True if the results should be read asynchronously or false if we should wait for results to be read.
* @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
* @throws Exception
*/
- public List<QueryResult> compileAndExecute(IHyracksClientConnection hcc, boolean asyncResults) throws Exception {
+ public List<QueryResult> compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, boolean asyncResults)
+ throws Exception {
int resultSetIdCounter = 0;
List<QueryResult> executionResult = new ArrayList<QueryResult>();
FileSplit outputFile = null;
@@ -298,7 +303,7 @@
response.put("handle", handle);
} else {
ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
- ResultReader resultReader = new ResultReader(hcc);
+ ResultReader resultReader = new ResultReader(hcc, hdc);
resultReader.open(jobId, metadataProvider.getResultSetId());
buffer.clear();
JSONArray results = new JSONArray();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
index a366abd..48bde1c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
@@ -14,49 +14,47 @@
*/
package edu.uci.ics.asterix.result;
-import java.io.IOException;
import java.nio.ByteBuffer;
+import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.client.dataset.DatasetClientContext;
-import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
public class ResultReader {
- private final DatasetClientContext datasetClientCtx;
-
private final IHyracksDataset hyracksDataset;
+ private IHyracksDatasetReader reader;
+
private IFrameTupleAccessor frameTupleAccessor;
// Number of parallel result reader buffers
- private static final int NUM_READERS = 1;
+ public static final int NUM_READERS = 1;
- // 32K buffer size;
- public static final int FRAME_SIZE = 32768;
+ public static final int FRAME_SIZE = GlobalConfig.getFrameSize();
- public ResultReader(IHyracksClientConnection hcc) throws Exception {
- datasetClientCtx = new DatasetClientContext(FRAME_SIZE);
- hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, NUM_READERS);
+ public ResultReader(IHyracksClientConnection hcc, IHyracksDataset hdc) throws Exception {
+ hyracksDataset = hdc;
}
- public void open(JobId jobId, ResultSetId resultSetId) throws IOException, ClassNotFoundException {
- hyracksDataset.open(jobId, resultSetId);
+ public void open(JobId jobId, ResultSetId resultSetId) throws HyracksDataException {
+ reader = hyracksDataset.createReader(jobId, resultSetId);
- frameTupleAccessor = new ResultFrameTupleAccessor(datasetClientCtx.getFrameSize());
+ frameTupleAccessor = new ResultFrameTupleAccessor(FRAME_SIZE);
}
public Status getStatus() {
- return hyracksDataset.getResultStatus();
+ return reader.getResultStatus();
}
public int read(ByteBuffer buffer) throws HyracksDataException {
- return hyracksDataset.read(buffer);
+ return reader.read(buffer);
}
public IFrameTupleAccessor getFrameTupleAccessor() {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 7fa3659..ac775be 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -65,6 +65,7 @@
import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
import edu.uci.ics.asterix.runtime.transaction.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;