Fixed bug in SortMergePartitionCollector. Fixed over-synchronization in collectors.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@452 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index 4adb20a..649ec46 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -14,8 +14,10 @@
*/
package edu.uci.ics.hyracks.control.nc.partitions;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -36,7 +38,7 @@
private final NodeControllerService ncs;
- private final Map<PartitionId, IPartition> partitionMap;
+ private final Map<PartitionId, List<IPartition>> partitionMap;
private final DefaultDeallocatableRegistry deallocatableRegistry;
@@ -45,14 +47,19 @@
public PartitionManager(NodeControllerService ncs, NetworkAddress dataPort) {
this.dataPort = dataPort;
this.ncs = ncs;
- partitionMap = new HashMap<PartitionId, IPartition>();
+ partitionMap = new HashMap<PartitionId, List<IPartition>>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
}
public void registerPartition(PartitionId pid, IPartition partition) throws HyracksDataException {
synchronized (this) {
- partitionMap.put(pid, partition);
+ List<IPartition> pList = partitionMap.get(pid);
+ if (pList == null) {
+ pList = new ArrayList<IPartition>();
+ partitionMap.put(pid, pList);
+ }
+ pList.add(partition);
}
try {
ncs.getClusterController().registerPartitionProvider(pid, dataPort);
@@ -62,15 +69,17 @@
}
public synchronized IPartition getPartition(PartitionId pid) {
- return partitionMap.get(pid);
+ return partitionMap.get(pid).get(0);
}
public synchronized void unregisterPartitions(UUID jobId) {
- for (Iterator<Map.Entry<PartitionId, IPartition>> i = partitionMap.entrySet().iterator(); i.hasNext();) {
- Map.Entry<PartitionId, IPartition> e = i.next();
+ for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = partitionMap.entrySet().iterator(); i.hasNext();) {
+ Map.Entry<PartitionId, List<IPartition>> e = i.next();
PartitionId pid = e.getKey();
if (jobId.equals(pid.getJobId())) {
- e.getValue().deallocate();
+ for (IPartition p : e.getValue()) {
+ p.deallocate();
+ }
i.remove();
}
}
@@ -79,8 +88,9 @@
@Override
public synchronized void registerPartitionRequest(PartitionId partitionId, IFrameWriter writer)
throws HyracksException {
- IPartition partition = partitionMap.get(partitionId);
- if (partition != null) {
+ List<IPartition> pList = partitionMap.get(partitionId);
+ if (pList != null && !pList.isEmpty()) {
+ IPartition partition = pList.get(0);
partition.writeTo(writer);
if (!partition.isReusable()) {
partitionMap.remove(partitionId);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
index b640142..5ff528e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
@@ -99,6 +99,17 @@
@Override
public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ findNextSender();
+ if (lastReadSender >= 0) {
+ ByteBuffer srcFrame = channels[lastReadSender].getNextBuffer();
+ FrameUtils.copy(srcFrame, buffer);
+ channels[lastReadSender].recycleBuffer(srcFrame);
+ return true;
+ }
+ return false;
+ }
+
+ private void findNextSender() throws HyracksDataException {
synchronized (NonDeterministicPartitionCollector.this) {
while (true) {
switch (lastReadSender) {
@@ -115,10 +126,7 @@
if (--availableFrameCounts[lastReadSender] == 0) {
frameAvailability.clear(lastReadSender);
}
- ByteBuffer srcFrame = channels[lastReadSender].getNextBuffer();
- FrameUtils.copy(srcFrame, buffer);
- channels[lastReadSender].recycleBuffer(srcFrame);
- return true;
+ return;
}
for (int i = eosSenders.nextSetBit(0); i >= 0; i = eosSenders.nextSetBit(i)) {
channels[i].close();
@@ -127,7 +135,8 @@
}
int nextClosedBitIndex = closedSenders.nextClearBit(0);
if (nextClosedBitIndex < 0 || nextClosedBitIndex >= nSenderPartitions) {
- return false;
+ lastReadSender = -1;
+ return;
}
try {
NonDeterministicPartitionCollector.this.wait();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
index 70355c7..8faee7b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
@@ -79,10 +79,11 @@
InputChannelFrameReader channelReader = new InputChannelFrameReader(channel);
channel.registerMonitor(channelReader);
channel.setAttachment(channelReader);
+ int senderIndex = pid.getSenderIndex();
synchronized (this) {
- channels[pid.getSenderIndex()] = channel;
+ channels[senderIndex] = channel;
}
- pbm.addPartition(pid.getSenderIndex());
+ pbm.addPartition(senderIndex);
channel.open();
}
}
@@ -188,19 +189,21 @@
}
@Override
- public synchronized boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
- while (!eos && availableFrames <= 0) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ synchronized (this) {
+ while (!eos && availableFrames <= 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
}
- }
- if (eos) {
- return false;
+ if (availableFrames <=0 && eos) {
+ return false;
+ }
+ --availableFrames;
}
ByteBuffer srcBuffer = channel.getNextBuffer();
- --availableFrames;
FrameUtils.copy(srcBuffer, buffer);
channel.recycleBuffer(srcBuffer);
return true;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index ddd24d8..e791582 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -77,6 +77,7 @@
setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
} else {
closeRun(runIndex, runCursors, tupleAccessors);
+ topTuples.pop();
}
}
}