Use DatasetDirectoryRecords instead of NetworkAddresses everywhere we access that information in the dataset stack.

Also pass the result id through every method that works either on fetching
the metadata for results or the results themselves.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2534 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 36e348b..abadc7e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -17,7 +17,8 @@
 import java.io.Serializable;
 import java.util.EnumSet;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 
@@ -182,11 +183,15 @@
         private static final long serialVersionUID = 1L;
 
         private final JobId jobId;
-        private final NetworkAddress[] knownLocations;
 
-        public GetDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations) {
+        private final ResultSetId rsId;
+
+        private final DatasetDirectoryRecord[] knownRecords;
+
+        public GetDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords) {
             this.jobId = jobId;
-            this.knownLocations = knownLocations;
+            this.rsId = rsId;
+            this.knownRecords = knownRecords;
         }
 
         @Override
@@ -198,8 +203,12 @@
             return jobId;
         }
 
-        public NetworkAddress[] getKnownLocations() {
-            return knownLocations;
+        public ResultSetId getResultSetId() {
+            return rsId;
+        }
+
+        public DatasetDirectoryRecord[] getKnownRecords() {
+            return knownRecords;
         }
     }
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 62dc6c1..c7aabb0 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -22,6 +22,6 @@
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
             NetworkAddress networkAddress);
 
