merged r1380:1386 from hyracks_asterix_stabilization

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1394 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 9ea4636..f45f449 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -24,6 +24,8 @@
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
@@ -49,6 +51,8 @@
 import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
 
 public class CCNCFunctions {
+    private static final Logger LOGGER = Logger.getLogger(CCNCFunctions.class.getName());
+
     private static final int FID_CODE_SIZE = 1;
 
     public enum FunctionId {
@@ -690,8 +694,12 @@
         private byte[] serialize(Object object, byte fid) throws Exception {
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
             baos.write(fid);
-            serialize(baos, object, fid);
-            JavaSerializationBasedPayloadSerializerDeserializer.serialize(baos, object);
+            try {
+                serialize(baos, object, fid);
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Error serializing " + object, e);
+                throw e;
+            }
             baos.close();
             return baos.toByteArray();
         }
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 609d7f0..257e2a8 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -25,7 +25,6 @@
 import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
 import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index 7769539..f6d4d51 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -55,7 +55,7 @@
                     h = h * 31 + fh;
                 }
                 if (h < 0) {
-                    h = -h;
+                    h = -(h + 1);
                 }
                 return h % nParts;
             }
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index ffdb5c6..4bea8a4 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -183,9 +183,23 @@
 
     void markEOSAck(int channelId) {
         synchronized (mConn) {
-            assert !pendingEOSAckBitmap.get(channelId);
-            pendingEOSAckBitmap.set(channelId);
-            pendingWriteEventsCounter.increment();
+            if (!pendingEOSAckBitmap.get(channelId)) {
+                pendingEOSAckBitmap.set(channelId);
+                pendingWriteEventsCounter.increment();
+            }
+        }
+    }
+
+    void notifyIOError() {
+        synchronized (mConn) {
+            for (int i = 0; i < ccbArray.length; ++i) {
+                ChannelControlBlock ccb = ccbArray[i];
+                if (ccb != null && !ccb.getRemoteEOS()) {
+                    ccb.reportRemoteError(-1);
+                    markEOSAck(i);
+                    unmarkPendingCredits(i);
+                }
+            }
         }
     }
 
@@ -197,7 +211,7 @@
             throw new NetException("More than " + MAX_OPEN_CHANNELS + " opened concurrently");
         }
         if (ccbArray[idx] != null) {
-            assert ccbArray[idx].completelyClosed();
+            assert ccbArray[idx].completelyClosed() : ccbArray[idx].toString();
             if (ccbArray[idx].completelyClosed()) {
                 if (LOGGER.isLoggable(Level.FINE)) {
                     LOGGER.fine("Cleaning free channel: " + ccbArray[idx]);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index c905f57..e717c45 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -49,6 +49,8 @@
 
     private boolean connectionFailure;
 
+    private Exception error;
+
     public MultiplexedConnection(MuxDemux muxDemux) {
         this.muxDemux = muxDemux;
         pendingWriteEventsCounter = new IEventCounter() {
@@ -119,7 +121,19 @@
         }
     }
 
+    @Override
+    public synchronized void notifyIOError(Exception e) {
+        connectionFailure = true;
+        error = e;
+        cSet.notifyIOError();
+    }
+
     public ChannelControlBlock openChannel() throws NetException, InterruptedException {
+        synchronized (this) {
+            if (connectionFailure) {
+                throw new NetException(error);
+            }
+        }
         ChannelControlBlock channel = cSet.allocateChannel();
         int channelId = channel.getChannelId();
         cSet.initiateChannelSyn(channelId);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
index 607bf31..79c21ee 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
@@ -21,4 +21,6 @@
 public interface ITCPConnectionEventListener {
     public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException,
             NetException;
+
+    public void notifyIOError(Exception e);
 }
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index 1a73bdc..7632391 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -153,7 +153,13 @@
 
                             if (readable || writable) {
                                 TCPConnection connection = (TCPConnection) key.attachment();
-                                connection.getEventListener().notifyIOReady(connection, readable, writable);
+                                try {
+                                    connection.getEventListener().notifyIOReady(connection, readable, writable);
+                                } catch (Exception e) {
+                                    connection.getEventListener().notifyIOError(e);
+                                    connection.close();
+                                    continue;
+                                }
                             }
                             if (key.isAcceptable()) {
                                 assert sc == serverSocketChannel;