Add flush() to IFrameWriter
This method is expected to be used with feeds to push
frames all the way to storage when needed. As of now, it is
needed in two cases:
1. No activities in ingestion node and need to push content
so it can be stored.
2. When the ingestion node needs to move the checkpoint ahead
if the at least once semantics are used.
Two feeds make use of this function. The filesystem feed and
couchbase feed which was introduced as well in this change.
Change-Id: Id862ce9e9b1360864c6976f2aea2137092f51203
Reviewed-on: https://asterix-gerrit.ics.uci.edu/585
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 867c96b..7801fd7 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -277,5 +277,15 @@
<version>1.2.2</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.couchbase.client</groupId>
+ <artifactId>core-io</artifactId>
+ <version>1.2.3</version>
+ </dependency>
+ <dependency>
+ <groupId>io.reactivex</groupId>
+ <artifactId>rxjava</artifactId>
+ <version>1.0.15</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index 866910b..76c8b85 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -29,7 +29,7 @@
import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
import org.apache.asterix.external.indexing.RecordIdReader;
import org.apache.asterix.external.indexing.RecordIdReaderFactory;
-import org.apache.asterix.external.input.record.reader.LookupReaderFactoryProvider;
+import org.apache.asterix.external.provider.LookupReaderFactoryProvider;
import org.apache.asterix.external.provider.ParserFactoryProvider;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.comm.IFrameWriter;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
index a4a5a43..4ad4c4f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
@@ -37,6 +37,7 @@
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.AUnorderedListType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -124,4 +125,11 @@
break;
}
}
+
+ public static <T> void toBytes(T serializable, ArrayBackedValueStorage buffer, ISerializerDeserializer<T> serde)
+ throws HyracksDataException {
+ buffer.reset();
+ DataOutput out = buffer.getDataOutput();
+ serde.serialize(serializable, out);
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index 580ac99..370ea93 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -59,6 +59,8 @@
* Specify whether the external data source can be indexed
* @return
*/
- public boolean isIndexible();
+ public default boolean isIndexible() {
+ return false;
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java
index 92b500d..fe15244 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRawRecord.java
@@ -35,11 +35,6 @@
public T get();
/**
- * @return The class of the record objects.
- */
- public Class<?> getRecordClass();
-
- /**
* Resets the object to prepare it for another write operation.
*/
public void reset();
@@ -48,4 +43,10 @@
* @return The size of the valid bytes of the object. If the object can't be serialized, this method returns -1
*/
int size();
+
+ /**
+ * Sets the new value of the record
+ * @param t
+ */
+ public void set(T t);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 019fe8f..3cf467e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -52,15 +52,16 @@
public IRawRecord<T> next() throws IOException, InterruptedException;
/**
- * @return the class of the java objects representing the records. used to check compatibility between readers and
- * parsers.
- * @throws IOException
- */
- public Class<? extends T> getRecordClass() throws IOException;
-
- /**
* used to stop reader from producing more records.
* @return true if the connection to the external source has been suspended, false otherwise.
*/
public boolean stop();
+
+ /**
+ * set a pointer to the controller of the feed. the controller can be used to flush()
+ * parsed records when waiting for more records to be pushed
+ */
+ public default void setController(IDataFlowController controller) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ };
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index aab4bf6..461eaf9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -65,4 +65,8 @@
tupleForwarder.resume();
return true;
}
+
+ public void flush() throws HyracksDataException {
+ tupleForwarder.flush();
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index fe4557d..2a4eaf9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -43,6 +43,7 @@
while (recordReader.hasNext()) {
IRawRecord<? extends T> record = recordReader.next();
if (record == null) {
+ flush();
Thread.sleep(interval);
continue;
}
@@ -110,5 +111,6 @@
@Override
public void setRecordReader(IRecordReader<T> recordReader) {
this.recordReader = recordReader;
+ recordReader.setController(this);
}
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index d1707665..b46a338 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -80,4 +80,8 @@
FrameUtils.flushFrame(frame.getBuffer(), writer);
}
}
+
+ public void flush() throws HyracksDataException {
+ appender.flush(writer);
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
index ba6f83c..a97182c 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/LookupAdapter.java
@@ -154,9 +154,14 @@
@Override
public void close() throws HyracksDataException {
try {
- appender.flush(writer, true);
+ appender.write(writer, true);
} finally {
writer.close();
}
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ appender.flush(writer);
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
index 18b6ec0..3fc7ac8 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/CollectTransformFeedFrameWriter.java
@@ -63,6 +63,7 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ // always project the first field only. why?
inputFrameTupleAccessor.reset(buffer);
int nTuple = inputFrameTupleAccessor.getTupleCount();
for (int t = 0; t < nTuple; t++) {
@@ -116,4 +117,9 @@
this.downstreamWriter = writer;
}
+ @Override
+ public void flush() throws HyracksDataException {
+ tupleAppender.flush(downstreamWriter);
+ }
+
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
index 7367d5a..d314f74 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
@@ -156,4 +156,9 @@
public FrameDistributor.DistributionMode getDistributionMode() {
return frameDistributor.getDistributionMode();
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ frameDistributor.flush();
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
index 0a595b7..159bc43 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCache.java
@@ -169,4 +169,9 @@
frameWriter.nextFrame(frame);
}
}
+
+ @Override
+ public void emptyInbox() throws HyracksDataException {
+ frameWriter.flush();
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
index 0d53524..ef4b87d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameCollector.java
@@ -157,4 +157,13 @@
return connectionId.toString().hashCode();
}
+ @Override
+ public void emptyInbox() throws HyracksDataException {
+ flush();
+ }
+
+ public synchronized void flush() throws HyracksDataException {
+ frameWriter.flush();
+ }
+
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
index 6ad00f1..3c45a20 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedFrameHandlers.java
@@ -272,6 +272,10 @@
};
}
+ @Override
+ public void emptyInbox() throws HyracksDataException {
+ }
+
}
@Override
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 3a46b1a..a00e732 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -112,7 +112,6 @@
this.mBuffer = MonitoredBuffer.getMonitoredBuffer(ctx, this, coreOperator, fta, recordDesc,
feedManager.getFeedMetricCollector(), connectionId, runtimeId, exceptionHandler, frameEventCallback,
nPartitions, fpa);
- this.mBuffer.start();
this.throttlingEnabled = false;
}
@@ -414,6 +413,7 @@
@Override
public void open() throws HyracksDataException {
coreOperator.open();
+ mBuffer.start();
}
@Override
@@ -465,4 +465,12 @@
public void setBufferingEnabled(boolean bufferingEnabled) {
this.bufferingEnabled = bufferingEnabled;
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ // Only flush when in process mode.
+ if (mode == Mode.PROCESS) {
+ coreOperator.flush();
+ }
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
index 543efb2..85308df 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
@@ -25,8 +25,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.external.feed.api.IFeedMemoryManager;
import org.apache.asterix.external.feed.api.IFeedMemoryComponent.Type;
+import org.apache.asterix.external.feed.api.IFeedMemoryManager;
import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.management.FeedId;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -358,4 +358,14 @@
return fta;
}
+ public void flush() throws HyracksDataException {
+ switch (distributionMode) {
+ case SINGLE:
+ FeedFrameCollector collector = registeredCollectors.values().iterator().next();
+ collector.flush();
+ default:
+ break;
+ }
+ }
+
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
index abeb994..f69c552 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/MessageReceiver.java
@@ -25,6 +25,7 @@
import java.util.logging.Logger;
import org.apache.asterix.external.feed.api.IMessageReceiver;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class MessageReceiver<T> implements IMessageReceiver<T> {
@@ -74,12 +75,17 @@
this.inbox = messageReceiver.inbox;
this.messageReceiver = messageReceiver;
}
+ // TODO: this should handle exceptions better
@Override
public void run() {
while (true) {
try {
- T message = inbox.take();
+ T message = inbox.poll();
+ if (message == null) {
+ messageReceiver.emptyInbox();
+ message = inbox.take();
+ }
messageReceiver.processMessage(message);
} catch (InterruptedException e) {
e.printStackTrace();
@@ -108,4 +114,6 @@
}
}
+ public abstract void emptyInbox() throws HyracksDataException;
+
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
index db38edf..b93410a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/MonitoredBuffer.java
@@ -393,4 +393,9 @@
return storageTimeTrackingRateTask;
}
+ @Override
+ public void emptyInbox() throws HyracksDataException {
+ inputHandler.flush();
+ }
+
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
index 1f9551d..9db930e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StorageSideMonitoredBuffer.java
@@ -38,8 +38,7 @@
public class StorageSideMonitoredBuffer extends MonitoredBuffer {
- private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000; // 10
- // seconds
+ private static final long STORAGE_TIME_TRACKING_FREQUENCY = 5000;
private boolean ackingEnabled;
private final boolean timeTrackingEnabled;
@@ -207,5 +206,4 @@
protected boolean reportInflowRate() {
return false;
}
-
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
index 932aece..0fbbd2e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FileOffsetIndexer.java
@@ -22,7 +22,7 @@
import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.om.base.AMutableInt32;
import org.apache.asterix.om.base.AMutableInt64;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
index 14235c00..9fa26f0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java
@@ -22,7 +22,7 @@
import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.om.base.AMutableInt32;
import org.apache.asterix.om.base.AMutableInt64;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index 7e9fdcb..aa35383 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -29,8 +29,8 @@
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingScheduler;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
-import org.apache.asterix.external.input.stream.HDFSInputStreamProvider;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
+import org.apache.asterix.external.input.stream.provider.HDFSInputStreamProvider;
import org.apache.asterix.external.provider.ExternalIndexerProvider;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.HDFSUtils;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
index fd5c397..365aeb0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/CharArrayRecord.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.input.record;
+import java.nio.CharBuffer;
import java.util.Arrays;
import org.apache.asterix.external.api.IRawRecord;
@@ -99,8 +100,21 @@
size++;
}
+ public void append(char[] recordBuffer) {
+ ensureCapacity(size + recordBuffer.length);
+ System.arraycopy(recordBuffer, 0, value, size, recordBuffer.length);
+ size += recordBuffer.length;
+ }
+
+ public void append(CharBuffer chars) {
+ ensureCapacity(size + chars.limit());
+ chars.get(value, size, chars.limit());
+ size += chars.limit();
+ }
+
@Override
- public Class<char[]> getRecordClass() {
- return char[].class;
+ public void set(char[] value) {
+ this.value = value;
+ this.size = value.length;
}
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
index 365bc22..f405b82 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/GenericRecord.java
@@ -47,10 +47,6 @@
}
@Override
- public Class<?> getRecordClass() {
- return record.getClass();
- }
-
public void set(T record) {
this.record = record;
}
@@ -58,5 +54,4 @@
@Override
public void reset() {
}
-
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
new file mode 100644
index 0000000..d5640a6
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadata.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class RecordWithMetadata<T> {
+
+ private ArrayBackedValueStorage[] fieldValueBuffers;
+ private DataOutput[] fieldValueBufferOutputs;
+ private IValueParserFactory[] valueParserFactories;
+ private byte[] fieldTypeTags;
+ private IRawRecord<T> record;
+
+ // Serializers
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ADouble> doubleSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADOUBLE);
+ private AMutableDouble mutableDouble = new AMutableDouble(0);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+ private AMutableString mutableString = new AMutableString(null);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT32);
+ private AMutableInt32 mutableInt = new AMutableInt32(0);
+ @SuppressWarnings("unchecked")
+ protected ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
+ private AMutableInt64 mutableLong = new AMutableInt64(0);
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+
+ public RecordWithMetadata(Class<? extends T> recordClass) {
+ }
+
+ public RecordWithMetadata(IAType[] metaTypes, Class<? extends T> recordClass) {
+ int n = metaTypes.length;
+ this.fieldValueBuffers = new ArrayBackedValueStorage[n];
+ this.fieldValueBufferOutputs = new DataOutput[n];
+ this.valueParserFactories = new IValueParserFactory[n];
+ this.fieldTypeTags = new byte[n];
+ for (int i = 0; i < n; i++) {
+ ATypeTag tag = metaTypes[i].getTypeTag();
+ fieldTypeTags[i] = tag.serialize();
+ fieldValueBuffers[i] = new ArrayBackedValueStorage();
+ fieldValueBufferOutputs[i] = fieldValueBuffers[i].getDataOutput();
+ valueParserFactories[i] = ExternalDataUtils.getParserFactory(tag);
+ }
+ }
+
+ public IRawRecord<T> getRecord() {
+ return record;
+ }
+
+ public ArrayBackedValueStorage getMetadata(int index) {
+ return fieldValueBuffers[index];
+ }
+
+ public void setRecord(IRawRecord<T> record) {
+ this.record = record;
+ }
+
+ public void reset() {
+ record.reset();
+ for (ArrayBackedValueStorage fieldBuffer : fieldValueBuffers) {
+ fieldBuffer.reset();
+ }
+ }
+
+ public void setMetadata(int index, int value) throws IOException {
+ fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+ mutableInt.setValue(value);
+ IDataParser.toBytes(mutableInt, fieldValueBuffers[index], int32Serde);
+ }
+
+ public void setMetadata(int index, long value) throws IOException {
+ fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+ mutableLong.setValue(value);
+ IDataParser.toBytes(mutableLong, fieldValueBuffers[index], int64Serde);
+ }
+
+ public void setMetadata(int index, String value) throws IOException {
+ fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+ mutableString.setValue(value);
+ IDataParser.toBytes(mutableString, fieldValueBuffers[index], stringSerde);
+ }
+
+ public void setMeta(int index, boolean value) throws IOException {
+ fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+ IDataParser.toBytes(value ? ABoolean.TRUE : ABoolean.FALSE, fieldValueBuffers[index], booleanSerde);
+ }
+
+ public void setMeta(int index, double value) throws IOException {
+ fieldValueBufferOutputs[index].writeByte(fieldTypeTags[index]);
+ mutableDouble.setValue(value);
+ IDataParser.toBytes(mutableDouble, fieldValueBuffers[index], doubleSerde);
+ }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
new file mode 100644
index 0000000..895af1b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReader.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.couchbase;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.asterix.external.api.IDataFlowController;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.log4j.Logger;
+
+import com.couchbase.client.core.CouchbaseCore;
+import com.couchbase.client.core.dcp.BucketStreamAggregator;
+import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
+import com.couchbase.client.core.dcp.BucketStreamState;
+import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent;
+import com.couchbase.client.core.env.DefaultCoreEnvironment;
+import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.message.cluster.CloseBucketRequest;
+import com.couchbase.client.core.message.cluster.OpenBucketRequest;
+import com.couchbase.client.core.message.cluster.SeedNodesRequest;
+import com.couchbase.client.core.message.dcp.DCPRequest;
+import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.RemoveMessage;
+import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+
+import rx.functions.Action1;
+
+public class CouchbaseReader implements IRecordReader<RecordWithMetadata<char[]>> {
+
+ private static final MutationMessage POISON_PILL = new MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L,
+ null);
+ private final String feedName;
+ private final short[] vbuckets;
+ private final String bucket;
+ private final String password;
+ private final String[] couchbaseNodes;
+ private AbstractFeedDataFlowController controller;
+ private Builder builder;
+ private BucketStreamAggregator bucketStreamAggregator;
+ private CouchbaseCore core;
+ private DefaultCoreEnvironment env;
+ private Thread pushThread;
+ private ArrayBlockingQueue<MutationMessage> messages;
+ private GenericRecord<RecordWithMetadata<char[]>> record;
+ private RecordWithMetadata<char[]> recordWithMetadata;
+ private boolean done = false;
+ private CharArrayRecord value;
+ private CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
+ private ByteBuffer bytes = ByteBuffer.allocateDirect(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ private CharBuffer chars = CharBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ // metaTypes = {key(string), bucket(string), vbucket(int32), seq(long), cas(long), creationTime(long),expiration(int32),flags(int32),revSeqNumber(long),lockTime(int32)}
+ private static final IAType[] metaTypes = new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT64, BuiltinType.AINT32,
+ BuiltinType.AINT32, BuiltinType.AINT64, BuiltinType.AINT32 };
+ private static final Logger LOGGER = Logger.getLogger(CouchbaseReader.class);
+
+ public CouchbaseReader(String feedName, String bucket, String password, String[] couchbaseNodes, short[] vbuckets,
+ int queueSize) throws HyracksDataException {
+ this.feedName = feedName;
+ this.bucket = bucket;
+ this.password = password;
+ this.couchbaseNodes = couchbaseNodes;
+ this.vbuckets = vbuckets;
+ this.recordWithMetadata = new RecordWithMetadata<char[]>(metaTypes, char[].class);
+ this.messages = new ArrayBlockingQueue<MutationMessage>(queueSize);
+ this.value = new CharArrayRecord();
+ recordWithMetadata.setRecord(value);
+ this.record = new GenericRecord<RecordWithMetadata<char[]>>(recordWithMetadata);
+ }
+
+ @Override
+ public void close() {
+ if (!done) {
+ done = true;
+ }
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ this.builder = DefaultCoreEnvironment.builder().dcpEnabled(CouchbaseReaderFactory.DCP_ENABLED)
+ .autoreleaseAfter(CouchbaseReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS);
+ this.env = builder.build();
+ this.core = new CouchbaseCore(env);
+ this.bucketStreamAggregator = new BucketStreamAggregator(feedName, core, bucket);
+ connect();
+ }
+
+ private void connect() {
+ core.send(new SeedNodesRequest(couchbaseNodes))
+ .timeout(CouchbaseReaderFactory.TIMEOUT, CouchbaseReaderFactory.TIME_UNIT).toBlocking().single();
+ core.send(new OpenBucketRequest(bucket, password))
+ .timeout(CouchbaseReaderFactory.TIMEOUT, CouchbaseReaderFactory.TIME_UNIT).toBlocking().single();
+ this.pushThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ CouchbaseReader.this.run(bucketStreamAggregator);
+ }
+ }, feedName);
+ pushThread.start();
+ }
+
+ private void run(BucketStreamAggregator bucketStreamAggregator) {
+ BucketStreamAggregatorState state = new BucketStreamAggregatorState();
+ for (int i = 0; i < vbuckets.length; i++) {
+ state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0, 0xffffffff));
+ }
+ state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>() {
+ @Override
+ public void call(BucketStreamStateUpdatedEvent event) {
+ if (event.partialUpdate()) {
+ } else {
+ }
+ }
+ });
+ try {
+ bucketStreamAggregator.feed(state).toBlocking().forEach(new Action1<DCPRequest>() {
+ @Override
+ public void call(final DCPRequest dcpRequest) {
+ try {
+ if (dcpRequest instanceof SnapshotMarkerMessage) {
+ SnapshotMarkerMessage message = (SnapshotMarkerMessage) dcpRequest;
+ final BucketStreamState oldState = state.get(message.partition());
+ state.put(new BucketStreamState(message.partition(), oldState.vbucketUUID(),
+ message.endSequenceNumber(), oldState.endSequenceNumber(),
+ message.endSequenceNumber(), oldState.snapshotEndSequenceNumber()));
+ } else if (dcpRequest instanceof MutationMessage) {
+
+ messages.put((MutationMessage) dcpRequest);
+ } else if (dcpRequest instanceof RemoveMessage) {
+ RemoveMessage message = (RemoveMessage) dcpRequest;
+ LOGGER.info(message.key() + " was deleted.");
+ }
+ } catch (Throwable th) {
+ LOGGER.error(th);
+ }
+ }
+ });
+ } catch (Throwable th) {
+ if (th.getCause() instanceof InterruptedException) {
+ LOGGER.warn("dcp thread was interrupted", th);
+ synchronized (this) {
+ CouchbaseReader.this.close();
+ notifyAll();
+ }
+ }
+ throw th;
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return !done;
+ }
+
+ @Override
+ public IRawRecord<RecordWithMetadata<char[]>> next() throws IOException, InterruptedException {
+ if (messages.isEmpty()) {
+ controller.flush();
+ }
+ MutationMessage message = messages.take();
+ if (message == POISON_PILL) {
+ return null;
+ }
+ String key = message.key();
+ int vbucket = message.partition();
+ long seq = message.bySequenceNumber();
+ String bucket = message.bucket();
+ long cas = message.cas();
+ long creationTime = message.creationTime();
+ int expiration = message.expiration();
+ int flags = message.flags();
+ long revSeqNumber = message.revisionSequenceNumber();
+ int lockTime = message.lockTime();
+ recordWithMetadata.reset();
+ recordWithMetadata.setMetadata(0, key);
+ recordWithMetadata.setMetadata(1, bucket);
+ recordWithMetadata.setMetadata(2, vbucket);
+ recordWithMetadata.setMetadata(3, seq);
+ recordWithMetadata.setMetadata(4, cas);
+ recordWithMetadata.setMetadata(5, creationTime);
+ recordWithMetadata.setMetadata(6, expiration);
+ recordWithMetadata.setMetadata(7, flags);
+ recordWithMetadata.setMetadata(8, revSeqNumber);
+ recordWithMetadata.setMetadata(9, lockTime);
+ CouchbaseReader.set(message.content(), decoder, bytes, chars, value);
+ return record;
+ }
+
+ @Override
+ public boolean stop() {
+ done = true;
+ core.send(new CloseBucketRequest(bucket)).toBlocking();
+ try {
+ messages.put(CouchbaseReader.POISON_PILL);
+ } catch (InterruptedException e) {
+ LOGGER.warn(e);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void setController(IDataFlowController controller) {
+ this.controller = (AbstractFeedDataFlowController) controller;
+ }
+
+ public static void set(ByteBuf content, CharsetDecoder decoder, ByteBuffer bytes, CharBuffer chars,
+ CharArrayRecord record) {
+ int position = content.readerIndex();
+ int limit = content.writerIndex();
+ int contentSize = content.capacity();
+ while (position < limit) {
+ bytes.clear();
+ chars.clear();
+ if (contentSize - position < bytes.capacity()) {
+ bytes.limit(contentSize - position);
+ }
+ content.getBytes(position, bytes);
+ position += bytes.position();
+ bytes.flip();
+ decoder.decode(bytes, chars, false);
+ chars.flip();
+ record.append(chars);
+ }
+ record.endRecord();
+ }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
new file mode 100644
index 0000000..b9b6f65
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/couchbase/CouchbaseReaderFactory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.couchbase;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+import com.couchbase.client.core.CouchbaseCore;
+import com.couchbase.client.core.config.CouchbaseBucketConfig;
+import com.couchbase.client.core.env.DefaultCoreEnvironment;
+import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
+import com.couchbase.client.core.message.cluster.CloseBucketRequest;
+import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
+import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
+import com.couchbase.client.core.message.cluster.OpenBucketRequest;
+import com.couchbase.client.core.message.cluster.SeedNodesRequest;
+
+import rx.functions.Func1;
+
+public class CouchbaseReaderFactory implements IRecordReaderFactory<RecordWithMetadata<char[]>> {
+
+ private static final long serialVersionUID = 1L;
+ // Constant fields
+ public static final boolean DCP_ENABLED = true;
+ public static final long AUTO_RELEASE_AFTER_MILLISECONDS = 5000L;
+ public static final int TIMEOUT = 5;
+ public static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
+ // Dynamic fields
+ private Map<String, String> configuration;
+ private String bucket;
+ private String password = "";
+ private String[] couchbaseNodes;
+ private int numOfVBuckets;
+ private int[] schedule;
+ private String feedName;
+ // Transient fields
+ private transient CouchbaseCore core;
+ private transient Builder builder;
+ private transient DefaultCoreEnvironment env;
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return AsterixClusterProperties.INSTANCE.getClusterLocations();
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ // validate first
+ if (!configuration.containsKey(ExternalDataConstants.KEY_BUCKET)) {
+ throw new AsterixException("Unspecified bucket");
+ }
+ if (!configuration.containsKey(ExternalDataConstants.KEY_NODES)) {
+ throw new AsterixException("Unspecified Couchbase nodes");
+ }
+ if (configuration.containsKey(ExternalDataConstants.KEY_PASSWORD)) {
+ password = configuration.get(ExternalDataConstants.KEY_PASSWORD);
+ }
+ this.configuration = configuration;
+ bucket = configuration.get(ExternalDataConstants.KEY_BUCKET);
+ couchbaseNodes = configuration.get(ExternalDataConstants.KEY_NODES).split(",");
+ feedName = configuration.get(ExternalDataConstants.KEY_FEED_NAME);
+ builder = DefaultCoreEnvironment.builder().dcpEnabled(DCP_ENABLED)
+ .autoreleaseAfter(AUTO_RELEASE_AFTER_MILLISECONDS);
+ env = builder.build();
+ core = new CouchbaseCore(env);
+ getNumberOfVbuckets();
+ schedule();
+ }
+
+ /*
+ * We distribute the work of streaming vbuckets between all the partitions in a round robin fashion.
+ */
+ private void schedule() {
+ schedule = new int[numOfVBuckets];
+ String[] locations = AsterixClusterProperties.INSTANCE.getClusterLocations().getLocations();
+ for (int i = 0; i < numOfVBuckets; i++) {
+ schedule[i] = i % locations.length;
+ }
+ }
+
+ private void getNumberOfVbuckets() {
+ core.send(new SeedNodesRequest(couchbaseNodes)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+ core.send(new OpenBucketRequest(bucket, password)).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+ numOfVBuckets = core.<GetClusterConfigResponse> send(new GetClusterConfigRequest())
+ .map(new Func1<GetClusterConfigResponse, Integer>() {
+ @Override
+ public Integer call(GetClusterConfigResponse response) {
+ CouchbaseBucketConfig config = (CouchbaseBucketConfig) response.config().bucketConfig(bucket);
+ return config.numberOfPartitions();
+ }
+ }).timeout(TIMEOUT, TIME_UNIT).toBlocking().single();
+ core.send(new CloseBucketRequest(bucket)).toBlocking();
+ }
+
+ @Override
+ public IRecordReader<? extends RecordWithMetadata<char[]>> createRecordReader(IHyracksTaskContext ctx,
+ int partition) throws Exception {
+ String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
+ ArrayList<Short> listOfAssignedVBuckets = new ArrayList<Short>();
+ for (int i = 0; i < schedule.length; i++) {
+ if (schedule[i] == partition) {
+ listOfAssignedVBuckets.add((short) i);
+ }
+ }
+ short[] vbuckets = new short[listOfAssignedVBuckets.size()];
+ for (int i = 0; i < vbuckets.length; i++) {
+ vbuckets[i] = listOfAssignedVBuckets.get(i);
+ }
+ CouchbaseReader reader = new CouchbaseReader(feedName + ":" + nodeName + ":" + partition, bucket, password,
+ couchbaseNodes, vbuckets, ExternalDataUtils.getQueueSize(configuration));
+ reader.configure(configuration);
+ return reader;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<? extends RecordWithMetadata<char[]>> getRecordClass() {
+ RecordWithMetadata<char[]> record = new RecordWithMetadata<char[]>(char[].class);
+ return (Class<? extends RecordWithMetadata<char[]>>) record.getClass();
+ }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java
similarity index 95%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java
index 1b84e7a..0627660 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractCharRecordLookupReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractCharRecordLookupReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -69,9 +69,10 @@
reusableByteBuffer.put(value.getBytes(), 0, value.getLength());
reusableByteBuffer.flip();
while (reusableByteBuffer.hasRemaining()) {
- decoder.decode(reusableByteBuffer, reusableCharBuffer, false);
- record.append(reusableCharBuffer.array(), 0, reusableCharBuffer.position());
reusableCharBuffer.clear();
+ decoder.decode(reusableByteBuffer, reusableCharBuffer, false);
+ reusableCharBuffer.flip();
+ record.append(reusableCharBuffer);
}
record.endRecord();
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java
similarity index 98%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java
index 5a20962..28abddb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractHDFSLookupRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/AbstractHDFSLookupRecordReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
import java.io.FileNotFoundException;
import java.io.IOException;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
similarity index 92%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
index e9fad25..c302b9b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/HDFSLookupReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSLookupReaderFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.factory;
+package org.apache.asterix.external.input.record.reader.hdfs;
import java.util.Map;
@@ -24,9 +24,6 @@
import org.apache.asterix.external.api.ILookupReaderFactory;
import org.apache.asterix.external.api.ILookupRecordReader;
import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
-import org.apache.asterix.external.input.record.reader.RCLookupReader;
-import org.apache.asterix.external.input.record.reader.SequenceLookupReader;
-import org.apache.asterix.external.input.record.reader.TextLookupReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.hadoop.fs.FileSystem;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
similarity index 95%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
index d88f967..564d55a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSRecordReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
import java.io.IOException;
import java.util.List;
@@ -28,6 +28,7 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.input.record.reader.EmptyRecordReader;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -99,16 +100,6 @@
return record;
}
- @Override
- public Class<? extends Writable> getRecordClass() throws IOException {
- if (value == null) {
- if (!nextInputSplit()) {
- return null;
- }
- }
- return value.getClass();
- }
-
private boolean nextInputSplit() throws IOException {
for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
/**
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java
similarity index 99%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java
index ea851a5..90f1558 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/HDFSTextLineReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/HDFSTextLineReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
import java.io.IOException;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
similarity index 97%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
index 5c33502..95d76ba 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RCLookupReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/RCLookupReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
import java.io.IOException;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java
similarity index 97%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java
index c294ccb..23e647f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SequenceLookupReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/SequenceLookupReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
import java.io.IOException;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java
similarity index 96%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java
index b276bfa..2e1a11a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TextLookupReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/TextLookupReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.hdfs;
import java.io.IOException;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
similarity index 96%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
index 1c2dc30..13cd26a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/RSSRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.rss;
import java.io.IOException;
import java.net.MalformedURLException;
@@ -99,11 +99,6 @@
}
@Override
- public Class<SyndEntryImpl> getRecordClass() throws IOException {
- return SyndEntryImpl.class;
- }
-
- @Override
public boolean stop() {
done = true;
return true;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
similarity index 95%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
index a672f2f..bbe485c 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/RSSRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/rss/RSSRecordReaderFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.factory;
+package org.apache.asterix.external.input.record.reader.rss;
import java.util.ArrayList;
import java.util.List;
@@ -24,7 +24,6 @@
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.RSSRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
similarity index 88%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
index 93ba0a0..6225b82 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReader.java
@@ -16,15 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.stream;
import java.io.IOException;
import java.util.Map;
+import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.api.IIndexingDatasource;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.CharArrayRecord;
import org.apache.asterix.external.input.stream.AInputStream;
import org.apache.asterix.external.input.stream.AInputStreamReader;
@@ -57,11 +59,6 @@
}
@Override
- public Class<char[]> getRecordClass() {
- return char[].class;
- }
-
- @Override
public void configure(Map<String, String> configuration) throws Exception {
record = new CharArrayRecord();
inputBuffer = new char[ExternalDataConstants.DEFAULT_BUFFER_SIZE];
@@ -87,4 +84,9 @@
return false;
}
}
+
+ @Override
+ public void setController(IDataFlowController controller) {
+ reader.setController((AbstractFeedDataFlowController) controller);
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
similarity index 98%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
index c7acb1a..f02bd93 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/AbstractStreamRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AbstractStreamRecordReaderFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.stream;
import java.util.List;
import java.util.Map;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
similarity index 98%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
index 2b33d7a..f8f7cda 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.stream;
import java.io.IOException;
import java.util.Map;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
similarity index 85%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
index 05d419d..f0867d3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/LineRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/LineRecordReaderFactory.java
@@ -16,14 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.factory;
+package org.apache.asterix.external.input.record.reader.stream;
import java.util.Map;
import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.AbstractStreamRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.LineRecordReader;
-import org.apache.asterix.external.input.record.reader.QuotedLineRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.api.context.IHyracksTaskContext;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
similarity index 98%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 49e67e9..a8eb07b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/QuotedLineRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.stream;
import java.io.IOException;
import java.util.Map;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
similarity index 98%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 84c96d0..d469cb3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/SemiStructuredRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.stream;
import java.io.IOException;
import java.util.Map;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
similarity index 87%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
index 91b439c..ec8eac9 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/SemiStructuredRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReaderFactory.java
@@ -16,13 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.factory;
+package org.apache.asterix.external.input.record.reader.stream;
import java.util.Map;
import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.input.record.reader.AbstractStreamRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.SemiStructuredRecordReader;
import org.apache.hyracks.api.context.IHyracksTaskContext;
public class SemiStructuredRecordReaderFactory extends AbstractStreamRecordReaderFactory<char[]> {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
similarity index 94%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
index 34d8122..084d6d0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPullRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPullRecordReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.twitter;
import java.io.IOException;
import java.util.List;
@@ -94,13 +94,7 @@
}
@Override
- public Class<Status> getRecordClass() throws IOException {
- return Status.class;
- }
-
- @Override
public boolean stop() {
return false;
}
-
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
similarity index 95%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 3ce6a81..764ac1d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/TwitterPushRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.input.record.reader.twitter;
import java.io.IOException;
import java.util.Map;
@@ -80,11 +80,6 @@
}
@Override
- public Class<? extends Status> getRecordClass() throws IOException {
- return Status.class;
- }
-
- @Override
public boolean stop() {
try {
close();
@@ -128,5 +123,4 @@
public void onTrackLimitationNotice(int arg0) {
}
}
-
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
similarity index 96%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 6840c11..f38c2cb 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/factory/TwitterRecordReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.factory;
+package org.apache.asterix.external.input.record.reader.twitter;
import java.util.Map;
import java.util.logging.Level;
@@ -25,8 +25,6 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.TwitterPullRecordReader;
-import org.apache.asterix.external.input.record.reader.TwitterPushRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.TwitterUtil;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
index 73f6195..ce65249 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStream.java
@@ -20,9 +20,14 @@
import java.io.InputStream;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+
public abstract class AInputStream extends InputStream {
public abstract boolean skipError() throws Exception;
public abstract boolean stop() throws Exception;
+ public void setController(AbstractFeedDataFlowController controller) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
index 7ba6032..25418b0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AInputStreamReader.java
@@ -21,6 +21,8 @@
import java.io.IOException;
import java.io.InputStreamReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+
public class AInputStreamReader extends InputStreamReader {
private AInputStream in;
@@ -40,4 +42,8 @@
throw new IOException(e);
}
}
+
+ public void setController(AbstractFeedDataFlowController controller) {
+ in.setController(controller);
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
index 7eebe4c..7b7cd8b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFileSystemInputStream.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.nio.file.Path;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FileSystemWatcher;
@@ -39,6 +40,11 @@
}
@Override
+ public void setController(AbstractFeedDataFlowController controller) {
+ watcher.setController(controller);
+ }
+
+ @Override
public void close() throws IOException {
IOException ioe = null;
if (in != null) {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
index ab1f8a0..06833af 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamProviderFactory.java
@@ -28,7 +28,7 @@
import org.apache.asterix.external.api.IInputStreamProviderFactory;
import org.apache.asterix.external.api.INodeResolver;
import org.apache.asterix.external.api.INodeResolverFactory;
-import org.apache.asterix.external.input.stream.LocalFSInputStreamProvider;
+import org.apache.asterix.external.input.stream.provider.LocalFSInputStreamProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedUtils;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
index 37afa53..ea60f43 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketInputStreamProviderFactory.java
@@ -30,7 +30,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.IInputStreamProvider;
import org.apache.asterix.external.api.IInputStreamProviderFactory;
-import org.apache.asterix.external.input.stream.SocketInputStreamProvider;
+import org.apache.asterix.external.input.stream.provider.SocketInputStreamProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.util.AsterixRuntimeUtil;
import org.apache.commons.lang3.StringUtils;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
similarity index 95%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
index 8f4c094..93a1685 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/HDFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/HDFSInputStreamProvider.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.stream;
+package org.apache.asterix.external.input.stream.provider;
import java.io.IOException;
import java.util.List;
@@ -24,7 +24,8 @@
import org.apache.asterix.external.api.IInputStreamProvider;
import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.external.input.record.reader.HDFSRecordReader;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
+import org.apache.asterix.external.input.stream.AInputStream;
import org.apache.asterix.external.provider.ExternalIndexerProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hadoop.io.Text;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
similarity index 92%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
index 22d0a87..4c4edd3 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/LocalFSInputStreamProvider.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.stream;
+package org.apache.asterix.external.input.stream.provider;
import java.io.File;
import java.io.IOException;
@@ -24,6 +24,8 @@
import java.util.Map;
import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.LocalFileSystemInputStream;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.hyracks.api.context.IHyracksTaskContext;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
similarity index 86%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
index 1f920e9..2b12675 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/SocketInputStreamProvider.java
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.stream;
+package org.apache.asterix.external.input.stream.provider;
import java.net.ServerSocket;
import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
+import org.apache.asterix.external.input.stream.SocketInputStream;
public class SocketInputStreamProvider implements IInputStreamProvider {
private ServerSocket server;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
similarity index 97%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
index 7c64aa3..06f7e72 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStreamProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.stream;
+package org.apache.asterix.external.input.stream.provider;
import java.io.IOException;
import java.io.InputStream;
@@ -30,6 +30,7 @@
import java.util.logging.Logger;
import org.apache.asterix.external.api.IInputStreamProvider;
+import org.apache.asterix.external.input.stream.AInputStream;
import org.apache.asterix.external.util.TweetGenerator;
import org.apache.hyracks.api.context.IHyracksTaskContext;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
index 9e35617..2cab3a7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
@@ -80,7 +80,7 @@
try {
adapter = adapterFactory.createAdapter(ctx, partition,
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), snapshotAccessor, writer);
- //Open the file index accessor here
+ // Open the file index accessor here
snapshotAccessor.open();
indexOpen = true;
adapter.open();
@@ -127,6 +127,11 @@
throw new HyracksDataException(th);
}
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ adapter.flush();
+ }
};
}
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 80a54be..fa2e513 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -25,9 +25,9 @@
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.external.feed.api.IFeedManager;
-import org.apache.asterix.external.feed.api.ISubscribableRuntime;
import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feed.api.ISubscribableRuntime;
import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -183,6 +183,9 @@
inputSideHandler.nextFrame(null); // signal end of data
while (!inputSideHandler.isFinished()) {
synchronized (coreOperator) {
+ if (inputSideHandler.isFinished()) {
+ break;
+ }
coreOperator.wait();
}
}
@@ -192,8 +195,8 @@
}
coreOperator.close();
System.out.println("CLOSED " + coreOperator + " STALLED ?" + stalled + " ENDED " + end);
- } catch (Exception e) {
- e.printStackTrace();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
} finally {
if (!stalled) {
deregister();
@@ -221,4 +224,9 @@
}
}
+ @Override
+ public void flush() throws HyracksDataException {
+ inputSideHandler.flush();
+ }
+
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
index 4dae72d..b09504a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaNodePushable.java
@@ -181,4 +181,9 @@
}
}
+ @Override
+ public void flush() throws HyracksDataException {
+ inputSideHandler.flush();
+ }
+
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index f75b3eb..3c4c9ad 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -188,6 +188,9 @@
inputSideHandler.nextFrame(null); // signal end of data
while (!inputSideHandler.isFinished()) {
synchronized (coreOperator) {
+ if (inputSideHandler.isFinished()) {
+ break;
+ }
coreOperator.wait();
}
}
@@ -195,8 +198,7 @@
}
coreOperator.close();
} catch (Exception e) {
- e.printStackTrace();
- // ignore
+ throw new HyracksDataException(e);
} finally {
if (!stalled) {
deregister();
@@ -217,4 +219,9 @@
}
}
+ @Override
+ public void flush() throws HyracksDataException {
+ inputSideHandler.flush();
+ }
+
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
index 129b62f..14a3e2a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java
@@ -39,7 +39,6 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IStreamDataParser;
-import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.base.ANull;
@@ -67,7 +66,6 @@
private AdmLexer admLexer;
private ARecordType recordType;
- private boolean datasetRec;
private boolean isStreamParser = true;
private int nullableFieldId = 0;
@@ -142,7 +140,7 @@
public boolean parse(DataOutput out) throws AsterixException {
try {
resetPools();
- return parseAdmInstance(recordType, datasetRec, out);
+ return parseAdmInstance(recordType, out);
} catch (IOException e) {
throw new ParseException(e, filename, admLexer.getLine(), admLexer.getColumn());
} catch (AdmLexerException e) {
@@ -163,12 +161,6 @@
public void configure(Map<String, String> configuration, ARecordType recordType) throws IOException {
this.recordType = recordType;
this.configuration = configuration;
- String isDatasetRecordString = configuration.get(ExternalDataConstants.KEY_DATASET_RECORD);
- if (isDatasetRecordString == null) {
- this.datasetRec = true;
- } else {
- this.datasetRec = Boolean.parseBoolean(isDatasetRecordString);
- }
this.isStreamParser = ExternalDataUtils.isDataSourceStreamProvider(configuration);
if (!isStreamParser) {
this.admLexer = new AdmLexer();
@@ -180,7 +172,7 @@
try {
resetPools();
admLexer.setBuffer(record.get());
- parseAdmInstance(recordType, datasetRec, out);
+ parseAdmInstance(recordType, out);
} catch (IOException e) {
throw new ParseException(e, filename, admLexer.getLine(), admLexer.getColumn());
} catch (AdmLexerException e) {
@@ -201,18 +193,18 @@
admLexer = new AdmLexer(new java.io.InputStreamReader(in));
}
- protected boolean parseAdmInstance(IAType objectType, boolean datasetRec, DataOutput out)
+ protected boolean parseAdmInstance(IAType objectType, DataOutput out)
throws AsterixException, IOException, AdmLexerException {
int token = admLexer.next();
if (token == AdmLexer.TOKEN_EOF) {
return false;
} else {
- admFromLexerStream(token, objectType, out, datasetRec);
+ admFromLexerStream(token, objectType, out);
return true;
}
}
- private void admFromLexerStream(int token, IAType objectType, DataOutput out, Boolean datasetRec)
+ private void admFromLexerStream(int token, IAType objectType, DataOutput out)
throws AsterixException, IOException, AdmLexerException {
switch (token) {
@@ -441,7 +433,7 @@
case AdmLexer.TOKEN_START_RECORD: {
if (checkType(ATypeTag.RECORD, objectType)) {
objectType = getComplexType(objectType, ATypeTag.RECORD);
- parseRecord((ARecordType) objectType, out, datasetRec);
+ parseRecord((ARecordType) objectType, out);
} else {
throw new ParseException(mismatchErrorMessage + objectType.getTypeTag());
}
@@ -567,7 +559,7 @@
return getTargetTypeTag(expectedTypeTag, aObjectType) != null;
}
- private void parseRecord(ARecordType recType, DataOutput out, Boolean datasetRec)
+ private void parseRecord(ARecordType recType, DataOutput out)
throws IOException, AsterixException, AdmLexerException {
ArrayBackedValueStorage fieldValueBuffer = getTempBuffer();
@@ -575,16 +567,8 @@
IARecordBuilder recBuilder = getRecordBuilder();
BitSet nulls = null;
- if (datasetRec) {
-
- if (recType != null) {
- nulls = new BitSet(recType.getFieldNames().length);
- recBuilder.reset(recType);
- } else {
- recBuilder.reset(null);
- }
-
- } else if (recType != null) {
+ if (recType != null) {
+ //TODO: use BitSet Pool
nulls = new BitSet(recType.getFieldNames().length);
recBuilder.reset(recType);
} else {
@@ -650,7 +634,7 @@
}
token = admLexer.next();
- this.admFromLexerStream(token, fieldType, fieldValueBuffer.getDataOutput(), false);
+ this.admFromLexerStream(token, fieldType, fieldValueBuffer.getDataOutput());
if (openRecordField) {
if (fieldValueBuffer.getByteArray()[0] != ATypeTag.NULL.serialize()) {
recBuilder.addField(fieldNameBuffer, fieldValueBuffer);
@@ -752,7 +736,7 @@
expectingListItem = false;
itemBuffer.reset();
- admFromLexerStream(token, itemType, itemBuffer.getDataOutput(), false);
+ admFromLexerStream(token, itemType, itemBuffer.getDataOutput());
orderedListBuilder.addItem(itemBuffer);
}
first = false;
@@ -799,7 +783,7 @@
} else {
expectingListItem = false;
itemBuffer.reset();
- admFromLexerStream(token, itemType, itemBuffer.getDataOutput(), false);
+ admFromLexerStream(token, itemType, itemBuffer.getDataOutput());
unorderedListBuilder.addItem(itemBuffer);
}
first = false;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
index 6c399c3..2f0fc86 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DelimitedDataParser.java
@@ -28,6 +28,7 @@
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.dataflow.data.nontagged.serde.ANullSerializerDeserializer;
+import org.apache.asterix.external.api.IDataParser;
import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
@@ -124,18 +125,6 @@
}
}
- protected void fieldNameToBytes(String fieldName, AMutableString str, ArrayBackedValueStorage buffer)
- throws HyracksDataException {
- buffer.reset();
- DataOutput out = buffer.getDataOutput();
- str.setValue(fieldName);
- try {
- stringSerde.serialize(str, out);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
@Override
public DataSourceType getDataSourceType() {
return isStreamParser ? DataSourceType.STREAM : DataSourceType.RECORDS;
@@ -173,7 +162,8 @@
throw new HyracksDataException("Illegal field " + name + " in closed type " + recordType);
} else {
nameBuffers[i] = new ArrayBackedValueStorage();
- fieldNameToBytes(name, str, nameBuffers[i]);
+ str.setValue(name);
+ IDataParser.toBytes(str, nameBuffers[i], stringSerde);
}
}
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
new file mode 100644
index 0000000..ecdb03d
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/RecordWithMetadataParser.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.external.api.IDataParser;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.om.base.AMutableString;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class RecordWithMetadataParser<T> implements IRecordDataParser<RecordWithMetadata<T>> {
+
+ private final Class<? extends RecordWithMetadata<T>> clazz;
+ private final int[] metaIndexes;
+ private final int valueIndex;
+ private ARecordType recordType;
+ private IRecordDataParser<T> valueParser;
+ private RecordBuilder recBuilder;
+ private ArrayBackedValueStorage[] nameBuffers;
+ private int numberOfFields;
+ private ArrayBackedValueStorage valueBuffer = new ArrayBackedValueStorage();
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+
+ public RecordWithMetadataParser(Class<? extends RecordWithMetadata<T>> clazz, int[] metaIndexes,
+ IRecordDataParser<T> valueParser, int valueIndex) {
+ this.clazz = clazz;
+ this.metaIndexes = metaIndexes;
+ this.valueParser = valueParser;
+ this.valueIndex = valueIndex;
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType recordType)
+ throws HyracksDataException, IOException {
+ this.recordType = recordType;
+ this.numberOfFields = recordType.getFieldNames().length;
+ recBuilder = new RecordBuilder();
+ recBuilder.reset(recordType);
+ recBuilder.init();
+ nameBuffers = new ArrayBackedValueStorage[numberOfFields];
+ AMutableString str = new AMutableString(null);
+ for (int i = 0; i < numberOfFields; i++) {
+ String name = recordType.getFieldNames()[i];
+ nameBuffers[i] = new ArrayBackedValueStorage();
+ str.setValue(name);
+ IDataParser.toBytes(str, nameBuffers[i], stringSerde);
+ }
+ }
+
+ @Override
+ public Class<? extends RecordWithMetadata<T>> getRecordClass() {
+ return clazz;
+ }
+
+ @Override
+ public void parse(IRawRecord<? extends RecordWithMetadata<T>> record, DataOutput out) throws Exception {
+ recBuilder.reset(recordType);
+ valueBuffer.reset();
+ recBuilder.init();
+ RecordWithMetadata<T> rwm = record.get();
+ for (int i = 0; i < numberOfFields; i++) {
+ if (i == valueIndex) {
+ valueParser.parse(rwm.getRecord(), valueBuffer.getDataOutput());
+ recBuilder.addField(i, valueBuffer);
+ } else {
+ recBuilder.addField(i, rwm.getMetadata(metaIndexes[i]));
+ }
+ }
+ recBuilder.write(out, true);
+ }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
new file mode 100644
index 0000000..88a0683
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/RecordWithMetadataParserFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IRecordDataParserFactory;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
+import org.apache.asterix.external.parser.RecordWithMetadataParser;
+import org.apache.asterix.external.provider.ParserFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class RecordWithMetadataParserFactory<T> implements IRecordDataParserFactory<RecordWithMetadata<T>> {
+
+ private static final long serialVersionUID = 1L;
+ private Class<? extends RecordWithMetadata<T>> recordClass;
+ private ARecordType recordType;
+ private int[] metaIndexes;
+ private IRecordDataParserFactory<T> valueParserFactory;
+ private int valueIndex;
+
+ @Override
+ public DataSourceType getDataSourceType() throws AsterixException {
+ return DataSourceType.RECORDS;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Map<String, String> configuration) throws Exception {
+ // validation first
+ if (!configuration.containsKey(ExternalDataConstants.KEY_META_INDEXES)) {
+ throw new HyracksDataException(
+ "the parser parameter (" + ExternalDataConstants.KEY_META_INDEXES + ") is missing");
+ }
+ if (!configuration.containsKey(ExternalDataConstants.KEY_VALUE_INDEX)) {
+ throw new HyracksDataException(
+ "the parser parameter (" + ExternalDataConstants.KEY_VALUE_INDEX + ") is missing");
+ }
+ if (!configuration.containsKey(ExternalDataConstants.KEY_VALUE_FORMAT)) {
+ throw new HyracksDataException(
+ "the parser parameter (" + ExternalDataConstants.KEY_VALUE_FORMAT + ") is missing");
+ }
+ // get meta field indexes
+ String[] stringMetaIndexes = configuration.get(ExternalDataConstants.KEY_META_INDEXES).split(",");
+ metaIndexes = new int[stringMetaIndexes.length];
+ for (int i = 0; i < stringMetaIndexes.length; i++) {
+ metaIndexes[i] = Integer.parseInt(stringMetaIndexes[i].trim());
+ }
+ // get value index
+ valueIndex = Integer.parseInt(configuration.get(ExternalDataConstants.KEY_VALUE_INDEX).trim());
+ // get value format
+ configuration.put(ExternalDataConstants.KEY_DATA_PARSER,
+ configuration.get(ExternalDataConstants.KEY_VALUE_FORMAT));
+ valueParserFactory = (IRecordDataParserFactory<T>) ParserFactoryProvider.getDataParserFactory(configuration);
+ valueParserFactory.setRecordType((ARecordType) recordType.getFieldTypes()[valueIndex]);
+ valueParserFactory.configure(configuration);
+ recordClass = (Class<? extends RecordWithMetadata<T>>) (new RecordWithMetadata<T>(
+ valueParserFactory.getRecordClass())).getClass();
+ }
+
+ @Override
+ public void setRecordType(ARecordType recordType) {
+ this.recordType = recordType;
+ }
+
+ @Override
+ public IRecordDataParser<RecordWithMetadata<T>> createRecordParser(IHyracksTaskContext ctx)
+ throws HyracksDataException, AsterixException, IOException {
+ IRecordDataParser<T> valueParser = valueParserFactory.createRecordParser(ctx);
+ return new RecordWithMetadataParser<T>(recordClass, metaIndexes, valueParser, valueIndex);
+ }
+
+ @Override
+ public Class<? extends RecordWithMetadata<T>> getRecordClass() {
+ return recordClass;
+ }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index a7ab062..745c653 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -25,9 +25,10 @@
import org.apache.asterix.external.api.IInputStreamProviderFactory;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
-import org.apache.asterix.external.input.record.reader.factory.LineRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.factory.SemiStructuredRecordReaderFactory;
-import org.apache.asterix.external.input.record.reader.factory.TwitterRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.couchbase.CouchbaseReaderFactory;
+import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory;
+import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory;
import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory;
import org.apache.asterix.external.input.stream.factory.SocketInputStreamProviderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -97,8 +98,11 @@
case ExternalDataConstants.READER_TWITTER_PUSH:
readerFactory = new TwitterRecordReaderFactory();
break;
+ case ExternalDataConstants.READER_COUCHBASE:
+ readerFactory = new CouchbaseReaderFactory();
+ break;
default:
- throw new AsterixException("unknown record reader factory");
+ throw new AsterixException("unknown record reader factory: " + reader);
}
}
return readerFactory;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java
similarity index 92%
rename from asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java
index 3a82a68..18b9cb5 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/LookupReaderFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/LookupReaderFactoryProvider.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader;
+package org.apache.asterix.external.provider;
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.ILookupReaderFactory;
-import org.apache.asterix.external.input.record.reader.factory.HDFSLookupReaderFactory;
+import org.apache.asterix.external.input.record.reader.hdfs.HDFSLookupReaderFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.HDFSUtils;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index f5a0512..30595db 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -22,10 +22,12 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.IDataParserFactory;
+import org.apache.asterix.external.input.record.RecordWithMetadata;
import org.apache.asterix.external.parser.factory.ADMDataParserFactory;
import org.apache.asterix.external.parser.factory.DelimitedDataParserFactory;
import org.apache.asterix.external.parser.factory.HiveDataParserFactory;
import org.apache.asterix.external.parser.factory.RSSParserFactory;
+import org.apache.asterix.external.parser.factory.RecordWithMetadataParserFactory;
import org.apache.asterix.external.parser.factory.TweetParserFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -39,13 +41,12 @@
return ExternalDataUtils.createExternalParserFactory(ExternalDataUtils.getDataverse(configuration),
parserFactoryName);
} else {
- parserFactory = ParserFactoryProvider.getParserFactory(configuration);
+ parserFactory = ParserFactoryProvider.getParserFactory(ExternalDataUtils.getRecordFormat(configuration));
}
return parserFactory;
}
- private static IDataParserFactory getParserFactory(Map<String, String> configuration) throws AsterixException {
- String recordFormat = ExternalDataUtils.getRecordFormat(configuration);
+ private static IDataParserFactory getParserFactory(String recordFormat) throws AsterixException {
switch (recordFormat) {
case ExternalDataConstants.FORMAT_ADM:
case ExternalDataConstants.FORMAT_JSON:
@@ -58,6 +59,8 @@
return new TweetParserFactory();
case ExternalDataConstants.FORMAT_RSS:
return new RSSParserFactory();
+ case ExternalDataConstants.FORMAT_RECORD_WITH_META:
+ return new RecordWithMetadataParserFactory<RecordWithMetadata<?>>();
default:
throw new AsterixException("Unknown data format");
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index 90c74e1..27a1d0e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -36,7 +36,7 @@
public static void addTupleToFrame(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
throws HyracksDataException {
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- appender.flush(writer, true);
+ appender.write(writer, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
throw new HyracksDataException("Tuple is too large for a frame");
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index fb2688f..3d7f60b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -40,9 +40,13 @@
public static final String KEY_HDFS_URL = "hdfs";
// specify the path when reading from a file system
public static final String KEY_PATH = "path";
+ // specify the hdfs input format when reading data from HDFS
public static final String KEY_INPUT_FORMAT = "input-format";
+ // specifies the filesystem (localfs or hdfs) when using a filesystem data source
public static final String KEY_FILESYSTEM = "fs";
+ // specifies the address of the hdfs name node
public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
+ // specifies the class implementation of the accessed instance of HDFS
public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
public static final String KEY_HADOOP_INPUT_FORMAT = "mapred.input.format.class";
@@ -73,6 +77,20 @@
public static final String KEY_IS_FEED = "is-feed";
public static final String KEY_WAIT_FOR_DATA = "wait-for-data";
public static final String KEY_FEED_NAME = "feed";
+ // a string representing external bucket name
+ public static final String KEY_BUCKET = "bucket";
+ // a comma delimited list of nodes
+ public static final String KEY_NODES = "nodes";
+ // a string representing the password used to authenticate with the external data source
+ public static final String KEY_PASSWORD = "password";
+ // an integer representing the number of raw records that can be bufferred in the parsing queue
+ public static final String KEY_QUEUE_SIZE = "queue-size";
+ // a comma delimited integers representing the indexes of the meta fields in the raw record (i,e: "3,1,0,2" denotes that the first meta field is in index 3 in the actual record)
+ public static final String KEY_META_INDEXES = "meta-indexes";
+ // an integer representing the index of the value field in the data type
+ public static final String KEY_VALUE_INDEX = "value-index";
+ // a string representing the format of the raw record in the value field in the data type
+ public static final String KEY_VALUE_FORMAT = "value-format";
/**
* HDFS class names
*/
@@ -95,6 +113,7 @@
*/
public static final String READER_HDFS = "hdfs";
public static final String READER_ADM = "adm";
+ public static final String READER_COUCHBASE = "couchbase";
public static final String READER_SEMISTRUCTURED = "semi-structured";
public static final String READER_DELIMITED = "delimited-text";
public static final String READER_TWITTER_PUSH = "twitter-push";
@@ -120,6 +139,7 @@
public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
public static final String FORMAT_TWEET = "tweet";
public static final String FORMAT_RSS = "rss";
+ public static final String FORMAT_RECORD_WITH_META = "record-with-meta";
/**
* input streams
@@ -178,6 +198,7 @@
*/
public static final int DEFAULT_BUFFER_SIZE = 4096;
public static final int DEFAULT_BUFFER_INCREMENT = 4096;
+ public static final int DEFAULT_QUEUE_SIZE = 64;
/**
* Expected parameter values
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index c9be872..7c03e4d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -171,15 +171,19 @@
if (tag == null) {
throw new NotImplementedException("Failed to get the type information for field " + i + ".");
}
- IValueParserFactory vpf = valueParserFactoryMap.get(tag);
- if (vpf == null) {
- throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
- }
- fieldParserFactories[i] = vpf;
+ fieldParserFactories[i] = getParserFactory(tag);
}
return fieldParserFactories;
}
+ public static IValueParserFactory getParserFactory(ATypeTag tag) {
+ IValueParserFactory vpf = valueParserFactoryMap.get(tag);
+ if (vpf == null) {
+ throw new NotImplementedException("No value parser factory for fields of type " + tag);
+ }
+ return vpf;
+ }
+
public static String getRecordReaderStreamName(Map<String, String> configuration) {
return configuration.get(ExternalDataConstants.KEY_READER_STREAM);
}
@@ -254,4 +258,10 @@
public static String getFeedName(Map<String, String> configuration) {
return configuration.get(ExternalDataConstants.KEY_FEED_NAME);
}
+
+ public static int getQueueSize(Map<String, String> configuration) {
+ return configuration.containsKey(ExternalDataConstants.KEY_QUEUE_SIZE)
+ ? Integer.parseInt(configuration.get(ExternalDataConstants.KEY_QUEUE_SIZE))
+ : ExternalDataConstants.DEFAULT_QUEUE_SIZE;
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
index 4bb9d92..631eef4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java
@@ -34,6 +34,8 @@
import java.util.Iterator;
import java.util.LinkedList;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -50,6 +52,7 @@
private final boolean isFeed;
private boolean done;
private File current;
+ private AbstractFeedDataFlowController controller;
public FileSystemWatcher(FeedLogManager logManager, Path inputResource, String expression, boolean isFeed)
throws IOException {
@@ -199,7 +202,7 @@
return false;
}
- public boolean hasNext() {
+ public boolean hasNext() throws HyracksDataException {
if (it.hasNext()) {
return true;
}
@@ -218,6 +221,9 @@
key = watcher.poll();
}
// No file was found, wait for the filesystem to push events
+ if (controller != null) {
+ controller.flush();
+ }
while (files.isEmpty()) {
try {
key = watcher.take();
@@ -241,4 +247,8 @@
it = files.iterator();
return it.hasNext();
}
+
+ public void setController(AbstractFeedDataFlowController controller) {
+ this.controller = controller;
+ }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index de6737a..7ac0428 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -28,7 +28,7 @@
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingScheduler;
import org.apache.asterix.external.indexing.RecordId.RecordIdType;
-import org.apache.asterix.external.input.stream.HDFSInputStreamProvider;
+import org.apache.asterix.external.input.stream.provider.HDFSInputStreamProvider;
import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.hadoop.fs.BlockLocation;