Implement the client side of RPCs to give the clients the ability to retrieve serialized record descriptor.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2831 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index abadc7e..f27a44e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -33,6 +33,7 @@
GET_JOB_STATUS,
START_JOB,
GET_DATASET_DIRECTORY_SERIVICE_INFO,
+ GET_DATASET_RECORD_DESCRIPTOR,
GET_DATASET_RESULT_LOCATIONS,
WAIT_FOR_COMPLETION,
GET_NODE_CONTROLLERS_INFO
@@ -163,20 +164,36 @@
public static class GetDatasetDirectoryServiceInfoFunction extends Function {
private static final long serialVersionUID = 1L;
- private final JobId jobId;
-
- public GetDatasetDirectoryServiceInfoFunction(JobId jobId) {
- this.jobId = jobId;
- }
-
@Override
public FunctionId getFunctionId() {
return FunctionId.GET_DATASET_DIRECTORY_SERIVICE_INFO;
}
+ }
+
+ public static class GetDatasetRecordDescriptorFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ public GetDatasetRecordDescriptorFunction(JobId jobId, ResultSetId rsId) {
+ this.jobId = jobId;
+ this.rsId = rsId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_DATASET_RECORD_DESCRIPTOR;
+ }
public JobId getJobId() {
return jobId;
}
+
+ public ResultSetId getResultSetId() {
+ return rsId;
+ }
}
public static class GetDatasetResultLocationsFunction extends Function {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index f47e41b..b8b67bb 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -22,6 +22,8 @@
public void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId, int partition,
int nPartitions, NetworkAddress networkAddress);
+ public byte[] getRecordDescriptor(JobId jobId, ResultSetId rsId) throws HyracksDataException;
+
public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
index 29fabe2..ae04474 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
@@ -18,6 +18,18 @@
public interface IHyracksDatasetDirectoryServiceConnection {
/**
+ * Gets the record descriptor for the given result set.
+ *
+ * @param jobId
+ * ID of the job
+ * @param rsId
+ * ID of the result set
+ * @return {@link byte[]}
+ * @throws Exception
+ */
+ public byte[] getDatasetSerializedRecordDescriptor(JobId jobId, ResultSetId rsId) throws Exception;
+
+ /**
* Gets the IP Addresses and ports for the partition generating the result for each location.
*
* @param jobId
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
index d1db448..8d536d6 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
@@ -18,6 +18,18 @@
public interface IHyracksDatasetDirectoryServiceInterface {
/**
+ * Gets the record descriptor for the given result set.
+ *
+ * @param jobId
+ * ID of the job
+ * @param rsId
+ * ID of the result set
+ * @return {@link byte[]}
+ * @throws Exception
+ */
+ public byte[] getDatasetSerializedRecordDescriptorFunction(JobId jobId, ResultSetId rsId) throws Exception;
+
+ /**
* Gets the IP Addresses and ports for the partition generating the result for each location.
*
* @param jobId
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 1399dcc..1050983 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -40,6 +40,11 @@
}
@Override
+ public byte[] getDatasetSerializedRecordDescriptor(JobId jobId, ResultSetId rsId) throws Exception {
+ return ddsi.getDatasetSerializedRecordDescriptorFunction(jobId, rsId);
+ }
+
+ @Override
public DatasetDirectoryRecord[] getDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownRecords) throws Exception {
return ddsi.getDatasetResultLocationsFunction(jobId, rsId, knownRecords);
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index fb6e41f..9de26dd 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -34,6 +34,13 @@
}
@Override
+ public byte[] getDatasetSerializedRecordDescriptorFunction(JobId jobId, ResultSetId rsId) throws Exception {
+ HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction gdrdf = new HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction(
+ jobId, rsId);
+ return (byte[]) rpci.call(ipcHandle, gdrdf);
+ }
+
+ @Override
public DatasetDirectoryRecord[] getDatasetResultLocationsFunction(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownRecords) throws Exception {
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 8097ad4..4b61d09 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -56,6 +56,7 @@
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import edu.uci.ics.hyracks.control.cc.work.GetRecordDescriptorWork;
import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
@@ -346,6 +347,13 @@
return;
}
+ case GET_DATASET_RECORD_DESCRIPTOR: {
+ HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction gdrdf = (HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction) fn;
+ workQueue.schedule(new GetRecordDescriptorWork(ClusterControllerService.this, gdrdf.getJobId(),
+ gdrdf.getResultSetId(), new IPCResponder<byte[]>(handle, mid)));
+ return;
+ }
+
case GET_DATASET_RESULT_LOCATIONS: {
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetRecordDescriptorWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetRecordDescriptorWork.java
new file mode 100644
index 0000000..02ad1d8
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetRecordDescriptorWork.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetRecordDescriptorWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+
+ private final JobId jobId;
+
+ private final ResultSetId rsId;
+
+ private final IResultCallback<byte[]> callback;
+
+ public GetRecordDescriptorWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+ IResultCallback<byte[]> callback) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.rsId = rsId;
+ this.callback = callback;
+ }
+
+ @Override
+ public void doRun() {
+ final IDatasetDirectoryService dds = ccs.getDatasetDirectoryService();
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ byte[] recDesc = dds.getRecordDescriptor(jobId, rsId);
+ callback.setValue(recDesc);
+ } catch (HyracksDataException e) {
+ callback.setException(e);
+ }
+ }
+ });
+ }
+}
\ No newline at end of file