Implement the basic ResultReader client API that uses the new result distribution infrastructure.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization_result_distribution@1138 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
new file mode 100644
index 0000000..50de154
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.result;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.asterix.formats.base.IDataFormat;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.client.dataset.DatasetClientContext;
+import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class ResultReader implements Runnable {
+    private final DatasetClientContext datasetClientCtx;
+
+    private final IHyracksDataset hyracksDataset;
+
+    private final PrintWriter responseWriter;
+
+    private final ByteBufferInputStream bbis;
+
+    private final DataInputStream di;
+
+    private final ByteBuffer readBuffer;
+
+    private IDataFormat format;
+
+    private int queryCount;
+
+    private JobId jobId;
+
+    private ResultSetId resultSetId;
+
+    private RecordDescriptor recordDescriptor;
+
+    private FrameTupleAccessor frameTupleAccessor;
+
+    // 32K buffer size;
+    private static final int FRAME_SIZE = 32768;
+
+    // Number of parallel result reader buffers
+    private static final int NUM_READERS = 1;
+
+    private static final String DELIM = "; ";
+
+    public ResultReader(IHyracksClientConnection hcc, PrintWriter responseWriter) throws Exception {
+        datasetClientCtx = new DatasetClientContext(FRAME_SIZE);
+
+        hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, NUM_READERS);
+
+        this.responseWriter = responseWriter;
+
+        bbis = new ByteBufferInputStream();
+        di = new DataInputStream(bbis);
+
+        readBuffer = datasetClientCtx.allocateFrame();
+
+        queryCount = 1;
+    }
+
+    public void setFormat(IDataFormat format) {
+        this.format = format;
+    }
+
+    public synchronized void notifyJobStart(JobId jobId, ResultSetId rsId) {
+        this.jobId = jobId;
+        this.resultSetId = rsId;
+        notifyAll();
+    }
+
+    @Override
+    public void run() {
+        synchronized (this) {
+            while (jobId == null || resultSetId == null) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    // Do nothing here
+                }
+            }
+        }
+
+        try {
+            hyracksDataset.open(jobId, resultSetId);
+            byte[] serializedRecordDescriptor = hyracksDataset.getSerializedRecordDescriptor();
+
+            recordDescriptor = (RecordDescriptor) JavaSerializationUtils.deserialize(serializedRecordDescriptor, format
+                    .getSerdeProvider().getClass().getClassLoader());
+
+            frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(), recordDescriptor);
+
+            int i = 0;
+
+            responseWriter.println("<H1>Result:</H1>");
+            responseWriter.println("<PRE>");
+            responseWriter.println("Query:" + queryCount++);
+
+            while (true) {
+                int size = hyracksDataset.read(readBuffer);
+
+                if (size <= 0) {
+                    break;
+                }
+                i += writeOutputLines(readBuffer);
+                if (i > 500) {
+                    responseWriter.println("...");
+                    responseWriter.println("SKIPPING THE REST OF THE RESULTS");
+                    break;
+                }
+            }
+
+            responseWriter.println();
+            responseWriter.println("</PRE>");
+        } catch (HyracksDataException e) {
+            // TODO(madhusudancs): Do something here
+            e.printStackTrace(responseWriter);
+        } catch (IOException e) {
+            // TODO(madhusudancs): Do something here
+            e.printStackTrace(responseWriter);
+        } catch (ClassNotFoundException e) {
+            // TODO(madhusudancs): Do something here
+            e.printStackTrace(responseWriter);
+        }
+    }
+
+    private int writeOutputLines(ByteBuffer buffer) throws HyracksDataException {
+        int lineCount = 0;
+        try {
+            frameTupleAccessor.reset(buffer);
+            for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+                int start = frameTupleAccessor.getTupleStartOffset(tIndex) + frameTupleAccessor.getFieldSlotsLength();
+                bbis.setByteBuffer(buffer, start);
+                Object[] record = new Object[recordDescriptor.getFieldCount()];
+                for (int i = 0; i < record.length; ++i) {
+                    Object instance = recordDescriptor.getFields()[i].deserialize(di);
+                    if (i == 0) {
+                        responseWriter.print(String.valueOf(instance));
+                    } else {
+                        responseWriter.print(DELIM + String.valueOf(instance));
+                    }
+                }
+                responseWriter.println();
+                lineCount++;
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        return lineCount;
+    }
+}