Merge branch 'master' into yingyi/fullstack_fix
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
index 2f9417b..879f2a9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
@@ -83,6 +83,7 @@
scanVariables.add(new LogicalVariable(-1));
IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider()
.computePropertiesVector(scanVariables);
+ r.getLocalProperties().clear();
IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
requirements[0] = r;
return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
index d8b7c33..c85bfb9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
@@ -73,6 +73,7 @@
scanVariables.addAll(keys);
scanVariables.add(new LogicalVariable(-1));
IPhysicalPropertiesVector r = dataSource.getPropertiesProvider().computePropertiesVector(scanVariables);
+ r.getLocalProperties().clear();
IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
requirements[0] = r;
return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 9c5de9c..49ec269 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -349,7 +349,7 @@
@Override
public String visitExtensionOperator(ExtensionOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("statistics collection");
+ addIndent(buffer, indent).append(op.toString());
return buffer.toString();
}
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 03432a8..51e4950 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -104,8 +104,8 @@
getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId, lastReadPartition,
NUM_READ_BUFFERS);
lastMonitor = getMonitor(lastReadPartition);
- resultChannel.open(datasetClientCtx);
resultChannel.registerMonitor(lastMonitor);
+ resultChannel.open(datasetClientCtx);
} catch (Exception e) {
throw new HyracksDataException(e);
}
@@ -142,8 +142,8 @@
getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId,
lastReadPartition, NUM_READ_BUFFERS);
lastMonitor = getMonitor(lastReadPartition);
- resultChannel.open(datasetClientCtx);
resultChannel.registerMonitor(lastMonitor);
+ resultChannel.open(datasetClientCtx);
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index ac5a627..89c20d6 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -36,6 +37,7 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor {
@@ -43,24 +45,43 @@
private final static int MATERIALIZER_ACTIVITY_ID = 0;
private final static int READER_ACTIVITY_ID = 1;
+ private final static int MATERIALIZER_READER_ACTIVITY_ID = 2;
+
+ private boolean isSingleActivity;
public MaterializingOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recordDescriptor) {
+ this(spec, recordDescriptor, false);
+ }
+
+ public MaterializingOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recordDescriptor,
+ boolean isSingleActivity) {
super(spec, 1, 1);
recordDescriptors[0] = recordDescriptor;
+ this.isSingleActivity = isSingleActivity;
}
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
- ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, READER_ACTIVITY_ID));
+ if (isSingleActivity) {
+ MaterializerReaderActivityNode mra = new MaterializerReaderActivityNode(new ActivityId(odId,
+ MATERIALIZER_READER_ACTIVITY_ID));
- builder.addActivity(this, ma);
- builder.addSourceEdge(0, ma, 0);
+ builder.addActivity(this, mra);
+ builder.addSourceEdge(0, mra, 0);
+ builder.addTargetEdge(0, mra, 0);
+ } else {
+ MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
+ ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, READER_ACTIVITY_ID));
- builder.addActivity(this, ra);
- builder.addTargetEdge(0, ra, 0);
+ builder.addActivity(this, ma);
+ builder.addSourceEdge(0, ma, 0);
- builder.addBlockingEdge(ma, ra);
+ builder.addActivity(this, ra);
+ builder.addTargetEdge(0, ra, 0);
+
+ builder.addBlockingEdge(ma, ra);
+ }
+
}
public static class MaterializerTaskState extends AbstractStateObject {
@@ -82,6 +103,76 @@
public void fromBytes(DataInput in) throws IOException {
}
+
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ MaterializingOperatorDescriptor.class.getSimpleName());
+ out = new RunFileWriter(file, ctx.getIOManager());
+ out.open();
+ }
+
+ public void appendFrame(ByteBuffer buffer) throws HyracksDataException {
+ out.nextFrame(buffer);
+ }
+
+ public void writeOut(IFrameWriter writer, ByteBuffer frame) throws HyracksDataException {
+ RunFileReader in = out.createReader();
+ writer.open();
+ try {
+ in.open();
+ while (in.nextFrame(frame)) {
+ frame.flip();
+ writer.nextFrame(frame);
+ frame.clear();
+ }
+ in.close();
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+ }
+
+ private final class MaterializerReaderActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public MaterializerReaderActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private MaterializerTaskState state;
+
+ @Override
+ public void open() throws HyracksDataException {
+ state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
+ state.open(ctx);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ state.appendFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ state.out.close();
+ ByteBuffer frame = ctx.allocateFrame();
+ state.writeOut(writer, frame);
+ }
+
+ };
+ }
}
private final class MaterializerActivityNode extends AbstractActivityNode {
@@ -101,15 +192,12 @@
public void open() throws HyracksDataException {
state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
partition));
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
- MaterializingOperatorDescriptor.class.getSimpleName());
- state.out = new RunFileWriter(file, ctx.getIOManager());
- state.out.open();
+ state.open(ctx);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- state.out.nextFrame(buffer);
+ state.appendFrame(buffer);
}
@Override
@@ -141,22 +229,7 @@
ByteBuffer frame = ctx.allocateFrame();
MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
getOperatorId(), MATERIALIZER_ACTIVITY_ID), partition));
- RunFileReader in = state.out.createReader();
- writer.open();
- try {
- in.open();
- while (in.nextFrame(frame)) {
- frame.flip();
- writer.nextFrame(frame);
- frame.clear();
- }
- in.close();
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- writer.close();
- }
+ state.writeOut(writer, frame);
}
@Override
diff --git a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index c52dbd8..81294c2 100644
--- a/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
@@ -175,6 +176,7 @@
if (!workingPendingConnections.isEmpty()) {
for (IPCHandle handle : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
+ channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.configureBlocking(false);
SelectionKey cKey = null;
if (channel.connect(handle.getRemoteAddress())) {
@@ -267,6 +269,7 @@
} else if (key.isAcceptable()) {
assert sc == serverSocketChannel;
SocketChannel channel = serverSocketChannel.accept();
+ channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.configureBlocking(false);
IPCHandle handle = new IPCHandle(system, null);
SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index 932d71a..067898f 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.net.StandardSocketOptions;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -118,6 +119,7 @@
if (!workingPendingConnections.isEmpty()) {
for (InetSocketAddress address : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
+ channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.configureBlocking(false);
boolean connect = false;
boolean failure = false;
@@ -143,6 +145,7 @@
}
if (!workingIncomingConnections.isEmpty()) {
for (SocketChannel channel : workingIncomingConnections) {
+ channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.configureBlocking(false);
SelectionKey sKey = channel.register(selector, 0);
TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);