Add support for chunked result transfers.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/SessionConfig.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/SessionConfig.java
index af0b4b4..bcab4a2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/SessionConfig.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/SessionConfig.java
@@ -9,10 +9,11 @@
     private final boolean printPhysicalOpsOnly;
     private final boolean generateJobSpec;
     private final boolean printJob;
+    private final int bufferSize;
 
     public SessionConfig(boolean optimize, boolean printExprParam, boolean printRewrittenExprParam,
             boolean printLogicalPlanParam, boolean printOptimizedLogicalPlanParam, boolean printPhysicalOpsOnly,
-            boolean generateJobSpec, boolean printJob) {
+            boolean generateJobSpec, boolean printJob, int bufferSize) {
         this.optimize = optimize;
         this.printExprParam = printExprParam;
         this.printRewrittenExprParam = printRewrittenExprParam;
@@ -21,6 +22,7 @@
         this.printPhysicalOpsOnly = printPhysicalOpsOnly;
         this.generateJobSpec = generateJobSpec;
         this.printJob = printJob;
+        this.bufferSize = bufferSize;
     }
 
     public boolean isPrintExprParam() {
@@ -54,4 +56,8 @@
     public boolean isGenerateJobSpec() {
         return generateJobSpec;
     }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
 }
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
index 12fa51a..90c0bb0 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.asterix.common.config.GlobalConfig;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.result.ResultReader;
+import edu.uci.ics.asterix.result.ResultUtils;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
@@ -43,6 +44,8 @@
             format = DisplayFormat.TEXT;
         }
 
+        response.setBufferSize(ResultUtils.DEFAULT_BUFFER_SIZE);
+
         String query = request.getParameter("query");
         String printExprParam = request.getParameter("print-expr-tree");
         String printRewrittenExprParam = request.getParameter("print-rewritten-expr-tree");
@@ -70,7 +73,8 @@
             List<Statement> aqlStatements = parser.Statement();
             SessionConfig sessionConfig = new SessionConfig(true, isSet(printExprParam),
                     isSet(printRewrittenExprParam), isSet(printLogicalPlanParam),
-                    isSet(printOptimizedLogicalPlanParam), false, true, isSet(printJob));
+                    isSet(printOptimizedLogicalPlanParam), false, true, isSet(printJob),
+                    ResultUtils.DEFAULT_BUFFER_SIZE);
             MetadataManager.INSTANCE.init();
             AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, out, sessionConfig, format);
             double duration = 0;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
index c6708b3..a763300 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryResultAPIServlet.java
@@ -16,7 +16,6 @@
 
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.nio.ByteBuffer;
 
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServlet;
@@ -26,6 +25,7 @@
 import org.json.JSONArray;
 import org.json.JSONObject;
 
+import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
 import edu.uci.ics.asterix.result.ResultReader;
 import edu.uci.ics.asterix.result.ResultUtils;
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -46,6 +46,16 @@
     public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
         response.setContentType("text/html");
         response.setCharacterEncoding("utf-8");
+
+        String bufferSizeStr = request.getParameter("chunk_size");
+        int bufferSize;
+        try {
+            bufferSize = Integer.parseInt(bufferSizeStr);
+        } catch (Exception e) {
+            bufferSize = ResultUtils.DEFAULT_BUFFER_SIZE;
+        }
+        response.setBufferSize(bufferSize);
+
         String strHandle = request.getParameter("handle");
         PrintWriter out = response.getWriter();
         ServletContext context = getServletContext();
@@ -74,17 +84,9 @@
             JSONArray handle = handleObj.getJSONArray("handle");
             JobId jobId = new JobId(handle.getLong(0));
             ResultSetId rsId = new ResultSetId(handle.getLong(1));
-            ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
             ResultReader resultReader = new ResultReader(hcc, hds);
             resultReader.open(jobId, rsId);
