Add an additional parameter to ResultWriter operator to take the query or job's execution mode, synchronous or asynchronous as the boolean parameter.
Pass it down all the way to result writers and the result state.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index dea077d..e370949 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -22,7 +22,7 @@
public interface IDatasetPartitionManager {
public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
- int partition, int nPartitions) throws HyracksException;
+ boolean asyncMode, int partition, int nPartitions) throws HyracksException;
public void reportPartitionWriteCompletion(JobId jobId, ResultSetId resultSetId, int partition)
throws HyracksException;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 6d2c043..2399320 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -76,14 +76,15 @@
@Override
public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
- int partition, int nPartitions) throws HyracksException {
+ boolean asyncMode, int partition, int nPartitions) throws HyracksException {
DatasetPartitionWriter dpw = null;
JobId jobId = ctx.getJobletContext().getJobId();
try {
synchronized (this) {
ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
- dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager, fileFactory);
+ dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, partition, datasetMemoryManager,
+ fileFactory);
Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
if (rsIdMap == null) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 8f4b639..97c9bca 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -37,6 +37,8 @@
private final ResultSetId resultSetId;
+ private final boolean asyncMode;
+
private final int partition;
private final DatasetMemoryManager datasetMemoryManager;
@@ -46,16 +48,18 @@
private final ResultState resultState;
public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
- ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager,
+ ResultSetId rsId, boolean asyncMode, int partition, DatasetMemoryManager datasetMemoryManager,
IWorkspaceFileFactory fileFactory) {
this.manager = manager;
this.jobId = jobId;
this.resultSetId = rsId;
+ this.asyncMode = asyncMode;
this.partition = partition;
this.datasetMemoryManager = datasetMemoryManager;
resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
- resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), fileFactory, ctx.getFrameSize());
+ resultState = new ResultState(resultSetPartitionId, asyncMode, ctx.getIOManager(), fileFactory,
+ ctx.getFrameSize());
}
public ResultState getResultState() {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 911f372..b076185 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -37,6 +37,8 @@
private final ResultSetPartitionId resultSetPartitionId;
+ private final boolean asyncMode;
+
private final int frameSize;
private final IIOManager ioManager;
@@ -59,9 +61,10 @@
private long persistentSize;
- ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, IWorkspaceFileFactory fileFactory,
- int frameSize) {
+ ResultState(ResultSetPartitionId resultSetPartitionId, boolean asyncMode, IIOManager ioManager,
+ IWorkspaceFileFactory fileFactory, int frameSize) {
this.resultSetPartitionId = resultSetPartitionId;
+ this.asyncMode = asyncMode;
this.ioManager = ioManager;
this.fileFactory = fileFactory;
this.frameSize = frameSize;
@@ -251,6 +254,10 @@
return ioManager;
}
+ public boolean getAsyncMode() {
+ return asyncMode;
+ }
+
@Override
public JobId getJobId() {
return resultSetPartitionId.getJobId();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index edca60a..4c27903 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -42,13 +42,16 @@
private final boolean ordered;
+ private final boolean asyncMode;
+
private final IResultSerializerFactory resultSerializerFactory;
public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
- IResultSerializerFactory resultSerializerFactory) throws IOException {
+ boolean asyncMode, IResultSerializerFactory resultSerializerFactory) throws IOException {
super(spec, 1, 0);
this.rsId = rsId;
this.ordered = ordered;
+ this.asyncMode = asyncMode;
this.resultSerializerFactory = resultSerializerFactory;
}
@@ -75,7 +78,7 @@
@Override
public void open() throws HyracksDataException {
try {
- datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, partition,
+ datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
nPartitions);
datasetPartitionWriter.open();
resultSerializer.init();