Pass the serialized record descriptor all the way upto dataset directory service in the RPC chain.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2829 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 4c36f27..bc67146 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -25,9 +25,11 @@
private final JobId jobId;
+ private final ResultSetId rsId;
+
private final boolean orderedResult;
- private final ResultSetId rsId;
+ private final byte[] serializedRecordDescriptor;
private final int partition;
@@ -35,12 +37,14 @@
private final NetworkAddress networkAddress;
- public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, boolean orderedResult,
- ResultSetId rsId, int partition, int nPartitions, NetworkAddress networkAddress) {
+ public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
+ boolean orderedResult, byte[] serializedRecordDescriptor, int partition, int nPartitions,
+ NetworkAddress networkAddress) {
this.ccs = ccs;
this.jobId = jobId;
- this.orderedResult = orderedResult;
this.rsId = rsId;
+ this.orderedResult = orderedResult;
+ this.serializedRecordDescriptor = serializedRecordDescriptor;
this.partition = partition;
this.nPartitions = nPartitions;
this.networkAddress = networkAddress;
@@ -48,8 +52,8 @@
@Override
public void run() {
- ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, orderedResult, rsId, partition,
- nPartitions, networkAddress);
+ ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult,
+ serializedRecordDescriptor, partition, nPartitions, networkAddress);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index a79110d..b9dd665 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -48,8 +48,9 @@
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
- public void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId, int partition,
- int nPartitions, NetworkAddress addr) throws Exception;
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ byte[] serializedRecordDescriptor, int partition, int nPartitions, NetworkAddress networkAddress)
+ throws Exception;
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 2d48155..5d84683 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -445,9 +445,11 @@
private final JobId jobId;
+ private final ResultSetId rsId;
+
private final boolean orderedResult;
- private final ResultSetId rsId;
+ private final byte[] serializedRecordDescriptor;
private final int partition;
@@ -455,11 +457,12 @@
private NetworkAddress networkAddress;
- public RegisterResultPartitionLocationFunction(JobId jobId, boolean orderedResult, ResultSetId rsId,
- int partition, int nPartitions, NetworkAddress networkAddress) {
+ public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ byte[] serializedRecordDescriptor, int partition, int nPartitions, NetworkAddress networkAddress) {
this.jobId = jobId;
- this.orderedResult = orderedResult;
this.rsId = rsId;
+ this.orderedResult = orderedResult;
+ this.serializedRecordDescriptor = serializedRecordDescriptor;
this.partition = partition;
this.nPartitions = nPartitions;
this.networkAddress = networkAddress;
@@ -474,14 +477,18 @@
return jobId;
}
- public boolean getOrderedResult() {
- return orderedResult;
- }
-
public ResultSetId getResultSetId() {
return rsId;
}
+ public byte[] getSerializedRecordDescriptor() {
+ return serializedRecordDescriptor;
+ }
+
+ public boolean getOrderedResult() {
+ return orderedResult;
+ }
+
public int getPartition() {
return partition;
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index ad3a82e..a5efcff 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -97,10 +97,11 @@
}
@Override
- public void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId, int partition,
- int nPartitions, NetworkAddress networkAddress) throws Exception {
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ byte[] serializedRecordDescriptor, int partition, int nPartitions, NetworkAddress networkAddress)
+ throws Exception {
CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
- jobId, orderedResult, rsId, partition, nPartitions, networkAddress);
+ jobId, rsId, orderedResult, serializedRecordDescriptor, partition, nPartitions, networkAddress);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index b9a8b69..ed8a278 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -50,13 +50,14 @@
}
@Override
- public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, boolean orderedResult, ResultSetId rsId,
- int partition, int nPartitions) throws HyracksException {
+ public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
+ byte[] serializedRecordDescriptor, int partition, int nPartitions) throws HyracksException {
DatasetPartitionWriter dpw = null;
JobId jobId = ctx.getJobletContext().getJobId();
try {
- ncs.getClusterController().registerResultPartitionLocation(jobId, orderedResult, rsId, partition,
- nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
+ ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult,
+ serializedRecordDescriptor, partition, nPartitions,
+ ncs.getDatasetNetworkManager().getNetworkAddress());
dpw = new DatasetPartitionWriter(ctx, this, partition, executor);
DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);