Read results for each result set id.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index 0c5a5bb..dea077d 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -29,7 +29,8 @@
public void reportPartitionFailure(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
- public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
+ public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
+ throws HyracksException;
public void abortReader(JobId jobId);
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 08ad0d7..33e1b01 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -101,7 +101,7 @@
knownRecords);
lastReadPartition = 0;
resultChannel = new DatasetNetworkInputChannel(netManager,
- getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
+ getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId, lastReadPartition,
NUM_READ_BUFFERS);
lastMonitor = getMonitor(lastReadPartition);
resultChannel.open(datasetClientCtx);
@@ -143,8 +143,8 @@
}
resultChannel = new DatasetNetworkInputChannel(netManager,
- getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
- NUM_READ_BUFFERS);
+ getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId,
+ lastReadPartition, NUM_READ_BUFFERS);
lastMonitor = getMonitor(lastReadPartition);
resultChannel.open(datasetClientCtx);
resultChannel.registerMonitor(lastMonitor);
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
index fac2949..1ab315b 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
@@ -41,6 +42,8 @@
private final JobId jobId;
+ private final ResultSetId resultSetId;
+
private final int partition;
private final Queue<ByteBuffer> fullQueue;
@@ -54,10 +57,11 @@
private Object attachment;
public DatasetNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, JobId jobId,
- int partition, int nBuffers) {
+ ResultSetId resultSetId, int partition, int nBuffers) {
this.netManager = netManager;
this.remoteAddress = remoteAddress;
this.jobId = jobId;
+ this.resultSetId = resultSetId;
this.partition = partition;
fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
this.nBuffers = nBuffers;
@@ -103,6 +107,7 @@
}
ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
writeBuffer.putLong(jobId.getId());
+ writeBuffer.putLong(resultSetId.getId());
writeBuffer.putInt(partition);
writeBuffer.flip();
if (LOGGER.isLoggable(Level.FINE)) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index ef2902e..2ec7acc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.nc.dataset;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -38,7 +39,7 @@
private final Executor executor;
- private final Map<JobId, ResultState[]> partitionResultStateMap;
+ private final Map<JobId, Map<ResultSetId, ResultState[]>> partitionResultStateMap;
private final DefaultDeallocatableRegistry deallocatableRegistry;
@@ -57,18 +58,23 @@
} else {
datasetMemoryManager = null;
}
- partitionResultStateMap = new LinkedHashMap<JobId, ResultState[]>() {
+ partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
private static final long serialVersionUID = 1L;
- protected boolean removeEldestEntry(Map.Entry<JobId, ResultState[]> eldest) {
+ protected boolean removeEldestEntry(Map.Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
synchronized (DatasetPartitionManager.this) {
if (size() > resultHistorySize) {
- ResultState[] resultStates = eldest.getValue();
- for (int i = 0; i < resultStates.length; i++) {
- ResultState state = resultStates[i];
- if (state != null) {
- state.closeAndDelete();
- LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
+ Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(eldest.getValue());
+ for (ResultSetId rsId : rsIdMap.keySet()) {
+ ResultState[] resultStates = rsIdMap.get(rsId);
+ if (resultStates != null) {
+ for (int i = 0; i < resultStates.length; i++) {
+ ResultState state = resultStates[i];
+ if (state != null) {
+ state.closeAndDelete();
+ LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
+ }
+ }
}
}
return true;
@@ -90,10 +96,16 @@
nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager, fileFactory);
- ResultState[] resultStates = partitionResultStateMap.get(jobId);
+ Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+ if (rsIdMap == null) {
+ rsIdMap = new HashMap<ResultSetId, ResultState[]>();
+ partitionResultStateMap.put(jobId, rsIdMap);
+ }
+
+ ResultState[] resultStates = rsIdMap.get(rsId);
if (resultStates == null) {
resultStates = new ResultState[nPartitions];
- partitionResultStateMap.put(jobId, resultStates);
+ rsIdMap.put(rsId, resultStates);
}
resultStates[partition] = dpw.getResultState();
}
@@ -128,16 +140,21 @@
}
@Override
- public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
- throws HyracksException {
+ public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition,
+ IFrameWriter writer) throws HyracksException {
ResultState resultState;
synchronized (this) {
- ResultState[] resultStates = partitionResultStateMap.get(jobId);
+ Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
- if (resultStates == null) {
+ if (rsIdMap == null) {
throw new HyracksException("Unknown JobId " + jobId);
}
+ ResultState[] resultStates = rsIdMap.get(resultSetId);
+ if (resultStates == null) {
+ throw new HyracksException("Unknown JobId: " + jobId + " ResultSetId: " + resultSetId);
+ }
+
resultState = resultStates[partition];
if (resultState == null) {
throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
@@ -146,19 +163,26 @@
DatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
dpr.writeTo(writer);
- LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":partition: " + partition);
+ LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":ResultSetId: " + resultSetId + ":partition: "
+ + partition);
}
@Override
public synchronized void abortReader(JobId jobId) {
- ResultState[] resultStates = partitionResultStateMap.get(jobId);
+ Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
- if (resultStates == null) {
+ if (rsIdMap == null) {
return;
}
- for (ResultState state : resultStates) {
- if (state != null) {
- state.abort();
+
+ for (ResultSetId rsId : rsIdMap.keySet()) {
+ ResultState[] resultStates = rsIdMap.get(rsId);
+ if (resultStates != null) {
+ for (ResultState state : resultStates) {
+ if (state != null) {
+ state.abort();
+ }
+ }
}
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
index 5b8b333..84baf49 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
@@ -95,6 +96,7 @@
@Override
public void accept(ByteBuffer buffer) {
JobId jobId = new JobId(buffer.getLong());
+ ResultSetId rsId = new ResultSetId(buffer.getLong());
int partition = buffer.getInt();
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Received initial dataset partition read request for JobId: " + jobId + " partition: "
@@ -102,7 +104,7 @@
}
noc = new NetworkOutputChannel(ccb, 1);
try {
- partitionManager.initializeDatasetPartitionReader(jobId, partition, noc);
+ partitionManager.initializeDatasetPartitionReader(jobId, rsId, partition, noc);
} catch (HyracksException e) {
noc.abort();
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index e74e54c..715f822 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -60,7 +60,7 @@
private static IHyracksClientConnection hcc;
private final List<File> outputFiles;
-
+
protected static int DEFAULT_MEM_PAGE_SIZE = 32768;
protected static int DEFAULT_MEM_NUM_PAGES = 1000;