-    public NetworkAddress[] getResultPartitionLocations(JobId jobId, NetworkAddress[] knownLocations)
-            throws HyracksDataException;
+    public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
index 715581c..29fabe2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IHyracksDatasetDirectoryServiceConnection {
@@ -28,6 +27,6 @@
      * @return {@link NetworkAddress[]}
      * @throws Exception
      */
-    public NetworkAddress[] getDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations)
-            throws Exception;
+    public DatasetDirectoryRecord[] getDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords) throws Exception;
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
index b4b3a34..d1db448 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IHyracksDatasetDirectoryServiceInterface {
@@ -23,11 +22,11 @@
      * 
      * @param jobId
      *            ID of the job
-     * @param knownLocations
-     *            Locations that are already known to the client
+     * @param knownRecords
+     *            Locations from the dataset directory that are already known to the client
      * @return {@link NetworkAddress[]}
      * @throws Exception
      */
-    public NetworkAddress[] getDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations)
-            throws Exception;
+    public DatasetDirectoryRecord[] getDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords) throws Exception;
 }
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
index a102d18..f2bba2a 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -21,19 +21,24 @@
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannel;
 import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.client.net.ClientNetworkManager;
 import edu.uci.ics.hyracks.comm.channels.DatasetNetworkInputChannel;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -43,20 +48,32 @@
 // TODO(madhusudancs): Should this implementation be moved to edu.uci.ics.hyracks.client?
 public class HyracksDataset implements IHyracksDataset {
     private final JobId jobId;
+
+    private final List<ResultSetId> rsIds;
+
     private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection;
+
     private final ClientNetworkManager netManager;
-    private NetworkAddress[] knownLocations;
+
+    private DatasetDirectoryRecord[] knownRecords;
 
     private IDatasetInputChannelMonitor[] monitors;
 
     // TODO:we should probably allow clients to specify this. 32K is the size for now.
     private static int FRAME_SIZE = 32 * 1024;
 
-    public HyracksDataset(JobId jobId, IHyracksDatasetDirectoryServiceConnection ddsc, int nReaders) throws Exception {
+    public HyracksDataset(IHyracksClientConnection hcc, JobSpecification jobSpec, JobId jobId, int nReaders)
+            throws Exception {
         this.jobId = jobId;
-        this.datasetDirectoryServiceConnection = ddsc;
+        this.rsIds = jobSpec.getResultSetIds();
+
+        NetworkAddress ddsAddress = hcc.getDatasetDirectoryServiceInfo(jobId);
+        datasetDirectoryServiceConnection = new HyracksDatasetDirectoryServiceConnection(new String(
+                ddsAddress.getIpAddress()), ddsAddress.getPort());
+
         netManager = new ClientNetworkManager(nReaders);
-        knownLocations = null;
+
+        knownRecords = null;
         monitors = null;
     }
 
@@ -64,7 +81,7 @@
         netManager.start();
     }
 
-    private boolean nullExists(NetworkAddress[] locations) {
+    private boolean nullExists(DatasetDirectoryRecord[] locations) {
         if (locations == null) {
             return true;
         }
@@ -77,11 +94,11 @@
     }
 
     private IDatasetInputChannelMonitor getMontior(int partition) throws HyracksException {
-        if (knownLocations == null || knownLocations[partition] == null) {
+        if (knownRecords == null || knownRecords[partition] == null) {
             throw new HyracksException("Accessing monitors before the obtaining the corresponding addresses.");
         }
         if (monitors == null) {
-            monitors = new DatasetInputChannelMonitor[knownLocations.length];
+            monitors = new DatasetInputChannelMonitor[knownRecords.length];
         }
         if (monitors[partition] == null) {
             monitors[partition] = new DatasetInputChannelMonitor();
@@ -176,11 +193,11 @@
     private void readResults() throws HyracksDataException {
         ByteBuffer buffer = null;
 
-        if (knownLocations == null) {
+        if (knownRecords == null) {
             return;
         }
-        for (int i = 0; i < knownLocations.length; i++) {
-            final NetworkAddress addr = knownLocations[i];
+        for (int i = 0; i < knownRecords.length; i++) {
+            final DatasetDirectoryRecord addr = knownRecords[i];
             if (addr != null) {
                 try {
                     DatasetNetworkInputChannel resultChannel = new DatasetNetworkInputChannel(netManager,
@@ -220,24 +237,26 @@
     @Override
     public ByteBuffer getResults() {
         try {
+            ResultSetId rsId = rsIds.get(0);
             start();
-            while (nullExists(knownLocations)) {
+            while (nullExists(knownRecords)) {
                 try {
-                    knownLocations = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(jobId,
-                            knownLocations);
+                    knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocationsFunction(jobId, rsId,
+                            knownRecords);
                     readResults();
                 } catch (Exception e) {
                     // TODO(madhusudancs) Do something here
                 }
             }
         } catch (IOException e) {
-            // Do something here
+            // TODO(madhusudancs): Do something here
         }
 
         return null;
     }
 
-    private SocketAddress getSocketAddress(NetworkAddress addr) throws UnknownHostException {
-        return new InetSocketAddress(InetAddress.getByAddress(addr.getIpAddress()), addr.getPort());
+    private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException {
+        NetworkAddress netAddr = addr.getNetworkAddress();
+        return new InetSocketAddress(InetAddress.getByAddress(netAddr.getIpAddress()), netAddr.getPort());
     }
 }
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 3907709..1399dcc 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -16,9 +16,10 @@
 
 import java.net.InetSocketAddress;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.RPCInterface;
@@ -39,7 +40,8 @@
     }
 
     @Override
-    public NetworkAddress[] getDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations) throws Exception {
-        return ddsi.getDatasetResultLocationsFunction(jobId, knownLocations);
+    public DatasetDirectoryRecord[] getDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords) throws Exception {
+        return ddsi.getDatasetResultLocationsFunction(jobId, rsId, knownRecords);
     }
 }
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index 167fab8..fb6e41f 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -15,8 +15,9 @@
 package edu.uci.ics.hyracks.client.dataset;
 
 import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.RPCInterface;
@@ -33,10 +34,10 @@
     }
 
     @Override
-    public NetworkAddress[] getDatasetResultLocationsFunction(JobId jobId, NetworkAddress[] knownLocations)
-            throws Exception {
+    public DatasetDirectoryRecord[] getDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords) throws Exception {
         HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(
-                jobId, knownLocations);
-        return (NetworkAddress[]) rpci.call(ipcHandle, gdrlf);
+                jobId, rsId, knownRecords);
+        return (DatasetDirectoryRecord[]) rpci.call(ipcHandle, gdrlf);
     }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 36ee412..95eecab 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -37,6 +37,7 @@
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.context.ICCContext;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -346,10 +347,10 @@
                 }
 
                 case GET_DATASET_RESULT_LOCATIONS: {
-                    HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
-                            (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+                    HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
                     workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
-                            .getJobId(), gdrlf.getKnownLocations(), new IPCResponder<NetworkAddress[]>(handle, mid)));
+                            .getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
+                            new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
                     return;
                 }
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 5d49bbf..ba1f750 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -60,15 +60,24 @@
     }
 
     @Override
-    public synchronized NetworkAddress[] getResultPartitionLocations(JobId jobId, NetworkAddress[] knownLocations)
-            throws HyracksDataException {
-        while (Arrays.equals(jobPartitionLocationsMap.get(jobId), knownLocations)) {
+    public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
+        while (!newRecords(jobId, rsId, knownRecords)) {
             try {
                 wait();
             } catch (InterruptedException e) {
                 throw new HyracksDataException(e);
             }
         }
-        return jobPartitionLocationsMap.get(jobId);
+        return jobResultLocationsMap.get(jobId).get(rsId);
+    }
+
+    private boolean newRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords) {
+        Map<ResultSetId, DatasetDirectoryRecord[]> rsMap = jobResultLocationsMap.get(jobId);
+        if (rsMap == null) {
+            return false;
+        }
+        DatasetDirectoryRecord[] records = rsMap.get(rsId);
+        return !Arrays.equals(records, knownRecords);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
index 814feb0..fd1d418 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
@@ -14,8 +14,9 @@
  */
 package edu.uci.ics.hyracks.control.cc.work;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
 import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -24,15 +25,21 @@
 
 public class GetResultPartitionLocationsWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
-    private final JobId jobId;
-    private final NetworkAddress[] knownLocations;
-    private final IResultCallback<NetworkAddress[]> callback;
 
-    public GetResultPartitionLocationsWork(ClusterControllerService ccs, JobId jobId, NetworkAddress[] knownLocations,
-            IResultCallback<NetworkAddress[]> callback) {
+    private final JobId jobId;
+
+    private final ResultSetId rsId;
+
+    private final DatasetDirectoryRecord[] knownRecords;
+
+    private final IResultCallback<DatasetDirectoryRecord[]> callback;
+
+    public GetResultPartitionLocationsWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+            DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
         this.ccs = ccs;
         this.jobId = jobId;
-        this.knownLocations = knownLocations;
+        this.rsId = rsId;
+        this.knownRecords = knownRecords;
         this.callback = callback;
     }
 
@@ -43,7 +50,8 @@
             @Override
             public void run() {
                 try {
-                    NetworkAddress[] partitionLocations = dds.getResultPartitionLocations(jobId, knownLocations);
+                    DatasetDirectoryRecord[] partitionLocations = dds.getResultPartitionLocations(jobId, rsId,
+                            knownRecords);
                     callback.setValue(partitionLocations);
                 } catch (HyracksDataException e) {
                     callback.setException(e);