-            buffer.clear();
-            JSONObject jsonResponse = new JSONObject();
-            JSONArray results = new JSONArray();
-            while (resultReader.read(buffer) > 0) {
-                results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor()));
-            }
-            jsonResponse.put("results", results);
-            out.write(jsonResponse.toString());
+            ResultUtils.writeResult(out, resultReader, bufferSize, DisplayFormat.JSON);
 
         } catch (Exception e) {
             out.println(e.getMessage());
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
index 29feb5e..5622713 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/RESTAPIServlet.java
@@ -50,6 +50,14 @@
     public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
         response.setContentType("application/json");
         response.setCharacterEncoding("utf-8");
+        String bufferSizeStr = request.getParameter("chunk_size");
+        int bufferSize;
+        try {
+            bufferSize = Integer.parseInt(bufferSizeStr);
+        } catch (Exception e) {
+            bufferSize = ResultUtils.DEFAULT_BUFFER_SIZE;
+        }
+        response.setBufferSize(bufferSize);
 
         PrintWriter out = response.getWriter();
 
@@ -86,7 +94,8 @@
             if (checkForbiddenStatements(aqlStatements, out)) {
                 return;
             }
-            SessionConfig sessionConfig = new SessionConfig(true, false, false, false, false, false, true, false);
+            SessionConfig sessionConfig = new SessionConfig(true, false, false, false, false, false, true, false,
+                    bufferSize);
 
             MetadataManager.INSTANCE.init();
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
index cb786d7..30aa174 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/java/AsterixJavaClient.java
@@ -14,6 +14,7 @@
 import edu.uci.ics.asterix.aql.translator.AqlTranslator;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.result.ResultUtils;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 
@@ -63,7 +64,8 @@
         MetadataManager.INSTANCE.init();
 
         SessionConfig pc = new SessionConfig(optimize, false, printRewrittenExpressions, printLogicalPlan,
-                printOptimizedPlan, printPhysicalOpsOnly, generateBinaryRuntime, printJob);
+                printOptimizedPlan, printPhysicalOpsOnly, generateBinaryRuntime, printJob,
+                ResultUtils.DEFAULT_BUFFER_SIZE);
 
         AqlTranslator aqlTranslator = new AqlTranslator(aqlStatements, writer, pc, DisplayFormat.TEXT);
         aqlTranslator.compileAndExecute(hcc, null, false);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 976ec7c..d802fc9 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -16,7 +16,6 @@
 
 import java.io.File;
 import java.io.PrintWriter;
-import java.nio.ByteBuffer;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -1350,34 +1349,24 @@
                 GlobalConfig.ASTERIX_LOGGER.info(compiled.toJSON().toString(1));
                 JobId jobId = runJob(hcc, compiled, false);
 
-                JSONObject response = new JSONObject();
                 if (asyncResults) {
+                    JSONObject response = new JSONObject();
                     JSONArray handle = new JSONArray();
                     handle.put(jobId.getId());
                     handle.put(metadataProvider.getResultSetId().getId());
                     response.put("handle", handle);
+                    out.print(response);
+                    out.flush();
                 } else {
-                    ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
+                    if (pdf == DisplayFormat.HTML) {
+                        out.println("<pre>");
+                    }
                     ResultReader resultReader = new ResultReader(hcc, hdc);
                     resultReader.open(jobId, metadataProvider.getResultSetId());
-                    buffer.clear();
-                    JSONArray results = new JSONArray();
-                    while (resultReader.read(buffer) > 0) {
-                        results.put(ResultUtils.getJSONFromBuffer(buffer, resultReader.getFrameTupleAccessor()));
-                        buffer.clear();
-                    }
-                    response.put("results", results);
-                }
-                switch (pdf) {
-                    case HTML:
-                        out.println("<pre>");
-                        ResultUtils.prettyPrintHTML(out, response);
+                    ResultUtils.writeResult(out, resultReader, sessionConfig.getBufferSize(), pdf);
+                    if (pdf == DisplayFormat.HTML) {
                         out.println("</pre>");
-                        break;
-                    case TEXT:
-                    case JSON:
-                        out.print(response);
-                        break;
+                    }
                 }
                 hcc.waitForCompletion(jobId);
             }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
