Completely rewrite the HyracksDataset client API class to use the new serializer and fetch the records buffer by before.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2835 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
index c737b22..23b15f6 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
@@ -14,8 +14,16 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+
 public interface IHyracksDataset {
-    public ByteBuffer getResults();
+    public void open(JobId jobId, ResultSetId resultSetId) throws IOException;
+
+    public byte[] getSerializedRecordDescriptor();
+
+    public int read(ByteBuffer buffer) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
index 41b3a17..522fe2d 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -14,22 +14,18 @@
  */
 package edu.uci.ics.hyracks.client.dataset;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
@@ -38,50 +34,158 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.client.net.ClientNetworkManager;
 import edu.uci.ics.hyracks.comm.channels.DatasetNetworkInputChannel;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 
 // TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
 public class HyracksDataset implements IHyracksDataset {
-    private final JobId jobId;
-
-    private final List<ResultSetId> rsIds;
-
     private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
 
     private final ClientNetworkManager netManager;
 
     private final DatasetClientContext datasetClientCtx;
 
+    private JobId jobId;
+
+    private ResultSetId resultSetId;
+
     private DatasetDirectoryRecord[] knownRecords;
 
     private IDatasetInputChannelMonitor[] monitors;
 
-    public HyracksDataset(IHyracksClientConnection hcc, JobSpecification jobSpec, JobId jobId, int nReaders)
-            throws Exception {
-        this.jobId = jobId;
-        this.rsIds = jobSpec.getResultSetIds();
+    private int lastReadPartition;
 
-        NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo(jobId);
+    private IDatasetInputChannelMonitor lastMonitor;
+
+    private DatasetNetworkInputChannel resultChannel;
+
+    private static int NUM_READ_BUFFERS = 1;
+
+    public HyracksDataset(IHyracksClientConnection hcc, DatasetClientContext datasetClientCtx, int nReaders)
+            throws Exception {
+        NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo();
         datasetDirectoryServiceConnection = new HyracksDatasetDirectoryServiceConnection(new String(
                 ddsAddress.getIpAddress()), ddsAddress.getPort());
 
         netManager = new ClientNetworkManager(nReaders);
 
-        datasetClientCtx = new DatasetClientContext(jobSpec.getFrameSize());
+        this.datasetClientCtx = datasetClientCtx;
 
         knownRecords = null;
         monitors = null;
+        lastReadPartition = -1;
+        lastMonitor = null;
+        resultChannel = null;
     }
 
-    private void start() throws IOException {
+    @Override
+    public void open(JobId jobId, ResultSetId resultSetId) throws IOException {
+        this.jobId = jobId;
+        this.resultSetId = resultSetId;
         netManager.start();
     }
 
+    @Override
+    public byte[] getSerializedRecordDescriptor() {
+        byte[] serializedRecordDescriptor = null;
+        try {
+            serializedRecordDescriptor = datasetDirectoryServiceConnection.getDatasetSerializedRecordDescriptor(jobId,
+                    resultSetId);
+        } catch (Exception e) {
+            // TODO(madhusudancs): Decide what to do in case of error
+        }
+        return serializedRecordDescriptor;
+    }
+
+    @Override
+    public int read(ByteBuffer buffer) throws HyracksDataException {
+        ByteBuffer readBuffer;
+        int readSize = 0;
+
+        if (lastReadPartition == -1) {
+            while (knownRecords == null || knownRecords[0] == null) {
+                try {
+                    knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(jobId,
+                            resultSetId, knownRecords);
+                    lastReadPartition = 0;
+                    resultChannel = new DatasetNetworkInputChannel(netManager,
+                            getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
+                            NUM_READ_BUFFERS);
+                    lastMonitor = getMonitor(lastReadPartition);
+                    resultChannel.open(datasetClientCtx);
+                    resultChannel.registerMonitor(lastMonitor);
+                } catch (HyracksException e) {
+                    throw new HyracksDataException(e);
+                } catch (UnknownHostException e) {
+                    throw new HyracksDataException(e);
+                } catch (Exception e) {
+                    // Do nothing here.
+                }
+            }
+        }
+
+        while (lastMonitor.getNFramesAvailable() < 0 && !lastMonitor.eosReached()) {
+            synchronized (lastMonitor) {
+                try {
+                    lastMonitor.wait();
+                } catch (InterruptedException e) {
+                    //
+                }
+            }
+        }
+
+        while (readSize <= 0 && !((lastReadPartition == knownRecords.length - 1) && (lastMonitor.eosReached()))) {
+            while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached()) {
+                synchronized (lastMonitor) {
+                    try {
+                        lastMonitor.wait();
+                    } catch (InterruptedException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+
+            if (lastMonitor.eosReached()) {
+                if ((lastReadPartition == knownRecords.length - 1)) {
+                    break;
+                } else {
+                    try {
+                        lastReadPartition++;
+                        while (knownRecords[lastReadPartition] == null) {
+                            try {
+                                knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(
+                                        jobId, resultSetId, knownRecords);
+                            } catch (Exception e) {
+                                // Do nothing here.
+                            }
+                        }
+
+                        resultChannel = new DatasetNetworkInputChannel(netManager,
+                                getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
+                                NUM_READ_BUFFERS);
+                        lastMonitor = getMonitor(lastReadPartition);
+                        resultChannel.open(datasetClientCtx);
+                        resultChannel.registerMonitor(lastMonitor);
+                    } catch (HyracksException e) {
+                        throw new HyracksDataException(e);
+                    } catch (UnknownHostException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            } else {
+                readBuffer = resultChannel.getNextBuffer();
+                if (readBuffer != null) {
+                    buffer.put(readBuffer);
+                    buffer.flip();
+                    readSize = buffer.limit();
+                    resultChannel.recycleBuffer(readBuffer);
+                }
+            }
+        }
+
+        return readSize;
+    }
+
     private boolean nullExists(DatasetDirectoryRecord[] locations) {
         if (locations == null) {
             return true;
@@ -94,7 +198,12 @@
         return false;
     }
 
-    private IDatasetInputChannelMonitor getMontior(int partition) throws HyracksException {
+    private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
+        NetworkAddress netAddr = addr.getNetworkAddress();
+        return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
+    }
+
+    private IDatasetInputChannelMonitor getMonitor(int partition) throws HyracksException {
         if (knownRecords == null || knownRecords[partition] == null) {
             throw new HyracksException("Accessing monitors before the obtaining the corresponding addresses.");
         }
@@ -153,111 +262,4 @@
             return nAvailableFrames.get();
         }
     }
-
-    /* TODO(madhusudancs): This method is used purely during development for debugging and must be removed before
-     * shipping the code 
-     */
-    private void printBuffer(ByteBuffer buffer) {
-        String delim = ",";
-        ByteBufferInputStream bbis = new ByteBufferInputStream();
-        DataInputStream di = new DataInputStream(bbis);
-        RecordDescriptor recordDescriptor = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE });
-
-        final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(32768, recordDescriptor);
-
-        try {
-            frameTupleAccessor.reset(buffer);
-            for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
-                int start = frameTupleAccessor.getTupleStartOffset(tIndex) + frameTupleAccessor.getFieldSlotsLength();
-                bbis.setByteBuffer(buffer, start);
-                Object[] record = new Object[recordDescriptor.getFieldCount()];
-                for (int i = 0; i < record.length; ++i) {
-                    Object instance = recordDescriptor.getFields()[i].deserialize(di);
-                    if (i == 0) {
-                        System.out.write(String.valueOf(instance).getBytes());
-                    } else {
-                        System.out.write((delim + String.valueOf(instance)).getBytes());
-                    }
-                }
-                System.out.write("\n".getBytes());
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    private void readResults() throws HyracksDataException {
-        ByteBuffer buffer = null;
-
-        if (knownRecords == null) {
-            return;
-        }
-        for (int i = 0; i < knownRecords.length; i++) {
-            final DatasetDirectoryRecord addr = knownRecords[i];
-            if (addr != null) {
-                try {
-                    DatasetNetworkInputChannel resultChannel = new DatasetNetworkInputChannel(netManager,
-                            getSocketAddress(addr), jobId, i, 100);
-
-                    IDatasetInputChannelMonitor monitor = getMontior(i);
-                    resultChannel.open(datasetClientCtx);
-                    resultChannel.registerMonitor(monitor);
-
-                    while (!monitor.eosReached()) {
-                        synchronized (monitor) {
-                            try {
-                                monitor.wait();
-                            } catch (InterruptedException e) {
-                                //
-                            }
-                        }
-                        buffer = resultChannel.getNextBuffer();
-
-                        if (buffer != null) {
-                            // TODO(madhusudancs): This is a development time debugging statement and should be removed
-                            printBuffer(buffer);
-
-                            resultChannel.recycleBuffer(buffer);
-                        }
-                    }
-
-                } catch (UnknownHostException e) {
-                    throw new HyracksDataException(e);
-                } catch (HyracksException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-        }
-    }
-
-    @Override
-    public ByteBuffer getResults() {
-        try {
-            ResultSetId rsId = rsIds.get(0);
-            start();
-            while (nullExists(knownRecords)) {
-                try {
-                    knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(jobId, rsId,
-                            knownRecords);
-                    readResults();
-                } catch (Exception e) {
-                    // TODO(madhusudancs) Do something here
-                }
-            }
-        } catch (IOException e) {
-            // TODO(madhusudancs): Do something here
-        }
-
-        return null;
-    }
-
-    private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
-        NetworkAddress netAddr = addr.getNetworkAddress();
-        return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
-    }
 }