Fixed bug in SortMergePartitionCollector. Fixed over-synchronization in collectors.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@452 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index 4adb20a..649ec46 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -14,8 +14,10 @@
  */
 package edu.uci.ics.hyracks.control.nc.partitions;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -36,7 +38,7 @@
 
     private final NodeControllerService ncs;
 
-    private final Map<PartitionId, IPartition> partitionMap;
+    private final Map<PartitionId, List<IPartition>> partitionMap;
 
     private final DefaultDeallocatableRegistry deallocatableRegistry;
 
@@ -45,14 +47,19 @@
     public PartitionManager(NodeControllerService ncs, NetworkAddress dataPort) {
         this.dataPort = dataPort;
         this.ncs = ncs;
-        partitionMap = new HashMap<PartitionId, IPartition>();
+        partitionMap = new HashMap<PartitionId, List<IPartition>>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
     }
 
     public void registerPartition(PartitionId pid, IPartition partition) throws HyracksDataException {
         synchronized (this) {
-            partitionMap.put(pid, partition);
+            List<IPartition> pList = partitionMap.get(pid);
+            if (pList == null) {
+                pList = new ArrayList<IPartition>();
+                partitionMap.put(pid, pList);
+            }
+            pList.add(partition);
         }
         try {
             ncs.getClusterController().registerPartitionProvider(pid, dataPort);
@@ -62,15 +69,17 @@
     }
 
     public synchronized IPartition getPartition(PartitionId pid) {
-        return partitionMap.get(pid);
+        return partitionMap.get(pid).get(0);
     }
 
     public synchronized void unregisterPartitions(UUID jobId) {
-        for (Iterator<Map.Entry<PartitionId, IPartition>> i = partitionMap.entrySet().iterator(); i.hasNext();) {
-            Map.Entry<PartitionId, IPartition> e = i.next();
+        for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = partitionMap.entrySet().iterator(); i.hasNext();) {
+            Map.Entry<PartitionId, List<IPartition>> e = i.next();
             PartitionId pid = e.getKey();
             if (jobId.equals(pid.getJobId())) {
-                e.getValue().deallocate();
+                for (IPartition p : e.getValue()) {
+                    p.deallocate();
+                }
                 i.remove();
             }
         }
@@ -79,8 +88,9 @@
     @Override
     public synchronized void registerPartitionRequest(PartitionId partitionId, IFrameWriter writer)
             throws HyracksException {
-        IPartition partition = partitionMap.get(partitionId);
-        if (partition != null) {
+        List<IPartition> pList = partitionMap.get(partitionId);
+        if (pList != null && !pList.isEmpty()) {
+            IPartition partition = pList.get(0);
             partition.writeTo(writer);
             if (!partition.isReusable()) {
                 partitionMap.remove(partitionId);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
index b640142..5ff528e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
@@ -99,6 +99,17 @@
 
         @Override
         public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            findNextSender();
+            if (lastReadSender >= 0) {
+                ByteBuffer srcFrame = channels[lastReadSender].getNextBuffer();
+                FrameUtils.copy(srcFrame, buffer);
+                channels[lastReadSender].recycleBuffer(srcFrame);
+                return true;
+            }
+            return false;
+        }
+
+        private void findNextSender() throws HyracksDataException {
             synchronized (NonDeterministicPartitionCollector.this) {
                 while (true) {
                     switch (lastReadSender) {
@@ -115,10 +126,7 @@
                         if (--availableFrameCounts[lastReadSender] == 0) {
                             frameAvailability.clear(lastReadSender);
                         }
-                        ByteBuffer srcFrame = channels[lastReadSender].getNextBuffer();
-                        FrameUtils.copy(srcFrame, buffer);
-                        channels[lastReadSender].recycleBuffer(srcFrame);
-                        return true;
+                        return;
                     }
                     for (int i = eosSenders.nextSetBit(0); i >= 0; i = eosSenders.nextSetBit(i)) {
                         channels[i].close();
@@ -127,7 +135,8 @@
                     }
                     int nextClosedBitIndex = closedSenders.nextClearBit(0);
                     if (nextClosedBitIndex < 0 || nextClosedBitIndex >= nSenderPartitions) {
-                        return false;
+                        lastReadSender = -1;
+                        return;
                     }
                     try {
                         NonDeterministicPartitionCollector.this.wait();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
index 70355c7..8faee7b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
@@ -79,10 +79,11 @@
             InputChannelFrameReader channelReader = new InputChannelFrameReader(channel);
             channel.registerMonitor(channelReader);
             channel.setAttachment(channelReader);
+            int senderIndex = pid.getSenderIndex();
             synchronized (this) {
-                channels[pid.getSenderIndex()] = channel;
+                channels[senderIndex] = channel;
             }
-            pbm.addPartition(pid.getSenderIndex());
+            pbm.addPartition(senderIndex);
             channel.open();
         }
     }
@@ -188,19 +189,21 @@
         }
 
         @Override
-        public synchronized boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            while (!eos && availableFrames <= 0) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
+        public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            synchronized (this) {
+                while (!eos && availableFrames <= 0) {
+                    try {
+                        wait();
+                    } catch (InterruptedException e) {
+                        throw new HyracksDataException(e);
+                    }
                 }
-            }
-            if (eos) {
-                return false;
+                if (availableFrames <=0 && eos) {
+                    return false;
+                }
+                --availableFrames;
             }
             ByteBuffer srcBuffer = channel.getNextBuffer();
-            --availableFrames;
             FrameUtils.copy(srcBuffer, buffer);
             channel.recycleBuffer(srcBuffer);
             return true;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index ddd24d8..e791582 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -77,6 +77,7 @@
                 setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
             } else {
                 closeRun(runIndex, runCursors, tupleAccessors);
+                topTuples.pop();
             }
         }
     }