small refactoring
Change-Id: I37eab1645416e3aad6119bba527c5e3b4b98fddc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1052
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 4189dbf..10e9125 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -54,7 +54,7 @@
protected final boolean sendMarker;
protected boolean failed = false;
private FeedRecordDataFlowController<T>.DataflowMarker dataflowMarker;
- private Future<?> result;
+ private Future<?> dataflowMarkerResult;
public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
@Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
@@ -69,12 +69,7 @@
@Override
public void start(IFrameWriter writer) throws HyracksDataException {
- ExecutorService executorService = sendMarker ? Executors.newSingleThreadExecutor() : null;
- if (sendMarker && dataflowMarker == null) {
- dataflowMarker = new DataflowMarker(recordReader.getProgressReporter(),
- TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx));
- result = executorService.submit(dataflowMarker);
- }
+ startDataflowMarker();
HyracksDataException hde = null;
try {
failed = false;
@@ -102,9 +97,7 @@
LOGGER.warn("Failure while operating a feed source", e);
throw new HyracksDataException(e);
}
- if(dataflowMarker != null){
- dataflowMarker.stop();
- }
+ stopDataflowMarker();
try {
tupleForwarder.close();
} catch (Throwable th) {
@@ -117,8 +110,8 @@
hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
} finally {
closeSignal();
- if (sendMarker && result != null) {
- result.cancel(true);
+ if (sendMarker && dataflowMarkerResult != null) {
+ dataflowMarkerResult.cancel(true);
}
}
if (hde != null) {
@@ -149,6 +142,21 @@
protected void addPrimaryKeys(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
}
+ private void startDataflowMarker() {
+ ExecutorService executorService = sendMarker ? Executors.newSingleThreadExecutor() : null;
+ if (sendMarker && dataflowMarker == null) {
+ dataflowMarker = new DataflowMarker(recordReader.getProgressReporter(),
+ TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx));
+ dataflowMarkerResult = executorService.submit(dataflowMarker);
+ }
+ }
+
+ private void stopDataflowMarker() {
+ if (dataflowMarker != null) {
+ dataflowMarker.stop();
+ }
+ }
+
private void closeSignal() {
synchronized (closed) {
closed.set(true);
@@ -166,9 +174,7 @@
@Override
public boolean stop() throws HyracksDataException {
- if (dataflowMarker != null) {
- dataflowMarker.stop();
- }
+ stopDataflowMarker();
HyracksDataException hde = null;
if (recordReader.stop()) {
if (failed) {