Remove all the serialization code from the result writer client library since it is now implemented by a utility.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization_result_distribution@1153 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
index 50de154..3ae7615 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultReader.java
@@ -14,9 +14,7 @@
*/
package edu.uci.ics.asterix.result;
-import java.io.DataInputStream;
import java.io.IOException;
-import java.io.PrintWriter;
import java.nio.ByteBuffer;
import edu.uci.ics.asterix.formats.base.IDataFormat;
@@ -30,143 +28,50 @@
import edu.uci.ics.hyracks.client.dataset.DatasetClientContext;
import edu.uci.ics.hyracks.client.dataset.HyracksDataset;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-public class ResultReader implements Runnable {
+public class ResultReader {
private final DatasetClientContext datasetClientCtx;
private final IHyracksDataset hyracksDataset;
- private final PrintWriter responseWriter;
-
- private final ByteBufferInputStream bbis;
-
- private final DataInputStream di;
-
- private final ByteBuffer readBuffer;
-
- private IDataFormat format;
-
- private int queryCount;
-
- private JobId jobId;
-
- private ResultSetId resultSetId;
+ private final IDataFormat format;
private RecordDescriptor recordDescriptor;
private FrameTupleAccessor frameTupleAccessor;
- // 32K buffer size;
- private static final int FRAME_SIZE = 32768;
-
// Number of parallel result reader buffers
private static final int NUM_READERS = 1;
- private static final String DELIM = "; ";
+ // 32K buffer size;
+ public static final int FRAME_SIZE = 32768;
- public ResultReader(IHyracksClientConnection hcc, PrintWriter responseWriter) throws Exception {
- datasetClientCtx = new DatasetClientContext(FRAME_SIZE);
-
- hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, NUM_READERS);
-
- this.responseWriter = responseWriter;
-
- bbis = new ByteBufferInputStream();
- di = new DataInputStream(bbis);
-
- readBuffer = datasetClientCtx.allocateFrame();
-
- queryCount = 1;
- }
-
- public void setFormat(IDataFormat format) {
+ public ResultReader(IHyracksClientConnection hcc, IDataFormat format) throws Exception {
this.format = format;
+
+ datasetClientCtx = new DatasetClientContext(FRAME_SIZE);
+ hyracksDataset = new HyracksDataset(hcc, datasetClientCtx, NUM_READERS);
}
- public synchronized void notifyJobStart(JobId jobId, ResultSetId rsId) {
- this.jobId = jobId;
- this.resultSetId = rsId;
- notifyAll();
+ public void open(JobId jobId, ResultSetId resultSetId) throws IOException, ClassNotFoundException {
+ hyracksDataset.open(jobId, resultSetId);
+ byte[] serializedRecordDescriptor = hyracksDataset.getSerializedRecordDescriptor();
+
+ recordDescriptor = (RecordDescriptor) JavaSerializationUtils.deserialize(serializedRecordDescriptor, format
+ .getSerdeProvider().getClass().getClassLoader());
+
+ frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(), recordDescriptor);
}
- @Override
- public void run() {
- synchronized (this) {
- while (jobId == null || resultSetId == null) {
- try {
- wait();
- } catch (InterruptedException e) {
- // Do nothing here
- }
- }
- }
-
- try {
- hyracksDataset.open(jobId, resultSetId);
- byte[] serializedRecordDescriptor = hyracksDataset.getSerializedRecordDescriptor();
-
- recordDescriptor = (RecordDescriptor) JavaSerializationUtils.deserialize(serializedRecordDescriptor, format
- .getSerdeProvider().getClass().getClassLoader());
-
- frameTupleAccessor = new FrameTupleAccessor(datasetClientCtx.getFrameSize(), recordDescriptor);
-
- int i = 0;
-
- responseWriter.println("<H1>Result:</H1>");
- responseWriter.println("<PRE>");
- responseWriter.println("Query:" + queryCount++);
-
- while (true) {
- int size = hyracksDataset.read(readBuffer);
-
- if (size <= 0) {
- break;
- }
- i += writeOutputLines(readBuffer);
- if (i > 500) {
- responseWriter.println("...");
- responseWriter.println("SKIPPING THE REST OF THE RESULTS");
- break;
- }
- }
-
- responseWriter.println();
- responseWriter.println("</PRE>");
- } catch (HyracksDataException e) {
- // TODO(madhusudancs): Do something here
- e.printStackTrace(responseWriter);
- } catch (IOException e) {
- // TODO(madhusudancs): Do something here
- e.printStackTrace(responseWriter);
- } catch (ClassNotFoundException e) {
- // TODO(madhusudancs): Do something here
- e.printStackTrace(responseWriter);
- }
+ public int read(ByteBuffer buffer) throws HyracksDataException {
+ return hyracksDataset.read(buffer);
}
- private int writeOutputLines(ByteBuffer buffer) throws HyracksDataException {
- int lineCount = 0;
- try {
- frameTupleAccessor.reset(buffer);
- for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
- int start = frameTupleAccessor.getTupleStartOffset(tIndex) + frameTupleAccessor.getFieldSlotsLength();
- bbis.setByteBuffer(buffer, start);
- Object[] record = new Object[recordDescriptor.getFieldCount()];
- for (int i = 0; i < record.length; ++i) {
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (i == 0) {
- responseWriter.print(String.valueOf(instance));
- } else {
- responseWriter.print(DELIM + String.valueOf(instance));
- }
- }
- responseWriter.println();
- lineCount++;
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- return lineCount;
+ public FrameTupleAccessor getFrameTupleAccessor() {
+ return frameTupleAccessor;
+ }
+
+ public RecordDescriptor getRecordDescriptor() {
+ return recordDescriptor;
}
}