1) Introduced the notion of typed/generic adapters. A typed adapter returns ADM data corresponding to a fixed type (open or close) that is known to the adapter. Example of this kind include TwitterAdapter, RSSFeedAdapter etc. In contrast, a 'generic' adapter may return ADM data of a type that depends upon the configuration arguments(the source they are fetching from).
Example of this kind include FileFeedAdapter, NCFileSystemAdapter, HDFSAdapter etc.
2) Refactored code so that each adapter implementation does not have to redefine the (common) logic for fault tolerance.
3) Pull based adapters (such as Twitter,RSSFeed) now construct ARecords directly (care is taken to avoid object construction by re-using mutable record instances). This is different from the earlier approach of constructing an input stream and (re)parsing data.
4) Deleted some classes not required anymore because of (2)
5) Modified adapter construction to happen through corresponding factory.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_feeds@420 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/AbstractDataListeningProperty.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/AbstractDataListeningProperty.java
deleted file mode 100644
index 88ce33c..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/AbstractDataListeningProperty.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.datasource.data.listener;
-
-/**
- * A push-based datasource adapter allows registering a IDataListener instance.
- * Data listening property defines when data is pushed to a IDataListener.
- */
-
-public abstract class AbstractDataListeningProperty {
-
- /**
- * COUNT_BASED: Data is pushed to a data listener only if the count of
- * records exceeds the configured threshold value. TIME_BASED: Data is
- * pushed to a data listener in a periodic manner at the end of each time
- * interval.
- */
- public enum listeningPropertyType {
- COUNT_BASED,
- TIME_BASED
- }
-
- public abstract listeningPropertyType getListeningPropretyType();
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/CountBasedDataListeningProperty.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/CountBasedDataListeningProperty.java
deleted file mode 100644
index 0eb42dc..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/CountBasedDataListeningProperty.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.datasource.data.listener;
-
-/**
- * A data listening property chosen by a data listener when it wants data to be
- * pushed when the count of records collected by the adapter exceeds a confiured
- * count value.
- */
-public class CountBasedDataListeningProperty extends AbstractDataListeningProperty {
-
- int numTuples;
-
- public int getNumTuples() {
- return numTuples;
- }
-
- public void setNumTuples(int numTuples) {
- this.numTuples = numTuples;
- }
-
- public CountBasedDataListeningProperty(int numTuples) {
- this.numTuples = numTuples;
- }
-
- @Override
- public listeningPropertyType getListeningPropretyType() {
- return listeningPropertyType.COUNT_BASED;
- }
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/IDataListener.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/IDataListener.java
deleted file mode 100644
index 269f060..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/IDataListener.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.datasource.data.listener;
-
-import java.nio.ByteBuffer;
-
-/**
- * An interface providing a call back API for a subscriber interested in data
- * received from an external data source via the datasource adapter.
- */
-public interface IDataListener {
-
- /**
- * This method is a call back API and is invoked by an instance of
- * IPushBasedDatasourceReadAdapter. The caller passes a frame containing new
- * data. The protocol as to when the caller shall invoke this method is
- * decided by the configured @see DataListenerProperty .
- *
- * @param aObjects
- */
-
- public void dataReceived(ByteBuffer frame);
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/TimeBasedDataListeningProperty.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/TimeBasedDataListeningProperty.java
deleted file mode 100644
index a11cf94..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/TimeBasedDataListeningProperty.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.datasource.data.listener;
-
-/**
- * A data listening property chosen by a data listener when it needs data to be
- * pushed in a periodic manner with a configured time-interval.
- */
-public class TimeBasedDataListeningProperty extends AbstractDataListeningProperty {
-
- // time interval in secs
- int interval;
-
- public int getInteval() {
- return interval;
- }
-
- public void setInterval(int interval) {
- this.interval = interval;
- }
-
- public TimeBasedDataListeningProperty(int interval) {
- this.interval = interval;
- }
-
- @Override
- public listeningPropertyType getListeningPropretyType() {
- return listeningPropertyType.TIME_BASED;
- }
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
new file mode 100644
index 0000000..abc3973
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter;
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+
+public class CNNFeedAdapterFactory implements ITypedDatasourceAdapterFactory {
+
+ @Override
+ public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+ CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter();
+ cnnFeedAdapter.configure(configuration);
+ return cnnFeedAdapter;
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.TYPED;
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
new file mode 100644
index 0000000..8468e31
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.om.types.IAType;
+
+public class HDFSAdapterFactory implements IGenericDatasourceAdapterFactory {
+
+
+ @Override
+ public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
+ HDFSAdapter hdfsAdapter = new HDFSAdapter(atype);
+ hdfsAdapter.configure(configuration);
+ return hdfsAdapter;
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.GENERIC;
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
new file mode 100644
index 0000000..0c394bb
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.om.types.IAType;
+
+public class HiveAdapterFactory implements IGenericDatasourceAdapterFactory {
+
+ @Override
+ public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
+ HiveAdapter hiveAdapter = new HiveAdapter(type);
+ return hiveAdapter;
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.GENERIC;
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IDatasourceAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IDatasourceAdapterFactory.java
new file mode 100644
index 0000000..08ff8a6
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IDatasourceAdapterFactory.java
@@ -0,0 +1,12 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+
+public interface IDatasourceAdapterFactory {
+
+ public enum AdapterType {
+ TYPED,
+ GENERIC
+ }
+
+ public AdapterType getAdapterType();
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasourceAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasourceAdapterFactory.java
new file mode 100644
index 0000000..d37fcc4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasourceAdapterFactory.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.om.types.IAType;
+
+public interface IGenericDatasourceAdapterFactory extends IDatasourceAdapterFactory{
+
+ public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception;
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasourceAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasourceAdapterFactory.java
new file mode 100644
index 0000000..e577aa9
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasourceAdapterFactory.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+
+public interface ITypedDatasourceAdapterFactory extends IDatasourceAdapterFactory {
+
+ public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
new file mode 100644
index 0000000..43863b0
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter;
+import edu.uci.ics.asterix.om.types.IAType;
+
+public class NCFileSystemAdapterFactory implements IGenericDatasourceAdapterFactory {
+
+ @Override
+ public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
+ NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(atype);
+ fsAdapter.configure(configuration);
+ return fsAdapter;
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.GENERIC;
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
new file mode 100644
index 0000000..f03fa16
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.external.dataset.adapter.PullBasedTwitterAdapter;
+
+public class PullBasedTwitterAdapterFactory implements ITypedDatasourceAdapterFactory {
+
+ @Override
+ public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+ PullBasedTwitterAdapter twitterAdapter = new PullBasedTwitterAdapter();
+ twitterAdapter.configure(configuration);
+ return twitterAdapter;
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.TYPED;
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
new file mode 100644
index 0000000..b7786cd
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
+
+public class RSSFeedAdapterFactory implements ITypedDatasourceAdapterFactory {
+
+ @Override
+ public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+ RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter();
+ rssFeedAdapter.configure(configuration);
+ return rssFeedAdapter;
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.TYPED;
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceReadAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceReadAdapter.java
deleted file mode 100644
index 737dde0..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceReadAdapter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.adapter.api;
-
-import edu.uci.ics.asterix.external.data.parser.IDataParser;
-
-public interface IDatasourceReadAdapter extends IDatasourceAdapter {
-
- /**
- * Retrieves data from an external datasource, packs it in frames and uses a
- * frame writer to flush the frames to a recipient operator.
- *
- * @param partition
- * Multiple instances of the adapter can be configured to
- * retrieve data in parallel. Partition is an integer between 0
- * to N-1 where N is the number of parallel adapter instances.
- * The partition value helps configure a particular instance of
- * the adapter to fetch data.
- * @param writer
- * An instance of IFrameWriter that is used to flush frames to
- * the recipient operator
- * @throws Exception
- */
-
- public IDataParser getDataParser(int partition) throws Exception;
-
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceWriteAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceWriteAdapter.java
deleted file mode 100644
index 3cd1464..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceWriteAdapter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.adapter.api;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import edu.uci.ics.asterix.om.types.IAType;
-
-public interface IDatasourceWriteAdapter extends IDatasourceAdapter {
-
- /**
- * Flushes tuples contained in the frame to the dataset stored in an
- * external data source. If required, the content of the frame is converted
- * into an appropriate format as required by the external data source.
- *
- * @caller This method is invoked by the wrapping ASTERIX operator when data
- * needs to be written to the external data source.
- * @param sourceAType
- * The type associated with the data that is required to be
- * written
- * @param frame
- * the frame that needs to be flushed
- * @param datasourceSpecificParams
- * A map containing other parameters that are specific to the
- * target data source where data is to be written. For example
- * when writing to a data source such as HDFS, an optional
- * parameter is the replication factor.
- * @throws Exception
- */
- public void flush(IAType sourceAType, ByteBuffer frame, Map<String, String> datasourceSpecificParams)
- throws Exception;
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
index 332b48b..763a431 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -16,8 +16,9 @@
import java.util.Map;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.adapter.factory.IDatasourceAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasourceAdapterFactory;
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
@@ -34,16 +35,16 @@
public class ExternalDataScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private final String adapter;
+ private final String adapterFactory;
private final Map<String, String> adapterConfiguration;
private final IAType atype;
- private IDatasourceReadAdapter datasourceReadAdapter;
+ private IDatasourceAdapterFactory datasourceAdapterFactory;
public ExternalDataScanOperatorDescriptor(JobSpecification spec, String adapter, Map<String, String> arguments,
IAType atype, RecordDescriptor rDesc) {
super(spec, 0, 1);
recordDescriptors[0] = rDesc;
- this.adapter = adapter;
+ this.adapterFactory = adapter;
this.adapterConfiguration = arguments;
this.atype = atype;
}
@@ -51,7 +52,7 @@
@Override
public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
ICCApplicationContext appCtx) {
-
+
/*
Comment: The following code is commented out. This is because constraints are being set at compile time so that they can
be propagated to upstream Asterix operators. Hyracks has to provide a way to propagate constraints to upstream operators.
@@ -81,14 +82,11 @@
}
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
-
try {
- //datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(adapter).newInstance();
- datasourceReadAdapter.configure(adapterConfiguration, atype);
- datasourceReadAdapter.initialize(ctx);
+ datasourceAdapterFactory = (IDatasourceAdapterFactory) Class.forName(adapterFactory).newInstance();
} catch (Exception e) {
throw new HyracksDataException("initialization of adapter failed", e);
}
@@ -96,8 +94,14 @@
@Override
public void initialize() throws HyracksDataException {
writer.open();
+ IDatasourceAdapter adapter = null;
try {
- datasourceReadAdapter.getDataParser(partition).parse(writer);
+ if (datasourceAdapterFactory.getAdapterType().equals(IDatasourceAdapterFactory.AdapterType.GENERIC)) {
+ adapter = ((IGenericDatasourceAdapterFactory) datasourceAdapterFactory).createAdapter(
+ adapterConfiguration, atype);
+ }
+ adapter.initialize(ctx);
+ adapter.start(partition, writer);
} catch (Exception e) {
throw new HyracksDataException("exception during reading from external data source", e);
} finally {
@@ -106,8 +110,5 @@
}
};
}
-
- public void setDatasourceAdapter(IDatasourceReadAdapter adapterInstance){
- this.datasourceReadAdapter = adapterInstance;
- }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ADMStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ADMStreamParser.java
deleted file mode 100644
index 658645c..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ADMStreamParser.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import java.util.Map;
-
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public class ADMStreamParser extends AbstractStreamDataParser {
-
- public ADMStreamParser() {
- }
-
- @Override
- public void initialize(ARecordType atype, IHyracksTaskContext ctx) {
- tupleParser = new AdmSchemafullRecordParserFactory(atype).createTupleParser(ctx);
- }
-
- @Override
- public void parse(IFrameWriter writer) throws HyracksDataException {
- tupleParser.parse(inputStream, writer);
- }
-
- @Override
- public void configure(Map<String, String> configuration) {
- }
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/AbstractStreamDataParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/AbstractStreamDataParser.java
deleted file mode 100644
index 403f197..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/AbstractStreamDataParser.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import java.io.InputStream;
-import java.util.HashMap;
-
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-
-public abstract class AbstractStreamDataParser implements IDataStreamParser {
-
- public static final String KEY_DELIMITER = "delimiter";
-
- protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
-
- static {
- typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
- }
-
- protected ITupleParser tupleParser;
- protected IFrameWriter frameWriter;
- protected InputStream inputStream;
-
- @Override
- public abstract void initialize(ARecordType recordType, IHyracksTaskContext ctx);
-
- @Override
- public abstract void parse(IFrameWriter frameWriter) throws HyracksDataException;
-
- @Override
- public void setInputStream(InputStream in) {
- inputStream = in;
-
- }
-
- @Override
- public InputStream getInputStream() {
- return inputStream;
- }
-
-
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/DelimitedDataStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/DelimitedDataStreamParser.java
deleted file mode 100644
index 9efafa6..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/DelimitedDataStreamParser.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import java.util.Map;
-
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-
-public class DelimitedDataStreamParser extends AbstractStreamDataParser {
-
- protected Character delimiter = defaultDelimiter;
-
- protected static final Character defaultDelimiter = new Character('\n');
-
- public Character getDelimiter() {
- return delimiter;
- }
-
- public DelimitedDataStreamParser(Character delimiter) {
- this.delimiter = delimiter;
- }
-
- public DelimitedDataStreamParser() {
- }
-
- @Override
- public void initialize(ARecordType recordType, IHyracksTaskContext ctx) {
- int n = recordType.getFieldTypes().length;
- IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
- for (int i = 0; i < n; i++) {
- ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
- IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
- if (vpf == null) {
- throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
- }
- fieldParserFactories[i] = vpf;
- }
- tupleParser = new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories, delimiter)
- .createTupleParser(ctx);
- }
-
- @Override
- public void parse(IFrameWriter writer) throws HyracksDataException {
- tupleParser.parse(inputStream, writer);
- }
-
- @Override
- public void configure(Map<String, String> configuration) {
- String delimiterArg = configuration.get(KEY_DELIMITER);
- if (delimiterArg != null) {
- delimiter = delimiterArg.charAt(0);
- } else {
- delimiter = '\n';
- }
- }
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataParser.java
deleted file mode 100644
index eb7daf7..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataParser.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import java.util.Map;
-
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Represents an parser that processes an input stream to form records of a
- * given type. The parser creates frames that are flushed using a frame writer.
- */
-public interface IDataParser {
-
- /**
- * @param atype
- * The record type associated with each record output by the
- * parser
- * @param configuration
- * Any configuration parameters for the parser
- */
- public void configure(Map<String, String> configuration);
-
- /**
- * Initializes the instance. An implementation may use the passed-in
- * configuration parameters, the output record type to initialize itself so
- * that it can parse an input stream to form records of the given type.
- *
- * @param configuration
- * Any configuration parameters for the parser
- * @param ctx
- * The runtime HyracksStageletContext.
- */
- public void initialize(ARecordType recordType, IHyracksTaskContext ctx);
-
- /**
- * Parses the input stream to produce records of the configured type and
- * uses the frame writer instance to flush frames containing the produced
- * records.
- *
- * @param in
- * The source input stream
- * @param frameWriter
- * A frame writer instance that is used for flushing frames to
- * the recipient operator
- * @throws HyracksDataException
- */
- public void parse(IFrameWriter frameWriter) throws HyracksDataException;
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataStreamParser.java
deleted file mode 100644
index 1425707..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataStreamParser.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import java.io.InputStream;
-
-public interface IDataStreamParser extends IDataParser {
-
- public void setInputStream(InputStream in);
-
- public InputStream getInputStream();
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedDataParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedDataParser.java
deleted file mode 100644
index 7c9bb7d..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedDataParser.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
-
-public interface IManagedDataParser extends IDataParser {
-
- public IManagedTupleParser getManagedTupleParser();
-
- public void setAdapter(IManagedFeedAdapter adapter);
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedTupleParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedTupleParser.java
deleted file mode 100644
index 14f6372..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedTupleParser.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import java.util.Map;
-
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-
-public interface IManagedTupleParser extends ITupleParser {
-
- public void suspend() throws Exception;
-
- public void resume() throws Exception;
-
- public void stop() throws Exception;
-
- public void alter(Map<String,String> alterParams) throws Exception;
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmRecordParserFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmRecordParserFactory.java
deleted file mode 100644
index 3d31489..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmRecordParserFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-
-public class ManagedAdmRecordParserFactory extends AdmSchemafullRecordParserFactory {
-
- private final IManagedFeedAdapter adapter;
-
- public ManagedAdmRecordParserFactory(ARecordType recType, IManagedFeedAdapter adapter) {
- super(recType);
- this.adapter = adapter;
- }
-
- @Override
- public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
- return new ManagedAdmTupleParser(ctx, recType, adapter);
- }
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmStreamParser.java
deleted file mode 100644
index bfe9fe0..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmStreamParser.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import java.util.Map;
-
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-
-public class ManagedAdmStreamParser extends ADMStreamParser implements IManagedDataParser{
-
- private IManagedFeedAdapter adapter;
-
- @Override
- public void initialize(ARecordType atype, IHyracksTaskContext ctx) {
- tupleParser = new ManagedAdmRecordParserFactory(atype, adapter).createTupleParser(ctx);
- }
-
- @Override
- public void configure(Map<String, String> configuration) {
-
- }
-
- @Override
- public IManagedTupleParser getManagedTupleParser() {
- return (IManagedTupleParser)tupleParser;
- }
-
- @Override
- public void setAdapter(IManagedFeedAdapter adapter) {
- this.adapter = adapter;
- }
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmTupleParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmTupleParser.java
deleted file mode 100644
index b7215a3..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmTupleParser.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import edu.uci.ics.asterix.adm.parser.nontagged.AdmLexer;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter.OperationState;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.runtime.operators.file.AdmTupleParser;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-
-public class ManagedAdmTupleParser extends AdmTupleParser implements IManagedTupleParser {
-
- private OperationState state;
- private List<OperationState> nextState;
- private final IManagedFeedAdapter adapter;
- private long tupleInterval;
-
- public static final String TUPLE_INTERVAL_KEY = "tuple-interval";
-
- public ManagedAdmTupleParser(IHyracksTaskContext ctx, ARecordType recType, IManagedFeedAdapter adapter) {
- super(ctx, recType);
- nextState = new ArrayList<OperationState>();
- this.adapter = adapter;
- this.tupleInterval = adapter.getAdapterProperty(TUPLE_INTERVAL_KEY) == null ? 0 : Long.parseLong(adapter
- .getAdapterProperty(TUPLE_INTERVAL_KEY));
- }
-
- public ManagedAdmTupleParser(IHyracksTaskContext ctx, ARecordType recType, long tupleInterval,
- IManagedFeedAdapter adapter) {
- super(ctx, recType);
- nextState = new ArrayList<OperationState>();
- this.adapter = adapter;
- this.tupleInterval = tupleInterval;
- }
-
- @Override
- public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
- admLexer = new AdmLexer(in);
- appender.reset(frame, true);
- int tupleNum = 0;
- try {
- while (true) {
- tb.reset();
- if (!parseAdmInstance(recType, true, dos)) {
- break;
- }
- tb.addFieldEndOffset();
- processNextTuple(nextState.isEmpty() ? null : nextState.get(0), writer);
- Thread.currentThread().sleep(tupleInterval);
- tupleNum++;
- }
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame, writer);
- }
- } catch (AsterixException ae) {
- throw new HyracksDataException(ae);
- } catch (IOException ioe) {
- throw new HyracksDataException(ioe);
- } catch (InterruptedException ie) {
- throw new HyracksDataException(ie);
- }
- }
-
- private void addTupleToFrame(IFrameWriter writer, boolean forceFlush) throws HyracksDataException {
- boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
-
- if (forceFlush) {
- FrameUtils.flushFrame(frame, writer);
- }
-
- }
-
- private void processNextTuple(OperationState feedState, IFrameWriter writer) throws HyracksDataException {
- try {
- if (feedState != null) {
- switch (state) {
- case SUSPENDED:
- suspendOperation(writer);
- break;
- case STOPPED:
- stopOperation(writer);
- break;
- }
- } else {
- addTupleToFrame(writer, false);
- }
- } catch (HyracksDataException hde) {
- throw hde;
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- private void suspendOperation(IFrameWriter writer) throws HyracksDataException, Exception {
- nextState.remove(0);
- addTupleToFrame(writer, false);
- adapter.beforeSuspend();
- synchronized (this) {
- this.wait();
- adapter.beforeResume();
- }
- }
-
- private void stopOperation(IFrameWriter writer) throws HyracksDataException, Exception {
- nextState.remove(0);
- addTupleToFrame(writer, true);
- adapter.beforeStop();
- writer.close();
- }
-
- @Override
- public void suspend() throws Exception {
- nextState.add(OperationState.SUSPENDED);
- }
-
- @Override
- public void resume() throws Exception {
- synchronized (this) {
- this.notifyAll();
- }
- }
-
- @Override
- public void stop() throws Exception {
- nextState.add(OperationState.STOPPED);
- }
-
- @Override
- public void alter(Map<String, String> alterParams) throws Exception {
- if (alterParams.get(TUPLE_INTERVAL_KEY) != null) {
- tupleInterval = Long.parseLong(alterParams.get(TUPLE_INTERVAL_KEY));
- }
- }
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataRecordParserFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataRecordParserFactory.java
deleted file mode 100644
index 37a7162..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataRecordParserFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-
-public class ManagedDelimitedDataRecordParserFactory extends NtDelimitedDataTupleParserFactory {
-
- private final IManagedFeedAdapter adapter;
-
- public ManagedDelimitedDataRecordParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter,
- ARecordType recType, IManagedFeedAdapter adapter) {
- super(recType, fieldParserFactories, fieldDelimiter);
- this.adapter = adapter;
- }
-
- @Override
- public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
- return new ManagedDelimitedDataTupleParser(ctx, recordType, adapter, valueParserFactories, fieldDelimiter);
- }
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataStreamParser.java
deleted file mode 100644
index 69921b0..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataStreamParser.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-
-public class ManagedDelimitedDataStreamParser extends DelimitedDataStreamParser implements IManagedDataParser {
-
- private IManagedFeedAdapter adapter;
-
- @Override
- public void initialize(ARecordType recordType, IHyracksTaskContext ctx) {
- int n = recordType.getFieldTypes().length;
- IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
- for (int i = 0; i < n; i++) {
- ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
- IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
- if (vpf == null) {
- throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
- }
- fieldParserFactories[i] = vpf;
- }
- tupleParser = new ManagedDelimitedDataRecordParserFactory(fieldParserFactories, delimiter.charValue(),
- recordType, adapter).createTupleParser(ctx);
- }
-
- @Override
- public IManagedTupleParser getManagedTupleParser() {
- return (IManagedTupleParser) tupleParser;
- }
-
- @Override
- public void setAdapter(IManagedFeedAdapter adapter) {
- this.adapter = adapter;
- }
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataTupleParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataTupleParser.java
deleted file mode 100644
index 325eff4..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataTupleParser.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.parser;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import edu.uci.ics.asterix.builders.IARecordBuilder;
-import edu.uci.ics.asterix.builders.RecordBuilder;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter.OperationState;
-import edu.uci.ics.asterix.om.base.AMutableString;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataTupleParser;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParser;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-
-public class ManagedDelimitedDataTupleParser extends DelimitedDataTupleParser implements IManagedTupleParser {
-
- private List<OperationState> nextState;
- private IManagedFeedAdapter adapter;
- private long tupleInterval;
-
- public static final String TUPLE_INTERVAL_KEY = "tuple-interval";
-
- public ManagedDelimitedDataTupleParser(IHyracksTaskContext ctx, ARecordType recType, IManagedFeedAdapter adapter,
- IValueParserFactory[] valueParserFactories, char fieldDelimter) {
- super(ctx, recType, valueParserFactories, fieldDelimter);
- this.adapter = adapter;
- nextState = new ArrayList<OperationState>();
- tupleInterval = adapter.getAdapterProperty(TUPLE_INTERVAL_KEY) == null ? 0 : Long.parseLong(adapter
- .getAdapterProperty(TUPLE_INTERVAL_KEY));
- }
-
- @Override
- public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
- try {
- IValueParser[] valueParsers = new IValueParser[valueParserFactories.length];
- for (int i = 0; i < valueParserFactories.length; ++i) {
- valueParsers[i] = valueParserFactories[i].createValueParser();
- }
-
- appender.reset(frame, true);
- tb = new ArrayTupleBuilder(1);
- recDos = tb.getDataOutput();
-
- ArrayBackedValueStorage fieldValueBuffer = new ArrayBackedValueStorage();
- DataOutput fieldValueBufferOutput = fieldValueBuffer.getDataOutput();
- IARecordBuilder recBuilder = new RecordBuilder();
- recBuilder.reset(recType);
- recBuilder.init();
-
- int n = recType.getFieldNames().length;
- byte[] fieldTypeTags = new byte[n];
- for (int i = 0; i < n; i++) {
- ATypeTag tag = recType.getFieldTypes()[i].getTypeTag();
- fieldTypeTags[i] = tag.serialize();
- }
-
- int[] fldIds = new int[n];
- ArrayBackedValueStorage[] nameBuffers = new ArrayBackedValueStorage[n];
- AMutableString str = new AMutableString(null);
- for (int i = 0; i < n; i++) {
- String name = recType.getFieldNames()[i];
- fldIds[i] = recBuilder.getFieldId(name);
- if (fldIds[i] < 0) {
- if (!recType.isOpen()) {
- throw new HyracksDataException("Illegal field " + name + " in closed type " + recType);
- } else {
- nameBuffers[i] = new ArrayBackedValueStorage();
- fieldNameToBytes(name, str, nameBuffers[i]);
- }
- }
- }
-
- FieldCursor cursor = new FieldCursor(new InputStreamReader(in));
- while (cursor.nextRecord()) {
- tb.reset();
- recBuilder.reset(recType);
- recBuilder.init();
-
- for (int i = 0; i < valueParsers.length; ++i) {
- if (!cursor.nextField()) {
- break;
- }
- fieldValueBuffer.reset();
- fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
- valueParsers[i].parse(cursor.getBuffer(), cursor.getfStart(),
- cursor.getfEnd() - cursor.getfStart(), fieldValueBufferOutput);
- if (fldIds[i] < 0) {
- recBuilder.addField(nameBuffers[i], fieldValueBuffer);
- } else {
- recBuilder.addField(fldIds[i], fieldValueBuffer);
- }
- }
- recBuilder.write(recDos, true);
- processNextTuple(nextState.isEmpty() ? null : nextState.get(0), writer);
- Thread.currentThread().sleep(tupleInterval);
- }
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame, writer);
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- } catch (InterruptedException ie) {
- throw new HyracksDataException(ie);
- }
- }
-
- private void addTupleToFrame(IFrameWriter writer, boolean forceFlush) throws HyracksDataException {
- tb.addFieldEndOffset();
- boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
- if (!success) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new IllegalStateException();
- }
- }
-
- if (forceFlush) {
- FrameUtils.flushFrame(frame, writer);
- }
-
- }
-
- private void processNextTuple(OperationState feedState, IFrameWriter writer) throws HyracksDataException {
- try {
- if (feedState != null) {
- switch (feedState) {
- case SUSPENDED:
- suspendOperation(writer);
- break;
- case STOPPED:
- stopOperation(writer);
- break;
- }
- } else {
- addTupleToFrame(writer, false);
- }
- } catch (HyracksDataException hde) {
- throw hde;
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- private void suspendOperation(IFrameWriter writer) throws HyracksDataException, Exception {
- nextState.remove(0);
- addTupleToFrame(writer, false);
- adapter.beforeSuspend();
- synchronized (this) {
- this.wait();
- adapter.beforeResume();
- }
- }
-
- private void stopOperation(IFrameWriter writer) throws HyracksDataException, Exception {
- nextState.remove(0);
- addTupleToFrame(writer, false);
- adapter.beforeStop();
- adapter.stop();
- }
-
- @Override
- public void suspend() throws Exception {
- nextState.add(OperationState.SUSPENDED);
- }
-
- @Override
- public void resume() throws Exception {
- synchronized (this) {
- this.notifyAll();
- }
- }
-
- @Override
- public void stop() throws Exception {
- nextState.add(OperationState.STOPPED);
- }
-
- @Override
- public void alter(Map<String, String> alterParams) throws Exception {
- if (alterParams.get(TUPLE_INTERVAL_KEY) != null) {
- tupleInterval = Long.parseLong(alterParams.get(TUPLE_INTERVAL_KEY));
- }
- }
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
index b22132d..aee8729 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -17,17 +17,11 @@
import java.util.HashMap;
import java.util.Map;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
/**
* Represents the base class that is required to be extended by every
@@ -38,48 +32,32 @@
private static final long serialVersionUID = -3510610289692452466L;
protected Map<String, String> configuration;
-
protected transient AlgebricksPartitionConstraint partitionConstraint;
-
protected IAType atype;
-
protected IHyracksTaskContext ctx;
+ protected AdapterDataFlowType dataFlowType;
+ protected AdapterType adapterType;
+ protected boolean typeInfoRequired = false;
protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
-
- protected static final HashMap<String, String> formatToParserMap = new HashMap<String, String>();
-
- protected static final HashMap<String, String> formatToManagedParserMap = new HashMap<String, String>();
-
- protected AdapterDataFlowType dataFlowType;
-
- protected AdapterType adapterType;
-
- static {
- typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
-
- formatToParserMap.put("delimited-text", "edu.uci.ics.asterix.external.data.parser.DelimitedDataStreamParser");
- formatToParserMap.put("adm", "edu.uci.ics.asterix.external.data.parser.ADMStreamParser");
-
- formatToManagedParserMap.put("delimited-text",
- "edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser");
- formatToManagedParserMap.put("adm", "edu.uci.ics.asterix.external.data.parser.ManagedAdmStreamParser");
-
- }
+ protected static final HashMap<String, String> formatToParserFactoryMap = new HashMap<String, String>();
public static final String KEY_FORMAT = "format";
- public static final String KEY_PARSER = "parser";
+ public static final String KEY_PARSER_FACTORY = "parser";
public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
public static final String FORMAT_ADM = "adm";
+ static {
+ formatToParserFactoryMap.put(FORMAT_DELIMITED_TEXT,
+ "edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory");
+ formatToParserFactoryMap.put(FORMAT_ADM,
+ "edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory");
+ }
+
abstract public void initialize(IHyracksTaskContext ctx) throws Exception;
- abstract public void configure(Map<String, String> arguments, IAType atype) throws Exception;
+ abstract public void configure(Map<String, String> arguments) throws Exception;
abstract public AdapterDataFlowType getAdapterDataFlowType();
@@ -89,10 +67,6 @@
return partitionConstraint;
}
- public void setAdapterProperty(String property, String value) {
- configuration.put(property, value);
- }
-
public String getAdapterProperty(String attribute) {
return configuration.get(attribute);
}
@@ -101,4 +75,16 @@
return configuration;
}
+ public IAType getAdapterOutputType() {
+ return atype;
+ }
+
+ public void setAdapterProperty(String property, String value) {
+ configuration.put(property, value);
+ }
+
+ public boolean isTypeInfoRequired() {
+ return typeInfoRequired;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 4d969e4..3760d56 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -5,18 +5,11 @@
import java.util.List;
import java.util.Map;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
-import edu.uci.ics.asterix.external.data.parser.IDataParser;
-import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
-import edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser;
-import edu.uci.ics.asterix.feed.intake.FeedStream;
-import edu.uci.ics.asterix.feed.intake.RSSFeedClient;
import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.IAType;
public class CNNFeedAdapter extends RSSFeedAdapter implements IDatasourceAdapter, IMutableFeedAdapter {
+ private static final long serialVersionUID = 2523303758114582251L;
private List<String> feedURLs = new ArrayList<String>();
private String id_prefix = "";
@@ -62,20 +55,8 @@
}
@Override
- public IDataParser getDataParser(int partition) throws Exception {
- IDataParser dataParser = new ManagedDelimitedDataStreamParser();
- dataParser.configure(configuration);
- dataParser.initialize((ARecordType) atype, ctx);
- RSSFeedClient feedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
- FeedStream feedStream = new FeedStream(feedClient, ctx);
- ((IDataStreamParser) dataParser).setInputStream(feedStream);
- return dataParser;
- }
-
- @Override
- public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+ public void configure(Map<String, String> arguments) throws Exception {
configuration = arguments;
- this.atype = atype;
String rssURLProperty = configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index f0e9ea4..ec39080 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -17,6 +17,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Constructor;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -30,6 +31,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
@@ -39,25 +41,19 @@
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
-import edu.uci.ics.asterix.external.data.parser.IDataParser;
-import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
-import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
import edu.uci.ics.asterix.runtime.util.AsterixRuntimeUtil;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-public class HDFSAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter {
+public class HDFSAdapter extends AbstractDatasourceAdapter {
private static final Logger LOGGER = Logger.getLogger(HDFSAdapter.class.getName());
@@ -65,10 +61,8 @@
private Object[] inputSplits;
private transient JobConf conf;
private IHyracksTaskContext ctx;
- private boolean isDelimited;
- private Character delimiter;
private InputSplitsProxy inputSplitsProxy;
- private String parserClass;
+ private String parserFactory;
private static final Map<String, String> formatClassNames = new HashMap<String, String>();
public static final String KEY_HDFS_URL = "hdfs";
@@ -83,13 +77,16 @@
formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
}
+ public HDFSAdapter(IAType atype) {
+ this.atype = atype;
+ }
+
@Override
- public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+ public void configure(Map<String, String> arguments) throws Exception {
configuration = arguments;
configureFormat();
configureJobConf();
configurePartitionConstraint();
- this.atype = atype;
}
private void configureFormat() throws Exception {
@@ -99,23 +96,17 @@
throw new Exception("format " + format + " not supported");
}
- parserClass = configuration.get(KEY_PARSER);
- if (parserClass == null) {
+ parserFactory = configuration.get(KEY_PARSER_FACTORY);
+ if (parserFactory == null) {
if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
- parserClass = formatToParserMap.get(FORMAT_DELIMITED_TEXT);
+ parserFactory = formatToParserFactoryMap.get(FORMAT_DELIMITED_TEXT);
} else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
- parserClass = formatToParserMap.get(FORMAT_ADM);
+ parserFactory = formatToParserFactoryMap.get(FORMAT_ADM);
}
}
}
- private IDataParser createDataParser() throws Exception {
- IDataParser dataParser = (IDataParser) Class.forName(parserClass).newInstance();
- dataParser.configure(configuration);
- return dataParser;
- }
-
private void configurePartitionConstraint() throws Exception {
AlgebricksAbsolutePartitionConstraint absPartitionConstraint;
List<String> locations = new ArrayList<String>();
@@ -147,9 +138,10 @@
break;
}
}
- if(!couldConfigureLocationConstraints){
+ if (!couldConfigureLocationConstraints) {
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.log(Level.INFO, "No local node controller found to process split : " + inputSplit + " will use count constraint!");
+ LOGGER.log(Level.INFO, "No local node controller found to process split : " + inputSplit
+ + " will use count constraint!");
}
break;
}
@@ -166,24 +158,6 @@
}
}
- private ITupleParserFactory createTupleParserFactory(ARecordType recType) {
- if (isDelimited) {
- int n = recType.getFieldTypes().length;
- IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
- for (int i = 0; i < n; i++) {
- ATypeTag tag = recType.getFieldTypes()[i].getTypeTag();
- IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
- if (vpf == null) {
- throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
- }
- fieldParserFactories[i] = vpf;
- }
- return new NtDelimitedDataTupleParserFactory(recType, fieldParserFactories, delimiter);
- } else {
- return new AdmSchemafullRecordParserFactory(recType);
- }
- }
-
private JobConf configureJobConf() throws Exception {
conf = new JobConf();
conf.set("fs.default.name", configuration.get(KEY_HDFS_URL));
@@ -247,7 +221,7 @@
}
@Override
- public IDataParser getDataParser(int partition) throws Exception {
+ public void start(int partition, IFrameWriter writer) throws Exception {
Path path = new Path(inputSplits[partition].toString());
FileSystem fs = FileSystem.get(conf);
InputStream inputStream;
@@ -267,15 +241,11 @@
}
}
- IDataParser dataParser = createDataParser();
- if (dataParser instanceof IDataStreamParser) {
- ((IDataStreamParser) dataParser).setInputStream(inputStream);
- } else {
- throw new IllegalArgumentException(" parser not compatible");
- }
- dataParser.configure(configuration);
- dataParser.initialize((ARecordType) atype, ctx);
- return dataParser;
+ Class tupleParserFactoryClass = Class.forName(parserFactory);
+ Constructor ctor = tupleParserFactoryClass.getConstructor(ARecordType.class);
+ ITupleParserFactory parserFactory = (ITupleParserFactory) ctor.newInstance(atype);
+ ITupleParser parser = parserFactory.createTupleParser(ctx);
+ parser.parse(inputStream, writer);
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index 44dab4c..4b03970 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -16,79 +16,73 @@
import java.util.Map;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
-import edu.uci.ics.asterix.external.data.parser.IDataParser;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-public class HiveAdapter extends AbstractDatasourceAdapter implements
- IDatasourceReadAdapter {
+public class HiveAdapter extends AbstractDatasourceAdapter {
- public static final String HIVE_DATABASE = "database";
- public static final String HIVE_TABLE = "table";
- public static final String HIVE_HOME = "hive-home";
- public static final String HIVE_METASTORE_URI = "metastore-uri";
- public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
- public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
+ public static final String HIVE_DATABASE = "database";
+ public static final String HIVE_TABLE = "table";
+ public static final String HIVE_HOME = "hive-home";
+ public static final String HIVE_METASTORE_URI = "metastore-uri";
+ public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
+ public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
- private HDFSAdapter hdfsAdapter;
+ private HDFSAdapter hdfsAdapter;
- @Override
- public AdapterType getAdapterType() {
- return AdapterType.READ;
- }
-
- @Override
- public AdapterDataFlowType getAdapterDataFlowType() {
- return AdapterDataFlowType.PULL;
- }
-
- @Override
- public void configure(Map<String, String> arguments, IAType atype)
- throws Exception {
- configuration = arguments;
- this.atype = atype;
- configureHadoopAdapter();
- }
-
- private void configureHadoopAdapter() throws Exception {
- String database = configuration.get(HIVE_DATABASE);
- String tablePath = null;
- if (database == null) {
- tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/"
- + configuration.get(HIVE_TABLE);
- } else {
- tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath
- + ".db" + "/" + configuration.get(HIVE_TABLE);
- }
- configuration.put(HDFSAdapter.KEY_HDFS_PATH, tablePath);
- if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
- throw new IllegalArgumentException("format"
- + configuration.get(KEY_FORMAT) + " is not supported");
- }
-
- if (!(configuration.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(
- HDFSAdapter.INPUT_FORMAT_TEXT) || configuration.get(
- HDFSAdapter.KEY_INPUT_FORMAT).equals(
- HDFSAdapter.INPUT_FORMAT_SEQUENCE))) {
- throw new IllegalArgumentException("file input format"
- + configuration.get(HDFSAdapter.KEY_INPUT_FORMAT)
- + " is not supported");
- }
-
- hdfsAdapter = new HDFSAdapter();
- hdfsAdapter.configure(configuration, atype);
- }
-
- @Override
- public void initialize(IHyracksTaskContext ctx) throws Exception {
- hdfsAdapter.initialize(ctx);
- }
+ public HiveAdapter(IAType atype) {
+ this.hdfsAdapter = new HDFSAdapter(atype);
+ }
@Override
- public IDataParser getDataParser(int partition) throws Exception {
- return hdfsAdapter.getDataParser(partition);
+ public AdapterType getAdapterType() {
+ return AdapterType.READ;
+ }
+
+ @Override
+ public AdapterDataFlowType getAdapterDataFlowType() {
+ return AdapterDataFlowType.PULL;
+ }
+
+ @Override
+ public void configure(Map<String, String> arguments) throws Exception {
+ configuration = arguments;
+ configureHadoopAdapter();
+ }
+
+ private void configureHadoopAdapter() throws Exception {
+ String database = configuration.get(HIVE_DATABASE);
+ String tablePath = null;
+ if (database == null) {
+ tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
+ } else {
+ tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
+ + configuration.get(HIVE_TABLE);
+ }
+ configuration.put(HDFSAdapter.KEY_HDFS_PATH, tablePath);
+ if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
+ throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
+ }
+
+ if (!(configuration.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_TEXT) || configuration
+ .get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_SEQUENCE))) {
+ throw new IllegalArgumentException("file input format" + configuration.get(HDFSAdapter.KEY_INPUT_FORMAT)
+ + " is not supported");
+ }
+
+ hdfsAdapter = new HDFSAdapter(atype);
+ hdfsAdapter.configure(configuration);
+ }
+
+ @Override
+ public void initialize(IHyracksTaskContext ctx) throws Exception {
+ hdfsAdapter.initialize(ctx);
+ }
+
+ @Override
+ public void start(int partition, IFrameWriter writer) throws Exception {
+ hdfsAdapter.start(partition, writer);
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
similarity index 95%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceAdapter.java
rename to asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
index f51e6b8..4e67f53 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
@@ -12,13 +12,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.data.adapter.api;
+package edu.uci.ics.asterix.external.dataset.adapter;
import java.io.Serializable;
import java.util.Map;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
/**
@@ -143,7 +144,7 @@
* providing all arguments as a set of (key,value) pairs. These
* arguments are put into the metadata.
*/
- public void configure(Map<String, String> arguments, IAType atype)
+ public void configure(Map<String, String> arguments)
throws Exception;
/**
@@ -162,6 +163,12 @@
*/
public AlgebricksPartitionConstraint getPartitionConstraint();
+
+ /**
+ * Returns the output ASTERIX type returned by the adapter.
+ */
+ public IAType getAdapterOutputType();
+
/**
* Allows the adapter to establish connection with the external data source
* expressing intent for data and providing any configuration parameters
@@ -175,4 +182,7 @@
* @throws Exception
*/
public void initialize(IHyracksTaskContext ctx) throws Exception;
+
+
+ public void start(int partition, IFrameWriter writer) throws Exception;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index efa0b70..afc71b6 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -18,53 +18,39 @@
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
+import java.lang.reflect.Constructor;
import java.util.Map;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
-import edu.uci.ics.asterix.external.data.parser.IDataParser;
-import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-public class NCFileSystemAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter {
+public class NCFileSystemAdapter extends AbstractDatasourceAdapter {
private static final long serialVersionUID = -4154256369973615710L;
private FileSplit[] fileSplits;
- private String parserClass;
+ private String parserFactory;
- public class Constants {
- public static final String KEY_SPLITS = "path";
- public static final String KEY_FORMAT = "format";
- public static final String KEY_PARSER = "parser";
- public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
- public static final String FORMAT_ADM = "adm";
+ public static final String KEY_SPLITS = "path";
+
+ public NCFileSystemAdapter(IAType atype) {
+ this.atype = atype;
}
@Override
- public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+ public void configure(Map<String, String> arguments) throws Exception {
this.configuration = arguments;
- String[] splits = arguments.get(Constants.KEY_SPLITS).split(",");
+ String[] splits = arguments.get(KEY_SPLITS).split(",");
configureFileSplits(splits);
configurePartitionConstraint();
configureFormat();
- if (atype == null) {
- configureInputType();
- } else {
- setInputAType(atype);
- }
- }
-
- public IAType getAType() {
- return atype;
- }
-
- public void setInputAType(IAType atype) {
- this.atype = atype;
}
@Override
@@ -82,28 +68,6 @@
return AdapterType.READ;
}
- @Override
- public IDataParser getDataParser(int partition) throws Exception {
- FileSplit split = fileSplits[partition];
- File inputFile = split.getLocalFile().getFile();
- InputStream in;
- try {
- in = new FileInputStream(inputFile);
- } catch (FileNotFoundException e) {
- throw new HyracksDataException(e);
- }
-
- IDataParser dataParser = (IDataParser) Class.forName(parserClass).newInstance();
- if (dataParser instanceof IDataStreamParser) {
- ((IDataStreamParser) dataParser).setInputStream(in);
- } else {
- throw new IllegalArgumentException(" parser not compatible");
- }
- dataParser.configure(configuration);
- dataParser.initialize((ARecordType) atype, ctx);
- return dataParser;
- }
-
private void configureFileSplits(String[] splits) {
if (fileSplits == null) {
fileSplits = new FileSplit[splits.length];
@@ -120,12 +84,12 @@
}
protected void configureFormat() throws Exception {
- parserClass = configuration.get(Constants.KEY_PARSER);
- if (parserClass == null) {
- if (Constants.FORMAT_DELIMITED_TEXT.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
- parserClass = formatToParserMap.get(FORMAT_DELIMITED_TEXT);
- } else if (Constants.FORMAT_ADM.equalsIgnoreCase(configuration.get(Constants.KEY_FORMAT))) {
- parserClass = formatToParserMap.get(Constants.FORMAT_ADM);
+ parserFactory = configuration.get(KEY_PARSER_FACTORY);
+ if (parserFactory == null) {
+ if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+ parserFactory = formatToParserFactoryMap.get(FORMAT_DELIMITED_TEXT);
+ } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+ parserFactory = formatToParserFactoryMap.get(FORMAT_ADM);
} else {
throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
}
@@ -133,10 +97,6 @@
}
- private void configureInputType() {
- throw new UnsupportedOperationException(" Cannot resolve input type, operation not supported");
- }
-
private void configurePartitionConstraint() {
String[] locs = new String[fileSplits.length];
for (int i = 0; i < fileSplits.length; i++) {
@@ -145,4 +105,22 @@
partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locs);
}
+ @Override
+ public void start(int partition, IFrameWriter writer) throws Exception {
+ FileSplit split = fileSplits[partition];
+ File inputFile = split.getLocalFile().getFile();
+ InputStream in;
+ try {
+ in = new FileInputStream(inputFile);
+ } catch (FileNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+
+ Class tupleParserFactoryClass = Class.forName(parserFactory);
+ Constructor ctor = tupleParserFactoryClass.getConstructor(ARecordType.class);
+ ITupleParserFactory parserFactory = (ITupleParserFactory) ctor.newInstance(atype);
+ ITupleParser parser = parserFactory.createTupleParser(ctx);
+ parser.parse(in, writer);
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
new file mode 100644
index 0000000..83bbb6a
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -0,0 +1,68 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import edu.uci.ics.asterix.feed.intake.IPullBasedFeedClient;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public abstract class PullBasedAdapter extends AbstractDatasourceAdapter implements IDatasourceAdapter {
+
+ protected ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
+ protected IPullBasedFeedClient pullBasedFeedClient;
+ private FrameTupleAppender appender;
+ private ByteBuffer frame;
+
+ public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
+
+ @Override
+ public abstract AdapterDataFlowType getAdapterDataFlowType();
+
+ @Override
+ public abstract AdapterType getAdapterType();
+
+ @Override
+ public abstract void configure(Map<String, String> arguments) throws Exception;
+
+ @Override
+ public abstract IAType getAdapterOutputType();
+
+ @Override
+ public abstract void initialize(IHyracksTaskContext ctx) throws Exception;
+
+ @Override
+ public void start(int partition, IFrameWriter writer) throws Exception {
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ frame = ctx.allocateFrame();
+ appender.reset(frame, true);
+
+ boolean newData = false;
+ pullBasedFeedClient = getFeedClient(partition);
+ while (true) {
+ tupleBuilder.reset();
+ newData = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput()); //nextTuple is a blocking call.
+ if (newData) {
+ tupleBuilder.addFieldEndOffset();
+ appendTupleToFrame(writer);
+ }
+ }
+ }
+
+ private void appendTupleToFrame(IFrameWriter writer) throws HyracksDataException {
+ if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index e95dbd8..351502f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -14,50 +14,52 @@
*/
package edu.uci.ics.asterix.external.dataset.adapter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import twitter4j.Query;
-import twitter4j.QueryResult;
-import twitter4j.Tweet;
-import twitter4j.Twitter;
-import twitter4j.TwitterFactory;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
-import edu.uci.ics.asterix.external.data.parser.AbstractStreamDataParser;
-import edu.uci.ics.asterix.external.data.parser.IDataParser;
-import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
-import edu.uci.ics.asterix.external.data.parser.IManagedDataParser;
-import edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser;
-import edu.uci.ics.asterix.feed.intake.IFeedClient;
+import edu.uci.ics.asterix.feed.intake.IPullBasedFeedClient;
+import edu.uci.ics.asterix.feed.intake.TwitterFeedClient;
import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
-import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-public class PullBasedTwitterAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter,
- IManagedFeedAdapter, IMutableFeedAdapter {
+public class PullBasedTwitterAdapter extends PullBasedAdapter implements IManagedFeedAdapter, IMutableFeedAdapter {
- private IDataStreamParser parser;
- private int parallelism = 1;
+ private int interval = 10;
private boolean stopRequested = false;
private boolean alterRequested = false;
private Map<String, String> alteredParams = new HashMap<String, String>();
+ private TwitterFeedClient tweetClient;
+ // private FrameTupleAppender appender;
+ // private ByteBuffer frame;
+ //
public static final String QUERY = "query";
public static final String INTERVAL = "interval";
@Override
- public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+ public IPullBasedFeedClient getFeedClient(int partition) {
+ return tweetClient;
+ }
+
+ @Override
+ public void configure(Map<String, String> arguments) throws Exception {
configuration = arguments;
- this.atype = atype;
partitionConstraint = new AlgebricksCountPartitionConstraint(1);
+ interval = Integer.parseInt(arguments.get(INTERVAL));
+ }
+
+ @Override
+ public void initialize(IHyracksTaskContext ctx) throws Exception {
+ this.ctx = ctx;
+ tweetClient = new TwitterFeedClient(ctx, this);
+ }
+
+ @Override
+ public IAType getAdapterOutputType() {
+ return tweetClient.getRecordType();
}
@Override
@@ -71,44 +73,18 @@
}
@Override
- public void initialize(IHyracksTaskContext ctx) throws Exception {
- this.ctx = ctx;
- }
-
- @Override
- public void beforeSuspend() throws Exception {
+ public void suspend() throws Exception {
// TODO Auto-generated method stub
}
@Override
- public void beforeResume() throws Exception {
+ public void resume() throws Exception {
// TODO Auto-generated method stub
}
@Override
- public void beforeStop() throws Exception {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public IDataParser getDataParser(int partition) throws Exception {
- if (parser == null) {
- parser = new ManagedDelimitedDataStreamParser();
- ((IManagedDataParser) parser).setAdapter(this);
- configuration.put(AbstractStreamDataParser.KEY_DELIMITER, "|");
- parser.configure(configuration);
- parser.initialize((ARecordType) atype, ctx);
- TweetClient tweetClient = new TweetClient(ctx.getJobletContext().getApplicationContext().getNodeId(), this);
- TweetStream tweetStream = new TweetStream(tweetClient, ctx);
- parser.setInputStream(tweetStream);
- }
- return parser;
- }
-
- @Override
public void stop() throws Exception {
stopRequested = true;
}
@@ -135,132 +111,5 @@
alteredParams = null;
alterRequested = false;
}
-}
-
-class TweetStream extends InputStream {
-
- private ByteBuffer buffer;
- private int capacity;
- private TweetClient tweetClient;
- private List<String> tweets = new ArrayList<String>();
-
- public TweetStream(TweetClient tweetClient, IHyracksTaskContext ctx) throws Exception {
- capacity = ctx.getFrameSize();
- buffer = ByteBuffer.allocate(capacity);
- this.tweetClient = tweetClient;
- initialize();
- }
-
- private void initialize() throws Exception {
- boolean hasMore = tweetClient.next(tweets);
- if (!hasMore) {
- buffer.limit(0);
- } else {
- buffer.position(0);
- buffer.limit(capacity);
- for (String tweet : tweets) {
- buffer.put(tweet.getBytes());
- buffer.put("\n".getBytes());
- }
- buffer.flip();
- }
- }
-
- @Override
- public int read() throws IOException {
- if (!buffer.hasRemaining()) {
-
- boolean hasMore = tweetClient.next(tweets);
- if (!hasMore) {
- return -1;
- }
- buffer.position(0);
- buffer.limit(capacity);
- for (String tweet : tweets) {
- buffer.put(tweet.getBytes());
- buffer.put("\n".getBytes());
- }
- buffer.flip();
- return buffer.get();
- } else {
- return buffer.get();
- }
-
- }
-}
-
-class TweetClient implements IFeedClient {
-
- private String query;
- private int timeInterval = 5;
- private Character delimiter = '|';
- private long id = 0;
- private String id_prefix;
-
- private final PullBasedTwitterAdapter adapter;
-
- public TweetClient(String id_prefix, PullBasedTwitterAdapter adapter) {
- this.id_prefix = id_prefix;
- this.adapter = adapter;
- initialize(adapter.getConfiguration());
- }
-
- private void initialize(Map<String, String> params) {
- this.query = params.get(PullBasedTwitterAdapter.QUERY);
- if (params.get(PullBasedTwitterAdapter.INTERVAL) != null) {
- this.timeInterval = Integer.parseInt(params.get(PullBasedTwitterAdapter.INTERVAL));
- }
- }
-
- @Override
- public boolean next(List<String> tweets) {
- try {
- if (adapter.isStopRequested()) {
- return false;
- }
- if (adapter.isAlterRequested()) {
- initialize(((PullBasedTwitterAdapter) adapter).getAlteredParams());
- adapter.postAlteration();
- }
- Thread.currentThread().sleep(1000 * timeInterval);
- tweets.clear();
- Twitter twitter = new TwitterFactory().getInstance();
- QueryResult result = twitter.search(new Query(query));
- List<Tweet> sourceTweets = result.getTweets();
- for (Tweet tweet : sourceTweets) {
- String tweetContent = formFeedTuple(tweet);
- tweets.add(tweetContent);
- System.out.println(tweetContent);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- return true;
- }
-
- public String formFeedTuple(Object tweetObject) {
- Tweet tweet = (Tweet) tweetObject;
- StringBuilder builder = new StringBuilder();
- builder.append(id_prefix + ":" + id);
- builder.append(delimiter);
- builder.append(tweet.getFromUserId());
- builder.append(delimiter);
- builder.append("Orange County");
- builder.append(delimiter);
- builder.append(escapeChars(tweet));
- builder.append(delimiter);
- builder.append(tweet.getCreatedAt().toString());
- id++;
- return new String(builder);
- }
-
- private String escapeChars(Tweet tweet) {
- if (tweet.getText().contains("\n")) {
- return tweet.getText().replace("\n", " ");
- }
- return tweet.getText();
- }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 7eaf7cd..3dfd3c6 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -19,29 +19,24 @@
import java.util.List;
import java.util.Map;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
-import edu.uci.ics.asterix.external.data.parser.IDataParser;
-import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
-import edu.uci.ics.asterix.external.data.parser.IManagedDataParser;
-import edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser;
-import edu.uci.ics.asterix.feed.intake.FeedStream;
-import edu.uci.ics.asterix.feed.intake.IFeedClient;
+import edu.uci.ics.asterix.feed.intake.IPullBasedFeedClient;
import edu.uci.ics.asterix.feed.intake.RSSFeedClient;
import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
-import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-public class RSSFeedAdapter extends AbstractDatasourceAdapter implements IDatasourceAdapter, IManagedFeedAdapter,
- IMutableFeedAdapter {
+public class RSSFeedAdapter extends PullBasedAdapter implements IManagedFeedAdapter, IMutableFeedAdapter {
private List<String> feedURLs = new ArrayList<String>();
private boolean isStopRequested = false;
private boolean isAlterRequested = false;
private Map<String, String> alteredParams = new HashMap<String, String>();
private String id_prefix = "";
+ private int interval = 10;
+
+ private IPullBasedFeedClient rssFeedClient;
public static final String KEY_RSS_URL = "url";
public static final String KEY_INTERVAL = "interval";
@@ -55,18 +50,6 @@
}
@Override
- public IDataParser getDataParser(int partition) throws Exception {
- IDataParser dataParser = new ManagedDelimitedDataStreamParser();
- ((IManagedDataParser) dataParser).setAdapter(this);
- dataParser.configure(configuration);
- dataParser.initialize((ARecordType) atype, ctx);
- IFeedClient feedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
- FeedStream feedStream = new FeedStream(feedClient, ctx);
- ((IDataStreamParser) dataParser).setInputStream(feedStream);
- return dataParser;
- }
-
- @Override
public void alter(Map<String, String> properties) throws Exception {
isAlterRequested = true;
this.alteredParams = properties;
@@ -79,19 +62,13 @@
}
@Override
- public void beforeSuspend() throws Exception {
+ public void suspend() throws Exception {
// TODO Auto-generated method stub
}
@Override
- public void beforeResume() throws Exception {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void beforeStop() throws Exception {
+ public void resume() throws Exception {
// TODO Auto-generated method stub
}
@@ -112,16 +89,14 @@
}
@Override
- public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+ public void configure(Map<String, String> arguments) throws Exception {
configuration = arguments;
- this.atype = atype;
String rssURLProperty = configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
initializeFeedURLs(rssURLProperty);
configurePartitionConstraints();
-
}
private void initializeFeedURLs(String rssURLProperty) {
@@ -157,4 +132,18 @@
return alteredParams;
}
+ @Override
+ public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
+ if (rssFeedClient == null) {
+ rssFeedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
+ }
+ return rssFeedClient;
+ }
+
+ @Override
+ public IAType getAdapterOutputType() {
+
+ return null;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/AbstractFeedStream.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/AbstractFeedStream.java
deleted file mode 100644
index 3d3856d..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/AbstractFeedStream.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package edu.uci.ics.asterix.feed.intake;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-
-public abstract class AbstractFeedStream extends InputStream {
-
- private ByteBuffer buffer;
- private int capacity;
- private IFeedClient feedClient;
- private List<String> feedObjects;
-
- public AbstractFeedStream(IFeedClient feedClient, IHyracksTaskContext ctx)
- throws Exception {
- capacity = ctx.getFrameSize();
- buffer = ByteBuffer.allocate(capacity);
- this.feedClient = feedClient;
- initialize();
- }
-
- @Override
- public int read() throws IOException {
- if (!buffer.hasRemaining()) {
-
- boolean hasMore = feedClient.next(feedObjects);
- if (!hasMore) {
- return -1;
- }
- buffer.position(0);
- buffer.limit(capacity);
- for (String feed : feedObjects) {
- buffer.put(feed.getBytes());
- buffer.put("\n".getBytes());
- }
- buffer.flip();
- return buffer.get();
- } else {
- return buffer.get();
- }
-
- }
-
- private void initialize() throws Exception {
- boolean hasMore = feedClient.next(feedObjects);
- if (!hasMore) {
- buffer.limit(0);
- } else {
- buffer.position(0);
- buffer.limit(capacity);
- for (String feed : feedObjects) {
- buffer.put(feed.getBytes());
- buffer.put("\n".getBytes());
- }
- buffer.flip();
- }
- }
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/FeedStream.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/FeedStream.java
deleted file mode 100644
index 9d75182..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/FeedStream.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package edu.uci.ics.asterix.feed.intake;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-
-
-public class FeedStream extends AbstractFeedStream {
-
- public FeedStream(IFeedClient feedClient, IHyracksTaskContext ctx)
- throws Exception {
- super(feedClient, ctx);
- }
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IFeedClient.java
index b67561d..f0e34c3 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IFeedClient.java
@@ -1,8 +1,5 @@
package edu.uci.ics.asterix.feed.intake;
-import java.util.List;
-
public interface IFeedClient {
- public boolean next(List<String> list);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IPullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IPullBasedFeedClient.java
new file mode 100644
index 0000000..36550e5
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IPullBasedFeedClient.java
@@ -0,0 +1,16 @@
+package edu.uci.ics.asterix.feed.intake;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public interface IPullBasedFeedClient {
+
+ public enum status {
+ MORE_DATA,
+ END_OF_DATA
+ }
+
+ public boolean nextTuple(DataOutput dataOutput) throws AsterixException;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/PullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/PullBasedFeedClient.java
new file mode 100644
index 0000000..b93d4cd
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/PullBasedFeedClient.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.asterix.feed.intake;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.om.base.AMutableRecord;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class PullBasedFeedClient implements IPullBasedFeedClient {
+
+ protected ARecordType recordType;
+ protected ARecordSerializerDeserializer recordSerDe;
+ protected AMutableRecord mutableRecord;
+
+ public abstract boolean setNextRecord() throws Exception;
+
+ @Override
+ public boolean nextTuple(DataOutput dataOutput) throws AsterixException {
+ try {
+ boolean newData = setNextRecord();
+ if (newData) {
+ IAType t = mutableRecord.getType();
+ ATypeTag tag = t.getTypeTag();
+ try {
+ dataOutput.writeByte(tag.serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ recordSerDe.serialize(mutableRecord, dataOutput);
+ return true;
+ }
+ return false;
+ } catch (Exception e) {
+ throw new AsterixException(e);
+ }
+
+ }
+
+ public void displayFeedRecord() {
+ StringBuilder builder = new StringBuilder();
+ int numFields = recordType.getFieldNames().length;
+ for (int i = 0; i < numFields; i++) {
+ builder.append(mutableRecord.getValueByPos(i).toString());
+ builder.append("|");
+ }
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/RSSFeedClient.java
index 2e77499..3eaf3c0 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/RSSFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/RSSFeedClient.java
@@ -1,9 +1,10 @@
package edu.uci.ics.asterix.feed.intake;
+import java.net.MalformedURLException;
import java.net.URL;
-import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
+import java.util.Queue;
import com.sun.syndication.feed.synd.SyndEntryImpl;
import com.sun.syndication.feed.synd.SyndFeed;
@@ -15,144 +16,131 @@
import com.sun.syndication.fetcher.impl.HttpURLFeedFetcher;
import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
+import edu.uci.ics.asterix.om.base.AMutableRecord;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
@SuppressWarnings("rawtypes")
-public class RSSFeedClient implements IFeedClient {
+public class RSSFeedClient extends PullBasedFeedClient {
- private final String feedURL;
- private int timeInterval = 1;
- private Character delimiter = '|';
- private long id = 0;
- private String id_prefix;
- private boolean feedModified = false;
- private RSSFeedAdapter adapter;
+ private final String feedURL;
+ private long id = 0;
+ private String id_prefix;
+ private boolean feedModified = false;
- public boolean isFeedModified() {
- return feedModified;
- }
+ private String[] fieldNames = { "id", "title", "description", "link" };
+ private IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING };
+ private Queue<SyndEntryImpl> rssFeedBuffer = new LinkedList<SyndEntryImpl>();
- public void setFeedModified(boolean feedModified) {
- this.feedModified = feedModified;
- }
+ IAObject[] mutableFields;
- public RSSFeedClient(RSSFeedAdapter adapter, String feedURL,
- String id_prefix) {
- this.adapter = adapter;
- this.feedURL = feedURL;
- this.id_prefix = id_prefix;
- }
+ private final FeedFetcherCache feedInfoCache;
+ private final FeedFetcher fetcher;
+ private final FetcherEventListenerImpl listener;
+ private final URL feedUrl;
+ String[] tupleFieldValues;
- private void initialize(Map<String, String> params) {
- if (params.get(adapter.KEY_INTERVAL) != null) {
- this.timeInterval = Integer.parseInt(params
- .get(adapter.KEY_INTERVAL));
- }
- }
+ public boolean isFeedModified() {
+ return feedModified;
+ }
- @Override
- public boolean next(List<String> feeds) {
- try {
- if (adapter.isStopRequested()) {
- return false;
- }
- if (adapter.isAlterRequested()) {
- initialize(adapter.getAlteredParams());
- adapter.postAlteration();
- }
- Thread.sleep(timeInterval * 1000);
- feeds.clear();
- try {
- getFeed(feeds);
- } catch (Exception te) {
- te.printStackTrace();
- System.out.println("Failed to get feed: " + te.getMessage());
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return true;
- }
+ public void setFeedModified(boolean feedModified) {
+ this.feedModified = feedModified;
+ }
- public String formFeedTuple(Object entry) {
- StringBuilder builder = new StringBuilder();
- builder.append(id_prefix + ":" + id);
- builder.append(delimiter);
- builder.append(((SyndEntryImpl) entry).getTitle());
- builder.append(delimiter);
- builder.append(((SyndEntryImpl) entry).getLink());
- id++;
- return new String(builder);
- }
+ public RSSFeedClient(RSSFeedAdapter adapter, String feedURL, String id_prefix) throws MalformedURLException {
+ this.feedURL = feedURL;
+ this.id_prefix = id_prefix;
+ feedUrl = new URL(feedURL);
+ feedInfoCache = HashMapFeedInfoCache.getInstance();
+ fetcher = new HttpURLFeedFetcher(feedInfoCache);
+ listener = new FetcherEventListenerImpl(this);
+ fetcher.addFetcherEventListener(listener);
- private void getFeed(List<String> feeds) {
- try {
- URL feedUrl = new URL(feedURL);
- FeedFetcherCache feedInfoCache = HashMapFeedInfoCache.getInstance();
- FeedFetcher fetcher = new HttpURLFeedFetcher(feedInfoCache);
+ mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
+ new AMutableString(null) };
+ recordType = new ARecordType("FeedRecordType", fieldNames, fieldTypes, false);
+ mutableRecord = new AMutableRecord(recordType, mutableFields);
+ tupleFieldValues = new String[recordType.getFieldNames().length];
+ }
- FetcherEventListenerImpl listener = new FetcherEventListenerImpl(
- this);
- fetcher.addFetcherEventListener(listener);
- System.err.println("Retrieving feed " + feedUrl);
- // Retrieve the feed.
- // We will get a Feed Polled Event and then a
- // Feed Retrieved event (assuming the feed is valid)
- SyndFeed feed = fetcher.retrieveFeed(feedUrl);
- if (feedModified) {
- System.err.println(feedUrl + " retrieved");
- System.err.println(feedUrl + " has a title: " + feed.getTitle()
- + " and contains " + feed.getEntries().size()
- + " entries.");
+ @Override
+ public boolean setNextRecord() throws Exception {
+ SyndEntryImpl feedEntry = getNextRSSFeed();
+ if (feedEntry == null) {
+ return false;
+ }
+ tupleFieldValues[0] = id_prefix + ":" + id;
+ tupleFieldValues[1] = feedEntry.getTitle();
+ tupleFieldValues[2] = feedEntry.getDescription().getValue();
+ tupleFieldValues[3] = feedEntry.getLink();
+ int numFields = recordType.getFieldNames().length;
+ for (int i = 0; i < numFields; i++) {
+ ((AMutableString) mutableFields[i]).setValue(tupleFieldValues[i]);
+ mutableRecord.setValueAtPos(i, mutableFields[i]);
+ }
+ id++;
+ return true;
+ }
- List fetchedFeeds = feed.getEntries();
- Iterator feedIterator = fetchedFeeds.iterator();
- while (feedIterator.hasNext()) {
- SyndEntryImpl feedEntry = (SyndEntryImpl) feedIterator
- .next();
- String feedContent = formFeedTuple(feedEntry);
- feeds.add(escapeChars(feedContent));
- System.out.println(feedContent);
- }
- }
- } catch (Exception ex) {
- System.out.println("ERROR: " + ex.getMessage());
- ex.printStackTrace();
- }
- }
+ private SyndEntryImpl getNextRSSFeed() throws Exception {
+ if (rssFeedBuffer.isEmpty()) {
+ fetchFeed();
+ }
+ if (rssFeedBuffer.isEmpty()) {
+ return null;
+ } else {
+ return rssFeedBuffer.remove();
+ }
+ }
- private String escapeChars(String content) {
- if (content.contains("\n")) {
- return content.replace("\n", " ");
- }
- return content;
- }
+ private void fetchFeed() {
+ try {
+ System.err.println("Retrieving feed " + feedURL);
+ // Retrieve the feed.
+ // We will get a Feed Polled Event and then a
+ // Feed Retrieved event (assuming the feed is valid)
+ SyndFeed feed = fetcher.retrieveFeed(feedUrl);
+ if (feedModified) {
+ System.err.println(feedUrl + " retrieved");
+ System.err.println(feedUrl + " has a title: " + feed.getTitle() + " and contains "
+ + feed.getEntries().size() + " entries.");
+
+ List fetchedFeeds = feed.getEntries();
+ rssFeedBuffer.addAll(fetchedFeeds);
+ }
+ } catch (Exception ex) {
+ System.out.println("ERROR: " + ex.getMessage());
+ ex.printStackTrace();
+ }
+ }
}
class FetcherEventListenerImpl implements FetcherListener {
- private final IFeedClient feedClient;
+ private final IPullBasedFeedClient feedClient;
- public FetcherEventListenerImpl(IFeedClient feedClient) {
- this.feedClient = feedClient;
- }
+ public FetcherEventListenerImpl(IPullBasedFeedClient feedClient) {
+ this.feedClient = feedClient;
+ }
- /**
- * @see com.sun.syndication.fetcher.FetcherListener#fetcherEvent(com.sun.syndication.fetcher.FetcherEvent)
- */
- public void fetcherEvent(FetcherEvent event) {
- String eventType = event.getEventType();
- if (FetcherEvent.EVENT_TYPE_FEED_POLLED.equals(eventType)) {
- System.err.println("\tEVENT: Feed Polled. URL = "
- + event.getUrlString());
- } else if (FetcherEvent.EVENT_TYPE_FEED_RETRIEVED.equals(eventType)) {
- System.err.println("\tEVENT: Feed Retrieved. URL = "
- + event.getUrlString());
- ((RSSFeedClient) feedClient).setFeedModified(true);
- } else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
- System.err.println("\tEVENT: Feed Unchanged. URL = "
- + event.getUrlString());
- ((RSSFeedClient) feedClient).setFeedModified(true);
- }
- }
+ /**
+ * @see com.sun.syndication.fetcher.FetcherListener#fetcherEvent(com.sun.syndication.fetcher.FetcherEvent)
+ */
+ public void fetcherEvent(FetcherEvent event) {
+ String eventType = event.getEventType();
+ if (FetcherEvent.EVENT_TYPE_FEED_POLLED.equals(eventType)) {
+ System.err.println("\tEVENT: Feed Polled. URL = " + event.getUrlString());
+ } else if (FetcherEvent.EVENT_TYPE_FEED_RETRIEVED.equals(eventType)) {
+ System.err.println("\tEVENT: Feed Retrieved. URL = " + event.getUrlString());
+ ((RSSFeedClient) feedClient).setFeedModified(true);
+ } else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
+ System.err.println("\tEVENT: Feed Unchanged. URL = " + event.getUrlString());
+ ((RSSFeedClient) feedClient).setFeedModified(true);
+ }
+ }
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/TwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/TwitterFeedClient.java
new file mode 100644
index 0000000..7bc1d80
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/TwitterFeedClient.java
@@ -0,0 +1,95 @@
+package edu.uci.ics.asterix.feed.intake;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+import twitter4j.Query;
+import twitter4j.QueryResult;
+import twitter4j.Tweet;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+import twitter4j.TwitterFactory;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.external.dataset.adapter.PullBasedTwitterAdapter;
+import edu.uci.ics.asterix.om.base.AMutableRecord;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class TwitterFeedClient extends PullBasedFeedClient {
+
+ private String keywords;
+ private Query query;
+ private long id = 0;
+ private String id_prefix;
+ private Twitter twitter;
+ private Queue<Tweet> tweetBuffer = new LinkedList<Tweet>();
+
+ private String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
+ private IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
+ BuiltinType.ASTRING };
+ IAObject[] mutableFields;
+ String[] tupleFieldValues;
+
+ public TwitterFeedClient(IHyracksTaskContext ctx, PullBasedTwitterAdapter adapter) {
+ this.id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
+ twitter = new TwitterFactory().getInstance();
+ mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
+ new AMutableString(null), new AMutableString(null) };
+ recordType = new ARecordType("FeedRecordType", fieldNames, fieldTypes, false);
+ recordSerDe = new ARecordSerializerDeserializer(recordType);
+ mutableRecord = new AMutableRecord(recordType, mutableFields);
+ initialize(adapter.getConfiguration());
+ tupleFieldValues = new String[recordType.getFieldNames().length];
+ }
+
+ public void initialize(Map<String, String> params) {
+ this.keywords = params.get(PullBasedTwitterAdapter.QUERY);
+ this.query = new Query(keywords);
+ query.setRpp(100);
+ }
+
+ private Tweet getNextTweet() throws TwitterException, InterruptedException {
+ if (tweetBuffer.isEmpty()) {
+ QueryResult result;
+ Thread.currentThread().sleep(1000 * 10);
+ result = twitter.search(query);
+ tweetBuffer.addAll(result.getTweets());
+ }
+ return tweetBuffer.remove();
+ }
+
+ public ARecordType getRecordType() {
+ return recordType;
+ }
+
+ public AMutableRecord getMutableRecord() {
+ return mutableRecord;
+ }
+
+ @Override
+ public boolean setNextRecord() throws Exception {
+ Tweet tweet;
+ tweet = getNextTweet();
+ if (tweet == null) {
+ return false;
+ }
+ int numFields = recordType.getFieldNames().length;
+
+ tupleFieldValues[0] = id_prefix + ":" + id;
+ tupleFieldValues[1] = tweet.getFromUser();
+ tupleFieldValues[2] = tweet.getLocation() == null ? "" : tweet.getLocation();
+ tupleFieldValues[3] = tweet.getText();
+ tupleFieldValues[4] = tweet.getCreatedAt().toString();
+ for (int i = 0; i < numFields; i++) {
+ ((AMutableString) mutableFields[i]).setValue(tupleFieldValues[i]);
+ mutableRecord.setValueAtPos(i, mutableFields[i]);
+ }
+ id++;
+ return true;
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java
index 1eb1079..7dd9ea4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java
@@ -14,9 +14,7 @@
*/
package edu.uci.ics.asterix.feed.managed.adapter;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
-
-public interface IManagedFeedAdapter extends IDatasourceReadAdapter {
+public interface IManagedFeedAdapter {
public enum OperationState {
SUSPENDED,
@@ -29,11 +27,9 @@
STOPPED, INACTIVE
}
- public void beforeSuspend() throws Exception;
+ public void suspend() throws Exception;
- public void beforeResume() throws Exception;
-
- public void beforeStop() throws Exception;
+ public void resume() throws Exception;
public void stop() throws Exception;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorDescriptor.java
index 93b8fc5..8a42903 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorDescriptor.java
@@ -16,7 +16,11 @@
import java.util.Map;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.adapter.factory.IDatasourceAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.IDatasourceAdapterFactory.AdapterType;
+import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasourceAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasourceAdapterFactory;
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.feed.mgmt.FeedId;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
@@ -31,18 +35,18 @@
public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private final String adapter;
+ private final String adapterFactoryClassName;
private final Map<String, String> adapterConfiguration;
private final IAType atype;
private final FeedId feedId;
- private transient IDatasourceReadAdapter datasourceReadAdapter;
+ private transient IDatasourceAdapterFactory datasourceAdapterFactory;
public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
Map<String, String> arguments, ARecordType atype, RecordDescriptor rDesc) {
super(spec, 1, 1);
recordDescriptors[0] = rDesc;
- this.adapter = adapter;
+ this.adapterFactoryClassName = adapter;
this.adapterConfiguration = arguments;
this.atype = atype;
this.feedId = feedId;
@@ -51,15 +55,20 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
-
+ IDatasourceAdapter adapter;
try {
- datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(adapter).newInstance();
- datasourceReadAdapter.configure(adapterConfiguration, atype);
- datasourceReadAdapter.initialize(ctx);
-
+ datasourceAdapterFactory = (IDatasourceAdapterFactory) Class.forName(adapterFactoryClassName).newInstance();
+ if (datasourceAdapterFactory.getAdapterType().equals(AdapterType.GENERIC)) {
+ adapter = ((IGenericDatasourceAdapterFactory) datasourceAdapterFactory).createAdapter(
+ adapterConfiguration, atype);
+ } else {
+ adapter = ((ITypedDatasourceAdapterFactory) datasourceAdapterFactory)
+ .createAdapter(adapterConfiguration);
+ }
+ adapter.initialize(ctx);
} catch (Exception e) {
throw new HyracksDataException("initialization of adapter failed", e);
}
- return new FeedIntakeOperatorNodePushable(feedId, datasourceReadAdapter, partition);
+ return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorNodePushable.java
index 4603208..dbac804 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorNodePushable.java
@@ -3,29 +3,27 @@
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
-import edu.uci.ics.asterix.external.data.parser.IManagedDataParser;
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.feed.comm.AlterFeedMessage;
+import edu.uci.ics.asterix.feed.comm.IFeedMessage;
import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
import edu.uci.ics.asterix.feed.mgmt.FeedId;
import edu.uci.ics.asterix.feed.mgmt.FeedSystemProvider;
import edu.uci.ics.asterix.feed.mgmt.IFeedManager;
-import edu.uci.ics.asterix.feed.comm.AlterFeedMessage;
-import edu.uci.ics.asterix.feed.comm.IFeedMessage;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class FeedIntakeOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
- private final IDatasourceReadAdapter adapter;
+ private final IDatasourceAdapter adapter;
private final int partition;
private final IFeedManager feedManager;
private final FeedId feedId;
private final LinkedBlockingQueue<IFeedMessage> inbox;
private FeedInboxMonitor feedInboxMonitor;
- public FeedIntakeOperatorNodePushable(FeedId feedId, IDatasourceReadAdapter adapter, int partition) {
+ public FeedIntakeOperatorNodePushable(FeedId feedId, IDatasourceAdapter adapter, int partition) {
this.adapter = adapter;
this.partition = partition;
this.feedManager = (IFeedManager) FeedSystemProvider.getFeedManager();
@@ -40,7 +38,7 @@
feedManager.registerFeedOperatorMsgQueue(feedId, inbox);
writer.open();
try {
- adapter.getDataParser(partition).parse(writer);
+ adapter.start(partition, writer);
} catch (Exception e) {
throw new HyracksDataException("exception during reading from external data source", e);
} finally {
@@ -61,7 +59,6 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
// TODO Auto-generated method stub
-
}
}
@@ -69,12 +66,10 @@
private LinkedBlockingQueue<IFeedMessage> inbox;
private final IManagedFeedAdapter adapter;
- private final int partition;
public FeedInboxMonitor(IManagedFeedAdapter adapter, LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
this.inbox = inbox;
this.adapter = adapter;
- this.partition = partition;
}
@Override
@@ -84,13 +79,13 @@
IFeedMessage feedMessage = inbox.take();
switch (feedMessage.getMessageType()) {
case SUSPEND:
- ((IManagedDataParser) adapter.getDataParser(partition)).getManagedTupleParser().suspend();
+ adapter.suspend();
break;
case RESUME:
- ((IManagedDataParser) adapter.getDataParser(partition)).getManagedTupleParser().resume();
+ adapter.resume();
break;
case STOP:
- ((IManagedDataParser) adapter.getDataParser(partition)).getManagedTupleParser().stop();
+ adapter.stop();
break;
case ALTER:
((IMutableFeedAdapter) adapter).alter(((AlterFeedMessage) feedMessage).getAlteredConfParams());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 26521fc..91b5685 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -23,9 +23,12 @@
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
-import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.adapter.factory.IDatasourceAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.IDatasourceAdapterFactory.AdapterType;
+import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasourceAdapterFactory;
+import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasourceAdapterFactory;
import edu.uci.ics.asterix.external.data.operator.ExternalDataScanOperatorDescriptor;
+import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.feed.comm.IFeedMessage;
import edu.uci.ics.asterix.feed.mgmt.FeedId;
import edu.uci.ics.asterix.feed.operator.FeedIntakeOperatorDescriptor;
@@ -190,12 +193,20 @@
throw new AlgebricksException("Can only scan datasets of records.");
}
- IDatasourceReadAdapter adapter;
+ IDatasourceAdapterFactory adapterFactory;
+ IDatasourceAdapter adapter;
try {
- adapter = (IDatasourceReadAdapter) Class.forName(decl.getAdapter()).newInstance();
+ adapterFactory = (IDatasourceAdapterFactory) Class.forName(decl.getAdapter()).newInstance();
+ if (adapterFactory.getAdapterType().equals(AdapterType.GENERIC)) {
+ adapter = ((IGenericDatasourceAdapterFactory) adapterFactory).createAdapter(decl.getProperties(),
+ itemType);
+ } else {
+ adapter = ((ITypedDatasourceAdapterFactory) adapterFactory).createAdapter(decl.getProperties());
+ }
+
} catch (Exception e) {
e.printStackTrace();
- throw new AlgebricksException("unable to load the adapter class " + e);
+ throw new AlgebricksException("unable to load the adapter factory class " + e);
}
if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
@@ -204,19 +215,11 @@
}
ARecordType rt = (ARecordType) itemType;
- try {
- adapter.configure(decl.getProperties(), itemType);
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("unable to configure the datasource adapter " + e);
- }
-
ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
decl.getAdapter(), decl.getProperties(), rt, scannerDesc);
- dataScanner.setDatasourceAdapter(adapter);
AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
}
@@ -251,27 +254,26 @@
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Can only consume records.");
}
+ IDatasourceAdapterFactory adapterFactory;
IDatasourceAdapter adapter;
try {
- adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter()).newInstance();
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("unable to load the adapter class " + e);
- }
+ adapterFactory = (IDatasourceAdapterFactory) Class.forName(decl.getAdapter()).newInstance();
- ARecordType rt = (ARecordType) itemType;
- try {
- adapter.configure(decl.getProperties(), itemType);
+ if (adapterFactory.getAdapterType().equals(AdapterType.GENERIC)) {
+ adapter = ((IGenericDatasourceAdapterFactory) adapterFactory).createAdapter(decl.getProperties(),
+ itemType);
+ } else {
+ adapter = ((ITypedDatasourceAdapterFactory) adapterFactory).createAdapter(decl.getProperties());
+ }
} catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("unable to configure the datasource adapter " + e);
+ throw new AlgebricksException("unable to load the adapter factry class " + e);
}
ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(dataverse,
- dataset), decl.getAdapter(), decl.getProperties(), rt, feedDesc);
+ dataset), decl.getAdapter(), decl.getProperties(), (ARecordType) itemType, feedDesc);
AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedIngestor, constraint);