Added fail() call to IFrameWriter
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@549 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java
index dd91f28..065f4c5 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.api.channels;
public interface IInputChannelMonitor {
+ public void notifyFailure(IInputChannel channel);
+
public void notifyDataAvailability(IInputChannel channel, int nFrames);
public void notifyEndOfStream(IInputChannel channel);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
index 0ded474..a85bd7e 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
@@ -18,12 +18,76 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+/**
+ * {@link IFrameWriter} is the interface implemented by a stream consumer. An
+ * {@link IFrameWriter} could be in one of the following states:
+ * <ul>
+ * <li>INITIAL</li>
+ * <li>OPENED</li>
+ * <li>CLOSED</li>
+ * <li>FAILED</li>
+ * </ul>
+ * A producer follows the following protocol when using an {@link IFrameWriter}.
+ * Initially, the {@link IFrameWriter} is in the INITIAL state.
+ * The first valid call to an {@link IFrameWriter} is always the
+ * {@link IFrameWriter#open()}. This call provides the opportunity for the
+ * {@link IFrameWriter} implementation to allocate any resources for its
+ * processing. Once this call returns, the {@link IFrameWriter} is in the OPENED
+ * state. If an error occurs
+ * during the {@link IFrameWriter#open()} call, a {@link HyracksDataException}
+ * is thrown and it stays in the INITIAL state.
+ * While the {@link IFrameWriter} is in the OPENED state, the producer can call
+ * one of:
+ * <ul>
+ * <li> {@link IFrameWriter#close()} to give up any resources owned by the
+ * {@link IFrameWriter} and enter the CLOSED state.</li>
+ * <li> {@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the
+ * {@link IFrameWriter}. The call returns normally on success and the
+ * {@link IFrameWriter} remains in the OPENED state. On failure, the call throws
+ * a {@link HyracksDataException}, and the {@link IFrameWriter} enters the ERROR
+ * state.</li>
+ * <li> {@link IFrameWriter#fail()} to indicate that stream is to be aborted. The
+ * {@link IFrameWriter} enters the FAILED state.</li>
+ * </ul>
+ * In the FAILED state, the only call allowed is the
+ * {@link IFrameWriter#close()} to move the {@link IFrameWriter} into the CLOSED
+ * state and give up all resources.
+ * No calls are allowed when the {@link IFrameWriter} is in the CLOSED state.
+ *
+ * Note: If the call to {@link IFrameWriter#open()} failed, the
+ * {@link IFrameWriter#close()} is not called by the producer. So an exceptional
+ * return from the {@link IFrameWriter#open()} call must clean up all partially
+ * allocated resources.
+ *
+ * @author vinayakb
+ */
public interface IFrameWriter {
+ /**
+ * First call to allocate any resources.
+ */
public void open() throws HyracksDataException;
+ /**
+ * Provide data to the stream of this {@link IFrameWriter}.
+ *
+ * @param buffer
+ * - Buffer containing data.
+ * @throws HyracksDataException
+ */
public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
- public void flush() throws HyracksDataException;
+ /**
+ * Indicate that a failure was encountered and the current stream is to be
+ * aborted.
+ *
+ * @throws HyracksDataException
+ */
+ public void fail() throws HyracksDataException;
+ /**
+ * Close this {@link IFrameWriter} and give up all resources.
+ *
+ * @throws HyracksDataException
+ */
public void close() throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java
index 128576c..4fb6f18 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java
@@ -26,12 +26,20 @@
* Pushes data to the acceptor.
*
* @param data
- * - Data pushed to the acceptor. <code>null</code> indicates the end of stream.
+ * - Data pushed to the acceptor. <code>null</code> indicates the
+ * end of stream.
* @throws HyracksDataException
*/
public void writeData(T data) throws HyracksDataException;
/**
+ * Indicates that the stream has failed.
+ *
+ * @throws HyracksDataException
+ */
+ public void fail() throws HyracksDataException;
+
+ /**
* Closes this writer.
*
* @throws Exception
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index e3d28ea..ef44033 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -73,6 +73,7 @@
userConstraints = new HashSet<Constraint>();
operatorIdCounter = 0;
connectorIdCounter = 0;
+ maxAttempts = 5;
}
public OperatorDescriptorId createOperatorDescriptorId() {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index e5d9f64..a3ca82d 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -34,7 +34,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -275,7 +275,7 @@
if (cpap != null) {
return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
}
- return new PipeliningConnectorPolicy();
+ return new SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy();
}
private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index e65f59e..6a73902 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -73,8 +73,7 @@
private volatile boolean aborted;
- public Task(Joblet joblet, TaskAttemptId taskId, String displayName,
- Executor executor) {
+ public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
this.joblet = joblet;
this.taskAttemptId = taskId;
this.displayName = displayName;
@@ -82,13 +81,13 @@
fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
deallocatableRegistry = new DefaultDeallocatableRegistry();
counterMap = new HashMap<String, Counter>();
-// this.inputGlobalVariables = inputGlobalVariables;
+ // this.inputGlobalVariables = inputGlobalVariables;
inputGlobalVariables = Collections.emptyMap();
outputVariables = new HashMap<MultipartName, Object>();
outputVariableDescriptorMap = new HashMap<MultipartName, WorkflowVariableDescriptor>();
-// for (WorkflowVariableDescriptor wvd : outputVariableDescriptors) {
-// outputVariableDescriptorMap.put(wvd.getName(), wvd);
-// }
+ // for (WorkflowVariableDescriptor wvd : outputVariableDescriptors) {
+ // outputVariableDescriptorMap.put(wvd.getName(), wvd);
+ // }
}
public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -270,19 +269,19 @@
reader.open();
try {
writer.open();
- try {
- ByteBuffer buffer = allocateFrame();
- while (reader.nextFrame(buffer)) {
- if (aborted) {
- return;
- }
- buffer.flip();
- writer.nextFrame(buffer);
- buffer.compact();
+ ByteBuffer buffer = allocateFrame();
+ while (reader.nextFrame(buffer)) {
+ if (aborted) {
+ return;
}
- } finally {
- writer.close();
+ buffer.flip();
+ writer.nextFrame(buffer);
+ buffer.compact();
}
+ writer.close();
+ } catch (Exception e) {
+ writer.fail();
+ throw e;
} finally {
reader.close();
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index 74542b3..31ce924 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -42,6 +42,8 @@
private boolean eosSent;
+ private boolean failed;
+
private ByteBuffer currentBuffer;
public NetworkOutputChannel(IHyracksRootContext ctx, int nBuffers) {
@@ -55,7 +57,7 @@
@Override
public synchronized boolean dispatchNetworkEvent() throws IOException {
- if (aborted) {
+ if (failed || aborted) {
eos = true;
return true;
} else if (key.isWritable()) {
@@ -154,15 +156,15 @@
}
@Override
- public void flush() throws HyracksDataException {
-
+ public void fail() throws HyracksDataException {
+ failed = true;
}
@Override
public synchronized void close() throws HyracksDataException {
eos = true;
key.interestOps(SelectionKey.OP_WRITE);
- key.selector().wakeup();
+ key.selector().wakeup();
}
@Override
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 971f2b6..e579da7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -120,7 +120,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index db2e405..b989805 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -16,6 +16,8 @@
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
@@ -29,6 +31,8 @@
import edu.uci.ics.hyracks.control.nc.io.IOManager;
public class MaterializedPartitionWriter implements IFrameWriter {
+ private static final Logger LOGGER = Logger.getLogger(MaterializedPartitionWriter.class.getName());
+
protected final IHyracksRootContext ctx;
protected final PartitionManager manager;
@@ -45,6 +49,8 @@
private long size;
+ private boolean failed;
+
public MaterializedPartitionWriter(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
TaskAttemptId taId, Executor executor) {
this.ctx = ctx;
@@ -56,10 +62,14 @@
@Override
public void open() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("open(" + pid + " by " + taId);
+ }
fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
size = 0;
+ failed = false;
}
@Override
@@ -68,14 +78,20 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
+ failed = true;
}
@Override
public void close() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("close(" + pid + " by " + taId);
+ }
ctx.getIOManager().close(handle);
- manager.registerPartition(pid, taId,
- new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()),
- PartitionState.COMMITTED);
+ if (!failed) {
+ manager.registerPartition(pid, taId,
+ new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()),
+ PartitionState.COMMITTED);
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index 1a9fb53..9403736 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -32,6 +32,8 @@
private IFrameWriter delegate;
+ private boolean failed;
+
public PipelinedPartition(PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
this.manager = manager;
this.pid = pid;
@@ -57,6 +59,7 @@
@Override
public synchronized void open() throws HyracksDataException {
manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+ failed = false;
while (delegate == null) {
try {
wait();
@@ -73,13 +76,16 @@
}
@Override
- public void flush() throws HyracksDataException {
- delegate.flush();
+ public void fail() throws HyracksDataException {
+ failed = true;
+ delegate.fail();
}
@Override
public void close() throws HyracksDataException {
- manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+ if (!failed) {
+ manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+ }
delegate.close();
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index c108bef..df76604 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -86,6 +87,8 @@
private boolean eos;
+ private boolean failed;
+
public PartitionWriter(PartitionChannel pc) {
this.pc = pc;
nAvailableFrames = 0;
@@ -93,6 +96,12 @@
}
@Override
+ public synchronized void notifyFailure(IInputChannel channel) {
+ failed = true;
+ notifyAll();
+ }
+
+ @Override
public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
nAvailableFrames += nFrames;
notifyAll();
@@ -121,6 +130,8 @@
channel.recycleBuffer(buffer);
} else if (eos) {
break;
+ } else if (failed) {
+ throw new HyracksDataException("Failure occurred on input");
} else {
try {
wait();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
index c64e187..5d592be 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
@@ -58,7 +58,7 @@
}
@Override
- public void flush() throws HyracksDataException {
- writer.flush();
+ public void fail() throws HyracksDataException {
+ writer.fail();
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
index 96a8102..2ec4b47 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
@@ -52,6 +52,6 @@
}
@Override
- public void flush() {
+ public void fail() {
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
index 7c7979b..e8f43cd 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/SerializingDataWriter.java
@@ -98,4 +98,9 @@
buffer.limit(buffer.capacity());
frameWriter.nextFrame(buffer);
}
+
+ @Override
+ public void fail() throws HyracksDataException {
+ frameWriter.fail();
+ }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileWriter.java
index 8d1ec1d..6c5a957 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileWriter.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/io/RunFileWriter.java
@@ -25,6 +25,7 @@
public class RunFileWriter implements IFrameWriter {
private final FileReference file;
private final IIOManager ioManager;
+ private boolean failed;
private FileHandle handle;
private long size;
@@ -39,6 +40,13 @@
handle = ioManager.open(file, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
size = 0;
+ failed = false;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ ioManager.close(handle);
+ failed = true;
}
@Override
@@ -48,7 +56,9 @@
@Override
public void close() throws HyracksDataException {
- ioManager.close(handle);
+ if (!failed) {
+ ioManager.close(handle);
+ }
}
public FileReference getFileReference() {
@@ -59,11 +69,10 @@
return size;
}
- @Override
- public void flush() throws HyracksDataException {
- }
-
- public RunFileReader createReader() {
+ public RunFileReader createReader() throws HyracksDataException {
+ if (failed) {
+ throw new HyracksDataException("createReader() called on a failed RunFileWriter");
+ }
return new RunFileReader(file, ioManager, size);
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index 14c034a..7aa7419 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -56,7 +56,7 @@
protected OutputCollector<K2, V2> output;
protected Reporter reporter;
protected Object mapper;
- //protected Mapper<K1, V1, K2, V2> mapper;
+ // protected Mapper<K1, V1, K2, V2> mapper;
protected int partition;
protected JobConf conf;
protected IOpenableDataWriter<Object[]> writer;
@@ -96,7 +96,8 @@
if (!conf.getUseNewMapper()) {
((org.apache.hadoop.mapred.Mapper) mapper).close();
} else {
- // do nothing. closing the mapper is handled internally by run method on context.
+ // do nothing. closing the mapper is handled internally by
+ // run method on context.
}
} catch (IOException ioe) {
throw new HyracksDataException(ioe);
@@ -118,6 +119,11 @@
}
@Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
public void open() throws HyracksDataException {
initializeMapper();
writer.open();
@@ -172,16 +178,18 @@
} else if (splitRead instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
conf.set("map.input.file", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getPath()
.toString());
- conf.setLong("map.input.start", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
- .getStart());
- conf.setLong("map.input.length", ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead)
- .getLength());
+ conf.setLong("map.input.start",
+ ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getStart());
+ conf.setLong("map.input.length",
+ ((org.apache.hadoop.mapreduce.lib.input.FileSplit) splitRead).getLength());
}
} catch (Exception e) {
e.printStackTrace();
- // we do not throw the exception here as we are setting additional parameters that may not be
- // required by the mapper. If they are indeed required, the configure method invoked on the mapper
- // shall report an exception because of the missing parameters.
+ // we do not throw the exception here as we are setting
+ // additional parameters that may not be
+ // required by the mapper. If they are indeed required, the
+ // configure method invoked on the mapper
+ // shall report an exception because of the missing parameters.
}
}
@@ -223,7 +231,9 @@
data[1] = value;
writer.writeData(data);
}
- };;;
+ };
+ ;
+ ;
OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
.getOutputCommitter(new TaskAttemptContext(conf, new TaskAttemptID()));
@@ -245,7 +255,9 @@
public Counter getCounter(Enum<?> arg0) {
return null;
}
- };;;
+ };
+ ;
+ ;
context = new org.apache.hadoop.mapreduce.Mapper().new Context(conf, new TaskAttemptID(),
newReader, recordWriter, outputCommitter, statusReporter,
(org.apache.hadoop.mapreduce.InputSplit) inputSplit);
@@ -315,9 +327,9 @@
String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
try {
if (hadoopClassFactory == null) {
- recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
- .forName(mapOutputKeyClassName), (Class<? extends Writable>) Class
- .forName(mapOutputValueClassName));
+ recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+ (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
+ (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
} else {
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
(Class<? extends Writable>) hadoopClassFactory.loadClass(mapOutputKeyClassName),
@@ -361,8 +373,8 @@
} else {
Class inputFormatClass = conf.getInputFormat().getClass();
InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
- return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf, super
- .createReporter());
+ return inputFormat.getRecordReader((org.apache.hadoop.mapred.InputSplit) inputSplit, conf,
+ super.createReporter());
}
}
@@ -400,8 +412,8 @@
}
return createSelfReadingMapper(ctx, recordDescriptor, partition);
} else {
- return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition), recordDescProvider
- .getInputRecordDescriptor(this.odId, 0));
+ return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
+ recordDescProvider.getInputRecordDescriptor(this.odId, 0));
}
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
index 5ff528e..33819a8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/NonDeterministicPartitionCollector.java
@@ -45,6 +45,8 @@
private final BitSet eosSenders;
+ private final BitSet failSenders;
+
private BitSet closedSenders;
private int lastReadSender;
@@ -57,6 +59,7 @@
reader = new FrameReader();
channels = new IInputChannel[nSenderPartitions];
eosSenders = new BitSet(nSenderPartitions);
+ failSenders = new BitSet(nSenderPartitions);
closedSenders = new BitSet(nSenderPartitions);
closedSenders.or(expectedPartitions);
closedSenders.flip(0, nSenderPartitions);
@@ -128,6 +131,9 @@
}
return;
}
+ if (!failSenders.isEmpty()) {
+ throw new HyracksDataException("Failure occurred on input");
+ }
for (int i = eosSenders.nextSetBit(0); i >= 0; i = eosSenders.nextSetBit(i)) {
channels[i].close();
eosSenders.clear(i);
@@ -161,6 +167,16 @@
}
@Override
+ public void notifyFailure(IInputChannel channel) {
+ synchronized (NonDeterministicPartitionCollector.this) {
+ PartitionId pid = (PartitionId) channel.getAttachment();
+ int senderIndex = pid.getSenderIndex();
+ failSenders.set(senderIndex);
+ NonDeterministicPartitionCollector.this.notifyAll();
+ }
+ }
+
+ @Override
public void notifyDataAvailability(IInputChannel channel, int nFrames) {
synchronized (NonDeterministicPartitionCollector.this) {
PartitionId pid = (PartitionId) channel.getAttachment();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
index 8faee7b..96b2a69 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/SortMergePartitionCollector.java
@@ -178,10 +178,13 @@
private boolean eos;
+ private boolean failed;
+
public InputChannelFrameReader(IInputChannel channel) {
this.channel = channel;
availableFrames = 0;
eos = false;
+ failed = false;
}
@Override
@@ -191,14 +194,17 @@
@Override
public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
synchronized (this) {
- while (!eos && availableFrames <= 0) {
+ while (!failed && !eos && availableFrames <= 0) {
try {
wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
- if (availableFrames <=0 && eos) {
+ if (failed) {
+ throw new HyracksDataException("Failure occurred on input");
+ }
+ if (availableFrames <= 0 && eos) {
return false;
}
--availableFrames;
@@ -215,6 +221,12 @@
}
@Override
+ public synchronized void notifyFailure(IInputChannel channel) {
+ failed = true;
+ notifyAll();
+ }
+
+ @Override
public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
availableFrames += nFrames;
notifyAll();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 5297a99..ba66ebc 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -55,6 +55,13 @@
}
@Override
+ public void fail() throws HyracksDataException {
+ for (int i = 0; i < epWriters.length; ++i) {
+ epWriters[i].fail();
+ }
+ }
+
+ @Override
public void close() throws HyracksDataException {
for (int i = 0; i < epWriters.length; ++i) {
epWriters[i].close();
@@ -67,13 +74,6 @@
epWriters[i].open();
}
}
-
- @Override
- public void flush() throws HyracksDataException {
- for (int i = 0; i < epWriters.length; ++i) {
- epWriters[i].flush();
- }
- }
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 7054535..6b3f3c0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -94,16 +94,9 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
for (int i = 0; i < appenders.length; ++i) {
- FrameTupleAppender appender = appenders[i];
- if (appender.getTupleCount() > 0) {
- ByteBuffer buffer = appender.getBuffer();
- IFrameWriter frameWriter = pWriters[i];
- flushFrame(buffer, frameWriter);
- pWriters[i].flush();
- appender.reset(buffer, true);
- }
+ pWriters[i].fail();
}
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
index 03438d3..f6511b4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractDeserializedFileScanOperatorDescriptor.java
@@ -64,28 +64,31 @@
public void open() throws HyracksDataException {
FileSplit split = splits[index];
RecordDescriptor desc = recordDescriptors[0];
+ IRecordReader reader;
try {
- IRecordReader reader = createRecordReader(split.getLocalFile().getFile(), desc);
- if (desc == null) {
- desc = recordDescriptors[0];
- }
- writer.open();
- try {
- while (true) {
- Object[] record = new Object[desc.getFields().length];
- if (!reader.read(record)) {
- break;
- }
- writer.writeData(record);
- }
- } finally {
- reader.close();
- writer.close();
- }
+ reader = createRecordReader(split.getLocalFile().getFile(), desc);
} catch (Exception e) {
throw new HyracksDataException(e);
}
-
+ if (desc == null) {
+ desc = recordDescriptors[0];
+ }
+ writer.open();
+ try {
+ while (true) {
+ Object[] record = new Object[desc.getFields().length];
+ if (!reader.read(record)) {
+ break;
+ }
+ writer.writeData(record);
+ }
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ reader.close();
+ writer.close();
+ }
}
@Override
@@ -97,6 +100,11 @@
public void writeData(Object[] data) throws HyracksDataException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void fail() throws HyracksDataException {
+ // do nothing
+ }
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
index ae1a722..4bb4eb9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
@@ -55,6 +55,10 @@
}
@Override
+ public void fail() throws HyracksDataException {
+ }
+
+ @Override
public void writeData(Object[] data) throws HyracksDataException {
try {
writer.write(data);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
index 21e197d..24d76aa 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
@@ -65,12 +65,7 @@
}
@Override
- public void flush() throws HyracksDataException {
- try {
- out.flush();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ public void fail() throws HyracksDataException {
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index cd320dd1..42c86ca 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -114,12 +114,7 @@
}
@Override
- public void flush() throws HyracksDataException {
- try {
- out.flush();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ public void fail() throws HyracksDataException {
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
index fc5f282..4ac72f6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/DeserializedPreclusteredGroupOperator.java
@@ -90,6 +90,11 @@
}
@Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
public void setDataWriter(int index, IOpenableDataWriter<Object[]> writer) {
if (index != 0) {
throw new IllegalArgumentException();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
index 73de7f5..085efd9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalGroupOperatorDescriptor.java
@@ -163,7 +163,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index cf954f6..66ecd85 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -106,7 +106,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
index b5568e6..a865456 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupOperatorDescriptor.java
@@ -78,8 +78,8 @@
}
@Override
- public void flush() throws HyracksDataException {
- pgw.flush();
+ public void fail() throws HyracksDataException {
+ pgw.fail();
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
index 60a1dc3..df7f9f9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/PreclusteredGroupWriter.java
@@ -112,9 +112,8 @@
}
@Override
- public void flush() throws HyracksDataException {
- FrameUtils.flushFrame(appender.getBuffer(), writer);
- appender.reset(appender.getBuffer(), true);
+ public void fail() throws HyracksDataException {
+ writer.fail();
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index c60c0af..4b2986a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -216,7 +216,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
};
return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 7c9134f..720ea99 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -295,7 +295,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
private void closeWriter(int i) throws HyracksDataException {
@@ -501,11 +501,6 @@
env.set(NUM_PARTITION, null);
}
- @Override
- public void flush() throws HyracksDataException {
- writer.flush();
- }
-
private void closeWriter(int i) throws HyracksDataException {
RunFileWriter writer = probeWriters[i];
if (writer != null) {
@@ -523,6 +518,11 @@
}
writer.nextFrame(head);
}
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
};
return op;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index ce8be1c..5a28d1c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -147,7 +147,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
};
return op;
@@ -186,8 +186,8 @@
}
@Override
- public void flush() throws HyracksDataException {
- writer.flush();
+ public void fail() throws HyracksDataException {
+ writer.fail();
}
};
return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 9e69e4b..6cabbe6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -103,7 +103,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
};
return op;
@@ -143,8 +143,8 @@
}
@Override
- public void flush() throws HyracksDataException {
- writer.flush();
+ public void fail() throws HyracksDataException {
+ writer.fail();
}
};
return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
index a142c94..3ed7c07 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
@@ -33,11 +33,15 @@
@Override
public void close() throws HyracksDataException {
- // writer.writeData(null);
writer.close();
}
@Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
public void open() throws HyracksDataException {
mapper = mapperFactory.createMapper();
writer.open();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index e8a6eae..deeff38 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -89,7 +89,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index 5a4ea37..80b2212 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -49,7 +49,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
};
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 2177409..d937784 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -43,6 +43,10 @@
}
@Override
+ public void fail() throws HyracksDataException {
+ }
+
+ @Override
public void writeData(Object[] data) throws HyracksDataException {
for (int i = 0; i < data.length; ++i) {
System.err.print(StringSerializationUtils.toString(data[i]));
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index ff39cc8..8c83e3a 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -39,7 +39,10 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.fail();
+ }
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index 8a3addb..a06bb45 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -71,6 +71,11 @@
public void writeData(Object[] data) throws HyracksDataException {
buffer.add(data);
}
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
};
return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
getOperatorId(), 0));
@@ -117,6 +122,11 @@
}
writer.close();
}
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
};
return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
getOperatorId(), 0));
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 3bde4c4..55659a4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -109,8 +109,8 @@
}
@Override
- public void flush() throws HyracksDataException {
- runGen.flush();
+ public void fail() throws HyracksDataException {
+ runGen.fail();
}
};
return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
index eee6f49..bcfdb89 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -84,7 +84,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
public FrameSorter getFrameSorter() {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index d3015b7..0882898 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -99,7 +99,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
};
return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
index f5ecde6..373ed48 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -69,6 +69,8 @@
private int nClosed;
+ private boolean failed;
+
public UnionOperator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc) {
nOpened = 0;
nClosed = 0;
@@ -99,9 +101,12 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
synchronized (UnionOperator.this) {
- writer.flush();
+ if (failed) {
+ writer.fail();
+ }
+ failed = true;
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
index b2c6feb..3345dd4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
@@ -63,7 +63,8 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
+ delegate.fail();
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBuffer.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBuffer.java
deleted file mode 100644
index a8ac4c2..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBuffer.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.util;
-
-import java.util.Collection;
-
-public class SynchronizedBoundedBuffer<T> {
- private static final int QUEUE_SIZE = 8192;
- private Object[] buffer;
- private int head;
- private int tail;
-
- public SynchronizedBoundedBuffer() {
- buffer = new Object[QUEUE_SIZE];
- head = 0;
- tail = 0;
- }
-
- public synchronized void put(T o) throws InterruptedException {
- while (full()) {
- wait();
- }
- buffer[tail] = o;
- tail = (tail + 1) % QUEUE_SIZE;
- notifyAll();
- }
-
- public synchronized void putAll(Collection<? extends T> c) throws InterruptedException {
- for (T o : c) {
- while (full()) {
- wait();
- }
- buffer[tail] = o;
- tail = (tail + 1) % QUEUE_SIZE;
- }
- notifyAll();
- }
-
- public synchronized T get() throws InterruptedException {
- while (empty()) {
- wait();
- }
- T o = (T) buffer[head];
- head = (head + 1) % QUEUE_SIZE;
- notifyAll();
- return o;
- }
-
- private boolean empty() {
- return head == tail;
- }
-
- private boolean full() {
- return (tail + 1) % QUEUE_SIZE == head;
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBufferDataReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBufferDataReader.java
deleted file mode 100644
index b218024..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBufferDataReader.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.util;
-
-import edu.uci.ics.hyracks.api.dataflow.IOpenableDataReader;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class SynchronizedBoundedBufferDataReader implements IOpenableDataReader<Object[]> {
- private SynchronizedBoundedBuffer<Object[]> queue;
-
- public SynchronizedBoundedBufferDataReader(SynchronizedBoundedBuffer<Object[]> queue) {
- this.queue = queue;
- }
-
- @Override
- public Object[] readData() throws HyracksDataException {
- try {
- return queue.get();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void close() {
- queue = null;
- }
-
- @Override
- public void open() {
- // do nothing
- }
-}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBufferDataWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBufferDataWriter.java
deleted file mode 100644
index 05d3360..0000000
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/SynchronizedBoundedBufferDataWriter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.dataflow.std.util;
-
-import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class SynchronizedBoundedBufferDataWriter implements IOpenableDataWriter<Object[]> {
- private SynchronizedBoundedBuffer<Object[]> queue;
-
- public SynchronizedBoundedBufferDataWriter(SynchronizedBoundedBuffer<Object[]> queue) {
- this.queue = queue;
- }
-
- @Override
- public void writeData(Object[] data) throws HyracksDataException {
- try {
- queue.put(data);
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- try {
- queue.put(null);
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- queue = null;
- }
-
- @Override
- public void open() {
- // do nothing
- }
-}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
index 99204d8..8a78e49 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -68,11 +68,10 @@
@Override
public void close() throws HyracksDataException {
-
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
});
}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 8a7e59e..f53bfd6 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -206,9 +206,7 @@
}
@Override
- public void flush() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writeBuffer, writer);
- }
+ public void fail() throws HyracksDataException {
+ writer.fail();
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
index 817aeb4..aced388 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
@@ -84,6 +84,6 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index 496678b..5779805 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -116,6 +116,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
+ writer.fail();
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index 1a0091a..0af27d6 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -120,6 +120,7 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
+ writer.fail();
}
}
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
index d2bd395..9d421b0 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
@@ -106,6 +106,6 @@
}
@Override
- public void flush() throws HyracksDataException {
+ public void fail() throws HyracksDataException {
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index 2afd6d8..69e48c0 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -175,9 +175,7 @@
}
@Override
- public void flush() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writeBuffer, writer);
- }
+ public void fail() throws HyracksDataException {
+ writer.fail();
}
}
\ No newline at end of file