Control Number of Readers for LocalFS Data
Change-Id: Ib9d5ece656220d5f562cc385f882c5ddfd3283a6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/776
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index b98618d..d8f1893 100644
--- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@ -20,6 +20,8 @@
import java.util.Collection;
import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.utils.StoragePathUtil;
@@ -64,6 +66,7 @@
/**
* Builds the job spec for ingesting a (primary) feed from its external source via the feed adaptor.
+ *
* @param primaryFeed
* @param metadataProvider
* @return JobSpecification the Hyracks job specification for receiving data from external source
@@ -251,12 +254,18 @@
public static JobSpecification buildRemoveFeedStorageJob(Feed feed) throws Exception {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- AlgebricksAbsolutePartitionConstraint locations = AsterixClusterProperties.INSTANCE.getClusterLocations();
+ AlgebricksAbsolutePartitionConstraint allCluster = AsterixClusterProperties.INSTANCE.getClusterLocations();
+ Set<String> nodes = new TreeSet<>();
+ for (String node : allCluster.getLocations()) {
+ nodes.add(node);
+ }
+ AlgebricksAbsolutePartitionConstraint locations = new AlgebricksAbsolutePartitionConstraint(
+ nodes.toArray(new String[nodes.size()]));
FileSplit[] feedLogFileSplits = FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(),
locations);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = StoragePathUtil
.splitProviderAndPartitionConstraints(feedLogFileSplits);
- FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first);
+ FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(spec, splitsAndConstraint.first, true);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, splitsAndConstraint.second);
spec.addRoot(frod);
return spec;
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
index c77ca10..d5765f1 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/DataverseOperations.java
@@ -32,7 +32,7 @@
JobSpecification jobSpec = JobSpecificationUtils.createJobSpecification();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForDataverse(dataverse.getDataverseName());
- FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first);
+ FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first, false);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second);
jobSpec.addRoot(frod);
return jobSpec;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 041f706..d3abd50 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -39,6 +39,7 @@
import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -58,6 +59,7 @@
private boolean isFeed;
private FileSplit[] feedLogFileSplits;
private ARecordType metaType;
+ private FeedLogManager feedLogManager = null;
@Override
public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
@@ -86,8 +88,14 @@
} catch (AsterixException e) {
throw new HyracksDataException(e);
}
+ if (isFeed) {
+ if (feedLogManager == null) {
+ feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits);
+ }
+ feedLogManager.touch();
+ }
IDataFlowController controller = DataflowControllerProvider.getDataflowController(recordType, ctx, partition,
- dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogFileSplits);
+ dataSourceFactory, dataParserFactory, configuration, indexingOp, isFeed, feedLogManager);
if (isFeed) {
return new FeedAdapter((AbstractFeedDataFlowController) controller);
} else {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
index 83d7a3a..a4c2fae 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/AsterixInputStream.java
@@ -22,11 +22,13 @@
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class AsterixInputStream extends InputStream {
protected AbstractFeedDataFlowController controller;
protected FeedLogManager logManager;
+ protected IStreamNotificationHandler notificationHandler;
public abstract boolean stop() throws Exception;
@@ -38,7 +40,11 @@
}
// TODO: Find a better way to send notifications
- public void setFeedLogManager(FeedLogManager logManager) {
+ public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException {
this.logManager = logManager;
}
+
+ public void setNotificationHandler(IStreamNotificationHandler notificationHandler) {
+ this.notificationHandler = notificationHandler;
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 11e2472..9cce1c9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -23,9 +23,11 @@
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* This interface represents a record reader that reads data from external source as a set of records
+ *
* @param <T>
*/
public interface IRecordReader<T> extends Closeable {
@@ -33,7 +35,7 @@
/**
* @return true if the reader has more records remaining, false, otherwise.
* @throws Exception
- * if an error takes place
+ * if an error takes place
*/
public boolean hasNext() throws Exception;
@@ -46,6 +48,7 @@
/**
* used to stop reader from producing more records.
+ *
* @return true if the connection to the external source has been suspended, false otherwise.
*/
public boolean stop();
@@ -61,8 +64,10 @@
/**
* set a pointer to the log manager of the feed. the log manager can be used to log
* progress and errors
+ *
+ * @throws HyracksDataException
*/
- public void setFeedLogManager(FeedLogManager feedLogManager);
+ public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException;
/**
* gives the record reader a chance to recover from IO errors during feed intake
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamNotificationHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamNotificationHandler.java
new file mode 100644
index 0000000..8b014ad
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IStreamNotificationHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+public interface IStreamNotificationHandler {
+
+ /**
+ * Used to notify a handler that the stream is about to start reading data from a new source.
+ * An example use is by the parser to skip CSV file headers in case the stream reads from a set of files.
+ */
+ public void notifyNewSource();
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index 8ec422f..a301ac9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -25,6 +25,7 @@
import org.apache.asterix.external.api.IRecordWithPKDataParser;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
public class ChangeFeedDataFlowController<T> extends FeedRecordDataFlowController<T> {
@@ -33,7 +34,8 @@
public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
final FeedLogManager feedLogManager, final int numOfOutputFields,
- final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader) {
+ final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader)
+ throws HyracksDataException {
super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
this.dataParser = dataParser;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 370eec0..aac7be2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -25,13 +25,15 @@
import org.apache.asterix.external.parser.RecordWithMetadataParser;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
public class ChangeFeedWithMetaDataFlowController<T, O> extends FeedWithMetaDataFlowController<T, O> {
public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
final FeedLogManager feedLogManager, final int numOfOutputFields,
- final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader) {
+ final RecordWithMetadataParser<T, O> dataParser, final IRecordReader<T> recordReader)
+ throws HyracksDataException {
super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 387e2dc..a092620 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -45,7 +45,7 @@
public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
@Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
- @Nonnull IRecordReader<T> recordReader) {
+ @Nonnull IRecordReader<T> recordReader) throws HyracksDataException {
super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
this.dataParser = dataParser;
this.recordReader = recordReader;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 203b5a7..e7c396b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -25,6 +25,7 @@
import org.apache.asterix.external.parser.RecordWithMetadataParser;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
public class FeedWithMetaDataFlowController<T, O> extends FeedRecordDataFlowController<T> {
@@ -34,7 +35,7 @@
public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
FeedLogManager feedLogManager, int numOfOutputFields, RecordWithMetadataParser<T, O> dataParser,
- IRecordReader<T> recordReader) {
+ IRecordReader<T> recordReader) throws HyracksDataException {
super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
this.dataParser = dataParser;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
index 2c2dd98..6eee892 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/IndexingStreamRecordReader.java
@@ -31,6 +31,7 @@
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class IndexingStreamRecordReader implements IRecordReader<char[]>, IIndexingDatasource {
@@ -73,7 +74,7 @@
}
@Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
+ public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
reader.setFeedLogManager(feedLogManager);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 8572fc7..59b72e4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -26,87 +26,96 @@
public class LineRecordReader extends StreamRecordReader {
+ private final boolean hasHeader;
protected boolean prevCharCR;
protected int newlineLength;
protected int recordNumber = 0;
+ protected boolean nextIsHeader = false;
public LineRecordReader(final boolean hasHeader, final AsterixInputStream stream) throws HyracksDataException {
super(stream);
- try {
- if (hasHeader) {
- if (hasNext()) {
- next();
- }
- }
- } catch (final IOException e) {
- throw new HyracksDataException(e);
+ this.hasHeader = hasHeader;
+ if (hasHeader) {
+ stream.setNotificationHandler(this);
}
+ }
+ @Override
+ public void notifyNewSource() {
+ if (hasHeader) {
+ nextIsHeader = true;
+ }
}
@Override
public boolean hasNext() throws IOException {
- if (done) {
- return false;
- }
- /*
- * We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to record.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to record, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- */
- newlineLength = 0; //length of terminating newline
- prevCharCR = false; //true of prev char was CR
- record.reset();
- int readLength = 0;
- do {
- int startPosn = bufferPosn; //starting from where we left off the last time
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- bufferLength = reader.read(inputBuffer);
- if (bufferLength <= 0) {
- if (readLength > 0) {
- record.endRecord();
- recordNumber++;
- return true;
+ while (true) {
+ if (done) {
+ return false;
+ }
+ /*
+ * We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to record.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to record, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ */
+ newlineLength = 0; //length of terminating newline
+ prevCharCR = false; //true of prev char was CR
+ record.reset();
+ int readLength = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = reader.read(inputBuffer);
+ if (bufferLength <= 0) {
+ if (readLength > 0) {
+ record.endRecord();
+ recordNumber++;
+ return true;
+ }
+ close();
+ return false; //EOF
}
- close();
- return false; //EOF
}
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
}
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
+ readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength; //CR at the end of the buffer
+ prevCharCR = false;
}
- prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+ if (readLength > 0) {
+ record.append(inputBuffer, startPosn, readLength);
+ }
+ } while (newlineLength == 0);
+ if (nextIsHeader) {
+ nextIsHeader = false;
+ continue;
}
- readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0) {
- --readLength; //CR at the end of the buffer
- prevCharCR = false;
- }
- if (readLength > 0) {
- record.append(inputBuffer, startPosn, readLength);
- }
- } while (newlineLength == 0);
- recordNumber++;
- return true;
+ recordNumber++;
+ return true;
+ }
}
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 515e0e5..88964a1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -44,76 +44,82 @@
@Override
public boolean hasNext() throws IOException {
- if (done) {
- return false;
- }
- newlineLength = 0;
- prevCharCR = false;
- prevCharEscape = false;
- record.reset();
- int readLength = 0;
- inQuote = false;
- do {
- int startPosn = bufferPosn;
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- bufferLength = reader.read(inputBuffer);
- if (bufferLength <= 0) {
- {
- if (readLength > 0) {
- if (inQuote) {
- throw new IOException("malformed input record ended inside quote");
+ while (true) {
+ if (done) {
+ return false;
+ }
+ newlineLength = 0;
+ prevCharCR = false;
+ prevCharEscape = false;
+ record.reset();
+ int readLength = 0;
+ inQuote = false;
+ do {
+ int startPosn = bufferPosn;
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = reader.read(inputBuffer);
+ if (bufferLength <= 0) {
+ {
+ if (readLength > 0) {
+ if (inQuote) {
+ throw new IOException("malformed input record ended inside quote");
+ }
+ record.endRecord();
+ recordNumber++;
+ return true;
}
- record.endRecord();
- recordNumber++;
- return true;
+ close();
+ return false;
}
- close();
- return false;
}
}
- }
- for (; bufferPosn < bufferLength; ++bufferPosn) {
- if (!inQuote) {
- if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn;
- break;
- }
- if (prevCharCR) {
- newlineLength = 1;
- break;
- }
- prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
- if (inputBuffer[bufferPosn] == quote) {
- if (!prevCharEscape) {
- inQuote = true;
+ for (; bufferPosn < bufferLength; ++bufferPosn) {
+ if (!inQuote) {
+ if (inputBuffer[bufferPosn] == ExternalDataConstants.LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn;
+ break;
}
- }
- if (prevCharEscape) {
- prevCharEscape = false;
+ if (prevCharCR) {
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (inputBuffer[bufferPosn] == ExternalDataConstants.CR);
+ if (inputBuffer[bufferPosn] == quote) {
+ if (!prevCharEscape) {
+ inQuote = true;
+ }
+ }
+ if (prevCharEscape) {
+ prevCharEscape = false;
+ } else {
+ prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
+ }
} else {
+ // only look for next quote
+ if (inputBuffer[bufferPosn] == quote) {
+ if (!prevCharEscape) {
+ inQuote = false;
+ }
+ }
prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
}
- } else {
- // only look for next quote
- if (inputBuffer[bufferPosn] == quote) {
- if (!prevCharEscape) {
- inQuote = false;
- }
- }
- prevCharEscape = inputBuffer[bufferPosn] == ExternalDataConstants.ESCAPE;
}
+ readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength;
+ }
+ if (readLength > 0) {
+ record.append(inputBuffer, startPosn, readLength);
+ }
+ } while (newlineLength == 0);
+ if (nextIsHeader) {
+ nextIsHeader = false;
+ continue;
}
- readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0) {
- --readLength;
- }
- if (readLength > 0) {
- record.append(inputBuffer, startPosn, readLength);
- }
- } while (newlineLength == 0);
- recordNumber++;
- return true;
+ recordNumber++;
+ return true;
+ }
}
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
index 57ef3ae..7dc5bce 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReader.java
@@ -23,13 +23,16 @@
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IStreamNotificationHandler;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.input.stream.AsterixInputStreamReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
-public abstract class StreamRecordReader implements IRecordReader<char[]> {
+public abstract class StreamRecordReader implements IRecordReader<char[]>, IStreamNotificationHandler {
protected final AsterixInputStreamReader reader;
protected CharArrayRecord record;
protected char[] inputBuffer;
@@ -37,6 +40,7 @@
protected int bufferPosn = 0;
protected boolean done = false;
protected FeedLogManager feedLogManager;
+ protected MutableBoolean newFile = new MutableBoolean(false);
public StreamRecordReader(AsterixInputStream inputStream) {
this.reader = new AsterixInputStreamReader(inputStream);
@@ -72,7 +76,8 @@
public abstract boolean hasNext() throws IOException;
@Override
- public void setFeedLogManager(FeedLogManager feedLogManager) {
+ public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
+ this.feedLogManager = feedLogManager;
reader.setFeedLogManager(feedLogManager);
}
@@ -85,4 +90,9 @@
public boolean handleException(Throwable th) {
return reader.handleException(th);
}
+
+ @Override
+ public void notifyNewSource() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
index 7e280a5..94333d1 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AsterixInputStreamReader.java
@@ -29,6 +29,7 @@
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class AsterixInputStreamReader extends Reader {
private AsterixInputStream in;
@@ -56,7 +57,7 @@
in.setController(controller);
}
- public void setFeedLogManager(FeedLogManager feedLogManager) {
+ public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
in.setFeedLogManager(feedLogManager);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 00c1eb7..2519177 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -21,49 +21,39 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Map;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FileSystemWatcher;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
import org.apache.log4j.Logger;
public class LocalFSInputStream extends AsterixInputStream {
private static final Logger LOGGER = Logger.getLogger(LocalFSInputStream.class.getName());
- private final Path path;
private final FileSystemWatcher watcher;
private FileInputStream in;
private byte lastByte;
private File currentFile;
- public LocalFSInputStream(final FileSplit[] fileSplits, final IHyracksTaskContext ctx,
- final Map<String, String> configuration, final int partition, final String expression, final boolean isFeed)
- throws IOException {
- this.path = fileSplits[partition].getLocalFile().getFile().toPath();
- this.watcher = new FileSystemWatcher(path, expression, isFeed);
- this.watcher.init();
- }
-
- @Override
- public void setFeedLogManager(FeedLogManager logManager) {
- super.setFeedLogManager(logManager);
- watcher.setFeedLogManager(logManager);
+ public LocalFSInputStream(FileSystemWatcher watcher) throws IOException {
+ this.watcher = watcher;
}
@Override
public void setController(AbstractFeedDataFlowController controller) {
super.setController(controller);
- watcher.setController(controller);
}
@Override
+ public void setFeedLogManager(FeedLogManager logManager) throws HyracksDataException {
+ super.setFeedLogManager(logManager);
+ watcher.setFeedLogManager(logManager);
+ };
+
+ @Override
public void close() throws IOException {
IOException ioe = null;
if (in != null) {
@@ -86,6 +76,9 @@
private void closeFile() throws IOException {
if (in != null) {
+ if (logManager != null) {
+ logManager.endPartition(currentFile.getAbsolutePath());
+ }
try {
in.close();
} finally {
@@ -100,9 +93,18 @@
*/
private boolean advance() throws IOException {
closeFile();
- if (watcher.hasNext()) {
- currentFile = watcher.next();
+ currentFile = watcher.poll();
+ if (currentFile == null) {
+ if (controller != null) {
+ controller.flush();
+ }
+ currentFile = watcher.take();
+ }
+ if (currentFile != null) {
in = new FileInputStream(currentFile);
+ if (notificationHandler != null) {
+ notificationHandler.notifyNewSource();
+ }
return true;
}
return false;
@@ -141,6 +143,7 @@
@Override
public boolean stop() throws Exception {
+ closeFile();
watcher.close();
return true;
}
@@ -165,18 +168,11 @@
advance();
return true;
} catch (Exception e) {
- return false;
- }
- } else {
- try {
- watcher.init();
- } catch (IOException e) {
- LOGGER.warn("Failed to initialize watcher during failure recovery", e);
- return false;
+ LOGGER.warn("An exception was thrown while trying to skip a file", e);
}
}
- return true;
}
+ LOGGER.warn("Failed to recover from failure", th);
return false;
}
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index 85d0e41..08fce87 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -20,10 +20,14 @@
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
@@ -32,8 +36,9 @@
import org.apache.asterix.external.input.stream.LocalFSInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FileSystemWatcher;
import org.apache.asterix.external.util.NodeResolverFactory;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -49,16 +54,27 @@
protected static INodeResolver nodeResolver;
protected Map<String, String> configuration;
protected FileSplit[] inputFileSplits;
- protected FileSplit[] feedLogFileSplits; // paths where instances of this feed can use as log storage
protected boolean isFeed;
protected String expression;
// transient fields (They don't need to be serialized and transferred)
private transient AlgebricksAbsolutePartitionConstraint constraints;
+ private transient FileSystemWatcher watcher;
@Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+ public synchronized AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ if (watcher == null) {
+ String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ ArrayList<Path> inputResources = new ArrayList<>();
+ for (int i = 0; i < inputFileSplits.length; i++) {
+ if (inputFileSplits[i].getNodeName().equals(nodeName)) {
+ inputResources.add(inputFileSplits[i].getLocalFile().getFile().toPath());
+ }
+ }
+ watcher = new FileSystemWatcher(inputResources, expression, isFeed);
+ }
try {
- return new LocalFSInputStream(inputFileSplits, ctx, configuration, partition, expression, isFeed);
+ return new LocalFSInputStream(watcher);
} catch (IOException e) {
throw new HyracksDataException(e);
}
@@ -81,10 +97,6 @@
configureFileSplits(splits);
configurePartitionConstraint();
this.isFeed = ExternalDataUtils.isFeed(configuration) && ExternalDataUtils.keepDataSourceOpen(configuration);
- if (isFeed) {
- feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
- ExternalDataUtils.getFeedName(configuration), constraints);
- }
this.expression = configuration.get(ExternalDataConstants.KEY_EXPRESSION);
}
@@ -94,6 +106,7 @@
}
private void configureFileSplits(String[] splits) throws AsterixException {
+ INodeResolver resolver = getNodeResolver();
if (inputFileSplits == null) {
inputFileSplits = new FileSplit[splits.length];
String nodeName;
@@ -106,7 +119,7 @@
throw new AsterixException(
"Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
}
- nodeName = trimmedValue.split(":")[0];
+ nodeName = resolver.resolveNode(trimmedValue.split(":")[0]);
nodeLocalPath = trimmedValue.split("://")[1];
FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
inputFileSplits[count++] = fileSplit;
@@ -115,13 +128,21 @@
}
private void configurePartitionConstraint() throws AsterixException {
- String[] locs = new String[inputFileSplits.length];
- String location;
+ Map<String, ClusterPartition[]> partitions = AsterixAppContextInfo.getInstance().getMetadataProperties()
+ .getNodePartitions();
+ List<String> locs = new ArrayList<>();
for (int i = 0; i < inputFileSplits.length; i++) {
- location = getNodeResolver().resolveNode(inputFileSplits[i].getNodeName());
- locs[i] = location;
+ String location = inputFileSplits[i].getNodeName();
+ if (!locs.contains(location)) {
+ int numOfPartitions = partitions.get(location).length;
+ int j = 0;
+ while (j < numOfPartitions) {
+ locs.add(location);
+ j++;
+ }
+ }
}
- constraints = new AlgebricksAbsolutePartitionConstraint(locs);
+ constraints = new AlgebricksAbsolutePartitionConstraint(locs.toArray(new String[locs.size()]));
}
protected INodeResolver getNodeResolver() {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index d362201..6ba27d8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -52,7 +52,6 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
public class DataflowControllerProvider {
@@ -60,13 +59,9 @@
@SuppressWarnings({ "rawtypes", "unchecked" })
public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
- Map<String, String> configuration, boolean indexingOp, boolean isFeed, FileSplit[] feedLogFileSplits)
- throws HyracksDataException {
+ Map<String, String> configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager)
+ throws HyracksDataException {
try {
- FeedLogManager feedLogManager = null;
- if (isFeed) {
- feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits);
- }
switch (dataSourceFactory.getDataSourceType()) {
case RECORDS:
IRecordReaderFactory<?> recordReaderFactory = (IRecordReaderFactory<?>) dataSourceFactory;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
index fc15d3c..5bb8ec3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@ -38,8 +38,7 @@
START, // partition start
END, // partition end
COMMIT, // a record commit within a partition
- SNAPSHOT // an identifier that partitions with identifiers before this one should be
- // ignored
+ SNAPSHOT // an identifier that partitions with identifiers before this one should be ignored
}
public static final String PROGRESS_LOG_FILE_NAME = "progress.log";
@@ -55,6 +54,7 @@
private BufferedWriter errorLogger;
private BufferedWriter recordLogger;
private final StringBuilder stringBuilder = new StringBuilder();
+ private int count = 0;
public FeedLogManager(File file) throws HyracksDataException {
try {
@@ -69,18 +69,22 @@
}
}
- public void endPartition() throws IOException {
+ public synchronized void touch() {
+ count++;
+ }
+
+ public synchronized void endPartition() throws IOException {
logProgress(END_PREFIX + currentPartition);
completed.add(currentPartition);
}
- public void endPartition(String partition) throws IOException {
+ public synchronized void endPartition(String partition) throws IOException {
currentPartition = partition;
logProgress(END_PREFIX + currentPartition);
completed.add(currentPartition);
}
- public void startPartition(String partition) throws IOException {
+ public synchronized void startPartition(String partition) throws IOException {
currentPartition = partition;
logProgress(START_PREFIX + currentPartition);
}
@@ -89,7 +93,7 @@
return Files.exists(dir);
}
- public void open() throws IOException {
+ public synchronized void open() throws IOException {
// read content of logs.
BufferedReader reader = Files.newBufferedReader(
Paths.get(dir.toAbsolutePath().toString() + File.separator + PROGRESS_LOG_FILE_NAME));
@@ -113,13 +117,17 @@
StandardCharsets.UTF_8, StandardOpenOption.APPEND);
}
- public void close() throws IOException {
+ public synchronized void close() throws IOException {
+ count--;
+ if (count > 0) {
+ return;
+ }
progressLogger.close();
errorLogger.close();
recordLogger.close();
}
- public boolean create() throws IOException {
+ public synchronized boolean create() throws IOException {
File f = dir.toFile();
f.mkdirs();
new File(f, PROGRESS_LOG_FILE_NAME).createNewFile();
@@ -128,13 +136,13 @@
return true;
}
- public boolean destroy() throws IOException {
+ public synchronized boolean destroy() throws IOException {
File f = dir.toFile();
FileUtils.deleteDirectory(f);
return true;
}
- public void logProgress(String log) throws IOException {
+ public synchronized void logProgress(String log) throws IOException {
stringBuilder.setLength(0);
stringBuilder.append(log);
stringBuilder.append(ExternalDataConstants.LF);
@@ -142,7 +150,7 @@
progressLogger.flush();
}
- public void logError(String error, Throwable th) throws IOException {
+ public synchronized void logError(String error, Throwable th) throws IOException {
stringBuilder.setLength(0);
stringBuilder.append(error);
stringBuilder.append(ExternalDataConstants.LF);
@@ -152,7 +160,7 @@
errorLogger.flush();
}
- public void logRecord(String record, String errorMessage) throws IOException {
+ public synchronized void logRecord(String record, String errorMessage) throws IOException {
stringBuilder.setLength(0);
stringBuilder.append(record);
stringBuilder.append(ExternalDataConstants.LF);
@@ -166,7 +174,7 @@
return log.substring(PREFIX_SIZE);
}
- public boolean isSplitRead(String split) {
+ public synchronized boolean isSplitRead(String split) {
return completed.contains(split);
}
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 5ab41af..502a432 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -45,17 +45,16 @@
return dataverseName + File.separator + feedName;
}
- public static FileSplit splitsForAdapter(String dataverseName, String feedName, int partition,
- ClusterPartition[] nodePartitions) {
+ public static FileSplit splitsForAdapter(String dataverseName, String feedName, String nodeName,
+ ClusterPartition partition) {
File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
- ClusterPartition nodePartition = nodePartitions[0];
String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
- nodePartition.getPartitionId());
- // format: 'storage dir name'/partition_#/dataverse/feed/adapter_#
- File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator
- + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + partition);
- return StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f);
+ partition.getPartitionId());
+ // Note: feed adapter instances in a single node share the feed logger
+ // format: 'storage dir name'/partition_#/dataverse/feed/node
+ File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName);
+ return StoragePathUtil.getFileSplitForClusterPartition(partition, f);
}
public static FileSplit[] splitsForAdapter(String dataverseName, String feedName,
@@ -63,22 +62,11 @@
if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
throw new AsterixException("Can't create file splits for adapter with count partitioning constraints");
}
- File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
- String[] locations = null;
- locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
+ String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
List<FileSplit> splits = new ArrayList<FileSplit>();
- String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
- int i = 0;
for (String nd : locations) {
- // Always get the first partition
- ClusterPartition nodePartition = AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0];
- String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName,
- nodePartition.getPartitionId());
- // format: 'storage dir name'/partition_#/dataverse/feed/adapter_#
- File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator
- + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + i);
- splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f));
- i++;
+ splits.add(splitsForAdapter(dataverseName, feedName, nd,
+ AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0]));
}
return splits.toArray(new FileSplit[] {});
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
index 4eec348..b15d097 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@ -33,8 +33,9 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
-import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -48,44 +49,51 @@
private Iterator<File> it;
private final String expression;
private FeedLogManager logManager;
- private final Path path;
+ private final List<Path> paths;
private final boolean isFeed;
private boolean done;
- private File current;
- private AbstractFeedDataFlowController controller;
private final LinkedList<Path> dirs;
+ private final ReentrantLock lock = new ReentrantLock();
- public FileSystemWatcher(Path inputResource, String expression, boolean isFeed) {
+ public FileSystemWatcher(List<Path> inputResources, String expression, boolean isFeed) throws HyracksDataException {
+ this.isFeed = isFeed;
this.keys = isFeed ? new HashMap<WatchKey, Path>() : null;
this.expression = expression;
- this.path = inputResource;
- this.isFeed = isFeed;
+ this.paths = inputResources;
this.dirs = new LinkedList<Path>();
+ if (!isFeed) {
+ init();
+ }
}
- public void setFeedLogManager(FeedLogManager feedLogManager) {
- this.logManager = feedLogManager;
+ public synchronized void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
+ if (logManager == null) {
+ this.logManager = feedLogManager;
+ init();
+ }
}
- public void init() throws HyracksDataException {
+ public synchronized void init() throws HyracksDataException {
try {
dirs.clear();
- LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
- it = files.iterator();
- if (isFeed) {
- keys.clear();
- if (watcher != null) {
- try {
- watcher.close();
- } catch (IOException e) {
- LOGGER.warn("Failed to close watcher service", e);
+ for (Path path : paths) {
+ LocalFileSystemUtils.traverse(files, path.toFile(), expression, dirs);
+ it = files.iterator();
+ if (isFeed) {
+ keys.clear();
+ if (watcher != null) {
+ try {
+ watcher.close();
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close watcher service", e);
+ }
}
+ watcher = FileSystems.getDefault().newWatchService();
+ for (Path dirPath : dirs) {
+ register(dirPath);
+ }
+ resume();
}
- watcher = FileSystems.getDefault().newWatchService();
- for (Path path : dirs) {
- register(path);
- }
- resume();
}
} catch (IOException e) {
throw new HyracksDataException(e);
@@ -102,7 +110,7 @@
keys.put(key, dir);
}
- private void resume() throws IOException {
+ private synchronized void resume() throws IOException {
if (logManager == null) {
return;
}
@@ -142,14 +150,12 @@
}
for (WatchEvent<?> event : key.pollEvents()) {
Kind<?> kind = event.kind();
- // TODO: Do something about overflow events
// An overflow event means that some events were dropped
if (kind == StandardWatchEventKinds.OVERFLOW) {
if (LOGGER.isEnabledFor(Level.WARN)) {
LOGGER.warn("Overflow event. Some events might have been missed");
}
// need to read and validate all files.
- //TODO: use btrees for all logs
init();
return;
}
@@ -174,33 +180,90 @@
}
}
}
+ it = files.iterator();
}
- public void close() throws IOException {
+ public synchronized void close() throws IOException {
if (!done) {
if (watcher != null) {
watcher.close();
watcher = null;
}
- if (logManager != null) {
- if (current != null) {
- logManager.startPartition(current.getAbsolutePath());
- logManager.endPartition();
- }
- logManager.close();
- current = null;
- }
done = true;
}
}
- public File next() throws IOException {
- if ((current != null) && (logManager != null)) {
- logManager.startPartition(current.getAbsolutePath());
- logManager.endPartition();
+ // poll is not blocking
+ public synchronized File poll() throws IOException {
+ if (it.hasNext()) {
+ return it.next();
}
- current = it.next();
- return current;
+ if (done || !isFeed) {
+ return null;
+ }
+ files.clear();
+ it = files.iterator();
+ if (keys.isEmpty()) {
+ close();
+ return null;
+ }
+ // Read new Events (Polling first to add all available files)
+ WatchKey key;
+ key = watcher.poll();
+ while (key != null) {
+ handleEvents(key);
+ if (endOfEvents(key)) {
+ close();
+ return null;
+ }
+ key = watcher.poll();
+ }
+ return null;
+ }
+
+ // take is blocking
+ public synchronized File take() throws IOException {
+ File next = poll();
+ if (next != null) {
+ return next;
+ }
+ if (done || !isFeed) {
+ return null;
+ }
+ // No file was found, wait for the filesystem to push events
+ WatchKey key = null;
+ lock.lock();
+ try {
+ while (!it.hasNext()) {
+ try {
+ key = watcher.take();
+ } catch (InterruptedException x) {
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("Feed Closed");
+ }
+ if (watcher == null) {
+ return null;
+ }
+ continue;
+ } catch (ClosedWatchServiceException e) {
+ if (LOGGER.isEnabledFor(Level.WARN)) {
+ LOGGER.warn("The watcher has exited");
+ }
+ if (watcher == null) {
+ return null;
+ }
+ continue;
+ }
+ handleEvents(key);
+ if (endOfEvents(key)) {
+ return null;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ // files were found, re-create the iterator and move it one step
+ return it.next();
}
private boolean endOfEvents(WatchKey key) {
@@ -213,64 +276,4 @@
}
return false;
}
-
- public boolean hasNext() throws IOException {
- if (it.hasNext()) {
- return true;
- }
- if (done || !isFeed) {
- return false;
- }
- files.clear();
- if (keys.isEmpty()) {
- return false;
- }
- // Read new Events (Polling first to add all available files)
- WatchKey key;
- key = watcher.poll();
- while (key != null) {
- handleEvents(key);
- if (endOfEvents(key)) {
- close();
- return false;
- }
- key = watcher.poll();
- }
- // No file was found, wait for the filesystem to push events
- if (controller != null) {
- controller.flush();
- }
- while (files.isEmpty()) {
- try {
- key = watcher.take();
- } catch (InterruptedException x) {
- if (LOGGER.isEnabledFor(Level.WARN)) {
- LOGGER.warn("Feed Closed");
- }
- if (watcher == null) {
- return false;
- }
- continue;
- } catch (ClosedWatchServiceException e) {
- if (LOGGER.isEnabledFor(Level.WARN)) {
- LOGGER.warn("The watcher has exited");
- }
- if (watcher == null) {
- return false;
- }
- continue;
- }
- handleEvents(key);
- if (endOfEvents(key)) {
- return false;
- }
- }
- // files were found, re-create the iterator and move it one step
- it = files.iterator();
- return it.hasNext();
- }
-
- public void setController(AbstractFeedDataFlowController controller) {
- this.controller = controller;
- }
}
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
index 354aedb..876639d 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/test/ClassAdToADMTest.java
@@ -18,7 +18,10 @@
*/
package org.apache.asterix.external.classad.test;
+import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -32,8 +35,7 @@
import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReader;
import org.apache.asterix.external.input.stream.LocalFSInputStream;
import org.apache.asterix.external.library.ClassAdParser;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.asterix.external.util.FileSystemWatcher;
import junit.framework.Test;
import junit.framework.TestCase;
@@ -69,10 +71,10 @@
ClassAdParser parser = new ClassAdParser(objectPool);
CharArrayLexerSource lexerSource = new CharArrayLexerSource();
for (String path : files) {
- LocalFSInputStream in = new LocalFSInputStream(
- new FileSplit[] { new FileSplit("",
- new FileReference(Paths.get(getClass().getResource(path).toURI()).toFile())) },
- null, null, 0, null, false);
+ List<Path> paths = new ArrayList<>();
+ paths.add(Paths.get(getClass().getResource(path).toURI()));
+ FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
+ LocalFSInputStream in = new LocalFSInputStream(watcher);
SemiStructuredRecordReader recordReader = new SemiStructuredRecordReader(in, "[", "]");
Value val = new Value(objectPool);
while (recordReader.hasNext()) {
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 7da6389..d1a4532 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -80,16 +80,16 @@
IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) ((NodeControllerService) ctx
.getJobletContext().getApplicationContext().getControllerService()).getApplicationContext()
.getApplicationObject();
- ClusterPartition[] nodePartitions = propertiesProvider.getMetadataProperties().getNodePartitions()
- .get(nodeId);
+ ClusterPartition nodePartition = propertiesProvider.getMetadataProperties().getNodePartitions()
+ .get(nodeId)[0];
try {
parser = new ADMDataParser(outputType, true);
forwarder = DataflowUtils
.getTupleForwarder(configuration,
FeedUtils.getFeedLogManager(ctx,
FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
- ExternalDataUtils.getFeedName(configuration), partition,
- nodePartitions)));
+ ExternalDataUtils.getFeedName(configuration), nodeId,
+ nodePartition)));
tb = new ArrayTupleBuilder(1);
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java b/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
index fc6e725..faff9df 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/parser/test/RecordWithMetaTest.java
@@ -21,6 +21,7 @@
import java.io.File;
import java.io.PrintStream;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
@@ -32,6 +33,7 @@
import org.apache.asterix.external.parser.ADMDataParser;
import org.apache.asterix.external.parser.RecordWithMetadataParser;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.FileSystemWatcher;
import org.apache.asterix.formats.nontagged.AqlADMPrinterFactoryProvider;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.om.types.ARecordType;
@@ -42,9 +44,7 @@
import org.apache.hyracks.algebricks.data.IPrinterFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
import org.junit.Assert;
import org.junit.Test;
@@ -79,12 +79,11 @@
int[] pkIndexes = { 0 };
int[] pkIndicators = { 1 };
+ List<Path> paths = new ArrayList<>();
+ paths.add(Paths.get(getClass().getResource("/beer.csv").toURI()));
+ FileSystemWatcher watcher = new FileSystemWatcher(paths, null, false);
// create input stream
- LocalFSInputStream inputStream = new LocalFSInputStream(
- new FileSplit[] { new FileSplit(null,
- new FileReference(Paths.get(getClass().getResource("/beer.csv").toURI()).toFile())) },
- null, null, 0, null, false);
-
+ LocalFSInputStream inputStream = new LocalFSInputStream(watcher);
// create reader record reader
QuotedLineRecordReader lineReader = new QuotedLineRecordReader(true, inputStream,
ExternalDataConstants.DEFAULT_QUOTE);
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
index 51c3802..0e9aa0c 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
@@ -26,6 +26,8 @@
import java.util.Map;
import java.util.Set;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+
/**
* Utility class for obtaining information on the set of Hyracks NodeController
* processes that are running on a given host.
@@ -54,6 +56,8 @@
}
public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) throws Exception {
- AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
+ ClusterControllerService ccs = (ClusterControllerService) AsterixAppContextInfo.getInstance()
+ .getCCApplicationContext().getControllerService();
+ map.putAll(ccs.getIpAddressNodeNameMap());
}
}