Completely rewrite the HyracksDataset client API class to use the new serializer and fetch the records buffer by before.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2835 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
index c737b22..23b15f6 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
@@ -14,8 +14,16 @@
*/
package edu.uci.ics.hyracks.api.dataset;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+
public interface IHyracksDataset {
- public ByteBuffer getResults();
+ public void open(JobId jobId, ResultSetId resultSetId) throws IOException;
+
+ public byte[] getSerializedRecordDescriptor();
+
+ public int read(ByteBuffer buffer) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
index 41b3a17..522fe2d 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -14,22 +14,18 @@
*/
package edu.uci.ics.hyracks.client.dataset;
-import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
@@ -38,50 +34,158 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.client.net.ClientNetworkManager;
import edu.uci.ics.hyracks.comm.channels.DatasetNetworkInputChannel;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
// TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
public class HyracksDataset implements IHyracksDataset {
- private final JobId jobId;
-
- private final List<ResultSetId> rsIds;
-
private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
private final ClientNetworkManager netManager;
private final DatasetClientContext datasetClientCtx;
+ private JobId jobId;
+
+ private ResultSetId resultSetId;
+
private DatasetDirectoryRecord[] knownRecords;
private IDatasetInputChannelMonitor[] monitors;
- public HyracksDataset(IHyracksClientConnection hcc, JobSpecification jobSpec, JobId jobId, int nReaders)
- throws Exception {
- this.jobId = jobId;
- this.rsIds = jobSpec.getResultSetIds();
+ private int lastReadPartition;
- NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo(jobId);
+ private IDatasetInputChannelMonitor lastMonitor;
+
+ private DatasetNetworkInputChannel resultChannel;
+
+ private static int NUM_READ_BUFFERS = 1;
+
+ public HyracksDataset(IHyracksClientConnection hcc, DatasetClientContext datasetClientCtx, int nReaders)
+ throws Exception {
+ NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo();
datasetDirectoryServiceConnection = new HyracksDatasetDirectoryServiceConnection(new String(
ddsAddress.getIpAddress()), ddsAddress.getPort());
netManager = new ClientNetworkManager(nReaders);
- datasetClientCtx = new DatasetClientContext(jobSpec.getFrameSize());
+ this.datasetClientCtx = datasetClientCtx;
knownRecords = null;
monitors = null;
+ lastReadPartition = -1;
+ lastMonitor = null;
+ resultChannel = null;
}
- private void start() throws IOException {
+ @Override
+ public void open(JobId jobId, ResultSetId resultSetId) throws IOException {
+ this.jobId = jobId;
+ this.resultSetId = resultSetId;
netManager.start();
}
+ @Override
+ public byte[] getSerializedRecordDescriptor() {
+ byte[] serializedRecordDescriptor = null;
+ try {
+ serializedRecordDescriptor = datasetDirectoryServiceConnection.getDatasetSerializedRecordDescriptor(jobId,
+ resultSetId);
+ } catch (Exception e) {
+ // TODO(madhusudancs): Decide what to do in case of error
+ }
+ return serializedRecordDescriptor;
+ }
+
+ @Override
+ public int read(ByteBuffer buffer) throws HyracksDataException {
+ ByteBuffer readBuffer;
+ int readSize = 0;
+
+ if (lastReadPartition == -1) {
+ while (knownRecords == null || knownRecords[0] == null) {
+ try {
+ knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(jobId,
+ resultSetId, knownRecords);
+ lastReadPartition = 0;
+ resultChannel = new DatasetNetworkInputChannel(netManager,
+ getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
+ NUM_READ_BUFFERS);
+ lastMonitor = getMonitor(lastReadPartition);
+ resultChannel.open(datasetClientCtx);
+ resultChannel.registerMonitor(lastMonitor);
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ } catch (UnknownHostException e) {
+ throw new HyracksDataException(e);
+ } catch (Exception e) {
+ // Do nothing here.
+ }
+ }
+ }
+
+ while (lastMonitor.getNFramesAvailable() < 0 && !lastMonitor.eosReached()) {
+ synchronized (lastMonitor) {
+ try {
+ lastMonitor.wait();
+ } catch (InterruptedException e) {
+ //
+ }
+ }
+ }
+
+ while (readSize <= 0 && !((lastReadPartition == knownRecords.length - 1) && (lastMonitor.eosReached()))) {
+ while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached()) {
+ synchronized (lastMonitor) {
+ try {
+ lastMonitor.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ if (lastMonitor.eosReached()) {
+ if ((lastReadPartition == knownRecords.length - 1)) {
+ break;
+ } else {
+ try {
+ lastReadPartition++;
+ while (knownRecords[lastReadPartition] == null) {
+ try {
+ knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(
+ jobId, resultSetId, knownRecords);
+ } catch (Exception e) {
+ // Do nothing here.
+ }
+ }
+
+ resultChannel = new DatasetNetworkInputChannel(netManager,
+ getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
+ NUM_READ_BUFFERS);
+ lastMonitor = getMonitor(lastReadPartition);
+ resultChannel.open(datasetClientCtx);
+ resultChannel.registerMonitor(lastMonitor);
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ } catch (UnknownHostException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ } else {
+ readBuffer = resultChannel.getNextBuffer();
+ if (readBuffer != null) {
+ buffer.put(readBuffer);
+ buffer.flip();
+ readSize = buffer.limit();
+ resultChannel.recycleBuffer(readBuffer);
+ }
+ }
+ }
+
+ return readSize;
+ }
+
private boolean nullExists(DatasetDirectoryRecord[] locations) {
if (locations == null) {
return true;
@@ -94,7 +198,12 @@
return false;
}
- private IDatasetInputChannelMonitor getMontior(int partition) throws HyracksException {
+ private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
+ NetworkAddress netAddr = addr.getNetworkAddress();
+ return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
+ }
+
+ private IDatasetInputChannelMonitor getMonitor(int partition) throws HyracksException {
if (knownRecords == null || knownRecords[partition] == null) {
throw new HyracksException("Accessing monitors before the obtaining the corresponding addresses.");
}
@@ -153,111 +262,4 @@
return nAvailableFrames.get();
}
}
-
- /* TODO(madhusudancs): This method is used purely during development for debugging and must be removed before
- * shipping the code
- */
- private void printBuffer(ByteBuffer buffer) {
- String delim = ",";
- ByteBufferInputStream bbis = new ByteBufferInputStream();
- DataInputStream di = new DataInputStream(bbis);
- RecordDescriptor recordDescriptor = new RecordDescriptor(new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
- UTF8StringSerializerDeserializer.INSTANCE });
-
- final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(32768, recordDescriptor);
-
- try {
- frameTupleAccessor.reset(buffer);
- for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
- int start = frameTupleAccessor.getTupleStartOffset(tIndex) + frameTupleAccessor.getFieldSlotsLength();
- bbis.setByteBuffer(buffer, start);
- Object[] record = new Object[recordDescriptor.getFieldCount()];
- for (int i = 0; i < record.length; ++i) {
- Object instance = recordDescriptor.getFields()[i].deserialize(di);
- if (i == 0) {
- System.out.write(String.valueOf(instance).getBytes());
- } else {
- System.out.write((delim + String.valueOf(instance)).getBytes());
- }
- }
- System.out.write("\n".getBytes());
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- private void readResults() throws HyracksDataException {
- ByteBuffer buffer = null;
-
- if (knownRecords == null) {
- return;
- }
- for (int i = 0; i < knownRecords.length; i++) {
- final DatasetDirectoryRecord addr = knownRecords[i];
- if (addr != null) {
- try {
- DatasetNetworkInputChannel resultChannel = new DatasetNetworkInputChannel(netManager,
- getSocketAddress(addr), jobId, i, 100);
-
- IDatasetInputChannelMonitor monitor = getMontior(i);
- resultChannel.open(datasetClientCtx);
- resultChannel.registerMonitor(monitor);
-
- while (!monitor.eosReached()) {
- synchronized (monitor) {
- try {
- monitor.wait();
- } catch (InterruptedException e) {
- //
- }
- }
- buffer = resultChannel.getNextBuffer();
-
- if (buffer != null) {
- // TODO(madhusudancs): This is a development time debugging statement and should be removed
- printBuffer(buffer);
-
- resultChannel.recycleBuffer(buffer);
- }
- }
-
- } catch (UnknownHostException e) {
- throw new HyracksDataException(e);
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
- }
- }
- }
- }
-
- @Override
- public ByteBuffer getResults() {
- try {
- ResultSetId rsId = rsIds.get(0);
- start();
- while (nullExists(knownRecords)) {
- try {
- knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(jobId, rsId,
- knownRecords);
- readResults();
- } catch (Exception e) {
- // TODO(madhusudancs) Do something here
- }
- }
- } catch (IOException e) {
- // TODO(madhusudancs): Do something here
- }
-
- return null;
- }
-
- private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
- NetworkAddress netAddr = addr.getNetworkAddress();
- return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
- }
}