Add support for chunked result transfers.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/SessionConfig.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/SessionConfig.java
index af0b4b4..bcab4a2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/SessionConfig.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/SessionConfig.java
@@ -9,10 +9,11 @@
private final boolean printPhysicalOpsOnly;
private final boolean generateJobSpec;
private final boolean printJob;
+ private final int bufferSize;
public SessionConfig(boolean optimize, boolean printExprParam, boolean printRewrittenExprParam,
boolean printLogicalPlanParam, boolean printOptimizedLogicalPlanParam, boolean printPhysicalOpsOnly,
- boolean generateJobSpec, boolean printJob) {
+ boolean generateJobSpec, boolean printJob, int bufferSize) {
this.optimize = optimize;
this.printExprParam = printExprParam;
this.printRewrittenExprParam = printRewrittenExprParam;
@@ -21,6 +22,7 @@
this.printPhysicalOpsOnly = printPhysicalOpsOnly;
this.generateJobSpec = generateJobSpec;
this.printJob = printJob;
+ this.bufferSize = bufferSize;
}
public boolean isPrintExprParam() {
@@ -54,4 +56,8 @@
public boolean isGenerateJobSpec() {
return generateJobSpec;
}
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
index 12fa51a..90c0bb0 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
@@ -23,6 +23,7 @@
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.result.ResultReader;
+import edu.uci.ics.asterix.result.ResultUtils;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
@@ -43,6 +44,8 @@
format = DisplayFormat.TEXT;
}
+ response.setBufferSize(ResultUtils.DEFAULT_BUFFER_SIZE);
+
String query = request.getParameter("query");
String printExprParam = request.getParameter("print-expr-tree");
String printRewrittenExprParam = request.getParameter("print-rewritten-expr-tree");
@@ -70,7 +73,8 @@
List<Statement> aqlStatements = parser.Statement();
SessionConfig sessionConfig = new SessionConfig(true, isSet(printExprParam),
isSet(printRewrittenExprParam), isSet(printLogicalPlanParam),
- isSet(printOptimizedLogicalPlanParam), false, true, isSet(printJob));
+ isSet(printOptimizedLogicalPlanParam), false, true, isSet(printJob),
+ ResultUtils.DEFAULT_BUFFER_SIZE);
MetadataManager.INSTANCE.init();
AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, out, sessionConfig, format);
double duration = 0;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
index c6708b3..a763300 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
@@ -16,7 +16,6 @@
import java.io.IOException;
import java.io.PrintWriter;
-import java.nio.ByteBuffer;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServlet;
@@ -26,6 +25,7 @@
import org.json.JSONArray;
import org.json.JSONObject;
+import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
import edu.uci.ics.asterix.result.ResultReader;
import edu.uci.ics.asterix.result.ResultUtils;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -46,6 +46,16 @@
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
response.setContentType("text/html");
response.setCharacterEncoding("utf-8");
+
+ String bufferSizeStr = request.getParameter("chunk_size");
+ int bufferSize;
+ try {
+ bufferSize = Integer.parseInt(bufferSizeStr);
+ } catch (Exception e) {
+ bufferSize = ResultUtils.DEFAULT_BUFFER_SIZE;
+ }
+ response.setBufferSize(bufferSize);
+
String strHandle = request.getParameter("handle");
PrintWriter out = response.getWriter();
ServletContext context = getServletContext();
@@ -74,17 +84,9 @@
JSONArray handle = handleObj.getJSONArray("handle");
JobId jobId = new JobId(handle.getLong(0));
ResultSetId rsId = new ResultSetId(handle.getLong(1));
- ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
ResultReader resultReader = new ResultReader(hcc, hds);
resultReader.open(jobId, rsId);
- buffer.clear();
- JSONObject jsonResponse = new JSONObject();
- JSONArray results = new JSONArray();
- while (resultReader.read(buffer) > 0) {
- results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor()));
- }
- jsonResponse.put("results", results);
- out.write(jsonResponse.toString());
+ ResultUtils.writeResult(out, resultReader, bufferSize, DisplayFormat.JSON);
} catch (Exception e) {
out.println(e.getMessage());
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
index 29feb5e..5622713 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
@@ -50,6 +50,14 @@
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
+ String bufferSizeStr = request.getParameter("chunk_size");
+ int bufferSize;
+ try {
+ bufferSize = Integer.parseInt(bufferSizeStr);
+ } catch (Exception e) {
+ bufferSize = ResultUtils.DEFAULT_BUFFER_SIZE;
+ }
+ response.setBufferSize(bufferSize);
PrintWriter out = response.getWriter();
@@ -86,7 +94,8 @@
if (checkForbiddenStatements(aqlStatements, out)) {
return;
}
- SessionConfig sessionConfig = new SessionConfig(true, false, false, false, false, false, true, false);
+ SessionConfig sessionConfig = new SessionConfig(true, false, false, false, false, false, true, false,
+ bufferSize);
MetadataManager.INSTANCE.init();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
index cb786d7..30aa174 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
@@ -14,6 +14,7 @@
import edu.uci.ics.asterix.aql.translator.AqlTranslator;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.result.ResultUtils;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -63,7 +64,8 @@
MetadataManager.INSTANCE.init();
SessionConfig pc = new SessionConfig(optimize, false, printRewrittenExpressions, printLogicalPlan,
- printOptimizedPlan, printPhysicalOpsOnly, generateBinaryRuntime, printJob);
+ printOptimizedPlan, printPhysicalOpsOnly, generateBinaryRuntime, printJob,
+ ResultUtils.DEFAULT_BUFFER_SIZE);
AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, writer, pc, DisplayFormat.TEXT);
aqlTranslator.compileAndExecute(hcc, null, false);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 976ec7c..d802fc9 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -16,7 +16,6 @@
import java.io.File;
import java.io.PrintWriter;
-import java.nio.ByteBuffer;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -1350,34 +1349,24 @@
GlobalConfig.ASTERIX_LOGGER.info(compiled.toJSON().toString(1));
JobId jobId = runJob(hcc, compiled, false);
- JSONObject response = new JSONObject();
if (asyncResults) {
+ JSONObject response = new JSONObject();
JSONArray handle = new JSONArray();
handle.put(jobId.getId());
handle.put(metadataProvider.getResultSetId().getId());
response.put("handle", handle);
+ out.print(response);
+ out.flush();
} else {
- ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
+ if (pdf == DisplayFormat.HTML) {
+ out.println("<pre>");
+ }
ResultReader resultReader = new ResultReader(hcc, hdc);
resultReader.open(jobId, metadataProvider.getResultSetId());
- buffer.clear();
- JSONArray results = new JSONArray();
- while (resultReader.read(buffer) > 0) {
- results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor()));
- buffer.clear();
- }
- response.put("results", results);
- }
- switch (pdf) {
- case HTML:
- out.println("<pre>");
- ResultUtils.prettyPrintHTML(out, response);
+ ResultUtils.writeResult(out, resultReader, sessionConfig.getBufferSize(), pdf);
+ if (pdf == DisplayFormat.HTML) {
out.println("</pre>");
- break;
- case TEXT:
- case JSON:
- out.print(response);
- break;
+ }
}
hcc.waitForCompletion(jobId);
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
index dec3128..398692e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
@@ -22,33 +22,73 @@
import org.json.JSONException;
import org.json.JSONObject;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
public class ResultUtils {
- public static JSONArray getJSONFromBuffer(ByteBuffer buffer, IFrameTupleAccessor fta) throws HyracksDataException {
- JSONArray resultRecords = new JSONArray();
- ByteBufferInputStream bbis = new ByteBufferInputStream();
+ // Default content length is 65536.
+ public static final int DEFAULT_BUFFER_SIZE = 65536;
- try {
+ // About 100 bytes of the response buffer is left for HTTP data.
+ public static int HTTP_DATA_LENTH = 100;
+
+ public static void writeResult(PrintWriter out, ResultReader resultReader, int bufferSize, DisplayFormat pdf)
+ throws IOException, HyracksDataException {
+ JsonFactory jsonFactory = new JsonFactory();
+ ByteArrayAccessibleOutputStream baos = new ByteArrayAccessibleOutputStream(bufferSize);
+ baos.reset();
+
+ JsonGenerator generator = jsonFactory.createGenerator(baos);
+
+ ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
+ buffer.clear();
+
+ IFrameTupleAccessor fta = resultReader.getFrameTupleAccessor();
+
+ String response;
+
+ generator.writeStartObject();
+ generator.writeArrayFieldStart("results");
+ while (resultReader.read(buffer) > 0) {
fta.reset(buffer);
for (int tIndex = 0; tIndex < fta.getTupleCount(); tIndex++) {
int start = fta.getTupleStartOffset(tIndex);
int length = fta.getTupleEndOffset(tIndex) - start;
- bbis.setByteBuffer(buffer, start);
- byte[] recordBytes = new byte[length];
- bbis.read(recordBytes, 0, length);
- resultRecords.put(new String(recordBytes, 0, length));
+ if (pdf == DisplayFormat.HTML) {
+ response = new String(buffer.array(), start, length);
+ out.print(response);
+ } else {
+ if ((baos.size() + length + HTTP_DATA_LENTH) > bufferSize) {
+ generator.writeEndArray();
+ generator.writeEndObject();
+ generator.close();
+ response = new String(baos.getByteArray(), 0, baos.size());
+ out.print(response);
+ out.flush();
+ baos.reset();
+ generator = jsonFactory.createGenerator(baos);
+ generator.writeStartObject();
+ generator.writeArrayFieldStart("results");
+ }
+ generator.writeUTF8String(buffer.array(), start, length);
+ }
}
- } finally {
- try {
- bbis.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ buffer.clear();
}
- return resultRecords;
+ if (pdf != DisplayFormat.HTML) {
+ generator.writeEndArray();
+ generator.writeEndObject();
+ generator.close();
+ response = new String(baos.getByteArray(), 0, baos.size());
+ out.print(response);
+ }
+
+ out.flush();
}
public static JSONObject getErrorResponse(int errorCode, String errorMessage) {
@@ -63,20 +103,4 @@
}
return errorResp;
}
-
- public static void prettyPrintHTML(PrintWriter out, JSONObject jsonResultObj) {
- JSONArray resultsWrapper;
- JSONArray resultsArray;
- try {
- resultsWrapper = jsonResultObj.getJSONArray("results");
- for (int i = 0; i < resultsWrapper.length(); i++) {
- resultsArray = resultsWrapper.getJSONArray(i);
- for (int j = 0; j < resultsArray.length(); j++) {
- out.print(resultsArray.getString(j));
- }
- }
- } catch (JSONException e) {
- // TODO(madhusudancs): Figure out what to do when JSONException occurs while building the results.
- }
- }
}