Remove all the serialization code from the result writer client library since it is now implemented by a utility.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization_result_distribution@1153 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
index 50de154..3ae7615 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
@@ -14,9 +14,7 @@
  */
 package edu.uci.ics.asterix.result;
 
-import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.asterix.formats.base.IDataFormat;
@@ -30,143 +28,50 @@
 import edu.uci.ics.hyracks.client.dataset.DatasetClientContext;
 import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
 
-public class ResultReader implements Runnable {
+public class ResultReader {
     private final DatasetClientContext datasetClientCtx;
 
     private final IHyracksDataset hyracksDataset;
 
-    private final PrintWriter responseWriter;
-
-    private final ByteBufferInputStream bbis;
-
-    private final DataInputStream di;
-
-    private final ByteBuffer readBuffer;
-
-    private IDataFormat format;
-
-    private int queryCount;
-
-    private JobId jobId;
-
-    private ResultSetId resultSetId;
+    private final IDataFormat format;
 
     private RecordDescriptor recordDescriptor;
 
     private FrameTupleAccessor frameTupleAccessor;
 
-    // 32K buffer size;
-    private static final int FRAME_SIZE = 32768;
-
     // Number of parallel result reader buffers
     private static final int NUM_READERS = 1;
 
-    private static final String DELIM = "; ";
+    // 32K buffer size;
+    public static final int FRAME_SIZE = 32768;
 
-    public ResultReader(IHyracksClientConnection hcc, PrintWriter responseWriter) throws Exception {
-        datasetClientCtx = new DatasetClientContext(FRAME_SIZE);
-
-        hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, NUM_READERS);
-
-        this.responseWriter = responseWriter;
-
-        bbis = new ByteBufferInputStream();
-        di = new DataInputStream(bbis);
-
-        readBuffer = datasetClientCtx.allocateFrame();
-
-        queryCount = 1;
-    }
-
-    public void setFormat(IDataFormat format) {
+    public ResultReader(IHyracksClientConnection hcc, IDataFormat format) throws Exception {
         this.format = format;
+
+        datasetClientCtx = new DatasetClientContext(FRAME_SIZE);
+        hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, NUM_READERS);
     }
 
-    public synchronized void notifyJobStart(JobId jobId, ResultSetId rsId) {
-        this.jobId = jobId;
-        this.resultSetId = rsId;
-        notifyAll();
+    public void open(JobId jobId, ResultSetId resultSetId) throws IOException, ClassNotFoundException {
+        hyracksDataset.open(jobId, resultSetId);
+        byte[] serializedRecordDescriptor = hyracksDataset.getSerializedRecordDescriptor();
+
+        recordDescriptor = (RecordDescriptor) JavaSerializationUtils.deserialize(serializedRecordDescriptor, format
+                .getSerdeProvider().getClass().getClassLoader());
+
+        frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(), recordDescriptor);
     }
 
-    @Override
-    public void run() {
-        synchronized (this) {
-            while (jobId == null || resultSetId == null) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    // Do nothing here
-                }
-            }
-        }
-
-        try {
-            hyracksDataset.open(jobId, resultSetId);
-            byte[] serializedRecordDescriptor = hyracksDataset.getSerializedRecordDescriptor();
-
-            recordDescriptor = (RecordDescriptor) JavaSerializationUtils.deserialize(serializedRecordDescriptor, format
-                    .getSerdeProvider().getClass().getClassLoader());
-
-            frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(), recordDescriptor);
-
-            int i = 0;
-
-            responseWriter.println("<H1>Result:</H1>");
-            responseWriter.println("<PRE>");
-            responseWriter.println("Query:" + queryCount++);
-
-            while (true) {
-                int size = hyracksDataset.read(readBuffer);
-
-                if (size <= 0) {
-                    break;
-                }
-                i += writeOutputLines(readBuffer);
-                if (i > 500) {
-                    responseWriter.println("...");
-                    responseWriter.println("SKIPPING THE REST OF THE RESULTS");
-                    break;
-                }
-            }
-
-            responseWriter.println();
-            responseWriter.println("</PRE>");
-        } catch (HyracksDataException e) {
-            // TODO(madhusudancs): Do something here
-            e.printStackTrace(responseWriter);
-        } catch (IOException e) {
-            // TODO(madhusudancs): Do something here
-            e.printStackTrace(responseWriter);
-        } catch (ClassNotFoundException e) {
-            // TODO(madhusudancs): Do something here
-            e.printStackTrace(responseWriter);
-        }
+    public int read(ByteBuffer buffer) throws HyracksDataException {
+        return hyracksDataset.read(buffer);
     }
 
-    private int writeOutputLines(ByteBuffer buffer) throws HyracksDataException {
-        int lineCount = 0;
-        try {
-            frameTupleAccessor.reset(buffer);
-            for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
-                int start = frameTupleAccessor.getTupleStartOffset(tIndex) + frameTupleAccessor.getFieldSlotsLength();
-                bbis.setByteBuffer(buffer, start);
-                Object[] record = new Object[recordDescriptor.getFieldCount()];
-                for (int i = 0; i < record.length; ++i) {
-                    Object instance = recordDescriptor.getFields()[i].deserialize(di);
-                    if (i == 0) {
-                        responseWriter.print(String.valueOf(instance));
-                    } else {
-                        responseWriter.print(DELIM + String.valueOf(instance));
-                    }
-                }
-                responseWriter.println();
-                lineCount++;
-            }
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-        return lineCount;
+    public FrameTupleAccessor getFrameTupleAccessor() {
+        return frameTupleAccessor;
+    }
+
+    public RecordDescriptor getRecordDescriptor() {
+        return recordDescriptor;
     }
 }