Create a new project called hyracks-client and move all the dataset api implementation classes into this new project.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2507 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/HyracksDataset.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/HyracksDataset.java
deleted file mode 100644
index f58f522..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/HyracksDataset.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.job.JobId;
-
-public class HyracksDataset implements IHyracksDataset {
- private final JobId jobId;
- private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
- private NetworkAddress[] knownLocations;
-
- public HyracksDataset(JobId jobId, IHyracksDatasetDirectoryServiceConnection ddsc) {
- this.jobId = jobId;
- this.datasetDirectoryServiceConnection = ddsc;
- knownLocations = null;
- }
-
- private boolean nullExists(NetworkAddress[] locations) {
- if (locations == null) {
- return true;
- }
- for (int i = 0; i < locations.length; i++) {
- if (locations[i] == null) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public ByteBuffer getResults() {
- while (nullExists(knownLocations)) {
- try {
- knownLocations = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(jobId,
- knownLocations);
- if (knownLocations != null) {
- }
- } catch (Exception e) {
- // TODO(madhusudancs) Do something here
- }
- }
-
- return null;
- }
-
-}
diff --git a/hyracks/hyracks-client/pom.xml b/hyracks/hyracks-client/pom.xml
new file mode 100644
index 0000000..4bd7c1a
--- /dev/null
+++ b/hyracks/hyracks-client/pom.xml
@@ -0,0 +1,46 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-client</artifactId>
+ <name>hyracks-client</name>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-net</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-comm</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
new file mode 100644
index 0000000..2248320
--- /dev/null
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.client.dataset;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannel;
+import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.client.ClientNetworkManager;
+import edu.uci.ics.hyracks.client.DatasetNetworkInputChannel;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+
+// TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
+public class HyracksDataset implements IHyracksDataset {
+ private final JobId jobId;
+ private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
+ private final ClientNetworkManager netManager;
+ private NetworkAddress[] knownLocations;
+
+ private IDatasetInputChannelMonitor[] monitors;
+
+ // TODO:we should probably allow clients to specify this. 32K is the size for now.
+ private static int FRAME_SIZE = 32 * 1024;
+
+ public HyracksDataset(JobId jobId, IHyracksDatasetDirectoryServiceConnection ddsc, int nReaders) throws Exception {
+ this.jobId = jobId;
+ this.datasetDirectoryServiceConnection = ddsc;
+ netManager = new ClientNetworkManager(nReaders);
+ knownLocations = null;
+ monitors = null;
+ }
+
+ private void start() throws IOException {
+ netManager.start();
+ }
+
+ private boolean nullExists(NetworkAddress[] locations) {
+ if (locations == null) {
+ return true;
+ }
+ for (int i = 0; i < locations.length; i++) {
+ if (locations[i] == null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private IDatasetInputChannelMonitor getMontior(int partition) throws HyracksException {
+ if (knownLocations == null || knownLocations[partition] == null) {
+ throw new HyracksException("Accessing monitors before the obtaining the corresponding addresses.");
+ }
+ if (monitors == null) {
+ monitors = new DatasetInputChannelMonitor[knownLocations.length];
+ }
+ if (monitors[partition] == null) {
+ monitors[partition] = new DatasetInputChannelMonitor();
+ }
+ return monitors[partition];
+ }
+
+ private class DatasetInputChannelMonitor implements IDatasetInputChannelMonitor {
+ private final AtomicInteger nAvailableFrames;
+
+ private final AtomicBoolean eos;
+
+ private final AtomicBoolean failed;
+
+ public DatasetInputChannelMonitor() {
+ nAvailableFrames = new AtomicInteger(0);
+ eos = new AtomicBoolean(false);
+ failed = new AtomicBoolean(false);
+ }
+
+ @Override
+ public synchronized void notifyFailure(IDatasetInputChannel channel) {
+ failed.set(true);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyDataAvailability(IDatasetInputChannel channel, int nFrames) {
+ nAvailableFrames.addAndGet(nFrames);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyEndOfStream(IDatasetInputChannel channel) {
+ eos.set(true);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized boolean eosReached() {
+ return eos.get();
+ }
+
+ @Override
+ public synchronized boolean failed() {
+ return failed.get();
+ }
+
+ @Override
+ public synchronized int getNFramesAvailable() {
+ return nAvailableFrames.get();
+ }
+ }
+
+ /* TODO(madhusudancs): This method is used purely during development for debugging and must be removed before
+ * shipping the code
+ */
+ private void printBuffer(ByteBuffer buffer) {
+ String delim = ",";
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream di = new DataInputStream(bbis);
+ RecordDescriptor recordDescriptor = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(FRAME_SIZE, recordDescriptor);
+
+ try {
+ frameTupleAccessor.reset(buffer);
+ for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+ int start = frameTupleAccessor.getTupleStartOffset(tIndex)
+ + frameTupleAccessor.getFieldSlotsLength();
+ bbis.setByteBuffer(buffer, start);
+ Object[] record = new Object[recordDescriptor.getFieldCount()];
+ for (int i = 0; i < record.length; ++i) {
+ Object instance = recordDescriptor.getFields()[i].deserialize(di);
+ if (i == 0) {
+ System.out.write(String.valueOf(instance).getBytes());
+ } else {
+ System.out.write((delim + String.valueOf(instance)).getBytes());
+ }
+ }
+ System.out.write("\n".getBytes());
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void readResults() throws HyracksDataException {
+ ByteBuffer buffer = null;
+
+ if (knownLocations == null) {
+ return;
+ }
+ for (int i = 0; i < knownLocations.length; i++) {
+ final NetworkAddress addr = knownLocations[i];
+ if (addr != null) {
+ try {
+ DatasetNetworkInputChannel resultChannel = new DatasetNetworkInputChannel(netManager,
+ getSocketAddress(addr), jobId, i, 100);
+
+ IDatasetInputChannelMonitor monitor = getMontior(i);
+ resultChannel.open(FRAME_SIZE);
+ resultChannel.registerMonitor(monitor);
+
+ while (!monitor.eosReached()) {
+ synchronized (monitor) {
+ try {
+ monitor.wait();
+ } catch (InterruptedException e) {
+ //
+ }
+ }
+ buffer = resultChannel.getNextBuffer();
+
+ if (buffer != null) {
+ resultChannel.recycleBuffer(buffer);
+ // TODO(madhusudancs): This is a development time debugging statement and should be removed
+ printBuffer(buffer);
+ }
+ }
+
+ } catch (UnknownHostException e) {
+ throw new HyracksDataException(e);
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public ByteBuffer getResults() {
+ try {
+ start();
+ while (nullExists(knownLocations)) {
+ try {
+ knownLocations = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(jobId,
+ knownLocations);
+ readResults();
+ } catch (Exception e) {
+ // TODO(madhusudancs) Do something here
+ }
+ }
+ } catch (IOException e) {
+ // Do something here
+ }
+
+ return null;
+ }
+
+ private SocketAddress getSocketAddress(NetworkAddress addr) throws UnknownHostException {
+ return new InetSocketAddress(InetAddress.getByAddress(addr.getIpAddress()), addr.getPort());
+ }
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
similarity index 86%
rename from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/HyracksDatasetDirectoryServiceConnection.java
rename to hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 971c9b3..3907709 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -12,17 +12,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.api.dataset;
+package edu.uci.ics.hyracks.client.dataset;
import java.net.InetSocketAddress;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.RPCInterface;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+//TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
public class HyracksDatasetDirectoryServiceConnection implements IHyracksDatasetDirectoryServiceConnection {
private final IPCSystem ipc;
private final IHyracksDatasetDirectoryServiceInterface ddsi;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
similarity index 88%
rename from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
rename to hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index 6bee84c..6a134af 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -12,14 +12,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.api.dataset;
+package edu.uci.ics.hyracks.client.dataset;
import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+//TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
public class HyracksDatasetDirectoryServiceInterfaceRemoteProxy implements IHyracksDatasetDirectoryServiceInterface {
private final IIPCHandle ipcHandle;