index dec3128..398692e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
@@ -22,33 +22,73 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
 
 public class ResultUtils {
-    public static JSONArray getJSONFromBuffer(ByteBuffer buffer, IFrameTupleAccessor fta) throws HyracksDataException {
-        JSONArray resultRecords = new JSONArray();
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
+    // Default content length is 65536.
+    public static final int DEFAULT_BUFFER_SIZE = 65536;
 
-        try {
+    // About 100 bytes of the response buffer is left for HTTP data.
+    public static int HTTP_DATA_LENTH = 100;
+
+    public static void writeResult(PrintWriter out, ResultReader resultReader, int bufferSize, DisplayFormat pdf)
+            throws IOException, HyracksDataException {
+        JsonFactory jsonFactory = new JsonFactory();
+        ByteArrayAccessibleOutputStream baos = new ByteArrayAccessibleOutputStream(bufferSize);
+        baos.reset();
+
+        JsonGenerator generator = jsonFactory.createGenerator(baos);
+
+        ByteBuffer buffer = ByteBuffer.allocate(ResultReader.FRAME_SIZE);
+        buffer.clear();
+
+        IFrameTupleAccessor fta = resultReader.getFrameTupleAccessor();
+
+        String response;
+
+        generator.writeStartObject();
+        generator.writeArrayFieldStart("results");
+        while (resultReader.read(buffer) > 0) {
             fta.reset(buffer);
             for (int tIndex = 0; tIndex < fta.getTupleCount(); tIndex++) {
                 int start = fta.getTupleStartOffset(tIndex);
                 int length = fta.getTupleEndOffset(tIndex) - start;
-                bbis.setByteBuffer(buffer, start);
-                byte[] recordBytes = new byte[length];
-                bbis.read(recordBytes, 0, length);
-                resultRecords.put(new String(recordBytes, 0, length));
+                if (pdf == DisplayFormat.HTML) {
+                    response = new String(buffer.array(), start, length);
+                    out.print(response);
+                } else {
+                    if ((baos.size() + length + HTTP_DATA_LENTH) > bufferSize) {
+                        generator.writeEndArray();
+                        generator.writeEndObject();
+                        generator.close();
+                        response = new String(baos.getByteArray(), 0, baos.size());
+                        out.print(response);
+                        out.flush();
+                        baos.reset();
+                        generator = jsonFactory.createGenerator(baos);
+                        generator.writeStartObject();
+                        generator.writeArrayFieldStart("results");
+                    }
+                    generator.writeUTF8String(buffer.array(), start, length);
+                }
             }
-        } finally {
-            try {
-                bbis.close();
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
+            buffer.clear();
         }
-        return resultRecords;
+        if (pdf != DisplayFormat.HTML) {
+            generator.writeEndArray();
+            generator.writeEndObject();
+            generator.close();
+            response = new String(baos.getByteArray(), 0, baos.size());
+            out.print(response);
+        }
+
+        out.flush();
     }
 
     public static JSONObject getErrorResponse(int errorCode, String errorMessage) {
@@ -63,20 +103,4 @@
         }
         return errorResp;
     }
-
-    public static void prettyPrintHTML(PrintWriter out, JSONObject jsonResultObj) {
-        JSONArray resultsWrapper;
-        JSONArray resultsArray;
-        try {
-            resultsWrapper = jsonResultObj.getJSONArray("results");
-            for (int i = 0; i < resultsWrapper.length(); i++) {
-                resultsArray = resultsWrapper.getJSONArray(i);
-                for (int j = 0; j < resultsArray.length(); j++) {
-                    out.print(resultsArray.getString(j));
-                }
-            }
-        } catch (JSONException e) {
-            // TODO(madhusudancs): Figure out what to do when JSONException occurs while building the results.
-        }
-    }
 }