Create a new project called hyracks-client and move all the dataset api implementation classes into this new project.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2507 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-client/pom.xml b/hyracks/hyracks-client/pom.xml
new file mode 100644
index 0000000..4bd7c1a
--- /dev/null
+++ b/hyracks/hyracks-client/pom.xml
@@ -0,0 +1,46 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-client</artifactId>
+  <name>hyracks-client</name>
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.2.3-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+ <dependency>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-api</artifactId>
+    <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-net</artifactId>
+    <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-comm</artifactId>
+    <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+  <dependency>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks-dataflow-common</artifactId>
+    <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+  </dependencies>
+</project>
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
new file mode 100644
index 0000000..2248320
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.client.dataset;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannel;
+import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.client.ClientNetworkManager;
+import edu.uci.ics.hyracks.client.DatasetNetworkInputChannel;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+
+// TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
+public class HyracksDataset implements IHyracksDataset {
+    private final JobId jobId;
+    private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
+    private final ClientNetworkManager netManager;
+    private NetworkAddress[] knownLocations;
+
+    private IDatasetInputChannelMonitor[] monitors;
+
+    // TODO:we should probably allow clients to specify this. 32K is the size for now.
+    private static int FRAME_SIZE = 32 * 1024;
+
+    public HyracksDataset(JobId jobId, IHyracksDatasetDirectoryServiceConnection ddsc, int nReaders) throws Exception {
+        this.jobId = jobId;
+        this.datasetDirectoryServiceConnection = ddsc;
+        netManager = new ClientNetworkManager(nReaders);
+        knownLocations = null;
+        monitors = null;
+    }
+
+    private void start() throws IOException {
+        netManager.start();
+    }
+
+    private boolean nullExists(NetworkAddress[] locations) {
+        if (locations == null) {
+            return true;
+        }
+        for (int i = 0; i < locations.length; i++) {
+            if (locations[i] == null) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private IDatasetInputChannelMonitor getMontior(int partition) throws HyracksException {
+        if (knownLocations == null || knownLocations[partition] == null) {
+            throw new HyracksException("Accessing monitors before the obtaining the corresponding addresses.");
+        }
+        if (monitors == null) {
+            monitors = new DatasetInputChannelMonitor[knownLocations.length];
+        }
+        if (monitors[partition] == null) {
+            monitors[partition] = new DatasetInputChannelMonitor();
+        }
+        return monitors[partition];
+    }
+
+    private class DatasetInputChannelMonitor implements IDatasetInputChannelMonitor {
+        private final AtomicInteger nAvailableFrames;
+
+        private final AtomicBoolean eos;
+
+        private final AtomicBoolean failed;
+
+        public DatasetInputChannelMonitor() {
+            nAvailableFrames = new AtomicInteger(0);
+            eos = new AtomicBoolean(false);
+            failed = new AtomicBoolean(false);
+        }
+
+        @Override
+        public synchronized void notifyFailure(IDatasetInputChannel channel) {
+            failed.set(true);
+            notifyAll();
+        }
+
+        @Override
+        public synchronized void notifyDataAvailability(IDatasetInputChannel channel, int nFrames) {
+            nAvailableFrames.addAndGet(nFrames);
+            notifyAll();
+        }
+
+        @Override
+        public synchronized void notifyEndOfStream(IDatasetInputChannel channel) {
+            eos.set(true);
+            notifyAll();
+        }
+
+        @Override
+        public synchronized boolean eosReached() {
+            return eos.get();
+        }
+
+        @Override
+        public synchronized boolean failed() {
+            return failed.get();
+        }
+
+        @Override
+        public synchronized int getNFramesAvailable() {
+            return nAvailableFrames.get();
+        }
+    }
+
+    /* TODO(madhusudancs): This method is used purely during development for debugging and must be removed before
+     * shipping the code 
+     */
+    private void printBuffer(ByteBuffer buffer) {
+        String delim = ",";
+        ByteBufferInputStream bbis = new ByteBufferInputStream();
+        DataInputStream di = new DataInputStream(bbis);
+        RecordDescriptor recordDescriptor = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(FRAME_SIZE, recordDescriptor);
+
+        try {
+            frameTupleAccessor.reset(buffer);
+            for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+                int start = frameTupleAccessor.getTupleStartOffset(tIndex)
+                        + frameTupleAccessor.getFieldSlotsLength();
+                bbis.setByteBuffer(buffer, start);
+                Object[] record = new Object[recordDescriptor.getFieldCount()];
+                for (int i = 0; i < record.length; ++i) {
+                    Object instance = recordDescriptor.getFields()[i].deserialize(di);
+                    if (i == 0) {
+                        System.out.write(String.valueOf(instance).getBytes());
+                    } else {
+                        System.out.write((delim + String.valueOf(instance)).getBytes());
+                    }
+                }
+                System.out.write("\n".getBytes());
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void readResults() throws HyracksDataException {
+        ByteBuffer buffer = null;
+
+        if (knownLocations == null) {
+            return;
+        }
+        for (int i = 0; i < knownLocations.length; i++) {
+            final NetworkAddress addr = knownLocations[i];
+            if (addr != null) {
+                try {
+                    DatasetNetworkInputChannel resultChannel = new DatasetNetworkInputChannel(netManager,
+                            getSocketAddress(addr), jobId, i, 100);
+
+                    IDatasetInputChannelMonitor monitor = getMontior(i);
+                    resultChannel.open(FRAME_SIZE);
+                    resultChannel.registerMonitor(monitor);
+
+                    while (!monitor.eosReached()) {
+                        synchronized (monitor) {
+                            try {
+                                monitor.wait();
+                            } catch (InterruptedException e) {
+                                //
+                            }
+                        }
+                        buffer = resultChannel.getNextBuffer();
+
+                        if (buffer != null) {
+                            resultChannel.recycleBuffer(buffer);
+                            // TODO(madhusudancs): This is a development time debugging statement and should be removed
+                            printBuffer(buffer);
+                        }
+                    }
+                    
+                } catch (UnknownHostException e) {
+                    throw new HyracksDataException(e);
+                } catch (HyracksException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public ByteBuffer getResults() {
+        try {
+            start();
+            while (nullExists(knownLocations)) {
+                try {
+                    knownLocations = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(jobId,
+                            knownLocations);
+                    readResults();
+                } catch (Exception e) {
+                    // TODO(madhusudancs) Do something here
+                }
+            }
+        } catch (IOException e) {
+            // Do something here
+        }
+
+        return null;
+    }
+
+    private SocketAddress getSocketAddress(NetworkAddress addr) throws UnknownHostException {
+        return new InetSocketAddress(InetAddress.getByAddress(addr.getIpAddress()), addr.getPort());
+    }
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
new file mode 100644
index 0000000..3907709
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.client.dataset;
+
+import java.net.InetSocketAddress;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+//TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
+public class HyracksDatasetDirectoryServiceConnection implements IHyracksDatasetDirectoryServiceConnection {
+    private final IPCSystem ipc;
+    private final IHyracksDatasetDirectoryServiceInterface ddsi;
+
+    public HyracksDatasetDirectoryServiceConnection(String ddsHost, int ddsPort) throws Exception {
+        RPCInterface rpci = new RPCInterface();
+        ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
+        ipc.start();
+        IIPCHandle ddsIpchandle = ipc.getHandle(new InetSocketAddress(ddsHost, ddsPort));
+        this.ddsi = new HyracksDatasetDirectoryServiceInterfaceRemoteProxy(ddsIpchandle, rpci);
+    }
+
+    @Override
+    public NetworkAddress[] getDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations) throws Exception {
+        return ddsi.getDatasetResultLocationsFunction(jobId, knownLocations);
+    }
+}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
new file mode 100644
index 0000000..6a134af
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.client.dataset;
+
+import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+
+//TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
+public class HyracksDatasetDirectoryServiceInterfaceRemoteProxy implements IHyracksDatasetDirectoryServiceInterface {
+    private final IIPCHandle ipcHandle;
+
+    private final RPCInterface rpci;
+
+    public HyracksDatasetDirectoryServiceInterfaceRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) {
+        this.ipcHandle = ipcHandle;
+        this.rpci = rpci;
+    }
+
+    @Override
+    public NetworkAddress[] getDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations) throws Exception {
+        HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
+                new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(
+                        jobId, knownLocations);
+        return (NetworkAddress[]) rpci.call(ipcHandle, gdrlf);
+    }
+}