Refactored Operator and Connector touchpoints
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@31 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
new file mode 100644
index 0000000..8173785
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import edu.uci.ics.hyracks.api.resources.IResourceManager;
+
+public interface IHyracksContext {
+ public IResourceManager getResourceManager();
+
+ public int getFrameSize();
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
index 6a3a447..82d0e9e 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
@@ -16,22 +16,15 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.context.HyracksContext;
public interface IActivityNode extends Serializable {
public ActivityNodeId getActivityNodeId();
public IOperatorDescriptor getOwner();
- public boolean supportsPushInterface();
-
- public boolean supportsPullInterface();
-
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions);
-
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions);
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions);
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index b3feef8..25be8b3 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -22,9 +22,9 @@
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.context.HyracksContext;
/**
* Connector that connects operators in a Job.
@@ -44,8 +44,8 @@
*
* @param ctx
* Context
- * @param plan
- * Job plan
+ * @param recordDesc
+ * Record Descriptor
* @param edwFactory
* Endpoint writer factory.
* @param index
@@ -57,15 +57,16 @@
* @return data writer.
* @throws Exception
*/
- public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
+ public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException;
/**
* Factory metod to create the receive side reader that reads data from this connector.
*
* @param ctx
* Context
- * @param plan
+ * @param recordDesc
* Job plan
* @param demux
* Connection Demultiplexer
@@ -78,8 +79,9 @@
* @return data reader
* @throws HyracksDataException
*/
- public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
+ public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException;
/**
* Translate this connector descriptor to JSON.
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
index 13ac15c..f019b13 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
@@ -15,7 +15,8 @@
package edu.uci.ics.hyracks.api.dataflow;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
public interface IOperatorNodePushable extends IFrameWriter {
- public void setFrameWriter(int index, IFrameWriter writer);
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
new file mode 100644
index 0000000..96d43a7
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+
+public interface IRecordDescriptorProvider {
+ public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex);
+
+ public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex);
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResourceManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResourceManager.java
new file mode 100644
index 0000000..495db70
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResourceManager.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface IResourceManager {
+
+ public ByteBuffer allocateFrame();
+
+ public File createFile(String prefix, String suffix) throws IOException;
+
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
index ea8ede3..bfc79e7 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
@@ -24,7 +24,7 @@
import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
-import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
public class ConnectionEntry implements IConnectionEntry {
private static final Logger LOGGER = Logger.getLogger(ConnectionEntry.class.getName());
@@ -47,7 +47,7 @@
private boolean aborted;
- public ConnectionEntry(HyracksContext ctx, SocketChannel socketChannel, SelectionKey key) {
+ public ConnectionEntry(IHyracksContext ctx, SocketChannel socketChannel, SelectionKey key) {
this.socketChannel = socketChannel;
readBuffer = ctx.getResourceManager().allocateFrame();
readBuffer.clear();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
index 55e2962..ce752f3 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
@@ -41,9 +41,9 @@
import edu.uci.ics.hyracks.api.comm.IDataReceiveListenerFactory;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.comm.io.FrameHelper;
-import edu.uci.ics.hyracks.context.HyracksContext;
public class ConnectionManager {
private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
@@ -54,7 +54,7 @@
private ServerSocketChannel serverSocketChannel;
- private final HyracksContext ctx;
+ private final IHyracksContext ctx;
private final Map<UUID, IDataReceiveListenerFactory> pendingConnectionReceivers;
@@ -70,7 +70,7 @@
private ByteBuffer emptyFrame;
- public ConnectionManager(HyracksContext ctx, InetAddress address) throws IOException {
+ public ConnectionManager(IHyracksContext ctx, InetAddress address) throws IOException {
this.ctx = ctx;
serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
index 694e61a..af7c6fc 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
@@ -28,22 +28,22 @@
import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
import edu.uci.ics.hyracks.api.comm.IDataReceiveListenerFactory;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.context.HyracksContext;
public class DemuxDataReceiveListenerFactory implements IDataReceiveListenerFactory, IConnectionDemultiplexer,
IDataReceiveListener {
private static final Logger LOGGER = Logger.getLogger(DemuxDataReceiveListenerFactory.class.getName());
private final NonDeterministicFrameReader frameReader;
- private final HyracksContext ctx;
+ private final IHyracksContext ctx;
private final BitSet readyBits;
private IConnectionEntry senders[];
private int openSenderCount;
private UUID jobId;
private UUID stageId;
- public DemuxDataReceiveListenerFactory(HyracksContext ctx, UUID jobId, UUID stageId) {
+ public DemuxDataReceiveListenerFactory(IHyracksContext ctx, UUID jobId, UUID stageId) {
frameReader = new NonDeterministicFrameReader(ctx, this);
this.ctx = ctx;
this.jobId = jobId;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
index 91350735..0ddc5d6 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
@@ -21,19 +21,19 @@
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.comm.io.FrameHelper;
-import edu.uci.ics.hyracks.context.HyracksContext;
public class NonDeterministicFrameReader implements IFrameReader {
private static final Logger LOGGER = Logger.getLogger(NonDeterministicFrameReader.class.getName());
- private final HyracksContext ctx;
+ private final IHyracksContext ctx;
private final IConnectionDemultiplexer demux;
private int lastReadSender;
private boolean eos;
- public NonDeterministicFrameReader(HyracksContext ctx, IConnectionDemultiplexer demux) {
+ public NonDeterministicFrameReader(IHyracksContext ctx, IConnectionDemultiplexer demux) {
this.ctx = ctx;
this.demux = demux;
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java
index 62372dc..5c97397 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/SortMergeFrameReader.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -34,12 +35,11 @@
import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.context.HyracksContext;
public class SortMergeFrameReader implements IFrameReader {
private static final Logger LOGGER = Logger.getLogger(SortMergeFrameReader.class.getName());
- private final HyracksContext ctx;
+ private final IHyracksContext ctx;
private final IConnectionDemultiplexer demux;
private final FrameTuplePairComparator tpc;
private final FrameTupleAppender appender;
@@ -50,7 +50,7 @@
private int lastReadSender;
private boolean first;
- public SortMergeFrameReader(HyracksContext ctx, IConnectionDemultiplexer demux, int[] sortFields,
+ public SortMergeFrameReader(IHyracksContext ctx, IConnectionDemultiplexer demux, int[] sortFields,
IBinaryComparator[] comparators, RecordDescriptor recordDescriptor) {
this.ctx = ctx;
this.demux = demux;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java
index 22d4961..c6b365b 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializer.java
@@ -21,10 +21,10 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.FrameConstants;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.comm.util.ByteBufferInputStream;
-import edu.uci.ics.hyracks.context.HyracksContext;
public class FrameDeserializer {
private static final Logger LOGGER = Logger.getLogger(FrameDeserializer.class.getName());
@@ -43,7 +43,7 @@
private ByteBuffer buffer;
- public FrameDeserializer(HyracksContext ctx, RecordDescriptor recordDescriptor) {
+ public FrameDeserializer(IHyracksContext ctx, RecordDescriptor recordDescriptor) {
this.bbis = new ByteBufferInputStream();
this.di = new DataInputStream(bbis);
this.recordDescriptor = recordDescriptor;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java
index 31d37b7..08f3472 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataReader.java
@@ -17,10 +17,10 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataReader;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.context.HyracksContext;
public class FrameDeserializingDataReader implements IOpenableDataReader<Object[]> {
private final ByteBuffer buffer;
@@ -33,7 +33,7 @@
private final FrameDeserializer frameDeserializer;
- public FrameDeserializingDataReader(HyracksContext ctx, IFrameReader frameReader, RecordDescriptor recordDescriptor) {
+ public FrameDeserializingDataReader(IHyracksContext ctx, IFrameReader frameReader, RecordDescriptor recordDescriptor) {
buffer = ctx.getResourceManager().allocateFrame();
this.frameReader = frameReader;
this.frameDeserializer = new FrameDeserializer(ctx, recordDescriptor);
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java
index e0a7250..413ded6 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameDeserializingDataWriter.java
@@ -17,16 +17,16 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.context.HyracksContext;
public class FrameDeserializingDataWriter implements IFrameWriter {
private final IOpenableDataWriter<Object[]> writer;
private final FrameDeserializer frameDeserializer;
- public FrameDeserializingDataWriter(HyracksContext ctx, IOpenableDataWriter<Object[]> writer,
+ public FrameDeserializingDataWriter(IHyracksContext ctx, IOpenableDataWriter<Object[]> writer,
RecordDescriptor recordDescriptor) {
this.writer = writer;
this.frameDeserializer = new FrameDeserializer(ctx, recordDescriptor);
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java
index a82a051..52ecfce 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameHelper.java
@@ -14,10 +14,10 @@
*/
package edu.uci.ics.hyracks.comm.io;
-import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
public class FrameHelper {
- public static int getTupleCountOffset(HyracksContext ctx) {
+ public static int getTupleCountOffset(IHyracksContext ctx) {
return ctx.getFrameSize() - 4;
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java
index 05d83cd..06c5a17 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAccessor.java
@@ -17,10 +17,10 @@
import java.io.DataInputStream;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.comm.util.ByteBufferInputStream;
-import edu.uci.ics.hyracks.context.HyracksContext;
/**
* FrameTupleCursor is used to navigate over tuples in a Frame.
@@ -33,12 +33,12 @@
* @author vinayakb
*/
public final class FrameTupleAccessor {
- private final HyracksContext ctx;
+ private final IHyracksContext ctx;
private final RecordDescriptor recordDescriptor;
private ByteBuffer buffer;
- public FrameTupleAccessor(HyracksContext ctx, RecordDescriptor recordDescriptor) {
+ public FrameTupleAccessor(IHyracksContext ctx, RecordDescriptor recordDescriptor) {
this.ctx = ctx;
this.recordDescriptor = recordDescriptor;
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
index 70f0248..c57e589 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
@@ -16,10 +16,10 @@
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
public class FrameTupleAppender {
- private final HyracksContext ctx;
+ private final IHyracksContext ctx;
private ByteBuffer buffer;
@@ -27,7 +27,7 @@
private int tupleDataEndOffset;
- public FrameTupleAppender(HyracksContext ctx) {
+ public FrameTupleAppender(IHyracksContext ctx) {
this.ctx = ctx;
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java
index 4103dd6..593eb3b 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/SerializingDataWriter.java
@@ -19,10 +19,10 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.context.HyracksContext;
public class SerializingDataWriter implements IOpenableDataWriter<Object[]> {
private static final Logger LOGGER = Logger.getLogger(SerializingDataWriter.class.getName());
@@ -39,7 +39,7 @@
private boolean open;
- public SerializingDataWriter(HyracksContext ctx, RecordDescriptor recordDescriptor, IFrameWriter frameWriter) {
+ public SerializingDataWriter(IHyracksContext ctx, RecordDescriptor recordDescriptor, IFrameWriter frameWriter) {
buffer = ctx.getResourceManager().allocateFrame();
tb = new ArrayTupleBuilder(recordDescriptor.getFields().length);
this.recordDescriptor = recordDescriptor;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/context/HyracksContext.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/context/HyracksContext.java
index 7fe088d..b061919 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/context/HyracksContext.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/context/HyracksContext.java
@@ -14,10 +14,12 @@
*/
package edu.uci.ics.hyracks.context;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.resources.IResourceManager;
import edu.uci.ics.hyracks.resources.ResourceManager;
-public class HyracksContext {
- private final ResourceManager resourceManager;
+public class HyracksContext implements IHyracksContext {
+ private final IResourceManager resourceManager;
private final int frameSize;
public HyracksContext(int frameSize) {
@@ -25,10 +27,12 @@
this.frameSize = frameSize;
}
- public ResourceManager getResourceManager() {
+ @Override
+ public IResourceManager getResourceManager() {
return resourceManager;
}
+ @Override
public int getFrameSize() {
return frameSize;
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index 758a2a0..682d6a2 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -38,6 +38,7 @@
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.controller.IClusterController;
import edu.uci.ics.hyracks.api.controller.INodeController;
import edu.uci.ics.hyracks.api.controller.NodeCapability;
@@ -52,6 +53,8 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobPlan;
@@ -71,7 +74,7 @@
private final String id;
- private final HyracksContext ctx;
+ private final IHyracksContext ctx;
private final NodeCapability nodeCapability;
@@ -162,71 +165,90 @@
}
@Override
- public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
- Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions)
+ public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, final JobPlan plan, UUID stageId,
+ int attempt, Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions)
throws Exception {
- LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
+ try {
+ LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
- final Joblet joblet = getLocalJoblet(jobId);
-
- Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
- joblet.setStagelet(stageId, stagelet);
-
- final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
- Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
-
- List<Endpoint> endpointList = new ArrayList<Endpoint>();
-
- for (ActivityNodeId hanId : tasks.keySet()) {
- IActivityNode han = plan.getActivityNodeMap().get(hanId);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Initializing " + hanId + " -> " + han);
- }
- IOperatorDescriptor op = han.getOwner();
- List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
- for (int i : tasks.get(hanId)) {
- IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i,
- opPartitions.get(op.getOperatorId()).size());
- OperatorRunnable or = new OperatorRunnable(ctx, hon);
- stagelet.setOperator(op.getOperatorId(), i, or);
- if (inputs != null) {
- for (int j = 0; j < inputs.size(); ++j) {
- if (j >= 1) {
- throw new IllegalStateException();
- }
- IConnectorDescriptor conn = inputs.get(j);
- OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
- .getOperatorId();
- OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
- .getOperatorId();
- Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
- endpointList.add(endpoint);
- DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId, stageId);
- connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
- PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
- .getTaskInputMap().get(hanId).get(j), i);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
- }
- portMap.put(piId, endpoint);
- IFrameReader reader = createReader(conn, drlf, i, plan, stagelet, opPartitions
- .get(producerOpId).size(), opPartitions.get(consumerOpId).size());
- or.setFrameReader(reader);
- }
+ IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
+ @Override
+ public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex) {
+ return plan.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
}
- honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
+
+ @Override
+ public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex) {
+ return plan.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
+ }
+ };
+
+ final Joblet joblet = getLocalJoblet(jobId);
+
+ Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
+ joblet.setStagelet(stageId, stagelet);
+
+ final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
+ Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
+
+ List<Endpoint> endpointList = new ArrayList<Endpoint>();
+
+ for (ActivityNodeId hanId : tasks.keySet()) {
+ IActivityNode han = plan.getActivityNodeMap().get(hanId);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Initializing " + hanId + " -> " + han);
+ }
+ IOperatorDescriptor op = han.getOwner();
+ List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
+ for (int i : tasks.get(hanId)) {
+ IOperatorNodePushable hon = han.createPushRuntime(ctx, joblet.getEnvironment(op, i), rdp, i,
+ opPartitions.get(op.getOperatorId()).size());
+ OperatorRunnable or = new OperatorRunnable(ctx, hon);
+ stagelet.setOperator(op.getOperatorId(), i, or);
+ if (inputs != null) {
+ for (int j = 0; j < inputs.size(); ++j) {
+ if (j >= 1) {
+ throw new IllegalStateException();
+ }
+ IConnectorDescriptor conn = inputs.get(j);
+ OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+ .getOperatorId();
+ OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+ .getOperatorId();
+ Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
+ endpointList.add(endpoint);
+ DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId,
+ stageId);
+ connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
+ PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
+ .getTaskInputMap().get(hanId).get(j), i);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
+ }
+ portMap.put(piId, endpoint);
+ IFrameReader reader = createReader(conn, drlf, i, plan, stagelet,
+ opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size());
+ or.setFrameReader(reader);
+ }
+ }
+ honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
+ }
}
+
+ stagelet.setEndpointList(endpointList);
+
+ return portMap;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
}
-
- stagelet.setEndpointList(endpointList);
-
- return portMap;
}
private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
throws HyracksDataException {
- final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex, nProducerCount,
+ final IFrameReader reader = conn.createReceiveSideReader(ctx, plan.getJobSpecification()
+ .getConnectorRecordDescriptor(conn), demux, receiverIndex, nProducerCount,
nConsumerCount);
return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
@@ -262,52 +284,59 @@
public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId,
Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
- LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
- final Joblet ji = getLocalJoblet(jobId);
- Stagelet si = (Stagelet) ji.getStagelet(stageId);
- final Map<OperatorInstanceId, OperatorRunnable> honMap = si.getOperatorMap();
+ try {
+ LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
+ final Joblet ji = getLocalJoblet(jobId);
+ Stagelet si = (Stagelet) ji.getStagelet(stageId);
+ final Map<OperatorInstanceId, OperatorRunnable> honMap = si.getOperatorMap();
- final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
+ final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
- final JobSpecification spec = plan.getJobSpecification();
+ final JobSpecification spec = plan.getJobSpecification();
- for (ActivityNodeId hanId : tasks.keySet()) {
- IActivityNode han = plan.getActivityNodeMap().get(hanId);
- IOperatorDescriptor op = han.getOwner();
- List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
- for (int i : tasks.get(hanId)) {
- OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
- if (outputs != null) {
- for (int j = 0; j < outputs.size(); ++j) {
- final IConnectorDescriptor conn = outputs.get(j);
- OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
- .getOperatorId();
- OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
- .getOperatorId();
- final int senderIndex = i;
- IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
- @Override
- public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
- PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
- Direction.INPUT, spec.getConsumerInputIndex(conn), index);
- Endpoint ep = globalPortMap.get(piId);
- if (ep == null) {
- LOGGER.info("Got null Endpoint for " + piId);
- throw new NullPointerException();
+ for (ActivityNodeId hanId : tasks.keySet()) {
+ IActivityNode han = plan.getActivityNodeMap().get(hanId);
+ IOperatorDescriptor op = han.getOwner();
+ List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
+ for (int i : tasks.get(hanId)) {
+ OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
+ if (outputs != null) {
+ for (int j = 0; j < outputs.size(); ++j) {
+ final IConnectorDescriptor conn = outputs.get(j);
+ OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+ .getOperatorId();
+ OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+ .getOperatorId();
+ final int senderIndex = i;
+ IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
+ PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
+ Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+ Endpoint ep = globalPortMap.get(piId);
+ if (ep == null) {
+ LOGGER.info("Got null Endpoint for " + piId);
+ throw new NullPointerException();
+ }
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
+ }
+ return createWriter(connectionManager.connect(ep.getNetworkAddress(),
+ ep.getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
}
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
- }
- return createWriter(connectionManager.connect(ep.getNetworkAddress(),
- ep.getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
- }
- };
- or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i,
- opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size()));
+ };
+ or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan.getJobSpecification()
+ .getConnectorRecordDescriptor(conn), edwFactory, i,
+ opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size()), spec
+ .getConnectorRecordDescriptor(conn));
+ }
}
+ stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
}
- stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
}
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
}
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java
index 457a44c..9f94086 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java
@@ -27,20 +27,19 @@
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.coreops.util.ReferenceEntry;
@@ -91,14 +90,8 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- return null;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -278,22 +271,12 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalArgumentException();
}
};
return op;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return false;
- }
}
private class MergeActivity extends AbstractActivityNode {
@@ -305,14 +288,8 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- return null;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -368,7 +345,7 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
if (index != 0) {
throw new IllegalArgumentException();
}
@@ -472,16 +449,6 @@
};
return op;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
}
private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
@@ -511,7 +478,7 @@
};
}
- private void flushFrames(HyracksContext ctx, List<ByteBuffer> inFrames, ByteBuffer outFrame, long[] tPointers,
+ private void flushFrames(IHyracksContext ctx, List<ByteBuffer> inFrames, ByteBuffer outFrame, long[] tPointers,
IFrameWriter writer) throws HyracksDataException {
FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, recordDescriptors[0]);
FrameTupleAppender outFrameAppender = new FrameTupleAppender(ctx);
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/HashDataWriter.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/HashDataWriter.java
index 2c1e28a..76b2ae4 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/HashDataWriter.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/HashDataWriter.java
@@ -18,13 +18,13 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.context.HyracksContext;
public class HashDataWriter implements IFrameWriter {
private final int consumerPartitionCount;
@@ -33,7 +33,7 @@
private final FrameTupleAccessor tupleAccessor;
private final ITuplePartitionComputer tpc;
- public HashDataWriter(HyracksContext ctx, int consumerPartitionCount, IEndpointDataWriterFactory edwFactory,
+ public HashDataWriter(IHyracksContext ctx, int consumerPartitionCount, IEndpointDataWriterFactory edwFactory,
RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
this.consumerPartitionCount = consumerPartitionCount;
epWriters = new IFrameWriter[consumerPartitionCount];
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java
index 1c7e6c7..f33b2b0 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java
@@ -19,21 +19,20 @@
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
@@ -76,14 +75,8 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- return null;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -212,22 +205,12 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalArgumentException();
}
};
return op;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return false;
- }
}
private class MergeActivity extends AbstractActivityNode {
@@ -239,14 +222,8 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- return null;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
IOperatorNodePushable op = new IOperatorNodePushable() {
private IFrameWriter writer;
@@ -298,7 +275,7 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
if (index != 0) {
throw new IllegalArgumentException();
}
@@ -307,15 +284,5 @@
};
return op;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
index 5013812..5281042 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
@@ -17,13 +17,13 @@
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.comm.NonDeterministicFrameReader;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractConnectorDescriptor;
public class MToNHashPartitioningConnectorDescriptor extends AbstractConnectorDescriptor {
@@ -36,17 +36,18 @@
}
@Override
- public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
- JobSpecification spec = plan.getJobSpecification();
- final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, spec
- .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
+ public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
+ final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
+ tpcf.createPartitioner());
return hashWriter;
}
@Override
- public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
index 28cd1fa..ffc42a1 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
@@ -17,15 +17,15 @@
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.comm.SortMergeFrameReader;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractConnectorDescriptor;
public class MToNHashPartitioningMergingConnectorDescriptor extends AbstractConnectorDescriptor {
@@ -36,7 +36,7 @@
private final IBinaryComparatorFactory[] comparatorFactories;
public MToNHashPartitioningMergingConnectorDescriptor(JobSpecification spec, ITuplePartitionComputerFactory tpcf,
- int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
+ int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
super(spec);
this.tpcf = tpcf;
this.sortFields = sortFields;
@@ -44,22 +44,22 @@
}
@Override
- public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
- JobSpecification spec = plan.getJobSpecification();
- final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, spec
- .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
+ public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
+ final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
+ tpcf.createPartitioner());
return hashWriter;
}
@Override
- public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- JobSpecification spec = plan.getJobSpecification();
- return new SortMergeFrameReader(ctx, demux, sortFields, comparators, spec.getConnectorRecordDescriptor(this));
+ return new SortMergeFrameReader(ctx, demux, sortFields, comparators, recordDesc);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
index d3a8af9..c9d6ee1 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
@@ -20,29 +20,26 @@
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.comm.NonDeterministicFrameReader;
import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractConnectorDescriptor;
public class MToNRangePartitioningConnectorDescriptor extends AbstractConnectorDescriptor {
private static final long serialVersionUID = 1L;
private class RangeDataWriter implements IFrameWriter {
- private final int consumerPartitionCount;
private final IFrameWriter[] epWriters;
private final FrameTupleAppender[] appenders;
private final FrameTupleAccessor tupleAccessor;
- public RangeDataWriter(HyracksContext ctx, int consumerPartitionCount, IFrameWriter[] epWriters,
- FrameTupleAppender[] appenders, RecordDescriptor recordDescriptor) {
- this.consumerPartitionCount = consumerPartitionCount;
+ public RangeDataWriter(IHyracksContext ctx, int consumerPartitionCount, IFrameWriter[] epWriters,
+ FrameTupleAppender[] appenders, RecordDescriptor recordDescriptor) {
this.epWriters = epWriters;
this.appenders = appenders;
tupleAccessor = new FrameTupleAccessor(ctx, recordDescriptor);
@@ -105,9 +102,9 @@
}
@Override
- public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
- JobSpecification spec = plan.getJobSpecification();
+ public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
final FrameTupleAppender[] appenders = new FrameTupleAppender[nConsumerPartitions];
for (int i = 0; i < nConsumerPartitions; ++i) {
@@ -119,14 +116,15 @@
throw new HyracksDataException(e);
}
}
- final RangeDataWriter rangeWriter = new RangeDataWriter(ctx, nConsumerPartitions, epWriters, appenders, spec
- .getConnectorRecordDescriptor(this));
+ final RangeDataWriter rangeWriter = new RangeDataWriter(ctx, nConsumerPartitions, epWriters, appenders,
+ recordDesc);
return rangeWriter;
}
@Override
- public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
index 324c39f..988fb5c 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
@@ -19,12 +19,12 @@
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.comm.NonDeterministicFrameReader;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractConnectorDescriptor;
public class MToNReplicatingConnectorDescriptor extends AbstractConnectorDescriptor {
@@ -35,7 +35,7 @@
private static final long serialVersionUID = 1L;
@Override
- public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
+ public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc, IEndpointDataWriterFactory edwFactory,
int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
for (int i = 0; i < nConsumerPartitions; ++i) {
@@ -70,7 +70,7 @@
}
@Override
- public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
+ public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc, IConnectionDemultiplexer demux,
int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java
index 807e890..59cf71b 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java
@@ -14,15 +14,14 @@
*/
package edu.uci.ics.hyracks.coreops;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
@@ -70,24 +69,9 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- return new DeserializedOperatorNodePushable(ctx, new MapperOperator(), plan, getActivityNodeId());
- }
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new DeserializedOperatorNodePushable(ctx, new MapperOperator(),
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java
index 9f06d7b..735af5a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java
@@ -21,16 +21,15 @@
import java.nio.channels.FileChannel;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
@@ -62,14 +61,8 @@
private static final long serialVersionUID = 1L;
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new IOperatorNodePushable() {
private FileChannel out;
private int frameCount;
@@ -114,7 +107,7 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalArgumentException();
}
};
@@ -124,30 +117,14 @@
public IOperatorDescriptor getOwner() {
return MaterializingOperatorDescriptor.this;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
}
private final class ReaderActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new IOperatorNodePushable() {
private IFrameWriter writer;
@@ -187,7 +164,7 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
if (index != 0) {
throw new IllegalArgumentException();
}
@@ -200,15 +177,5 @@
public IOperatorDescriptor getOwner() {
return MaterializingOperatorDescriptor.this;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
index fff3aa6..c2ce792 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
@@ -17,12 +17,12 @@
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.comm.NonDeterministicFrameReader;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractConnectorDescriptor;
public class OneToOneConnectorDescriptor extends AbstractConnectorDescriptor {
@@ -33,14 +33,16 @@
}
@Override
- public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
return edwFactory.createFrameWriter(index);
}
@Override
- public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java
index a07a2f9..c378961 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java
@@ -14,14 +14,13 @@
*/
package edu.uci.ics.hyracks.coreops;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
@@ -58,24 +57,9 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(), plan, getActivityNodeId());
- }
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(),
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java
index 08ec2bb..f90eaea 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java
@@ -17,18 +17,17 @@
import java.util.ArrayList;
import java.util.List;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
@@ -49,14 +48,8 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
private ArrayList<Object[]> buffer;
@@ -81,17 +74,8 @@
buffer.add(data);
}
};
- return new DeserializedOperatorNodePushable(ctx, op, plan, getActivityNodeId());
- }
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
+ return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0));
}
}
@@ -104,14 +88,8 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
private IOpenableDataWriter<Object[]> writer;
@@ -143,17 +121,8 @@
writer.close();
}
};
- return new DeserializedOperatorNodePushable(ctx, op, plan, getActivityNodeId());
- }
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
+ return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0));
}
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java
index a4571ee..b862e16 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java
@@ -16,15 +16,14 @@
import java.io.File;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
@@ -101,24 +100,8 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), plan, getActivityNodeId());
- }
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), null);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java
index d04df29..84e30d3 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java
@@ -16,14 +16,13 @@
import java.io.File;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
@@ -88,24 +87,9 @@
protected abstract IRecordWriter createRecordWriter(File file, int index) throws Exception;
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- return new DeserializedOperatorNodePushable(ctx, new FileWriteOperator(partition), plan, getActivityNodeId());
- }
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new DeserializedOperatorNodePushable(ctx, new FileWriteOperator(partition),
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
index b2dc234..faa6ac4 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
@@ -20,6 +20,7 @@
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -30,7 +31,6 @@
import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.context.HyracksContext;
class GroupingHashTable {
/**
@@ -60,7 +60,7 @@
}
private static final int INIT_ACCUMULATORS_SIZE = 8;
- private final HyracksContext ctx;
+ private final IHyracksContext ctx;
private final FrameTupleAppender appender;
private final List<ByteBuffer> buffers;
private final Link[] table;
@@ -79,7 +79,7 @@
private final FrameTupleAccessor storedKeysAccessor;
- GroupingHashTable(HyracksContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
+ GroupingHashTable(IHyracksContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
this.ctx = ctx;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java
index 664c84d..6e91c7d 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java
@@ -17,19 +17,18 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
@@ -73,24 +72,18 @@
private static final long serialVersionUID = 1L;
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
- final RecordDescriptor rd0 = plan.getJobSpecification()
- .getOperatorInputRecordDescriptor(getOperatorId(), 0);
- final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, rd0);
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx,
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
return new IOperatorNodePushable() {
private GroupingHashTable table;
@Override
public void open() throws HyracksDataException {
- table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory, rd0,
- recordDescriptors[0], tableSize);
+ table = new GroupingHashTable(ctx, keys, comparatorFactories, tpcf, aggregatorFactory,
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+ tableSize);
}
@Override
@@ -108,7 +101,7 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalArgumentException();
}
};
@@ -118,30 +111,14 @@
public IOperatorDescriptor getOwner() {
return HashGroupOperatorDescriptor.this;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
}
private class OutputActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new IOperatorNodePushable() {
private IFrameWriter writer;
@@ -165,7 +142,7 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
if (index != 0) {
throw new IllegalArgumentException();
}
@@ -178,15 +155,5 @@
public IOperatorDescriptor getOwner() {
return HashGroupOperatorDescriptor.this;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java
index e7e04a5..acfdafe 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java
@@ -14,15 +14,14 @@
*/
package edu.uci.ics.hyracks.coreops.group;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
@@ -43,29 +42,13 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
IComparator[] comparators = new IComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createComparator();
}
return new DeserializedOperatorNodePushable(ctx, new PreclusteredGroupOperator(groupFields, comparators,
- aggregator), plan, getActivityNodeId());
- }
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
+ aggregator), recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
index 90502b0..064743d 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
@@ -18,15 +18,14 @@
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.coreops.file.IRecordReader;
@@ -142,24 +141,9 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), plan, getActivityNodeId());
- }
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition),
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java
index c144365..0e81de7 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java
@@ -22,15 +22,14 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
import edu.uci.ics.hyracks.hadoop.util.DatatypeHelper;
@@ -133,28 +132,13 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- return new DeserializedOperatorNodePushable(ctx, new MapperOperator(), plan, getActivityNodeId());
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new DeserializedOperatorNodePushable(ctx, new MapperOperator(),
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
}
public Class<? extends Mapper> getMapperClass() {
return mapperClass;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java
index e7bfa31..95bfaea 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java
@@ -28,18 +28,17 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IDataReader;
import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
import edu.uci.ics.hyracks.coreops.group.IGroupAggregator;
import edu.uci.ics.hyracks.coreops.group.PreclusteredGroupOperator;
@@ -206,14 +205,8 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
try {
if (this.comparatorFactory == null) {
String comparatorClassName = getJobConfMap().get(comparatorClassKey);
@@ -235,7 +228,8 @@
}
IOpenableDataWriterOperator op = new PreclusteredGroupOperator(new int[] { 0 },
new IComparator[] { comparatorFactory.createComparator() }, new ReducerAggregator(createReducer()));
- return new DeserializedOperatorNodePushable(ctx, op, plan, getActivityNodeId());
+ return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
+ getOperatorId(), 0));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -250,16 +244,6 @@
}
@Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
-
- @Override
public RecordDescriptor getRecordDescriptor(JobConf conf) {
String outputKeyClassName = conf.get("mapred.output.key.class");
String outputValueClassName = conf.get("mapred.output.value.class");
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java
index c38b9ce..c9605e6 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java
@@ -21,24 +21,23 @@
import java.nio.channels.FileChannel;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
import edu.uci.ics.hyracks.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.coreops.RepartitionComputerFactory;
import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
@@ -113,22 +112,15 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, final int nPartitions) {
- final RecordDescriptor rd0 = plan.getJobSpecification().getOperatorInputRecordDescriptor(getOperatorId(),
- operatorInputIndex);
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ final IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
IOperatorNodePushable op = new IOperatorNodePushable() {
- private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx, rd0);
+ private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx,
+ recordDescProvider.getInputRecordDescriptor(getOperatorId(), operatorInputIndex));
ITuplePartitionComputer hpc = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories)
.createPartitioner();
@@ -140,7 +132,7 @@
private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalArgumentException();
}
@@ -229,39 +221,20 @@
public IOperatorDescriptor getOwner() {
return GraceHashJoinOperatorDescriptor.this;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
-
}
private class JoinActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
- final RecordDescriptor rd0 = plan.getJobSpecification()
- .getOperatorInputRecordDescriptor(getOperatorId(), 0);
- final RecordDescriptor rd1 = plan.getJobSpecification()
- .getOperatorInputRecordDescriptor(getOperatorId(), 1);
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
IOperatorNodePushable op = new IOperatorNodePushable() {
private InMemoryHashJoin joiner;
@@ -357,7 +330,7 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
if (index != 0) {
throw new IllegalStateException();
}
@@ -371,15 +344,5 @@
public IOperatorDescriptor getOwner() {
return GraceHashJoinOperatorDescriptor.this;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
}
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/HybridHashJoinOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/HybridHashJoinOperatorDescriptor.java
index 1462f52..82f4506 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/HybridHashJoinOperatorDescriptor.java
@@ -21,25 +21,24 @@
import java.nio.channels.FileChannel;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
import edu.uci.ics.hyracks.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.coreops.RepartitionComputerFactory;
import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
@@ -124,18 +123,10 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, final int nPartitions) {
- final RecordDescriptor rd0 = plan.getJobSpecification()
- .getOperatorInputRecordDescriptor(getOperatorId(), 0);
- final RecordDescriptor rd1 = plan.getJobSpecification()
- .getOperatorInputRecordDescriptor(getOperatorId(), 1);
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -156,7 +147,7 @@
private int B;
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalArgumentException();
}
@@ -330,17 +321,6 @@
public IOperatorDescriptor getOwner() {
return HybridHashJoinOperatorDescriptor.this;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
-
}
private class PartitionAndJoinActivityNode extends AbstractActivityNode {
@@ -353,18 +333,10 @@
}
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, final int nPartitions) {
- final RecordDescriptor rd0 = plan.getJobSpecification()
- .getOperatorInputRecordDescriptor(getOperatorId(), 0);
- final RecordDescriptor rd1 = plan.getJobSpecification()
- .getOperatorInputRecordDescriptor(getOperatorId(), 1);
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -575,7 +547,7 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
if (index != 0) {
throw new IllegalStateException();
}
@@ -589,15 +561,5 @@
public IOperatorDescriptor getOwner() {
return HybridHashJoinOperatorDescriptor.this;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoin.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoin.java
index 0962895..ab896fa 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoin.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoin.java
@@ -20,12 +20,12 @@
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
-import edu.uci.ics.hyracks.context.HyracksContext;
public class InMemoryHashJoin {
private final Link[] table;
@@ -38,7 +38,7 @@
private final FrameTuplePairComparator tpComparator;
private final ByteBuffer outBuffer;
- public InMemoryHashJoin(HyracksContext ctx, int tableSize, FrameTupleAccessor accessor0,
+ public InMemoryHashJoin(IHyracksContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
FrameTuplePairComparator comparator) {
table = new Link[tableSize];
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java
index 51fc7c1..cc1cdb7 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java
@@ -17,23 +17,22 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
import edu.uci.ics.hyracks.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
@@ -79,18 +78,10 @@
private static final long serialVersionUID = 1L;
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
- final RecordDescriptor rd0 = plan.getJobSpecification()
- .getOperatorInputRecordDescriptor(getOperatorId(), 0);
- final RecordDescriptor rd1 = plan.getJobSpecification()
- .getOperatorInputRecordDescriptor(getOperatorId(), 1);
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -122,7 +113,7 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalArgumentException();
}
};
@@ -133,30 +124,14 @@
public IOperatorDescriptor getOwner() {
return InMemoryHashJoinOperatorDescriptor.this;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
}
private class HashProbeActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@Override
- public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
- int partition, int nPartitions) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
- final IOperatorEnvironment env, int partition, int nPartitions) {
+ public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
IOperatorNodePushable op = new IOperatorNodePushable() {
private IFrameWriter writer;
private InMemoryHashJoin joiner;
@@ -180,7 +155,7 @@
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
if (index != 0) {
throw new IllegalStateException();
}
@@ -194,15 +169,5 @@
public IOperatorDescriptor getOwner() {
return InMemoryHashJoinOperatorDescriptor.this;
}
-
- @Override
- public boolean supportsPullInterface() {
- return false;
- }
-
- @Override
- public boolean supportsPushInterface() {
- return true;
- }
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/DeserializedOperatorNodePushable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/DeserializedOperatorNodePushable.java
index 610cc51..58f6a9f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/DeserializedOperatorNodePushable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/DeserializedOperatorNodePushable.java
@@ -15,43 +15,33 @@
package edu.uci.ics.hyracks.coreops.util;
import java.nio.ByteBuffer;
-import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.comm.io.FrameDeserializer;
import edu.uci.ics.hyracks.comm.io.SerializingDataWriter;
-import edu.uci.ics.hyracks.context.HyracksContext;
import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
public final class DeserializedOperatorNodePushable implements IOperatorNodePushable {
- private final HyracksContext ctx;
+ private final IHyracksContext ctx;
private final IOpenableDataWriterOperator delegate;
- private final JobPlan plan;
-
- private final ActivityNodeId aid;
-
private final FrameDeserializer deserializer;
- public DeserializedOperatorNodePushable(HyracksContext ctx, IOpenableDataWriterOperator delegate, JobPlan plan,
- ActivityNodeId aid) {
+ public DeserializedOperatorNodePushable(IHyracksContext ctx, IOpenableDataWriterOperator delegate,
+ RecordDescriptor inRecordDesc) {
this.ctx = ctx;
this.delegate = delegate;
- this.plan = plan;
- this.aid = aid;
- List<Integer> inList = plan.getTaskInputMap().get(aid);
- deserializer = inList == null ? null : new FrameDeserializer(ctx, plan.getTaskInputRecordDescriptor(aid, 0));
+ deserializer = inRecordDesc == null ? null : new FrameDeserializer(ctx, inRecordDesc);
}
@Override
- public void setFrameWriter(int index, IFrameWriter writer) {
- delegate.setDataWriter(index, new SerializingDataWriter(ctx, plan.getTaskOutputRecordDescriptor(aid, index),
- writer));
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ delegate.setDataWriter(index, new SerializingDataWriter(ctx, recordDesc, writer));
}
@Override
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/ReferencedPriorityQueue.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/ReferencedPriorityQueue.java
index f64320b..5f7dc54 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/ReferencedPriorityQueue.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/util/ReferencedPriorityQueue.java
@@ -18,12 +18,12 @@
import java.util.BitSet;
import java.util.Comparator;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.context.HyracksContext;
public class ReferencedPriorityQueue {
- private final HyracksContext ctx;
+ private final IHyracksContext ctx;
private final RecordDescriptor recordDescriptor;
private final ReferenceEntry entries[];
private final int size;
@@ -32,7 +32,7 @@
private final Comparator<ReferenceEntry> comparator;
- public ReferencedPriorityQueue(HyracksContext ctx, RecordDescriptor recordDescriptor, int initSize,
+ public ReferencedPriorityQueue(IHyracksContext ctx, RecordDescriptor recordDescriptor, int initSize,
Comparator<ReferenceEntry> comparator) throws IOException {
this.ctx = ctx;
this.recordDescriptor = recordDescriptor;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/resources/ResourceManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/resources/ResourceManager.java
index 38be3ff..eff017a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/resources/ResourceManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/resources/ResourceManager.java
@@ -18,19 +18,22 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.resources.IResourceManager;
-public final class ResourceManager {
- private final HyracksContext ctx;
+public final class ResourceManager implements IResourceManager {
+ private final IHyracksContext ctx;
- public ResourceManager(HyracksContext ctx) {
+ public ResourceManager(IHyracksContext ctx) {
this.ctx = ctx;
}
+ @Override
public ByteBuffer allocateFrame() {
return ByteBuffer.allocate(ctx.getFrameSize());
}
+ @Override
public File createFile(String prefix, String suffix) throws IOException {
return File.createTempFile(prefix, suffix);
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
index 149f867..6ea3bf1 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
@@ -18,8 +18,9 @@
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
public class OperatorRunnable implements Runnable {
private IOperatorNodePushable opNode;
@@ -27,13 +28,13 @@
private ByteBuffer buffer;
private volatile boolean abort;
- public OperatorRunnable(HyracksContext ctx, IOperatorNodePushable opNode) {
+ public OperatorRunnable(IHyracksContext ctx, IOperatorNodePushable opNode) {
this.opNode = opNode;
buffer = ctx.getResourceManager().allocateFrame();
}
- public void setFrameWriter(int index, IFrameWriter writer) {
- opNode.setFrameWriter(index, writer);
+ public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ opNode.setFrameWriter(index, writer, recordDesc);
}
public void setFrameReader(IFrameReader reader) {
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
index 1467de5..92f53af 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataReader;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
@@ -40,7 +41,7 @@
private static final String DBLP_FILE = "data/dblp.txt";
private static class SerDeserRunner {
- private final HyracksContext ctx;
+ private final IHyracksContext ctx;
private static final int FRAME_SIZE = 32768;
private RecordDescriptor rDes;
private List<ByteBuffer> buffers;