Add a parameter for the ResultWriterOperatorDescriptor to support ordered result distribution.
Let this parameter percolate throughout the layers of indirection and the
network until it hits the dataset directory service.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2547 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
index c7aabb0..f47e41b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetDirectoryService.java
@@ -19,8 +19,8 @@
import edu.uci.ics.hyracks.api.job.JobId;
public interface IDatasetDirectoryService {
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
- NetworkAddress networkAddress);
+ public void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId, int partition,
+ int nPartitions, NetworkAddress networkAddress);
public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index 6c1ff10..db952a0 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -21,8 +21,8 @@
import edu.uci.ics.hyracks.api.job.JobId;
public interface IDatasetPartitionManager {
- public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, int partition,
- int nPartitions) throws HyracksException;
+ public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, boolean orderedResult, ResultSetId rsId,
+ int partition, int nPartitions) throws HyracksException;
public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 95eecab..8097ad4 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -452,8 +452,8 @@
case REGISTER_RESULT_PARTITION_LOCATION: {
CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
- .getJobId(), rrplf.getResultSetId(), rrplf.getPartition(), rrplf.getNPartitions(), rrplf
- .getNetworkAddress()));
+ .getJobId(), rrplf.getOrderedResult(), rrplf.getResultSetId(), rrplf.getPartition(), rrplf
+ .getNPartitions(), rrplf.getNetworkAddress()));
return;
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index eaa3bab..fab870e 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -40,12 +40,12 @@
}
@Override
- public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition,
- int nPartitions, NetworkAddress networkAddress) {
- Map<ResultSetId, DatasetDirectoryRecord[]> resultSetsMap = jobResultLocationsMap.get(jobId);
- if (resultSetsMap == null) {
- resultSetsMap = new HashMap<ResultSetId, DatasetDirectoryRecord[]>();
- jobResultLocationsMap.put(jobId, resultSetsMap);
+ public synchronized void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId,
+ int partition, int nPartitions, NetworkAddress networkAddress) {
+ Map<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>> rsMap = jobResultLocationsMap.get(jobId);
+ if (rsMap == null) {
+ rsMap = new HashMap<ResultSetId, Pair<Boolean, DatasetDirectoryRecord[]>>();
+ jobResultLocationsMap.put(jobId, rsMap);
}
DatasetDirectoryRecord[] records = resultSetsMap.get(rsId);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 3c762eb..4c36f27 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -25,6 +25,8 @@
private final JobId jobId;
+ private final boolean orderedResult;
+
private final ResultSetId rsId;
private final int partition;
@@ -33,10 +35,11 @@
private final NetworkAddress networkAddress;
- public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- int partition, int nPartitions, NetworkAddress networkAddress) {
+ public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, boolean orderedResult,
+ ResultSetId rsId, int partition, int nPartitions, NetworkAddress networkAddress) {
this.ccs = ccs;
this.jobId = jobId;
+ this.orderedResult = orderedResult;
this.rsId = rsId;
this.partition = partition;
this.nPartitions = nPartitions;
@@ -45,13 +48,13 @@
@Override
public void run() {
- ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, partition, nPartitions,
- networkAddress);
+ ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, orderedResult, rsId, partition,
+ nPartitions, networkAddress);
}
@Override
public String toString() {
return "JobId@" + jobId + " Partition@" + partition + " ResultSetId@" + rsId + " NPartitions@" + nPartitions
- + " ResultPartitionLocation@" + networkAddress;
+ + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult;
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 086fc4d..a79110d 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -48,8 +48,8 @@
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
- NetworkAddress addr) throws Exception;
+ public void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId, int partition,
+ int nPartitions, NetworkAddress addr) throws Exception;
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index e3591ce..2d48155 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -445,6 +445,8 @@
private final JobId jobId;
+ private final boolean orderedResult;
+
private final ResultSetId rsId;
private final int partition;
@@ -453,9 +455,10 @@
private NetworkAddress networkAddress;
- public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
- NetworkAddress networkAddress) {
+ public RegisterResultPartitionLocationFunction(JobId jobId, boolean orderedResult, ResultSetId rsId,
+ int partition, int nPartitions, NetworkAddress networkAddress) {
this.jobId = jobId;
+ this.orderedResult = orderedResult;
this.rsId = rsId;
this.partition = partition;
this.nPartitions = nPartitions;
@@ -471,6 +474,10 @@
return jobId;
}
+ public boolean getOrderedResult() {
+ return orderedResult;
+ }
+
public ResultSetId getResultSetId() {
return rsId;
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 595efd1..ad3a82e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -97,10 +97,10 @@
}
@Override
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, int partition, int nPartitions,
- NetworkAddress networkAddress) throws Exception {
+ public void registerResultPartitionLocation(JobId jobId, boolean orderedResult, ResultSetId rsId, int partition,
+ int nPartitions, NetworkAddress networkAddress) throws Exception {
CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
- jobId, rsId, partition, nPartitions, networkAddress);
+ jobId, orderedResult, rsId, partition, nPartitions, networkAddress);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 7d9b10e..b9a8b69 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -50,13 +50,13 @@
}
@Override
- public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, int partition,
- int nPartitions) throws HyracksException {
+ public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, boolean orderedResult, ResultSetId rsId,
+ int partition, int nPartitions) throws HyracksException {
DatasetPartitionWriter dpw = null;
JobId jobId = ctx.getJobletContext().getJobId();
try {
- ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, partition, nPartitions,
- ncs.getDatasetNetworkManager().getNetworkAddress());
+ ncs.getClusterController().registerResultPartitionLocation(jobId, orderedResult, rsId, partition,
+ nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
dpw = new DatasetPartitionWriter(ctx, this, partition, executor);
DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index dde8669..8c44494 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -33,9 +33,12 @@
private final ResultSetId rsId;
- public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId) {
+ private final boolean ordered;
+
+ public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered) {
super(spec, 1, 0);
this.rsId = rsId;
+ this.ordered = ordered;
}
@Override
@@ -49,7 +52,8 @@
@Override
public void open() throws HyracksDataException {
try {
- datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, partition, nPartitions);
+ datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, ordered, rsId, partition,
+ nPartitions);
datasetPartitionWriter.open();
} catch (HyracksException e) {
throw new HyracksDataException(e);