merged r1380:1386 from hyracks_asterix_stabilization
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1394 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 9ea4636..f45f449 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -24,6 +24,8 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
@@ -49,6 +51,8 @@
import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
public class CCNCFunctions {
+ private static final Logger LOGGER = Logger.getLogger(CCNCFunctions.class.getName());
+
private static final int FID_CODE_SIZE = 1;
public enum FunctionId {
@@ -690,8 +694,12 @@
private byte[] serialize(Object object, byte fid) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(fid);
- serialize(baos, object, fid);
- JavaSerializationBasedPayloadSerializerDeserializer.serialize(baos, object);
+ try {
+ serialize(baos, object, fid);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Error serializing " + object, e);
+ throw e;
+ }
baos.close();
return baos.toByteArray();
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 609d7f0..257e2a8 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -25,7 +25,6 @@
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index 7769539..f6d4d51 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -55,7 +55,7 @@
h = h * 31 + fh;
}
if (h < 0) {
- h = -h;
+ h = -(h + 1);
}
return h % nParts;
}
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index ffdb5c6..4bea8a4 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -183,9 +183,23 @@
void markEOSAck(int channelId) {
synchronized (mConn) {
- assert !pendingEOSAckBitmap.get(channelId);
- pendingEOSAckBitmap.set(channelId);
- pendingWriteEventsCounter.increment();
+ if (!pendingEOSAckBitmap.get(channelId)) {
+ pendingEOSAckBitmap.set(channelId);
+ pendingWriteEventsCounter.increment();
+ }
+ }
+ }
+
+ void notifyIOError() {
+ synchronized (mConn) {
+ for (int i = 0; i < ccbArray.length; ++i) {
+ ChannelControlBlock ccb = ccbArray[i];
+ if (ccb != null && !ccb.getRemoteEOS()) {
+ ccb.reportRemoteError(-1);
+ markEOSAck(i);
+ unmarkPendingCredits(i);
+ }
+ }
}
}
@@ -197,7 +211,7 @@
throw new NetException("More than " + MAX_OPEN_CHANNELS + " opened concurrently");
}
if (ccbArray[idx] != null) {
- assert ccbArray[idx].completelyClosed();
+ assert ccbArray[idx].completelyClosed() : ccbArray[idx].toString();
if (ccbArray[idx].completelyClosed()) {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Cleaning free channel: " + ccbArray[idx]);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index c905f57..e717c45 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -49,6 +49,8 @@
private boolean connectionFailure;
+ private Exception error;
+
public MultiplexedConnection(MuxDemux muxDemux) {
this.muxDemux = muxDemux;
pendingWriteEventsCounter = new IEventCounter() {
@@ -119,7 +121,19 @@
}
}
+ @Override
+ public synchronized void notifyIOError(Exception e) {
+ connectionFailure = true;
+ error = e;
+ cSet.notifyIOError();
+ }
+
public ChannelControlBlock openChannel() throws NetException, InterruptedException {
+ synchronized (this) {
+ if (connectionFailure) {
+ throw new NetException(error);
+ }
+ }
ChannelControlBlock channel = cSet.allocateChannel();
int channelId = channel.getChannelId();
cSet.initiateChannelSyn(channelId);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
index 607bf31..79c21ee 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
@@ -21,4 +21,6 @@
public interface ITCPConnectionEventListener {
public void notifyIOReady(TCPConnection connection, boolean readable, boolean writable) throws IOException,
NetException;
+
+ public void notifyIOError(Exception e);
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index 1a73bdc..7632391 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -153,7 +153,13 @@
if (readable || writable) {
TCPConnection connection = (TCPConnection) key.attachment();
- connection.getEventListener().notifyIOReady(connection, readable, writable);
+ try {
+ connection.getEventListener().notifyIOReady(connection, readable, writable);
+ } catch (Exception e) {
+ connection.getEventListener().notifyIOError(e);
+ connection.close();
+ continue;
+ }
}
if (key.isAcceptable()) {
assert sc == serverSocketChannel;