[NO ISSUE][ING] Follow the IFrameWriter contract in Feed Pipeline
- user model changes: no
- storage format changes: no
- interface changes: yes
- it removes unneeded ITupleForwarder interface.
Details:
- Previously, if a failure happens at the writer.open
call in the FeedIntake operator, the IFrameWriter
protocol is not followed since we don't fail before
close is called.
- Previously, fail calls can be done as well inside
of the feed adapter.
- This change moves failing the pipeline outside the
adapter and so the adapter's responsibility is
getting records from outside the system while
the operator takes care of opening, failing, and
closing the writer.
Change-Id: Ife679fb9643dc6b39d035e0eecdb915b227503a5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2463
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
deleted file mode 100644
index 22d0d6b..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITupleForwarder.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.api;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-
-public interface ITupleForwarder {
-
- public static final String FORWARD_POLICY = "forward-policy";
-
- public enum TupleForwardPolicy {
- FRAME_FULL,
- COUNTER_TIMER_EXPIRED,
- RATE_CONTROLLED,
- FEED
- }
-
- public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter) throws HyracksDataException;
-
- public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
-
- public void close() throws HyracksDataException;
-
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
index bbd93c2..a324496 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractDataFlowController.java
@@ -19,16 +19,13 @@
package org.apache.asterix.external.dataflow;
import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.asterix.external.api.ITupleForwarder;
import org.apache.hyracks.api.context.IHyracksTaskContext;
public abstract class AbstractDataFlowController implements IDataFlowController {
- protected final ITupleForwarder tupleForwarder;
protected final IHyracksTaskContext ctx;
- public AbstractDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder) {
+ public AbstractDataFlowController(IHyracksTaskContext ctx) {
this.ctx = ctx;
- this.tupleForwarder = tupleForwarder;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 3437de1..2533af5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -18,42 +18,31 @@
*/
package org.apache.asterix.external.dataflow;
+import java.io.Closeable;
+import java.io.IOException;
+
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-public abstract class AbstractFeedDataFlowController implements IDataFlowController {
- protected final FeedTupleForwarder tupleForwarder;
+public abstract class AbstractFeedDataFlowController implements IDataFlowController, Closeable {
+ protected TupleForwarder tupleForwarder;
protected final IHyracksTaskContext ctx;
protected final int numOfFields;
protected final ArrayTupleBuilder tb;
protected final FeedLogManager feedLogManager;
protected boolean flushing;
- public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
- FeedLogManager feedLogManager, int numOfFields) {
+ public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfFields) {
this.feedLogManager = feedLogManager;
this.numOfFields = numOfFields;
this.ctx = ctx;
- this.tupleForwarder = tupleForwarder;
this.tb = new ArrayTupleBuilder(numOfFields);
}
@Override
- public boolean pause() {
- tupleForwarder.pause();
- return true;
- }
-
- @Override
- public boolean resume() {
- tupleForwarder.resume();
- return true;
- }
-
- @Override
public void flush() throws HyracksDataException {
flushing = true;
tupleForwarder.flush();
@@ -61,4 +50,9 @@
}
public abstract String getStats();
+
+ @Override
+ public void close() throws IOException {
+ feedLogManager.close();
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index 2db17e2..b14722b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -30,11 +30,10 @@
private final IRecordWithPKDataParser<T> dataParser;
- public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
- final FeedLogManager feedLogManager, final int numOfOutputFields,
- final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader)
- throws HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+ public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedLogManager feedLogManager,
+ final int numOfOutputFields, final IRecordWithPKDataParser<T> dataParser,
+ final IRecordReader<T> recordReader) throws HyracksDataException {
+ super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader);
this.dataParser = dataParser;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 4447b28..621397b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -28,11 +28,10 @@
public class ChangeFeedWithMetaDataFlowController<T> extends FeedWithMetaDataFlowController<T> {
- public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
- final FeedLogManager feedLogManager, final int numOfOutputFields,
- final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader)
- throws HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+ public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedLogManager feedLogManager,
+ final int numOfOutputFields, final IRecordWithMetadataParser<T> dataParser,
+ final IRecordReader<T> recordReader) throws HyracksDataException {
+ super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
deleted file mode 100644
index 10815d9..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class CounterTimerTupleForwarder implements ITupleForwarder {
-
- public static final String BATCH_SIZE = "batch-size";
- public static final String BATCH_INTERVAL = "batch-interval";
-
- private static final Logger LOGGER = LogManager.getLogger();
-
- private FrameTupleAppender appender;
- private IFrame frame;
- private IFrameWriter writer;
- private int batchSize;
- private long batchInterval;
- private int tuplesInFrame = 0;
- private TimeBasedFlushTask flushTask;
- private Timer timer;
- private Object lock = new Object();
- private boolean activeTimer = false;
-
- private CounterTimerTupleForwarder(int batchSize, long batchInterval) {
- this.batchSize = batchSize;
- this.batchInterval = batchInterval;
- if (batchInterval > 0L) {
- activeTimer = true;
- }
- }
-
- // Factory method
- public static CounterTimerTupleForwarder create(Map<String, String> configuration) {
- int batchSize = -1;
- long batchInterval = 0L;
- String propValue = configuration.get(BATCH_SIZE);
- if (propValue != null) {
- batchSize = Integer.parseInt(propValue);
- }
- propValue = configuration.get(BATCH_INTERVAL);
- if (propValue != null) {
- batchInterval = Long.parseLong(propValue);
- }
- return new CounterTimerTupleForwarder(batchSize, batchInterval);
- }
-
- @Override
- public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
- this.appender = new FrameTupleAppender();
- this.frame = new VSizeFrame(ctx);
- appender.reset(frame, true);
- this.writer = writer;
- if (activeTimer) {
- this.timer = new Timer();
- this.flushTask = new TimeBasedFlushTask(writer, lock);
- timer.scheduleAtFixedRate(flushTask, 0, batchInterval);
- }
- }
-
- @Override
- public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
- if (activeTimer) {
- synchronized (lock) {
- addTupleToFrame(tb);
- }
- } else {
- addTupleToFrame(tb);
- }
- tuplesInFrame++;
- }
-
- private void addTupleToFrame(ArrayTupleBuilder tb) throws HyracksDataException {
- if (tuplesInFrame == batchSize
- || !appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("flushing frame containg (" + tuplesInFrame + ") tuples");
- }
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- tuplesInFrame = 0;
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- if (activeTimer) {
- synchronized (lock) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
- } else {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
- }
-
- if (timer != null) {
- timer.cancel();
- }
- }
-
- private class TimeBasedFlushTask extends TimerTask {
-
- private IFrameWriter writer;
- private final Object lock;
-
- public TimeBasedFlushTask(IFrameWriter writer, Object lock) {
- this.writer = writer;
- this.lock = lock;
- }
-
- @Override
- public void run() {
- try {
- if (tuplesInFrame > 0) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("TTL expired flushing frame (" + tuplesInFrame + ")");
- }
- synchronized (lock) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- appender.reset(frame, true);
- tuplesInFrame = 0;
- }
- }
- } catch (HyracksDataException e) {
- e.printStackTrace();
- }
- }
- }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 9826be7..164ff68 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -26,14 +26,12 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
-import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -58,10 +56,9 @@
protected long incomingRecordsCount = 0;
protected long failedRecordsCount = 0;
- public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
- FeedLogManager feedLogManager, int numOfOutputFields, IRecordDataParser<T> dataParser,
- IRecordReader<T> recordReader) throws HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
+ public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfOutputFields,
+ IRecordDataParser<T> dataParser, IRecordReader<T> recordReader) throws HyracksDataException {
+ super(ctx, feedLogManager, numOfOutputFields);
this.dataParser = dataParser;
this.recordReader = recordReader;
recordReader.setFeedLogManager(feedLogManager);
@@ -79,7 +76,7 @@
}
Throwable failure = null;
try {
- tupleForwarder.initialize(ctx, writer);
+ this.tupleForwarder = new TupleForwarder(ctx, writer);
while (hasNext()) {
IRawRecord<? extends T> record = next();
if (record == null) {
@@ -102,17 +99,14 @@
try {
flush();
} catch (Exception flushException) {
- tupleForwarder.fail();
flushException.addSuppressed(e);
failure = flushException;
}
} else {
failure = e;
- tupleForwarder.fail();
}
} catch (Throwable e) {
failure = e;
- tupleForwarder.fail();
LOGGER.log(Level.WARN, "Failure while operating a feed source", e);
} finally {
failure = finish(failure);
@@ -168,11 +162,17 @@
}
private Throwable finish(Throwable failure) {
- Throwable th = CleanupUtils.close(recordReader, null);
- th = DataflowUtils.close(tupleForwarder, th);
+ Throwable th = CleanupUtils.close(recordReader, failure);
+ if (th == null) {
+ try {
+ tupleForwarder.complete();
+ } catch (Throwable completeFailure) {
+ th = completeFailure;
+ }
+ }
closeSignal();
setState(State.STOPPED);
- return ExceptionUtils.suppress(failure, th);
+ return th;
}
private boolean parseAndForward(IRawRecord<? extends T> record) throws IOException {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 025520e..ffa42e5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -31,9 +31,9 @@
private final AsterixInputStream stream;
protected long incomingRecordsCount = 0;
- public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
- FeedLogManager feedLogManager, IStreamDataParser streamParser, AsterixInputStream inputStream) {
- super(ctx, tupleForwarder, feedLogManager, 1);
+ public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager,
+ IStreamDataParser streamParser, AsterixInputStream inputStream) {
+ super(ctx, feedLogManager, 1);
this.dataParser = streamParser;
this.stream = inputStream;
}
@@ -41,7 +41,7 @@
@Override
public void start(IFrameWriter writer) throws HyracksDataException {
try {
- tupleForwarder.initialize(ctx, writer);
+ tupleForwarder = new TupleForwarder(ctx, writer);
while (true) {
if (!parseNext()) {
break;
@@ -50,10 +50,9 @@
tupleForwarder.addTuple(tb);
incomingRecordsCount++;
}
- } catch (Exception e) {
+ tupleForwarder.complete();
+ } catch (Throwable e) {
throw HyracksDataException.create(e);
- } finally {
- tupleForwarder.close();
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
deleted file mode 100644
index f824b67..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import java.io.IOException;
-
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.asterix.external.util.DataflowUtils;
-import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class FeedTupleForwarder implements ITupleForwarder {
-
- private final FeedLogManager feedLogManager;
- private FrameTupleAppender appender;
- private IFrame frame;
- private IFrameWriter writer;
- private boolean paused = false;
- private boolean initialized;
- private boolean failed;
-
- public FeedTupleForwarder(FeedLogManager feedLogManager) {
- this.feedLogManager = feedLogManager;
- }
-
- public FeedLogManager getFeedLogManager() {
- return feedLogManager;
- }
-
- @Override
- public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
- if (!initialized) {
- this.frame = new VSizeFrame(ctx);
- this.writer = writer;
- this.appender = new FrameTupleAppender(frame);
- initialized = true;
- }
- }
-
- @Override
- public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
- if (paused) {
- synchronized (this) {
- while (paused) {
- try {
- wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(e);
- }
- }
- }
- }
- DataflowUtils.addTupleToFrame(appender, tb, writer);
- }
-
- public void pause() {
- paused = true;
- }
-
- public synchronized void resume() {
- paused = false;
- notifyAll();
- }
-
- @Override
- public void close() throws HyracksDataException {
- Throwable throwable = null;
- try {
- if (!failed && appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
- } catch (Throwable th) {
- throwable = th;
- throw th;
- } finally {
- try {
- feedLogManager.close();
- } catch (IOException e) {
- if (throwable != null) {
- throwable.addSuppressed(e);
- } else {
- throw HyracksDataException.create(e);
- }
- } catch (Throwable th) {
- if (throwable != null) {
- throwable.addSuppressed(th);
- } else {
- throw th;
- }
- }
- }
- }
-
- public void flush() throws HyracksDataException {
- appender.flush(writer);
- }
-
- public void fail() throws HyracksDataException {
- failed = true;
- writer.fail();
- }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index c7f6d9c..289c16f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -30,10 +30,9 @@
protected final IRecordWithMetadataParser<T> dataParser;
- public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
- FeedLogManager feedLogManager, int numOfOutputFields, IRecordWithMetadataParser<T> dataParser,
- IRecordReader<T> recordReader) throws HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+ public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfOutputFields,
+ IRecordWithMetadataParser<T> dataParser, IRecordReader<T> recordReader) throws HyracksDataException {
+ super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader);
this.dataParser = dataParser;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
deleted file mode 100644
index 18927cd..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class FrameFullTupleForwarder implements ITupleForwarder {
-
- private FrameTupleAppender appender;
- private IFrame frame;
- private IFrameWriter writer;
-
- @Override
- public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
- this.appender = new FrameTupleAppender();
- this.frame = new VSizeFrame(ctx);
- this.writer = writer;
- appender.reset(frame, true);
- }
-
- @Override
- public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
- boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- appender.reset(frame, true);
- success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
-
- }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
index c4f75e3..b956295 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
@@ -23,7 +23,6 @@
import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.ITupleForwarder;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -31,10 +30,9 @@
public class IndexingDataFlowController<T> extends RecordDataFlowController<T> {
private final IExternalIndexer indexer;
- public IndexingDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
- IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader, IExternalIndexer indexer)
- throws IOException {
- super(ctx, tupleForwarder, dataParser, recordReader, 1 + indexer.getNumberOfFields());
+ public IndexingDataFlowController(IHyracksTaskContext ctx, IRecordDataParser<T> dataParser,
+ IRecordReader<? extends T> recordReader, IExternalIndexer indexer) throws IOException {
+ super(ctx, dataParser, recordReader, 1 + indexer.getNumberOfFields());
this.indexer = indexer;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
deleted file mode 100644
index f34b77d..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.dataflow;
-
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class RateControlledTupleForwarder implements ITupleForwarder {
-
- private FrameTupleAppender appender;
- private IFrame frame;
- private IFrameWriter writer;
- private long interTupleInterval;
- private boolean delayConfigured;
-
- public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
-
- private RateControlledTupleForwarder(long interTupleInterval) {
- this.interTupleInterval = interTupleInterval;
- delayConfigured = interTupleInterval != 0L;
- }
-
- // Factory method
- public static RateControlledTupleForwarder create(Map<String, String> configuration) {
- long interTupleInterval = 0L;
- String propValue = configuration.get(INTER_TUPLE_INTERVAL);
- if (propValue != null) {
- interTupleInterval = Long.parseLong(propValue);
- }
- return new RateControlledTupleForwarder(interTupleInterval);
- }
-
- @Override
- public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
- this.appender = new FrameTupleAppender();
- this.frame = new VSizeFrame(ctx);
- this.writer = writer;
- appender.reset(frame, true);
- }
-
- @Override
- public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
- if (delayConfigured) {
- try {
- Thread.sleep(interTupleInterval);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(e);
- }
- }
- boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- appender.reset(frame, true);
- success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- throw new RuntimeDataException(ErrorCode.DATAFLOW_ILLEGAL_STATE);
- }
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame.getBuffer(), writer);
- }
- }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index 0f9572d..aebdefb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -21,7 +21,6 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.ITupleForwarder;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -33,9 +32,9 @@
protected final IRecordReader<? extends T> recordReader;
protected final int numOfTupleFields;
- public RecordDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
- IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader, int numOfTupleFields) {
- super(ctx, tupleForwarder);
+ public RecordDataFlowController(IHyracksTaskContext ctx, IRecordDataParser<T> dataParser,
+ IRecordReader<? extends T> recordReader, int numOfTupleFields) {
+ super(ctx);
this.dataParser = dataParser;
this.recordReader = recordReader;
this.numOfTupleFields = numOfTupleFields;
@@ -45,7 +44,7 @@
public void start(IFrameWriter writer) throws HyracksDataException {
try {
ArrayTupleBuilder tb = new ArrayTupleBuilder(numOfTupleFields);
- tupleForwarder.initialize(ctx, writer);
+ TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
while (recordReader.hasNext()) {
IRawRecord<? extends T> record = recordReader.next();
tb.reset();
@@ -54,7 +53,7 @@
appendOtherTupleFields(tb);
tupleForwarder.addTuple(tb);
}
- tupleForwarder.close();
+ tupleForwarder.complete();
recordReader.close();
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
index ccf22da..a28c484 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/StreamDataFlowController.java
@@ -19,7 +19,6 @@
package org.apache.asterix.external.dataflow;
import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.api.ITupleForwarder;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -28,9 +27,8 @@
public class StreamDataFlowController extends AbstractDataFlowController {
private final IStreamDataParser dataParser;
- public StreamDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
- IStreamDataParser dataParser) {
- super(ctx, tupleForwarder);
+ public StreamDataFlowController(IHyracksTaskContext ctx, IStreamDataParser dataParser) {
+ super(ctx);
this.dataParser = dataParser;
}
@@ -38,7 +36,7 @@
public void start(IFrameWriter writer) throws HyracksDataException {
try {
ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
- tupleForwarder.initialize(ctx, writer);
+ TupleForwarder tupleForwarder = new TupleForwarder(ctx, writer);
while (true) {
tb.reset();
if (!dataParser.parse(tb.getDataOutput())) {
@@ -47,7 +45,7 @@
tb.addFieldEndOffset();
tupleForwarder.addTuple(tb);
}
- tupleForwarder.close();
+ tupleForwarder.complete();
} catch (Exception e) {
throw new HyracksDataException(e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java
new file mode 100644
index 0000000..afb00b0
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/TupleForwarder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.dataflow;
+
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+public class TupleForwarder {
+
+ private final FrameTupleAppender appender;
+ private final IFrame frame;
+ private final IFrameWriter writer;
+
+ public TupleForwarder(IHyracksTaskContext ctx, IFrameWriter writer) throws HyracksDataException {
+ this.frame = new VSizeFrame(ctx);
+ this.writer = writer;
+ this.appender = new FrameTupleAppender(frame);
+ }
+
+ public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException {
+ DataflowUtils.addTupleToFrame(appender, tb, writer);
+ }
+
+ public void flush() throws HyracksDataException {
+ appender.flush(writer);
+ }
+
+ public void complete() throws HyracksDataException {
+ appender.write(writer, false);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index fd9db7e..2a92d40 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -18,12 +18,15 @@
*/
package org.apache.asterix.external.dataset.adapter;
+import java.io.Closeable;
+import java.io.IOException;
+
import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class FeedAdapter implements IDataSourceAdapter {
+public class FeedAdapter implements IDataSourceAdapter, Closeable {
private final AbstractFeedDataFlowController controller;
public FeedAdapter(AbstractFeedDataFlowController controller) {
@@ -54,4 +57,9 @@
public AbstractFeedDataFlowController getController() {
return controller;
}
+
+ @Override
+ public void close() throws IOException {
+ controller.close();
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 0503677..867fb60 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
@@ -60,6 +61,7 @@
@Override
protected void start() throws HyracksDataException, InterruptedException {
+ Throwable failure = null;
Thread.currentThread().setName("Intake Thread");
try {
writer.open();
@@ -79,11 +81,16 @@
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
message.getBuffer().flip();
run();
- } catch (Exception e) {
+ } catch (Throwable e) {
+ failure = e;
+ CleanupUtils.fail(writer, e);
LOGGER.log(Level.WARN, "Failure during data ingestion", e);
- throw e;
} finally {
- writer.close();
+ failure = CleanupUtils.close(adapter, failure);
+ failure = CleanupUtils.close(writer, failure);
+ }
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 78f24a5..3e53b52 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -42,12 +42,10 @@
import org.apache.asterix.external.dataflow.ChangeFeedWithMetaDataFlowController;
import org.apache.asterix.external.dataflow.FeedRecordDataFlowController;
import org.apache.asterix.external.dataflow.FeedStreamDataFlowController;
-import org.apache.asterix.external.dataflow.FeedTupleForwarder;
import org.apache.asterix.external.dataflow.FeedWithMetaDataFlowController;
import org.apache.asterix.external.dataflow.IndexingDataFlowController;
import org.apache.asterix.external.dataflow.RecordDataFlowController;
import org.apache.asterix.external.dataflow.StreamDataFlowController;
-import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.om.types.ARecordType;
@@ -70,35 +68,29 @@
IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
if (indexingOp) {
- return new IndexingDataFlowController(ctx,
- DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
- recordReader, ((IIndexingDatasource) recordReader).getIndexer());
+ return new IndexingDataFlowController(ctx, dataParser, recordReader,
+ ((IIndexingDatasource) recordReader).getIndexer());
} else if (isFeed) {
- FeedTupleForwarder tupleForwarder =
- (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager);
boolean isChangeFeed = ExternalDataUtils.isChangeFeed(configuration);
boolean isRecordWithMeta = ExternalDataUtils.isRecordWithMeta(configuration);
if (isRecordWithMeta) {
if (isChangeFeed) {
int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
- return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
- numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader);
+ return new ChangeFeedWithMetaDataFlowController(ctx, feedLogManager, numOfKeys + 2,
+ (IRecordWithMetadataParser) dataParser, recordReader);
} else {
- return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2,
+ return new FeedWithMetaDataFlowController(ctx, feedLogManager, 2,
(IRecordWithMetadataParser) dataParser, recordReader);
}
} else if (isChangeFeed) {
int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
- return new ChangeFeedDataFlowController(ctx, tupleForwarder, feedLogManager, numOfKeys + 1,
+ return new ChangeFeedDataFlowController(ctx, feedLogManager, numOfKeys + 1,
(IRecordWithPKDataParser) dataParser, recordReader);
} else {
- return new FeedRecordDataFlowController(ctx, tupleForwarder, feedLogManager, 1, dataParser,
- recordReader);
+ return new FeedRecordDataFlowController(ctx, feedLogManager, 1, dataParser, recordReader);
}
} else {
- return new RecordDataFlowController(ctx,
- DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
- recordReader, 1);
+ return new RecordDataFlowController(ctx, dataParser, recordReader, 1);
}
case STREAM:
IInputStreamFactory streamFactory = (IInputStreamFactory) dataSourceFactory;
@@ -107,12 +99,9 @@
IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition);
streamParser.setInputStream(stream);
if (isFeed) {
- return new FeedStreamDataFlowController(ctx,
- (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager),
- feedLogManager, streamParser, stream);
+ return new FeedStreamDataFlowController(ctx, feedLogManager, streamParser, stream);
} else {
- return new StreamDataFlowController(ctx, DataflowUtils.getTupleForwarder(configuration, null),
- streamParser);
+ return new StreamDataFlowController(ctx, streamParser);
}
default:
throw new RuntimeDataException(ErrorCode.PROVIDER_DATAFLOW_CONTROLLER_UNKNOWN_DATA_SOURCE,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index 438f1df..37d400f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -18,28 +18,15 @@
*/
package org.apache.asterix.external.util;
-import java.util.Map;
-
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.ITupleForwarder;
-import org.apache.asterix.external.api.ITupleForwarder.TupleForwardPolicy;
-import org.apache.asterix.external.dataflow.CounterTimerTupleForwarder;
-import org.apache.asterix.external.dataflow.FeedTupleForwarder;
-import org.apache.asterix.external.dataflow.FrameFullTupleForwarder;
-import org.apache.asterix.external.dataflow.RateControlledTupleForwarder;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
public class DataflowUtils {
- private static final Logger LOGGER = LogManager.getLogger();
private DataflowUtils() {
}
@@ -54,32 +41,6 @@
}
}
- public static ITupleForwarder getTupleForwarder(Map<String, String> configuration, FeedLogManager feedLogManager)
- throws HyracksDataException {
- ITupleForwarder.TupleForwardPolicy policyType = null;
- String propValue = configuration.get(ITupleForwarder.FORWARD_POLICY);
- if (ExternalDataUtils.isFeed(configuration)) {
- // TODO pass this value in the configuration and avoid this check for feeds
- policyType = TupleForwardPolicy.FEED;
- } else if (propValue == null) {
- policyType = TupleForwardPolicy.FRAME_FULL;
- } else {
- policyType = TupleForwardPolicy.valueOf(propValue.trim().toUpperCase());
- }
- switch (policyType) {
- case FEED:
- return new FeedTupleForwarder(feedLogManager);
- case FRAME_FULL:
- return new FrameFullTupleForwarder();
- case COUNTER_TIMER_EXPIRED:
- return CounterTimerTupleForwarder.create(configuration);
- case RATE_CONTROLLED:
- return RateControlledTupleForwarder.create(configuration);
- default:
- throw new RuntimeDataException(ErrorCode.UTIL_DATAFLOW_UTILS_UNKNOWN_FORWARD_POLICY);
- }
- }
-
public static void addTupleToFrame(FrameTupleAppender appender, ITupleReference tuple, IFrameWriter writer)
throws HyracksDataException {
if (!appender.append(tuple)) {
@@ -89,30 +50,4 @@
}
}
}
-
- /**
- * Close the ITupleForwarder and suppress any Throwable thrown by the close call.
- * This method must NEVER throw any Throwable
- *
- * @param indexHelper
- * the indexHelper to close
- * @param root
- * the first exception encountered during release of resources
- * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null
- */
- public static Throwable close(ITupleForwarder tupleForwarder, Throwable root) {
- if (tupleForwarder != null) {
- try {
- tupleForwarder.close();
- } catch (Throwable th) { // NOSONAR Will be re-thrown
- try {
- LOGGER.log(Level.WARN, "Failure closing a closeable resource", th);
- } catch (Throwable ignore) { // NOSONAR Logging exception will be ignored
- // NOSONAR ignore
- }
- root = ExceptionUtils.suppress(root, th);
- }
- }
- return root;
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
index 9d887b6..42ebbd4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@ -20,6 +20,7 @@
import java.io.BufferedReader;
import java.io.BufferedWriter;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -35,7 +36,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class FeedLogManager {
+public class FeedLogManager implements Closeable {
public enum LogEntryType {
START, // partition start
@@ -64,7 +65,7 @@
public FeedLogManager(File file) throws HyracksDataException {
try {
this.dir = file.toPath();
- this.completed = new TreeSet<String>();
+ this.completed = new TreeSet<>();
if (!exists()) {
create();
}
@@ -100,16 +101,16 @@
public synchronized void open() throws IOException {
// read content of logs.
- BufferedReader reader = Files.newBufferedReader(
- Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME));
- String log = reader.readLine();
- while (log != null) {
- if (log.startsWith(END_PREFIX)) {
- completed.add(getSplitId(log));
+ try (BufferedReader reader = Files.newBufferedReader(
+ Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME))) {
+ String log = reader.readLine();
+ while (log != null) {
+ if (log.startsWith(END_PREFIX)) {
+ completed.add(getSplitId(log));
+ }
+ log = reader.readLine();
}
- log = reader.readLine();
}
- reader.close();
progressLogger = Files.newBufferedWriter(
Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME),
@@ -122,6 +123,7 @@
StandardCharsets.UTF_8, StandardOpenOption.APPEND);
}
+ @Override
public synchronized void close() throws IOException {
count--;
if (count > 0) {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 6fe938c..8ee8a57 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -28,11 +28,8 @@
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
-import org.apache.asterix.external.api.ITupleForwarder;
+import org.apache.asterix.external.dataflow.TupleForwarder;
import org.apache.asterix.external.parser.ADMDataParser;
-import org.apache.asterix.external.util.DataflowUtils;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -79,26 +76,19 @@
@Override
public ITupleParser createTupleParser(IHyracksTaskContext ctx) throws HyracksDataException {
ADMDataParser parser;
- ITupleForwarder forwarder;
+
ArrayTupleBuilder tb;
IApplicationContext appCtx =
(IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
ClusterPartition nodePartition = appCtx.getMetadataProperties().getNodePartitions().get(nodeId)[0];
parser = new ADMDataParser(outputType, true);
- forwarder =
- DataflowUtils
- .getTupleForwarder(configuration,
- FeedUtils.getFeedLogManager(ctx, FeedUtils.splitsForAdapter(
- ExternalDataUtils.getDataverse(configuration),
- ExternalDataUtils.getFeedName(configuration), nodeId, nodePartition)));
tb = new ArrayTupleBuilder(1);
return new ITupleParser() {
-
@Override
public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
try {
parser.setInputStream(in);
- forwarder.initialize(ctx, writer);
+ TupleForwarder forwarder = new TupleForwarder(ctx, writer);
while (true) {
tb.reset();
if (!parser.parse(tb.getDataOutput())) {
@@ -107,7 +97,7 @@
tb.addFieldEndOffset();
forwarder.addTuple(tb);
}
- forwarder.close();
+ forwarder.complete();
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
index bf54e01..a116320 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/EnforceFrameWriter.java
@@ -107,11 +107,11 @@
@Override
public void close() throws HyracksDataException {
- writer.close();
- downstreamOpen = false;
if (downstreamFailed && !failCalledByUpstream) {
throw HyracksDataException.create(ErrorCode.MISSED_FAIL_CALL);
}
+ writer.close();
+ downstreamOpen = false;
}
public static IFrameWriter enforce(IFrameWriter writer) {