Remove the usage of serialized descriptor.
We do not have the necessity to pass the serialized record descriptor to
clients, so there is no need to pass it to DatasetDirectoryService to
store it in the record since we serialize all the results in the Hyracks
operator before sending the results now.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2978 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index ddf542e..cd2b698 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -197,32 +197,6 @@
}
}
- public static class GetDatasetRecordDescriptorFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- public GetDatasetRecordDescriptorFunction(JobId jobId, ResultSetId rsId) {
- this.jobId = jobId;
- this.rsId = rsId;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.GET_DATASET_RECORD_DESCRIPTOR;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public ResultSetId getResultSetId() {
- return rsId;
- }
- }
-
public static class GetDatasetResultLocationsFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index 1325150..5266333 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -20,8 +20,8 @@
import edu.uci.ics.hyracks.api.job.JobId;
public interface IDatasetDirectoryService {
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
- byte[] serializedRecordDescriptor, int partition, int nPartitions, NetworkAddress networkAddress);
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+ int nPartitions, NetworkAddress networkAddress);
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition);
@@ -29,8 +29,6 @@
public Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException;
- public byte[] getRecordDescriptor(JobId jobId, ResultSetId rsId) throws HyracksDataException;
-
public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index 54ee11b..ae38c7f 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -22,9 +22,10 @@
public interface IDatasetPartitionManager {
public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
- byte[] serializedRecordDescriptor, int partition, int nPartitions) throws HyracksException;
+ int partition, int nPartitions) throws HyracksException;
- public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
+ public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition)
+ throws HyracksException;
public void reportPartitionFailure(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
index b437975..40db7a7 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDataset.java
@@ -26,7 +26,5 @@
public Status getResultStatus();
- public byte[] getSerializedRecordDescriptor();
-
public int read(ByteBuffer buffer) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
index 9358e4c..d49d5cd 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
@@ -31,18 +31,6 @@
public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception;
/**
- * Gets the record descriptor for the given result set.
- *
- * @param jobId
- * ID of the job
- * @param rsId
- * ID of the result set
- * @return {@link byte[]}
- * @throws Exception
- */
- public byte[] getDatasetSerializedRecordDescriptor(JobId jobId, ResultSetId rsId) throws Exception;
-
- /**
* Gets the IP Addresses and ports for the partition generating the result for each location.
*
* @param jobId
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
index 0cf61a6..ba21a84 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
@@ -31,18 +31,6 @@
public Status getDatasetResultStatus(JobId jobId, ResultSetId rsId) throws Exception;
/**
- * Gets the record descriptor for the given result set.
- *
- * @param jobId
- * ID of the job
- * @param rsId
- * ID of the result set
- * @return {@link byte[]}
- * @throws Exception
- */
- public byte[] getDatasetSerializedRecordDescriptor(JobId jobId, ResultSetId rsId) throws Exception;
-
- /**
* Gets the IP Addresses and ports for the partition generating the result for each location.
*
* @param jobId
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
index d7ef55c..94779a6 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDataset.java
@@ -98,18 +98,6 @@
}
@Override
- public byte[] getSerializedRecordDescriptor() {
- byte[] serializedRecordDescriptor = null;
- try {
- serializedRecordDescriptor = datasetDirectoryServiceConnection.getDatasetSerializedRecordDescriptor(jobId,
- resultSetId);
- } catch (Exception e) {
- // TODO(madhusudancs): Decide what to do in case of error
- }
- return serializedRecordDescriptor;
- }
-
- @Override
public int read(ByteBuffer buffer) throws HyracksDataException {
ByteBuffer readBuffer;
int readSize = 0;
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 637d15c..095fd7d 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -46,11 +46,6 @@
}
@Override
- public byte[] getDatasetSerializedRecordDescriptor(JobId jobId, ResultSetId rsId) throws Exception {
- return ddsi.getDatasetSerializedRecordDescriptor(jobId, rsId);
- }
-
- @Override
public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownRecords) throws Exception {
return ddsi.getDatasetResultLocations(jobId, rsId, knownRecords);
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index b5cba65..47cdf97 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -42,13 +42,6 @@
}
@Override
- public byte[] getDatasetSerializedRecordDescriptor(JobId jobId, ResultSetId rsId) throws Exception {
- HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction gdrdf = new HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction(
- jobId, rsId);
- return (byte[]) rpci.call(ipcHandle, gdrdf);
- }
-
- @Override
public DatasetDirectoryRecord[] getDatasetResultLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownRecords) throws Exception {
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = new HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction(
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 93c9685..82457fe 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -57,7 +57,6 @@
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
-import edu.uci.ics.hyracks.control.cc.work.GetRecordDescriptorWork;
import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
import edu.uci.ics.hyracks.control.cc.work.GetResultStatusWork;
import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
@@ -357,13 +356,6 @@
return;
}
- case GET_DATASET_RECORD_DESCRIPTOR: {
- HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction gdrdf = (HyracksClientInterfaceFunctions.GetDatasetRecordDescriptorFunction) fn;
- workQueue.schedule(new GetRecordDescriptorWork(ClusterControllerService.this, gdrdf.getJobId(),
- gdrdf.getResultSetId(), new IPCResponder<byte[]>(handle, mid)));
- return;
- }
-
case GET_DATASET_RESULT_LOCATIONS: {
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
@@ -470,9 +462,8 @@
case REGISTER_RESULT_PARTITION_LOCATION: {
CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
- .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf
- .getSerializedRecordDescriptor(), rrplf.getPartition(), rrplf.getNPartitions(), rrplf
- .getNetworkAddress()));
+ .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getPartition(), rrplf
+ .getNPartitions(), rrplf.getNetworkAddress()));
return;
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 832e10a..13d0c30 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -43,7 +43,7 @@
@Override
public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
- byte[] serializedRecordDescriptor, int partition, int nPartitions, NetworkAddress networkAddress) {
+ int partition, int nPartitions, NetworkAddress networkAddress) {
Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
if (rsMap == null) {
rsMap = new HashMap<ResultSetId, ResultSetMetaData>();
@@ -52,8 +52,7 @@
ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
if (resultSetMetaData == null) {
- resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions],
- serializedRecordDescriptor);
+ resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]);
rsMap.put(rsId, resultSetMetaData);
}
@@ -124,25 +123,6 @@
}
@Override
- public synchronized byte[] getRecordDescriptor(JobId jobId, ResultSetId rsId) throws HyracksDataException {
- Map<ResultSetId, ResultSetMetaData> rsMap;
- while ((rsMap = jobResultLocationsMap.get(jobId)) == null) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
-
- ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
- if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
- throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
- }
-
- return resultSetMetaData.getSerializedRecordDescriptor();
- }
-
- @Override
public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
DatasetDirectoryRecord[] newRecords;
@@ -245,12 +225,9 @@
private final DatasetDirectoryRecord[] records;
- private final byte[] serializedRecordDescriptor;
-
- public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records, byte[] serializedRecordDescriptor) {
+ public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
this.ordered = ordered;
this.records = records;
- this.serializedRecordDescriptor = serializedRecordDescriptor;
}
public boolean getOrderedResult() {
@@ -260,9 +237,5 @@
public DatasetDirectoryRecord[] getRecords() {
return records;
}
-
- public byte[] getSerializedRecordDescriptor() {
- return serializedRecordDescriptor;
- }
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetRecordDescriptorWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetRecordDescriptorWork.java
deleted file mode 100644
index 02ad1d8..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetRecordDescriptorWork.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.IResultCallback;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetRecordDescriptorWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
-
- private final JobId jobId;
-
- private final ResultSetId rsId;
-
- private final IResultCallback<byte[]> callback;
-
- public GetRecordDescriptorWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- IResultCallback<byte[]> callback) {
- this.ccs = ccs;
- this.jobId = jobId;
- this.rsId = rsId;
- this.callback = callback;
- }
-
- @Override
- public void doRun() {
- final IDatasetDirectoryService dds = ccs.getDatasetDirectoryService();
- ccs.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- byte[] recDesc = dds.getRecordDescriptor(jobId, rsId);
- callback.setValue(recDesc);
- } catch (HyracksDataException e) {
- callback.setException(e);
- }
- }
- });
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 1e013cd..f86e924 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -29,8 +29,6 @@
private final boolean orderedResult;
- private final byte[] serializedRecordDescriptor;
-
private final int partition;
private final int nPartitions;
@@ -38,13 +36,11 @@
private final NetworkAddress networkAddress;
public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- boolean orderedResult, byte[] serializedRecordDescriptor, int partition, int nPartitions,
- NetworkAddress networkAddress) {
+ boolean orderedResult, int partition, int nPartitions, NetworkAddress networkAddress) {
this.ccs = ccs;
this.jobId = jobId;
this.rsId = rsId;
this.orderedResult = orderedResult;
- this.serializedRecordDescriptor = serializedRecordDescriptor;
this.partition = partition;
this.nPartitions = nPartitions;
this.networkAddress = networkAddress;
@@ -52,8 +48,8 @@
@Override
public void run() {
- ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult,
- serializedRecordDescriptor, partition, nPartitions, networkAddress);
+ ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
+ nPartitions, networkAddress);
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index d72d4f7..55e4479 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -48,9 +48,8 @@
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
- byte[] serializedRecordDescriptor, int partition, int nPartitions, NetworkAddress networkAddress)
- throws Exception;
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
+ int nPartitions, NetworkAddress networkAddress) throws Exception;
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index e23f1a2..b506b12 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -451,8 +451,6 @@
private final boolean orderedResult;
- private final byte[] serializedRecordDescriptor;
-
private final int partition;
private final int nPartitions;
@@ -460,11 +458,10 @@
private NetworkAddress networkAddress;
public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, boolean orderedResult,
- byte[] serializedRecordDescriptor, int partition, int nPartitions, NetworkAddress networkAddress) {
+ int partition, int nPartitions, NetworkAddress networkAddress) {
this.jobId = jobId;
this.rsId = rsId;
this.orderedResult = orderedResult;
- this.serializedRecordDescriptor = serializedRecordDescriptor;
this.partition = partition;
this.nPartitions = nPartitions;
this.networkAddress = networkAddress;
@@ -483,10 +480,6 @@
return rsId;
}
- public byte[] getSerializedRecordDescriptor() {
- return serializedRecordDescriptor;
- }
-
public boolean getOrderedResult() {
return orderedResult;
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 71dc15b..85f17b1 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -51,13 +51,12 @@
@Override
public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
- byte[] serializedRecordDescriptor, int partition, int nPartitions) throws HyracksException {
+ int partition, int nPartitions) throws HyracksException {
DatasetPartitionWriter dpw = null;
JobId jobId = ctx.getJobletContext().getJobId();
try {
- ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult,
- serializedRecordDescriptor, partition, nPartitions,
- ncs.getDatasetNetworkManager().getNetworkAddress());
+ ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
+ nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, executor);
DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 97814aa..66b8fbd 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -47,17 +47,14 @@
private final IResultSerializedAppenderFactory resultSerializedAppenderFactory;
- private final byte[] serializedRecordDescriptor;
public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
- RecordDescriptor recordDescriptor, IResultSerializedAppenderFactory resultSerializedAppenderFactory)
- throws IOException {
+ RecordDescriptor recordDescriptor, IResultSerializerFactory resultSerializerFactory) throws IOException {
super(spec, 1, 0);
this.rsId = rsId;
this.ordered = ordered;
this.recordDescriptor = recordDescriptor;
this.resultSerializedAppenderFactory = resultSerializedAppenderFactory;
- this.serializedRecordDescriptor = JavaSerializationUtils.serialize(recordDescriptor);
}
@Override
@@ -84,8 +81,7 @@
@Override
public void open() throws HyracksDataException {
try {
- datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered,
- serializedRecordDescriptor, partition, nPartitions);
+ datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, partition, nPartitions);
datasetPartitionWriter.open();
} catch (HyracksException e) {
throw new HyracksDataException(e);