Merge branch 'master' into yingyi/fullstack_fix
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
index 2f9417b..879f2a9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
@@ -83,6 +83,7 @@
         scanVariables.add(new LogicalVariable(-1));
         IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider()
                 .computePropertiesVector(scanVariables);
+        r.getLocalProperties().clear();
         IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
         requirements[0] = r;
         return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
index d8b7c33..c85bfb9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
@@ -73,6 +73,7 @@
         scanVariables.addAll(keys);
         scanVariables.add(new LogicalVariable(-1));
         IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computePropertiesVector(scanVariables);
+        r.getLocalProperties().clear();
         IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
         requirements[0] = r;
         return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 9c5de9c..49ec269 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -349,7 +349,7 @@
     @Override
     public String visitExtensionOperator(ExtensionOperator op, Integer indent) throws AlgebricksException {
         StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("statistics collection");
+        addIndent(buffer, indent).append(op.toString());
         return buffer.toString();
     }
 
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 03432a8..51e4950 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -104,8 +104,8 @@
                             getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId, lastReadPartition,
                             NUM_READ_BUFFERS);
                     lastMonitor = getMonitor(lastReadPartition);
-                    resultChannel.open(datasetClientCtx);
                     resultChannel.registerMonitor(lastMonitor);
+                    resultChannel.open(datasetClientCtx);
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
                 }
@@ -142,8 +142,8 @@
                                 getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId,
                                 lastReadPartition, NUM_READ_BUFFERS);
                         lastMonitor = getMonitor(lastReadPartition);
-                        resultChannel.open(datasetClientCtx);
                         resultChannel.registerMonitor(lastMonitor);
+                        resultChannel.open(datasetClientCtx);
                     } catch (Exception e) {
                         throw new HyracksDataException(e);
                     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index ac5a627..89c20d6 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -19,6 +19,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -36,6 +37,7 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -43,24 +45,43 @@
 
     private final static int MATERIALIZER_ACTIVITY_ID = 0;
     private final static int READER_ACTIVITY_ID = 1;
+    private final static int MATERIALIZER_READER_ACTIVITY_ID = 2;
+
+    private boolean isSingleActivity;
 
     public MaterializingOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recordDescriptor) {
+        this(spec, recordDescriptor, false);
+    }
+
+    public MaterializingOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recordDescriptor,
+            boolean isSingleActivity) {
         super(spec, 1, 1);
         recordDescriptors[0] = recordDescriptor;
+        this.isSingleActivity = isSingleActivity;
     }
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
-        ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, READER_ACTIVITY_ID));
+        if (isSingleActivity) {
+            MaterializerReaderActivityNode mra = new MaterializerReaderActivityNode(new ActivityId(odId,
+                    MATERIALIZER_READER_ACTIVITY_ID));
 
-        builder.addActivity(this, ma);
-        builder.addSourceEdge(0, ma, 0);
+            builder.addActivity(this, mra);
+            builder.addSourceEdge(0, mra, 0);
+            builder.addTargetEdge(0, mra, 0);
+        } else {
+            MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
+            ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, READER_ACTIVITY_ID));
 
-        builder.addActivity(this, ra);
-        builder.addTargetEdge(0, ra, 0);
+            builder.addActivity(this, ma);
+            builder.addSourceEdge(0, ma, 0);
 
-        builder.addBlockingEdge(ma, ra);
+            builder.addActivity(this, ra);
+            builder.addTargetEdge(0, ra, 0);
+
+            builder.addBlockingEdge(ma, ra);
+        }
+
     }
 
     public static class MaterializerTaskState extends AbstractStateObject {
@@ -82,6 +103,76 @@
         public void fromBytes(DataInput in) throws IOException {
 
         }
+
+        public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+            FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                    MaterializingOperatorDescriptor.class.getSimpleName());
+            out = new RunFileWriter(file, ctx.getIOManager());
+            out.open();
+        }
+
+        public void appendFrame(ByteBuffer buffer) throws HyracksDataException {
+            out.nextFrame(buffer);
+        }
+
+        public void writeOut(IFrameWriter writer, ByteBuffer frame) throws HyracksDataException {
+            RunFileReader in = out.createReader();
+            writer.open();
+            try {
+                in.open();
+                while (in.nextFrame(frame)) {
+                    frame.flip();
+                    writer.nextFrame(frame);
+                    frame.clear();
+                }
+                in.close();
+            } catch (Exception e) {
+                writer.fail();
+                throw new HyracksDataException(e);
+            } finally {
+                writer.close();
+            }
+        }
+    }
+
+    private final class MaterializerReaderActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public MaterializerReaderActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+            return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private MaterializerTaskState state;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+                            partition));
+                    state.open(ctx);
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    state.appendFrame(buffer);
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    state.out.close();
+                    ByteBuffer frame = ctx.allocateFrame();
+                    state.writeOut(writer, frame);
+                }
+
+            };
+        }
     }
 
     private final class MaterializerActivityNode extends AbstractActivityNode {
@@ -101,15 +192,12 @@
                 public void open() throws HyracksDataException {
                     state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
                             partition));
-                    FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
-                            MaterializingOperatorDescriptor.class.getSimpleName());
-                    state.out = new RunFileWriter(file, ctx.getIOManager());
-                    state.out.open();
+                    state.open(ctx);
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    state.out.nextFrame(buffer);
+                    state.appendFrame(buffer);
                 }
 
                 @Override
@@ -141,22 +229,7 @@
                     ByteBuffer frame = ctx.allocateFrame();
                     MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
                             getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
-                    RunFileReader in = state.out.createReader();
-                    writer.open();
-                    try {
-                        in.open();
-                        while (in.nextFrame(frame)) {
-                            frame.flip();
-                            writer.nextFrame(frame);
-                            frame.clear();
-                        }
-                        in.close();
-                    } catch (Exception e) {
-                        writer.fail();
-                        throw new HyracksDataException(e);
-                    } finally {
-                        writer.close();
-                    }
+                    state.writeOut(writer, frame);
                 }
 
                 @Override
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index c52dbd8..81294c2 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -17,6 +17,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectableChannel;
@@ -175,6 +176,7 @@
                     if (!workingPendingConnections.isEmpty()) {
                         for (IPCHandle handle : workingPendingConnections) {
                             SocketChannel channel = SocketChannel.open();
+                            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
                             channel.configureBlocking(false);
                             SelectionKey cKey = null;
                             if (channel.connect(handle.getRemoteAddress())) {
@@ -267,6 +269,7 @@
                             } else if (key.isAcceptable()) {
                                 assert sc == serverSocketChannel;
                                 SocketChannel channel = serverSocketChannel.accept();
+                                channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
                                 channel.configureBlocking(false);
                                 IPCHandle handle = new IPCHandle(system, null);
                                 SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index 932d71a..067898f 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -17,6 +17,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.net.StandardSocketOptions;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -118,6 +119,7 @@
                     if (!workingPendingConnections.isEmpty()) {
                         for (InetSocketAddress address : workingPendingConnections) {
                             SocketChannel channel = SocketChannel.open();
+                            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
                             channel.configureBlocking(false);
                             boolean connect = false;
                             boolean failure = false;
@@ -143,6 +145,7 @@
                     }
                     if (!workingIncomingConnections.isEmpty()) {
                         for (SocketChannel channel : workingIncomingConnections) {
+                            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
                             channel.configureBlocking(false);
                             SelectionKey sKey = channel.register(selector, 0);
                             TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);