Added asterix project
git-svn-id: https://asterixdb.googlecode.com/svn/trunk/asterix@12 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
new file mode 100644
index 0000000..67e89af
--- /dev/null
+++ b/asterix-external-data/pom.xml
@@ -0,0 +1,150 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>asterix</artifactId>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <version>0.0.4-SNAPSHOT</version>
+ </parent>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-external-data</artifactId>
+ <version>0.0.4-SNAPSHOT</version>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>appassembler-maven-plugin</artifactId>
+ <version>1.0</version>
+ <executions>
+ <execution>
+ <configuration>
+ <programs>
+ <program>
+ <mainClass>edu.uci.ics.asterix.drivers.AsterixWebServer</mainClass>
+ <name>asterix-web</name>
+ </program>
+ <program>
+ <mainClass>edu.uci.ics.asterix.drivers.AsterixClientDriver</mainClass>
+ <name>asterix-cmd</name>
+ </program>
+ </programs>
+ <repositoryLayout>flat</repositoryLayout>
+ <repositoryName>lib</repositoryName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>assemble</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.7.2</version>
+ <configuration>
+ <!--
+ doesn't work from m2eclipse, currently
+ <additionalClasspathElements>
+ <additionalClasspathElement>${basedir}/src/main/resources</additionalClasspathElement>
+ </additionalClasspathElements>
+ -->
+ <forkMode>pertest</forkMode>
+ <argLine>-enableassertions -Xmx${test.heap.size}m -Dfile.encoding=UTF-8
+ -Djava.util.logging.config.file=src/test/resources/logging.properties</argLine>
+ <includes>
+ <include>**/*TestSuite.java</include>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <version>2.5</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-cc</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-om</artifactId>
+ <version>0.0.4-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-runtime</artifactId>
+ <version>0.0.4-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.kenai.nbpwr</groupId>
+ <artifactId>org-apache-commons-io</artifactId>
+ <version>1.3.1-201002241208</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-core</artifactId>
+ <version>2.2.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.java.dev.rome</groupId>
+ <artifactId>rome-fetcher</artifactId>
+ <version>1.0.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>rome</groupId>
+ <artifactId>rome</artifactId>
+ <version>1.0.1-modified-01</version>
+ </dependency>
+<dependency>
+ <groupId>jdom</groupId>
+ <artifactId>jdom</artifactId>
+ <version>1.0</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/AbstractDataListeningProperty.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/AbstractDataListeningProperty.java
new file mode 100644
index 0000000..88ce33c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/AbstractDataListeningProperty.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.datasource.data.listener;
+
+/**
+ * A push-based datasource adapter allows registering a IDataListener instance.
+ * Data listening property defines when data is pushed to a IDataListener.
+ */
+
+public abstract class AbstractDataListeningProperty {
+
+ /**
+ * COUNT_BASED: Data is pushed to a data listener only if the count of
+ * records exceeds the configured threshold value. TIME_BASED: Data is
+ * pushed to a data listener in a periodic manner at the end of each time
+ * interval.
+ */
+ public enum listeningPropertyType {
+ COUNT_BASED,
+ TIME_BASED
+ }
+
+ public abstract listeningPropertyType getListeningPropretyType();
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/CountBasedDataListeningProperty.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/CountBasedDataListeningProperty.java
new file mode 100644
index 0000000..0eb42dc
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/CountBasedDataListeningProperty.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.datasource.data.listener;
+
+/**
+ * A data listening property chosen by a data listener when it wants data to be
+ * pushed when the count of records collected by the adapter exceeds a confiured
+ * count value.
+ */
+public class CountBasedDataListeningProperty extends AbstractDataListeningProperty {
+
+ int numTuples;
+
+ public int getNumTuples() {
+ return numTuples;
+ }
+
+ public void setNumTuples(int numTuples) {
+ this.numTuples = numTuples;
+ }
+
+ public CountBasedDataListeningProperty(int numTuples) {
+ this.numTuples = numTuples;
+ }
+
+ @Override
+ public listeningPropertyType getListeningPropretyType() {
+ return listeningPropertyType.COUNT_BASED;
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/IDataListener.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/IDataListener.java
new file mode 100644
index 0000000..269f060
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/IDataListener.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.datasource.data.listener;
+
+import java.nio.ByteBuffer;
+
+/**
+ * An interface providing a call back API for a subscriber interested in data
+ * received from an external data source via the datasource adapter.
+ */
+public interface IDataListener {
+
+ /**
+ * This method is a call back API and is invoked by an instance of
+ * IPushBasedDatasourceReadAdapter. The caller passes a frame containing new
+ * data. The protocol as to when the caller shall invoke this method is
+ * decided by the configured @see DataListenerProperty .
+ *
+ * @param aObjects
+ */
+
+ public void dataReceived(ByteBuffer frame);
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/TimeBasedDataListeningProperty.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/TimeBasedDataListeningProperty.java
new file mode 100644
index 0000000..a11cf94
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/datasource/data/listener/TimeBasedDataListeningProperty.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.datasource.data.listener;
+
+/**
+ * A data listening property chosen by a data listener when it needs data to be
+ * pushed in a periodic manner with a configured time-interval.
+ */
+public class TimeBasedDataListeningProperty extends AbstractDataListeningProperty {
+
+ // time interval in secs
+ int interval;
+
+ public int getInteval() {
+ return interval;
+ }
+
+ public void setInterval(int interval) {
+ this.interval = interval;
+ }
+
+ public TimeBasedDataListeningProperty(int interval) {
+ this.interval = interval;
+ }
+
+ @Override
+ public listeningPropertyType getListeningPropretyType() {
+ return listeningPropertyType.TIME_BASED;
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceAdapter.java
new file mode 100644
index 0000000..1baf149
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceAdapter.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.adapter.api;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * A super interface implemented by a data source adapter. An adapter can be a
+ * pull based or push based. This interface provides all common APIs that need
+ * to be implemented by each adapter irrespective of the the kind of
+ * adapter(pull or push).
+ */
+public interface IDatasourceAdapter extends Serializable {
+
+ /**
+ * Represents the kind of data exchange that happens between the adapter and
+ * the external data source. The data exchange can be either pull based or
+ * push based. In the former case (pull), the request for data transfer is
+ * initiated by the adapter. In the latter case (push) the adapter is
+ * required to submit an initial request to convey intent for data.
+ * Subsequently all data transfer requests are initiated by the external
+ * data source.
+ */
+ public enum AdapterDataFlowType {
+ PULL,
+ PUSH
+ }
+
+ /**
+ * An adapter can be used to read from an external data source and may also
+ * allow writing to the external data source. This enum type indicates the
+ * kind of operations supported by the adapter.
+ *
+ * @caller Compiler uses this method to assert the validity of an operation
+ * on an external dataset. The type of adapter associated with an
+ * external dataset determines the set of valid operations allowed
+ * on the dataset.
+ */
+ public enum AdapterType {
+ READ,
+ WRITE,
+ READ_WRITE
+ }
+
+ /**
+ * An adapter can be a pull or a push based adapter. This method returns the
+ * kind of adapter, that is whether it is a pull based adapter or a push
+ * based adapter.
+ *
+ * @caller Compiler or wrapper operator: Compiler uses this API to choose
+ * the right wrapper (push-based) operator that wraps around the
+ * adapter and provides an iterator interface. If we decide to form
+ * a single operator that handles both pull and push based adapter
+ * kinds, then this method will be used by the wrapper operator for
+ * switching between the logic for interacting with a pull based
+ * adapter versus a push based adapter.
+ * @return AdapterDataFlowType
+ */
+ public AdapterDataFlowType getAdapterDataFlowType();
+
+ /**
+ * Returns the type of adapter indicating if the adapter can be used for
+ * reading from an external data source or writing to an external data
+ * source or can be used for both purposes.
+ *
+ * @Caller: Compiler: The compiler uses this API to verify if an operation
+ * is supported by the adapter. For example, an write query against
+ * an external dataset will not compile successfully if the
+ * external dataset was declared with a read_only adapter.
+ * @see AdapterType
+ * @return
+ */
+ public AdapterType getAdapterType();
+
+ /**
+ * Each adapter instance is configured with a set of parameters that are
+ * key-value pairs. When creating an external or a feed dataset, an adapter
+ * instance is used in conjunction with a set of configuration parameters
+ * for the adapter instance. The configuration parameters are stored
+ * internally with the adapter and can be retrieved using this API.
+ *
+ * @param propertyKey
+ * @return String the value corresponding to the configuration parameter
+ * represented by the key- attributeKey.
+ */
+ public String getAdapterProperty(String propertyKey);
+
+ /**
+ * Allows setting a configuration property of the adapter with a specified
+ * value.
+ *
+ * @caller Used by the wrapper operator to modify the behavior of the
+ * adapter, if required.
+ * @param property
+ * the property to be set
+ * @param value
+ * the value for the property
+ */
+ public void setAdapterProperty(String property, String value);
+
+ /**
+ * Configures the IDatasourceAdapter instance.
+ *
+ * @caller Scenario 1) Called during compilation of DDL statement that
+ * creates a Feed dataset and associates the adapter with the
+ * dataset. The (key,value) configuration parameters provided as
+ * part of the DDL statement are collected by the compiler and
+ * passed on to this method. The adapter may as part of
+ * configuration connect with the external data source and determine
+ * the IAType associated with data residing with the external
+ * datasource.
+ * Scenario 2) An adapter instance is created by an ASTERIX operator
+ * that wraps around the adapter instance. The operator, as part of
+ * its initialization invokes the configure method. The (key,value)
+ * configuration parameters are passed on to the operator by the
+ * compiler. Subsequent to the invocation, the wrapping operator
+ * obtains the partition constraints (if any). In addition, in the
+ * case of a read adapter, the wrapping operator obtains the output
+ * ASTERIX type associated with the data that will be output from
+ * the adapter.
+ * @param arguments
+ * A map with key-value pairs that contains the configuration
+ * parameters for the adapter. The arguments are obtained from
+ * the metadata. Recall that the DDL to create an external
+ * dataset or a feed dataset requires using an adapter and
+ * providing all arguments as a set of (key,value) pairs. These
+ * arguments are put into the metadata.
+ */
+ public void configure(Map<String, String> arguments, IAType atype) throws Exception;
+
+ /**
+ * Returns a list of partition constraints. A partition constraint can be a
+ * requirement to execute at a particular location or could be cardinality
+ * constraints indicating the number of instances that need to run in
+ * parallel. example, a IDatasourceAdapter implementation written for data
+ * residing on the local file system of a node cannot run on any other node
+ * and thus has a location partition constraint. The location partition
+ * constraint can be expressed as a node IP address or a node controller id.
+ * In the former case, the IP address is translated to a node controller id
+ * running on the node with the given IP address.
+ *
+ * @Caller The wrapper operator configures its partition constraints from
+ * the constraints obtained from the adapter.
+ */
+ public AlgebricksPartitionConstraint getPartitionConstraint();
+
+ /**
+ * Allows the adapter to establish connection with the external data source
+ * expressing intent for data and providing any configuration parameters
+ * required by the external data source for the transfer of data. This
+ * method does not result in any data transfer, but is a prerequisite for
+ * any subsequent data transfer to happen between the external data source
+ * and the adapter.
+ *
+ * @caller This method is called by the wrapping ASTERIX operator that
+ * @param ctx
+ * @throws Exception
+ */
+ public void initialize(IHyracksTaskContext ctx) throws Exception;
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceReadAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceReadAdapter.java
new file mode 100644
index 0000000..737dde0
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceReadAdapter.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.adapter.api;
+
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+
+public interface IDatasourceReadAdapter extends IDatasourceAdapter {
+
+ /**
+ * Retrieves data from an external datasource, packs it in frames and uses a
+ * frame writer to flush the frames to a recipient operator.
+ *
+ * @param partition
+ * Multiple instances of the adapter can be configured to
+ * retrieve data in parallel. Partition is an integer between 0
+ * to N-1 where N is the number of parallel adapter instances.
+ * The partition value helps configure a particular instance of
+ * the adapter to fetch data.
+ * @param writer
+ * An instance of IFrameWriter that is used to flush frames to
+ * the recipient operator
+ * @throws Exception
+ */
+
+ public IDataParser getDataParser(int partition) throws Exception;
+
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceWriteAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceWriteAdapter.java
new file mode 100644
index 0000000..3cd1464
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/adapter/api/IDatasourceWriteAdapter.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.adapter.api;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import edu.uci.ics.asterix.om.types.IAType;
+
+public interface IDatasourceWriteAdapter extends IDatasourceAdapter {
+
+ /**
+ * Flushes tuples contained in the frame to the dataset stored in an
+ * external data source. If required, the content of the frame is converted
+ * into an appropriate format as required by the external data source.
+ *
+ * @caller This method is invoked by the wrapping ASTERIX operator when data
+ * needs to be written to the external data source.
+ * @param sourceAType
+ * The type associated with the data that is required to be
+ * written
+ * @param frame
+ * the frame that needs to be flushed
+ * @param datasourceSpecificParams
+ * A map containing other parameters that are specific to the
+ * target data source where data is to be written. For example
+ * when writing to a data source such as HDFS, an optional
+ * parameter is the replication factor.
+ * @throws Exception
+ */
+ public void flush(IAType sourceAType, ByteBuffer frame, Map<String, String> datasourceSpecificParams)
+ throws Exception;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
new file mode 100644
index 0000000..cf0d4f3
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.operator;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class ExternalDataScanOperatorDescriptor extends
+ AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private final String adapter;
+ private final Map<String, String> adapterConfiguration;
+ private final IAType atype;
+
+ private transient IDatasourceReadAdapter datasourceReadAdapter;
+
+ public ExternalDataScanOperatorDescriptor(JobSpecification spec,
+ String adapter, Map<String, String> arguments, IAType atype,
+ RecordDescriptor rDesc) {
+ super(spec, 0, 1);
+ recordDescriptors[0] = rDesc;
+ this.adapter = adapter;
+ this.adapterConfiguration = arguments;
+ this.atype = atype;
+ }
+
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition,
+ int nPartitions) throws HyracksDataException {
+
+ try {
+ datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(
+ adapter).newInstance();
+ datasourceReadAdapter.configure(adapterConfiguration, atype);
+ datasourceReadAdapter.initialize(ctx);
+ } catch (Exception e) {
+ throw new HyracksDataException("initialization of adapter failed",
+ e);
+ }
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ writer.open();
+ try {
+ datasourceReadAdapter.getDataParser(partition)
+ .parse(writer);
+ } catch (Exception e) {
+ throw new HyracksDataException(
+ "exception during reading from external data source",
+ e);
+ } finally {
+ writer.close();
+ }
+ }
+ };
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ADMStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ADMStreamParser.java
new file mode 100644
index 0000000..658645c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ADMStreamParser.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ADMStreamParser extends AbstractStreamDataParser {
+
+ public ADMStreamParser() {
+ }
+
+ @Override
+ public void initialize(ARecordType atype, IHyracksTaskContext ctx) {
+ tupleParser = new AdmSchemafullRecordParserFactory(atype).createTupleParser(ctx);
+ }
+
+ @Override
+ public void parse(IFrameWriter writer) throws HyracksDataException {
+ tupleParser.parse(inputStream, writer);
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) {
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/AbstractStreamDataParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/AbstractStreamDataParser.java
new file mode 100644
index 0000000..403f197
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/AbstractStreamDataParser.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.io.InputStream;
+import java.util.HashMap;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+
+public abstract class AbstractStreamDataParser implements IDataStreamParser {
+
+ public static final String KEY_DELIMITER = "delimiter";
+
+ protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
+
+ static {
+ typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+ }
+
+ protected ITupleParser tupleParser;
+ protected IFrameWriter frameWriter;
+ protected InputStream inputStream;
+
+ @Override
+ public abstract void initialize(ARecordType recordType, IHyracksTaskContext ctx);
+
+ @Override
+ public abstract void parse(IFrameWriter frameWriter) throws HyracksDataException;
+
+ @Override
+ public void setInputStream(InputStream in) {
+ inputStream = in;
+
+ }
+
+ @Override
+ public InputStream getInputStream() {
+ return inputStream;
+ }
+
+
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/DelimitedDataStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/DelimitedDataStreamParser.java
new file mode 100644
index 0000000..11c70ac8
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/DelimitedDataStreamParser.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class DelimitedDataStreamParser extends AbstractStreamDataParser {
+
+ protected Character delimiter = defaultDelimiter;
+
+ protected static final Character defaultDelimiter = new Character('\n');
+
+ public Character getDelimiter() {
+ return delimiter;
+ }
+
+ public DelimitedDataStreamParser(Character delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ public DelimitedDataStreamParser() {
+ }
+
+ @Override
+ public void initialize(ARecordType recordType, IHyracksTaskContext ctx) {
+ int n = recordType.getFieldTypes().length;
+ IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
+ for (int i = 0; i < n; i++) {
+ ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
+ IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
+ if (vpf == null) {
+ throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
+ }
+ fieldParserFactories[i] = vpf;
+ }
+ tupleParser = new NtDelimitedDataTupleParserFactory(recordType, fieldParserFactories, delimiter)
+ .createTupleParser(ctx);
+ }
+
+ @Override
+ public void parse(IFrameWriter writer) throws HyracksDataException {
+ tupleParser.parse(inputStream, writer);
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) {
+ String delimiterArg = configuration.get(KEY_DELIMITER);
+ if (delimiterArg != null) {
+ delimiter = delimiterArg.charAt(0);
+ } else {
+ delimiter = '\n';
+ }
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataParser.java
new file mode 100644
index 0000000..eb7daf7
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataParser.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Represents an parser that processes an input stream to form records of a
+ * given type. The parser creates frames that are flushed using a frame writer.
+ */
+public interface IDataParser {
+
+ /**
+ * @param atype
+ * The record type associated with each record output by the
+ * parser
+ * @param configuration
+ * Any configuration parameters for the parser
+ */
+ public void configure(Map<String, String> configuration);
+
+ /**
+ * Initializes the instance. An implementation may use the passed-in
+ * configuration parameters, the output record type to initialize itself so
+ * that it can parse an input stream to form records of the given type.
+ *
+ * @param configuration
+ * Any configuration parameters for the parser
+ * @param ctx
+ * The runtime HyracksStageletContext.
+ */
+ public void initialize(ARecordType recordType, IHyracksTaskContext ctx);
+
+ /**
+ * Parses the input stream to produce records of the configured type and
+ * uses the frame writer instance to flush frames containing the produced
+ * records.
+ *
+ * @param in
+ * The source input stream
+ * @param frameWriter
+ * A frame writer instance that is used for flushing frames to
+ * the recipient operator
+ * @throws HyracksDataException
+ */
+ public void parse(IFrameWriter frameWriter) throws HyracksDataException;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataStreamParser.java
new file mode 100644
index 0000000..1425707
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IDataStreamParser.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.io.InputStream;
+
+public interface IDataStreamParser extends IDataParser {
+
+ public void setInputStream(InputStream in);
+
+ public InputStream getInputStream();
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedDataParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedDataParser.java
new file mode 100644
index 0000000..7c9bb7d
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedDataParser.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+
+public interface IManagedDataParser extends IDataParser {
+
+ public IManagedTupleParser getManagedTupleParser();
+
+ public void setAdapter(IManagedFeedAdapter adapter);
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedTupleParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedTupleParser.java
new file mode 100644
index 0000000..14f6372
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/IManagedTupleParser.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+
+public interface IManagedTupleParser extends ITupleParser {
+
+ public void suspend() throws Exception;
+
+ public void resume() throws Exception;
+
+ public void stop() throws Exception;
+
+ public void alter(Map<String,String> alterParams) throws Exception;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmRecordParserFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmRecordParserFactory.java
new file mode 100644
index 0000000..3d31489
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmRecordParserFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+
+public class ManagedAdmRecordParserFactory extends AdmSchemafullRecordParserFactory {
+
+ private final IManagedFeedAdapter adapter;
+
+ public ManagedAdmRecordParserFactory(ARecordType recType, IManagedFeedAdapter adapter) {
+ super(recType);
+ this.adapter = adapter;
+ }
+
+ @Override
+ public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
+ return new ManagedAdmTupleParser(ctx, recType, adapter);
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmStreamParser.java
new file mode 100644
index 0000000..bfe9fe0
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmStreamParser.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class ManagedAdmStreamParser extends ADMStreamParser implements IManagedDataParser{
+
+ private IManagedFeedAdapter adapter;
+
+ @Override
+ public void initialize(ARecordType atype, IHyracksTaskContext ctx) {
+ tupleParser = new ManagedAdmRecordParserFactory(atype, adapter).createTupleParser(ctx);
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration) {
+
+ }
+
+ @Override
+ public IManagedTupleParser getManagedTupleParser() {
+ return (IManagedTupleParser)tupleParser;
+ }
+
+ @Override
+ public void setAdapter(IManagedFeedAdapter adapter) {
+ this.adapter = adapter;
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmTupleParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmTupleParser.java
new file mode 100644
index 0000000..b7215a3
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedAdmTupleParser.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.adm.parser.nontagged.AdmLexer;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter.OperationState;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AdmTupleParser;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+
+public class ManagedAdmTupleParser extends AdmTupleParser implements IManagedTupleParser {
+
+ private OperationState state;
+ private List<OperationState> nextState;
+ private final IManagedFeedAdapter adapter;
+ private long tupleInterval;
+
+ public static final String TUPLE_INTERVAL_KEY = "tuple-interval";
+
+ public ManagedAdmTupleParser(IHyracksTaskContext ctx, ARecordType recType, IManagedFeedAdapter adapter) {
+ super(ctx, recType);
+ nextState = new ArrayList<OperationState>();
+ this.adapter = adapter;
+ this.tupleInterval = adapter.getAdapterProperty(TUPLE_INTERVAL_KEY) == null ? 0 : Long.parseLong(adapter
+ .getAdapterProperty(TUPLE_INTERVAL_KEY));
+ }
+
+ public ManagedAdmTupleParser(IHyracksTaskContext ctx, ARecordType recType, long tupleInterval,
+ IManagedFeedAdapter adapter) {
+ super(ctx, recType);
+ nextState = new ArrayList<OperationState>();
+ this.adapter = adapter;
+ this.tupleInterval = tupleInterval;
+ }
+
+ @Override
+ public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+ admLexer = new AdmLexer(in);
+ appender.reset(frame, true);
+ int tupleNum = 0;
+ try {
+ while (true) {
+ tb.reset();
+ if (!parseAdmInstance(recType, true, dos)) {
+ break;
+ }
+ tb.addFieldEndOffset();
+ processNextTuple(nextState.isEmpty() ? null : nextState.get(0), writer);
+ Thread.currentThread().sleep(tupleInterval);
+ tupleNum++;
+ }
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ } catch (AsterixException ae) {
+ throw new HyracksDataException(ae);
+ } catch (IOException ioe) {
+ throw new HyracksDataException(ioe);
+ } catch (InterruptedException ie) {
+ throw new HyracksDataException(ie);
+ }
+ }
+
+ private void addTupleToFrame(IFrameWriter writer, boolean forceFlush) throws HyracksDataException {
+ boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ if (!success) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+
+ if (forceFlush) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+
+ }
+
+ private void processNextTuple(OperationState feedState, IFrameWriter writer) throws HyracksDataException {
+ try {
+ if (feedState != null) {
+ switch (state) {
+ case SUSPENDED:
+ suspendOperation(writer);
+ break;
+ case STOPPED:
+ stopOperation(writer);
+ break;
+ }
+ } else {
+ addTupleToFrame(writer, false);
+ }
+ } catch (HyracksDataException hde) {
+ throw hde;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void suspendOperation(IFrameWriter writer) throws HyracksDataException, Exception {
+ nextState.remove(0);
+ addTupleToFrame(writer, false);
+ adapter.beforeSuspend();
+ synchronized (this) {
+ this.wait();
+ adapter.beforeResume();
+ }
+ }
+
+ private void stopOperation(IFrameWriter writer) throws HyracksDataException, Exception {
+ nextState.remove(0);
+ addTupleToFrame(writer, true);
+ adapter.beforeStop();
+ writer.close();
+ }
+
+ @Override
+ public void suspend() throws Exception {
+ nextState.add(OperationState.SUSPENDED);
+ }
+
+ @Override
+ public void resume() throws Exception {
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ nextState.add(OperationState.STOPPED);
+ }
+
+ @Override
+ public void alter(Map<String, String> alterParams) throws Exception {
+ if (alterParams.get(TUPLE_INTERVAL_KEY) != null) {
+ tupleInterval = Long.parseLong(alterParams.get(TUPLE_INTERVAL_KEY));
+ }
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataRecordParserFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataRecordParserFactory.java
new file mode 100644
index 0000000..37a7162
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataRecordParserFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+
+public class ManagedDelimitedDataRecordParserFactory extends NtDelimitedDataTupleParserFactory {
+
+ private final IManagedFeedAdapter adapter;
+
+ public ManagedDelimitedDataRecordParserFactory(IValueParserFactory[] fieldParserFactories, char fieldDelimiter,
+ ARecordType recType, IManagedFeedAdapter adapter) {
+ super(recType, fieldParserFactories, fieldDelimiter);
+ this.adapter = adapter;
+ }
+
+ @Override
+ public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
+ return new ManagedDelimitedDataTupleParser(ctx, recordType, adapter, valueParserFactories, fieldDelimiter);
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataStreamParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataStreamParser.java
new file mode 100644
index 0000000..f38a95b
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataStreamParser.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class ManagedDelimitedDataStreamParser extends DelimitedDataStreamParser implements IManagedDataParser {
+
+ private IManagedFeedAdapter adapter;
+
+ @Override
+ public void initialize(ARecordType recordType, IHyracksTaskContext ctx) {
+ int n = recordType.getFieldTypes().length;
+ IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
+ for (int i = 0; i < n; i++) {
+ ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
+ IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
+ if (vpf == null) {
+ throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
+ }
+ fieldParserFactories[i] = vpf;
+ }
+ tupleParser = new ManagedDelimitedDataRecordParserFactory(fieldParserFactories, delimiter.charValue(),
+ recordType, adapter).createTupleParser(ctx);
+ }
+
+ @Override
+ public IManagedTupleParser getManagedTupleParser() {
+ return (IManagedTupleParser) tupleParser;
+ }
+
+ @Override
+ public void setAdapter(IManagedFeedAdapter adapter) {
+ this.adapter = adapter;
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataTupleParser.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataTupleParser.java
new file mode 100644
index 0000000..325eff4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/parser/ManagedDelimitedDataTupleParser.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.data.parser;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.builders.RecordBuilder;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter.OperationState;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataTupleParser;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParser;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+
+public class ManagedDelimitedDataTupleParser extends DelimitedDataTupleParser implements IManagedTupleParser {
+
+ private List<OperationState> nextState;
+ private IManagedFeedAdapter adapter;
+ private long tupleInterval;
+
+ public static final String TUPLE_INTERVAL_KEY = "tuple-interval";
+
+ public ManagedDelimitedDataTupleParser(IHyracksTaskContext ctx, ARecordType recType, IManagedFeedAdapter adapter,
+ IValueParserFactory[] valueParserFactories, char fieldDelimter) {
+ super(ctx, recType, valueParserFactories, fieldDelimter);
+ this.adapter = adapter;
+ nextState = new ArrayList<OperationState>();
+ tupleInterval = adapter.getAdapterProperty(TUPLE_INTERVAL_KEY) == null ? 0 : Long.parseLong(adapter
+ .getAdapterProperty(TUPLE_INTERVAL_KEY));
+ }
+
+ @Override
+ public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
+ try {
+ IValueParser[] valueParsers = new IValueParser[valueParserFactories.length];
+ for (int i = 0; i < valueParserFactories.length; ++i) {
+ valueParsers[i] = valueParserFactories[i].createValueParser();
+ }
+
+ appender.reset(frame, true);
+ tb = new ArrayTupleBuilder(1);
+ recDos = tb.getDataOutput();
+
+ ArrayBackedValueStorage fieldValueBuffer = new ArrayBackedValueStorage();
+ DataOutput fieldValueBufferOutput = fieldValueBuffer.getDataOutput();
+ IARecordBuilder recBuilder = new RecordBuilder();
+ recBuilder.reset(recType);
+ recBuilder.init();
+
+ int n = recType.getFieldNames().length;
+ byte[] fieldTypeTags = new byte[n];
+ for (int i = 0; i < n; i++) {
+ ATypeTag tag = recType.getFieldTypes()[i].getTypeTag();
+ fieldTypeTags[i] = tag.serialize();
+ }
+
+ int[] fldIds = new int[n];
+ ArrayBackedValueStorage[] nameBuffers = new ArrayBackedValueStorage[n];
+ AMutableString str = new AMutableString(null);
+ for (int i = 0; i < n; i++) {
+ String name = recType.getFieldNames()[i];
+ fldIds[i] = recBuilder.getFieldId(name);
+ if (fldIds[i] < 0) {
+ if (!recType.isOpen()) {
+ throw new HyracksDataException("Illegal field " + name + " in closed type " + recType);
+ } else {
+ nameBuffers[i] = new ArrayBackedValueStorage();
+ fieldNameToBytes(name, str, nameBuffers[i]);
+ }
+ }
+ }
+
+ FieldCursor cursor = new FieldCursor(new InputStreamReader(in));
+ while (cursor.nextRecord()) {
+ tb.reset();
+ recBuilder.reset(recType);
+ recBuilder.init();
+
+ for (int i = 0; i < valueParsers.length; ++i) {
+ if (!cursor.nextField()) {
+ break;
+ }
+ fieldValueBuffer.reset();
+ fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
+ valueParsers[i].parse(cursor.getBuffer(), cursor.getfStart(),
+ cursor.getfEnd() - cursor.getfStart(), fieldValueBufferOutput);
+ if (fldIds[i] < 0) {
+ recBuilder.addField(nameBuffers[i], fieldValueBuffer);
+ } else {
+ recBuilder.addField(fldIds[i], fieldValueBuffer);
+ }
+ }
+ recBuilder.write(recDos, true);
+ processNextTuple(nextState.isEmpty() ? null : nextState.get(0), writer);
+ Thread.currentThread().sleep(tupleInterval);
+ }
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ } catch (InterruptedException ie) {
+ throw new HyracksDataException(ie);
+ }
+ }
+
+ private void addTupleToFrame(IFrameWriter writer, boolean forceFlush) throws HyracksDataException {
+ tb.addFieldEndOffset();
+ boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ if (!success) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+
+ if (forceFlush) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+
+ }
+
+ private void processNextTuple(OperationState feedState, IFrameWriter writer) throws HyracksDataException {
+ try {
+ if (feedState != null) {
+ switch (feedState) {
+ case SUSPENDED:
+ suspendOperation(writer);
+ break;
+ case STOPPED:
+ stopOperation(writer);
+ break;
+ }
+ } else {
+ addTupleToFrame(writer, false);
+ }
+ } catch (HyracksDataException hde) {
+ throw hde;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ private void suspendOperation(IFrameWriter writer) throws HyracksDataException, Exception {
+ nextState.remove(0);
+ addTupleToFrame(writer, false);
+ adapter.beforeSuspend();
+ synchronized (this) {
+ this.wait();
+ adapter.beforeResume();
+ }
+ }
+
+ private void stopOperation(IFrameWriter writer) throws HyracksDataException, Exception {
+ nextState.remove(0);
+ addTupleToFrame(writer, false);
+ adapter.beforeStop();
+ adapter.stop();
+ }
+
+ @Override
+ public void suspend() throws Exception {
+ nextState.add(OperationState.SUSPENDED);
+ }
+
+ @Override
+ public void resume() throws Exception {
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ nextState.add(OperationState.STOPPED);
+ }
+
+ @Override
+ public void alter(Map<String, String> alterParams) throws Exception {
+ if (alterParams.get(TUPLE_INTERVAL_KEY) != null) {
+ tupleInterval = Long.parseLong(alterParams.get(TUPLE_INTERVAL_KEY));
+ }
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
new file mode 100644
index 0000000..320a29f
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+
+/**
+ * Represents the base class that is required to be extended by every
+ * implementation of the IDatasourceAdapter interface.
+ */
+public abstract class AbstractDatasourceAdapter implements IDatasourceAdapter {
+
+ private static final long serialVersionUID = -3510610289692452466L;
+
+ protected Map<String, String> configuration;
+
+ protected AlgebricksPartitionConstraint partitionConstraint;
+
+ protected IAType atype;
+
+ protected IHyracksTaskContext ctx;
+
+ protected IDataParser dataParser;
+
+ protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
+
+ protected static final HashMap<String, String> formatToParserMap = new HashMap<String, String>();
+
+ protected static final HashMap<String, String> formatToManagedParserMap = new HashMap<String, String>();
+
+ protected AdapterDataFlowType dataFlowType;
+
+ protected AdapterType adapterType;
+
+ static {
+ typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
+ typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+
+ formatToParserMap.put("delimited-text", "edu.uci.ics.asterix.external.data.parser.DelimitedDataStreamParser");
+ formatToParserMap.put("adm", "edu.uci.ics.asterix.external.data.parser.ADMStreamParser");
+
+ formatToManagedParserMap.put("delimited-text",
+ "edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser");
+ formatToManagedParserMap.put("adm", "edu.uci.ics.asterix.external.data.parser.ManagedAdmStreamParser");
+
+ }
+
+ public static final String KEY_FORMAT = "format";
+ public static final String KEY_PARSER = "parser";
+
+ public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+ public static final String FORMAT_ADM = "adm";
+
+ abstract public void initialize(IHyracksTaskContext ctx) throws Exception;
+
+ abstract public void configure(Map<String, String> arguments, IAType atype) throws Exception;
+
+ abstract public AdapterDataFlowType getAdapterDataFlowType();
+
+ abstract public AdapterType getAdapterType();
+
+ public AlgebricksPartitionConstraint getPartitionConstraint() {
+ return partitionConstraint;
+ }
+
+ public void setAdapterProperty(String property, String value) {
+ configuration.put(property, value);
+ }
+
+ public IDataParser getParser() {
+ return dataParser;
+ }
+
+ public void setParser(IDataParser dataParser) {
+ this.dataParser = dataParser;
+ }
+
+ public String getAdapterProperty(String attribute) {
+ return configuration.get(attribute);
+ }
+
+ public Map<String, String> getConfiguration() {
+ return configuration;
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
new file mode 100644
index 0000000..0673a24
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -0,0 +1,120 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
+import edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser;
+import edu.uci.ics.asterix.feed.intake.FeedStream;
+import edu.uci.ics.asterix.feed.intake.RSSFeedClient;
+import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+
+public class CNNFeedAdapter extends RSSFeedAdapter implements
+ IDatasourceAdapter, IMutableFeedAdapter {
+
+ private List<String> feedURLs = new ArrayList<String>();
+ private String id_prefix = "";
+
+ public static final String KEY_RSS_URL = "topic";
+ public static final String KEY_INTERVAL = "interval";
+
+ private static Map<String, String> topicFeeds = new HashMap<String, String>();
+
+ public static final String TOP_STORIES = "topstories";
+ public static final String WORLD = "world";
+ public static final String US = "us";
+ public static final String SPORTS = "sports";
+ public static final String BUSINESS = "business";
+ public static final String POLITICS = "politics";
+ public static final String CRIME = "crime";
+ public static final String TECHNOLOGY = "technology";
+ public static final String HEALTH = "health";
+ public static final String ENTERNTAINMENT = "entertainemnt";
+ public static final String TRAVEL = "travel";
+ public static final String LIVING = "living";
+ public static final String VIDEO = "video";
+ public static final String STUDENT = "student";
+ public static final String POPULAR = "popular";
+ public static final String RECENT = "recent";
+
+ private void initTopics() {
+ topicFeeds
+ .put(TOP_STORIES, "http://rss.cnn.com/rss/cnn_topstories.rss");
+ topicFeeds.put(WORLD, "http://rss.cnn.com/rss/cnn_world.rss");
+ topicFeeds.put(US, "http://rss.cnn.com/rss/cnn_us.rss");
+ topicFeeds.put(SPORTS, "http://rss.cnn.com/rss/si_topstories.rss");
+ topicFeeds.put(BUSINESS, "http://rss.cnn.com/rss/money_latest.rss");
+ topicFeeds.put(POLITICS, "http://rss.cnn.com/rss/cnn_allpolitics.rss");
+ topicFeeds.put(CRIME, "http://rss.cnn.com/rss/cnn_crime.rss");
+ topicFeeds.put(TECHNOLOGY, "http://rss.cnn.com/rss/cnn_tech.rss");
+ topicFeeds.put(HEALTH, "http://rss.cnn.com/rss/cnn_health.rss");
+ topicFeeds
+ .put(ENTERNTAINMENT, "http://rss.cnn.com/rss/cnn_showbiz.rss");
+ topicFeeds.put(LIVING, "http://rss.cnn.com/rss/cnn_living.rss");
+ topicFeeds.put(VIDEO, "http://rss.cnn.com/rss/cnn_freevideo.rss");
+ topicFeeds.put(TRAVEL, "http://rss.cnn.com/rss/cnn_travel.rss");
+ topicFeeds.put(STUDENT, "http://rss.cnn.com/rss/cnn_studentnews.rss");
+ topicFeeds.put(POPULAR, "http://rss.cnn.com/rss/cnn_mostpopular.rss");
+ topicFeeds.put(RECENT, "http://rss.cnn.com/rss/cnn_latest.rss");
+ }
+
+ @Override
+ public IDataParser getDataParser(int partition) throws Exception {
+ if (dataParser == null) {
+ dataParser = new ManagedDelimitedDataStreamParser();
+ dataParser.configure(configuration);
+ dataParser.initialize((ARecordType) atype, ctx);
+ RSSFeedClient feedClient = new RSSFeedClient(this,
+ feedURLs.get(partition), id_prefix);
+ FeedStream feedStream = new FeedStream(feedClient, ctx);
+ ((IDataStreamParser) dataParser).setInputStream(feedStream);
+ }
+ return dataParser;
+ }
+
+ @Override
+ public void configure(Map<String, String> arguments, IAType atype)
+ throws Exception {
+ configuration = arguments;
+ this.atype = atype;
+ String rssURLProperty = configuration.get(KEY_RSS_URL);
+ if (rssURLProperty == null) {
+ throw new IllegalArgumentException("no rss url provided");
+ }
+ initializeFeedURLs(rssURLProperty);
+ configurePartitionConstraints();
+
+ }
+
+ private void initializeFeedURLs(String rssURLProperty) {
+ feedURLs.clear();
+ String[] rssTopics = rssURLProperty.split(",");
+ initTopics();
+ for (String topic : rssTopics) {
+ String feedURL = topicFeeds.get(topic);
+ if (feedURL == null) {
+ throw new IllegalArgumentException(" unknown topic :" + topic
+ + " please choose from the following "
+ + getValidTopics());
+ }
+ feedURLs.add(feedURL);
+ }
+ }
+
+ private static String getValidTopics() {
+ StringBuilder builder = new StringBuilder();
+ for (String key : topicFeeds.keySet()) {
+ builder.append(key);
+ builder.append(" ");
+ }
+ return new String(builder);
+ }
+
+}
+
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
new file mode 100644
index 0000000..205c855
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.data.parser.DelimitedDataStreamParser;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.core.api.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+
+public class HDFSAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter {
+
+ private String hdfsUrl;
+ private List<String> hdfsPaths;
+ private String inputFormatClassName;
+ private InputSplit[] inputSplits;
+ private JobConf conf;
+ private IHyracksTaskContext ctx;
+ private Reporter reporter;
+ private boolean isDelimited;
+ private Character delimiter;
+ private static final Map<String, String> formatClassNames = new HashMap<String, String>();
+
+ public static final String KEY_HDFS_URL = "hdfs";
+ public static final String KEY_HDFS_PATH = "path";
+ public static final String KEY_INPUT_FORMAT = "input-format";
+
+ public static final String INPUT_FORMAT_TEXT = "text-input-format";
+ public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+ static {
+ formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+ formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+ }
+
+ public String getHdfsUrl() {
+ return hdfsUrl;
+ }
+
+ public void setHdfsUrl(String hdfsUrl) {
+ this.hdfsUrl = hdfsUrl;
+ }
+
+ public List<String> getHdfsPaths() {
+ return hdfsPaths;
+ }
+
+ public void setHdfsPaths(List<String> hdfsPaths) {
+ this.hdfsPaths = hdfsPaths;
+ }
+
+ @Override
+ public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+ configuration = arguments;
+ configureFormat();
+ configureJobConf();
+ configurePartitionConstraint();
+ this.atype = atype;
+ }
+
+ private void configureFormat() throws Exception {
+ String format = configuration.get(KEY_INPUT_FORMAT);
+ inputFormatClassName = formatClassNames.get(format);
+ if (inputFormatClassName == null) {
+ throw new Exception("format " + format + " not supported");
+ }
+
+ String parserClass = configuration.get(KEY_PARSER);
+ if (parserClass == null) {
+ if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+ parserClass = formatToParserMap.get(FORMAT_DELIMITED_TEXT);
+ } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+ parserClass = formatToParserMap.get(FORMAT_ADM);
+ }
+ }
+
+ dataParser = (IDataParser) Class.forName(parserClass).newInstance();
+ dataParser.configure(configuration);
+ }
+
+ private void configurePartitionConstraint() throws Exception {
+ InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, 0);
+ partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
+ hdfsPaths = new ArrayList<String>();
+ for (String hdfsPath : configuration.get(KEY_HDFS_PATH).split(",")) {
+ hdfsPaths.add(hdfsPath);
+ }
+ }
+
+ private ITupleParserFactory createTupleParserFactory(ARecordType recType) {
+ if (isDelimited) {
+ int n = recType.getFieldTypes().length;
+ IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
+ for (int i = 0; i < n; i++) {
+ ATypeTag tag = recType.getFieldTypes()[i].getTypeTag();
+ IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
+ if (vpf == null) {
+ throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
+ }
+ fieldParserFactories[i] = vpf;
+ }
+ return new NtDelimitedDataTupleParserFactory(recType, fieldParserFactories, delimiter);
+ } else {
+ return new AdmSchemafullRecordParserFactory(recType);
+ }
+ }
+
+ private JobConf configureJobConf() throws Exception {
+ hdfsUrl = configuration.get(KEY_HDFS_URL);
+ conf = new JobConf();
+ conf.set("fs.default.name", hdfsUrl);
+ conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+ conf.set("mapred.input.dir", configuration.get(KEY_HDFS_PATH));
+ conf.set("mapred.input.format.class", formatClassNames.get(configuration.get(KEY_INPUT_FORMAT)));
+ return conf;
+ }
+
+ public AdapterDataFlowType getAdapterDataFlowType() {
+ return AdapterDataFlowType.PULL;
+ }
+
+ public AdapterType getAdapterType() {
+ return AdapterType.READ_WRITE;
+ }
+
+ @Override
+ public void initialize(IHyracksTaskContext ctx) throws Exception {
+ this.ctx = ctx;
+ inputSplits = conf.getInputFormat().getSplits(conf, 0);
+ dataParser.initialize((ARecordType) atype, ctx);
+ reporter = new Reporter() {
+
+ @Override
+ public Counter getCounter(Enum<?> arg0) {
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(String arg0, String arg1) {
+ return null;
+ }
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return null;
+ }
+
+ @Override
+ public void incrCounter(Enum<?> arg0, long arg1) {
+ }
+
+ @Override
+ public void incrCounter(String arg0, String arg1, long arg2) {
+ }
+
+ @Override
+ public void setStatus(String arg0) {
+ }
+
+ @Override
+ public void progress() {
+ }
+ };
+ }
+
+ @Override
+ public IDataParser getDataParser(int partition) throws Exception {
+ Path path = new Path(inputSplits[partition].toString());
+ FileSystem fs = FileSystem.get(conf);
+ InputStream inputStream;
+ if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+ SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(inputSplits[partition], conf, reporter);
+ inputStream = new SequenceToTextStream(reader, ctx);
+ } else {
+ try {
+ inputStream = fs.open(((org.apache.hadoop.mapred.FileSplit) inputSplits[partition]).getPath());
+ } catch (FileNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ if (dataParser instanceof IDataStreamParser) {
+ ((IDataStreamParser) dataParser).setInputStream(inputStream);
+ } else {
+ throw new IllegalArgumentException(" parser not compatible");
+ }
+
+ return dataParser;
+ }
+
+}
+
+class SequenceToTextStream extends InputStream {
+
+ private ByteBuffer buffer;
+ private int capacity;
+ private RecordReader reader;
+ private boolean readNext = true;
+ private final Object key;
+ private final Text value;
+
+ public SequenceToTextStream(RecordReader reader, IHyracksTaskContext ctx) throws Exception {
+ capacity = ctx.getFrameSize();
+ buffer = ByteBuffer.allocate(capacity);
+ this.reader = reader;
+ key = reader.createKey();
+ try {
+ value = (Text) reader.createValue();
+ } catch (ClassCastException cce) {
+ throw new Exception("context is not of type org.apache.hadoop.io.Text"
+ + " type not supported in sequence file format", cce);
+ }
+ initialize();
+ }
+
+ private void initialize() throws Exception {
+ boolean hasMore = reader.next(key, value);
+ if (!hasMore) {
+ buffer.limit(0);
+ } else {
+ buffer.position(0);
+ buffer.limit(capacity);
+ buffer.put(value.getBytes());
+ buffer.put("\n".getBytes());
+ buffer.flip();
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!buffer.hasRemaining()) {
+ boolean hasMore = reader.next(key, value);
+ if (!hasMore) {
+ return -1;
+ }
+ buffer.position(0);
+ buffer.limit(capacity);
+ buffer.put(value.getBytes());
+ buffer.put("\n".getBytes());
+ buffer.flip();
+ return buffer.get();
+ } else {
+ return buffer.get();
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
new file mode 100644
index 0000000..44dab4c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class HiveAdapter extends AbstractDatasourceAdapter implements
+ IDatasourceReadAdapter {
+
+ public static final String HIVE_DATABASE = "database";
+ public static final String HIVE_TABLE = "table";
+ public static final String HIVE_HOME = "hive-home";
+ public static final String HIVE_METASTORE_URI = "metastore-uri";
+ public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
+ public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
+
+ private HDFSAdapter hdfsAdapter;
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.READ;
+ }
+
+ @Override
+ public AdapterDataFlowType getAdapterDataFlowType() {
+ return AdapterDataFlowType.PULL;
+ }
+
+ @Override
+ public void configure(Map<String, String> arguments, IAType atype)
+ throws Exception {
+ configuration = arguments;
+ this.atype = atype;
+ configureHadoopAdapter();
+ }
+
+ private void configureHadoopAdapter() throws Exception {
+ String database = configuration.get(HIVE_DATABASE);
+ String tablePath = null;
+ if (database == null) {
+ tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/"
+ + configuration.get(HIVE_TABLE);
+ } else {
+ tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath
+ + ".db" + "/" + configuration.get(HIVE_TABLE);
+ }
+ configuration.put(HDFSAdapter.KEY_HDFS_PATH, tablePath);
+ if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
+ throw new IllegalArgumentException("format"
+ + configuration.get(KEY_FORMAT) + " is not supported");
+ }
+
+ if (!(configuration.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(
+ HDFSAdapter.INPUT_FORMAT_TEXT) || configuration.get(
+ HDFSAdapter.KEY_INPUT_FORMAT).equals(
+ HDFSAdapter.INPUT_FORMAT_SEQUENCE))) {
+ throw new IllegalArgumentException("file input format"
+ + configuration.get(HDFSAdapter.KEY_INPUT_FORMAT)
+ + " is not supported");
+ }
+
+ hdfsAdapter = new HDFSAdapter();
+ hdfsAdapter.configure(configuration, atype);
+ }
+
+ @Override
+ public void initialize(IHyracksTaskContext ctx) throws Exception {
+ hdfsAdapter.initialize(ctx);
+ }
+
+ @Override
+ public IDataParser getDataParser(int partition) throws Exception {
+ return hdfsAdapter.getDataParser(partition);
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
new file mode 100644
index 0000000..2449db0
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+
+public class NCFileSystemAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter {
+
+ private static final long serialVersionUID = -4154256369973615710L;
+ protected FileSplit[] fileSplits;
+
+ public class Constants {
+ public static final String KEY_SPLITS = "path";
+ public static final String KEY_FORMAT = "format";
+ public static final String KEY_PARSER = "parser";
+ public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+ public static final String FORMAT_ADM = "adm";
+ }
+
+ @Override
+ public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+ this.configuration = arguments;
+ String[] splits = arguments.get(Constants.KEY_SPLITS).split(",");
+ configureFileSplits(splits);
+ configurePartitionConstraint();
+ configureFormat();
+ if (atype == null) {
+ configureInputType();
+ } else {
+ setInputAType(atype);
+ }
+ }
+
+ public IAType getAType() {
+ return atype;
+ }
+
+ public void setInputAType(IAType atype) {
+ this.atype = atype;
+ }
+
+ @Override
+ public void initialize(IHyracksTaskContext ctx) throws Exception {
+ this.ctx = ctx;
+ dataParser.initialize((ARecordType) atype, ctx);
+ }
+
+ @Override
+ public AdapterDataFlowType getAdapterDataFlowType() {
+ return AdapterDataFlowType.PULL;
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.READ;
+ }
+
+ @Override
+ public IDataParser getDataParser(int partition) throws Exception {
+ FileSplit split = fileSplits[partition];
+ File inputFile = split.getLocalFile().getFile();
+ InputStream in;
+ try {
+ in = new FileInputStream(inputFile);
+ } catch (FileNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ if (dataParser instanceof IDataStreamParser) {
+ ((IDataStreamParser) dataParser).setInputStream(in);
+ } else {
+ throw new IllegalArgumentException(" parser not compatible");
+ }
+ return dataParser;
+ }
+
+ private void configureFileSplits(String[] splits) {
+ fileSplits = new FileSplit[splits.length];
+ String nodeName;
+ String nodeLocalPath;
+ int count = 0;
+ for (String splitPath : splits) {
+ nodeName = splitPath.split(":")[0];
+ nodeLocalPath = splitPath.split("://")[1];
+ FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
+ fileSplits[count++] = fileSplit;
+ }
+ }
+
+ protected void configureFormat() throws Exception {
+ String parserClass = configuration.get(Constants.KEY_PARSER);
+ if (parserClass == null) {
+ if (Constants.FORMAT_DELIMITED_TEXT.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+ parserClass = formatToParserMap.get(FORMAT_DELIMITED_TEXT);
+ } else if (Constants.FORMAT_ADM.equalsIgnoreCase(configuration.get(Constants.KEY_FORMAT))) {
+ parserClass = formatToParserMap.get(Constants.FORMAT_ADM);
+ } else {
+ throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
+ }
+ }
+ dataParser = (IDataParser) Class.forName(parserClass).newInstance();
+ dataParser.configure(configuration);
+ }
+
+ private void configureInputType() {
+ throw new UnsupportedOperationException(" Cannot resolve input type, operation not supported");
+ }
+
+ private void configurePartitionConstraint() {
+ String[] locs = new String[fileSplits.length];
+ for (int i = 0; i < fileSplits.length; i++) {
+ locs[i] = fileSplits[i].getNodeName();
+ }
+ partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locs);
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
new file mode 100644
index 0000000..76d53ab
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import twitter4j.Query;
+import twitter4j.QueryResult;
+import twitter4j.Tweet;
+import twitter4j.Twitter;
+import twitter4j.TwitterFactory;
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.data.parser.AbstractStreamDataParser;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
+import edu.uci.ics.asterix.external.data.parser.IManagedDataParser;
+import edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser;
+import edu.uci.ics.asterix.feed.intake.IFeedClient;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class PullBasedTwitterAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter,
+ IManagedFeedAdapter, IMutableFeedAdapter {
+
+ private IDataStreamParser parser;
+ private int parallelism = 1;
+ private boolean stopRequested = false;
+ private boolean alterRequested = false;
+ private Map<String, String> alteredParams = new HashMap<String, String>();
+
+ public static final String QUERY = "query";
+ public static final String INTERVAL = "interval";
+
+ @Override
+ public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+ configuration = arguments;
+ this.atype = atype;
+ partitionConstraint = new AlgebricksCountPartitionConstraint(1);
+ }
+
+ @Override
+ public AdapterDataFlowType getAdapterDataFlowType() {
+ return dataFlowType.PULL;
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return adapterType.READ;
+ }
+
+ @Override
+ public void initialize(IHyracksTaskContext ctx) throws Exception {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void beforeSuspend() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void beforeResume() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void beforeStop() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public IDataParser getDataParser(int partition) throws Exception {
+ if (parser == null) {
+ parser = new ManagedDelimitedDataStreamParser();
+ ((IManagedDataParser) parser).setAdapter(this);
+ configuration.put(AbstractStreamDataParser.KEY_DELIMITER, "|");
+ parser.configure(configuration);
+ parser.initialize((ARecordType) atype, ctx);
+ TweetClient tweetClient = new TweetClient(ctx.getJobletContext().getApplicationContext().getNodeId(), this);
+ TweetStream tweetStream = new TweetStream(tweetClient, ctx);
+ parser.setInputStream(tweetStream);
+ }
+ return parser;
+ }
+
+ @Override
+ public void stop() throws Exception {
+ stopRequested = true;
+ }
+
+ public boolean isStopRequested() {
+ return stopRequested;
+ }
+
+ @Override
+ public void alter(Map<String, String> properties) throws Exception {
+ alterRequested = true;
+ this.alteredParams = properties;
+ }
+
+ public boolean isAlterRequested() {
+ return alterRequested;
+ }
+
+ public Map<String, String> getAlteredParams() {
+ return alteredParams;
+ }
+
+ public void postAlteration() {
+ alteredParams = null;
+ alterRequested = false;
+ }
+}
+
+class TweetStream extends InputStream {
+
+ private ByteBuffer buffer;
+ private int capacity;
+ private TweetClient tweetClient;
+ private List<String> tweets = new ArrayList<String>();
+
+ public TweetStream(TweetClient tweetClient, IHyracksTaskContext ctx) throws Exception {
+ capacity = ctx.getFrameSize();
+ buffer = ByteBuffer.allocate(capacity);
+ this.tweetClient = tweetClient;
+ initialize();
+ }
+
+ private void initialize() throws Exception {
+ boolean hasMore = tweetClient.next(tweets);
+ if (!hasMore) {
+ buffer.limit(0);
+ } else {
+ buffer.position(0);
+ buffer.limit(capacity);
+ for (String tweet : tweets) {
+ buffer.put(tweet.getBytes());
+ buffer.put("\n".getBytes());
+ }
+ buffer.flip();
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!buffer.hasRemaining()) {
+
+ boolean hasMore = tweetClient.next(tweets);
+ if (!hasMore) {
+ return -1;
+ }
+ buffer.position(0);
+ buffer.limit(capacity);
+ for (String tweet : tweets) {
+ buffer.put(tweet.getBytes());
+ buffer.put("\n".getBytes());
+ }
+ buffer.flip();
+ return buffer.get();
+ } else {
+ return buffer.get();
+ }
+
+ }
+}
+
+class TweetClient implements IFeedClient {
+
+ private String query;
+ private int timeInterval = 5;
+ private Character delimiter = '|';
+ private long id = 0;
+ private String id_prefix;
+
+ private final PullBasedTwitterAdapter adapter;
+
+ public TweetClient(String id_prefix, PullBasedTwitterAdapter adapter) {
+ this.id_prefix = id_prefix;
+ this.adapter = adapter;
+ initialize(adapter.getConfiguration());
+ }
+
+ private void initialize(Map<String, String> params) {
+ this.query = params.get(PullBasedTwitterAdapter.QUERY);
+ if (params.get(PullBasedTwitterAdapter.INTERVAL) != null) {
+ this.timeInterval = Integer.parseInt(params.get(PullBasedTwitterAdapter.INTERVAL));
+ }
+ }
+
+ @Override
+ public boolean next(List<String> tweets) {
+ try {
+ if (adapter.isStopRequested()) {
+ return false;
+ }
+ if (adapter.isAlterRequested()) {
+ initialize(((PullBasedTwitterAdapter) adapter).getAlteredParams());
+ adapter.postAlteration();
+ }
+ Thread.currentThread().sleep(1000 * timeInterval);
+ tweets.clear();
+ Twitter twitter = new TwitterFactory().getInstance();
+ QueryResult result = twitter.search(new Query(query));
+ List<Tweet> sourceTweets = result.getTweets();
+ for (Tweet tweet : sourceTweets) {
+ String tweetContent = formFeedTuple(tweet);
+ tweets.add(tweetContent);
+ System.out.println(tweetContent);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ return true;
+ }
+
+ public String formFeedTuple(Object tweetObject) {
+ Tweet tweet = (Tweet) tweetObject;
+ StringBuilder builder = new StringBuilder();
+ builder.append(id_prefix + ":" + id);
+ builder.append(delimiter);
+ builder.append(tweet.getFromUserId());
+ builder.append(delimiter);
+ builder.append("Orange County");
+ builder.append(delimiter);
+ builder.append(escapeChars(tweet));
+ builder.append(delimiter);
+ builder.append(tweet.getCreatedAt().toString());
+ id++;
+ return new String(builder);
+ }
+
+ private String escapeChars(Tweet tweet) {
+ if (tweet.getText().contains("\n")) {
+ return tweet.getText().replace("\n", " ");
+ }
+ return tweet.getText();
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
new file mode 100644
index 0000000..0cf8e95
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.external.data.parser.IDataParser;
+import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
+import edu.uci.ics.asterix.external.data.parser.IManagedDataParser;
+import edu.uci.ics.asterix.external.data.parser.ManagedDelimitedDataStreamParser;
+import edu.uci.ics.asterix.feed.intake.FeedStream;
+import edu.uci.ics.asterix.feed.intake.IFeedClient;
+import edu.uci.ics.asterix.feed.intake.RSSFeedClient;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class RSSFeedAdapter extends AbstractDatasourceAdapter implements IDatasourceAdapter, IManagedFeedAdapter,
+ IMutableFeedAdapter {
+
+ private List<String> feedURLs = new ArrayList<String>();
+ private boolean isStopRequested = false;
+ private boolean isAlterRequested = false;
+ private Map<String, String> alteredParams = new HashMap<String, String>();
+ private String id_prefix = "";
+
+ public static final String KEY_RSS_URL = "url";
+ public static final String KEY_INTERVAL = "interval";
+
+ public boolean isStopRequested() {
+ return isStopRequested;
+ }
+
+ public void setStopRequested(boolean isStopRequested) {
+ this.isStopRequested = isStopRequested;
+ }
+
+ @Override
+ public IDataParser getDataParser(int partition) throws Exception {
+ if (dataParser == null) {
+ dataParser = new ManagedDelimitedDataStreamParser();
+ ((IManagedDataParser) dataParser).setAdapter(this);
+ dataParser.initialize((ARecordType) atype, ctx);
+ IFeedClient feedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
+ FeedStream feedStream = new FeedStream(feedClient, ctx);
+ ((IDataStreamParser) dataParser).setInputStream(feedStream);
+ }
+ return dataParser;
+ }
+
+ @Override
+ public void alter(Map<String, String> properties) throws Exception {
+ isAlterRequested = true;
+ this.alteredParams = properties;
+ reconfigure(properties);
+ }
+
+ public void postAlteration() {
+ alteredParams = null;
+ isAlterRequested = false;
+ }
+
+ @Override
+ public void beforeSuspend() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void beforeResume() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void beforeStop() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void stop() throws Exception {
+ isStopRequested = true;
+ }
+
+ @Override
+ public AdapterDataFlowType getAdapterDataFlowType() {
+ return AdapterDataFlowType.PULL;
+ }
+
+ @Override
+ public AdapterType getAdapterType() {
+ return AdapterType.READ;
+ }
+
+ @Override
+ public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+ configuration = arguments;
+ this.atype = atype;
+ String rssURLProperty = configuration.get(KEY_RSS_URL);
+ if (rssURLProperty == null) {
+ throw new IllegalArgumentException("no rss url provided");
+ }
+ initializeFeedURLs(rssURLProperty);
+ configurePartitionConstraints();
+
+ }
+
+ private void initializeFeedURLs(String rssURLProperty) {
+ feedURLs.clear();
+ String[] feedURLProperty = rssURLProperty.split(",");
+ for (String feedURL : feedURLProperty) {
+ feedURLs.add(feedURL);
+ }
+ }
+
+ protected void reconfigure(Map<String, String> arguments) {
+ String rssURLProperty = configuration.get(KEY_RSS_URL);
+ if (rssURLProperty != null) {
+ initializeFeedURLs(rssURLProperty);
+ }
+ }
+
+ protected void configurePartitionConstraints() {
+ partitionConstraint = new AlgebricksCountPartitionConstraint(feedURLs.size());
+ }
+
+ @Override
+ public void initialize(IHyracksTaskContext ctx) throws Exception {
+ this.ctx = ctx;
+ id_prefix = ctx.getJobletContext().getApplicationContext().getNodeId();
+ }
+
+ public boolean isAlterRequested() {
+ return isAlterRequested;
+ }
+
+ public Map<String, String> getAlteredParams() {
+ return alteredParams;
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/AlterFeedMessage.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/AlterFeedMessage.java
new file mode 100644
index 0000000..d34af73
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/AlterFeedMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.comm;
+
+import java.util.Map;
+
+public class AlterFeedMessage extends FeedMessage {
+
+ private final Map<String, String> alteredConfParams;
+
+ public AlterFeedMessage(Map<String, String> alteredConfParams) {
+ super(MessageType.ALTER);
+ messageResponseMode = MessageResponseMode.SYNCHRONOUS;
+ this.alteredConfParams = alteredConfParams;
+ }
+
+ public AlterFeedMessage(MessageResponseMode mode, Map<String, String> alteredConfParams) {
+ super(MessageType.ALTER);
+ messageResponseMode = mode;
+ this.alteredConfParams = alteredConfParams;
+ }
+
+ @Override
+ public MessageResponseMode getMessageResponseMode() {
+ return messageResponseMode;
+ }
+
+ @Override
+ public MessageType getMessageType() {
+ return MessageType.ALTER;
+ }
+
+ public Map<String, String> getAlteredConfParams() {
+ return alteredConfParams;
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/FeedMessage.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/FeedMessage.java
new file mode 100644
index 0000000..1f1a020
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/FeedMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.comm;
+
+public class FeedMessage implements IFeedMessage {
+
+ protected MessageResponseMode messageResponseMode = MessageResponseMode.SYNCHRONOUS;
+ protected MessageType messageType;
+
+
+ public FeedMessage(MessageType messageType){
+ this.messageType = messageType;
+ }
+
+
+ public MessageResponseMode getMessageResponseMode() {
+ return messageResponseMode;
+ }
+
+
+ public void setMessageResponseMode(MessageResponseMode messageResponseMode) {
+ this.messageResponseMode = messageResponseMode;
+ }
+
+
+ public MessageType getMessageType() {
+ return messageType;
+ }
+
+
+ public void setMessageType(MessageType messageType) {
+ this.messageType = messageType;
+ }
+
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/IFeedMessage.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/IFeedMessage.java
new file mode 100644
index 0000000..dfb4f91
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/comm/IFeedMessage.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.comm;
+
+import java.io.Serializable;
+
+public interface IFeedMessage extends Serializable {
+
+ public enum MessageResponseMode {
+ SYNCHRONOUS,
+ ASYNCHRONOUS,
+ }
+
+ public enum MessageType {
+ STOP,
+ SUSPEND,
+ RESUME,
+ ALTER,
+ }
+
+ public MessageResponseMode getMessageResponseMode();
+
+ public MessageType getMessageType();
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/AbstractFeedStream.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/AbstractFeedStream.java
new file mode 100644
index 0000000..3d3856d
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/AbstractFeedStream.java
@@ -0,0 +1,62 @@
+package edu.uci.ics.asterix.feed.intake;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public abstract class AbstractFeedStream extends InputStream {
+
+ private ByteBuffer buffer;
+ private int capacity;
+ private IFeedClient feedClient;
+ private List<String> feedObjects;
+
+ public AbstractFeedStream(IFeedClient feedClient, IHyracksTaskContext ctx)
+ throws Exception {
+ capacity = ctx.getFrameSize();
+ buffer = ByteBuffer.allocate(capacity);
+ this.feedClient = feedClient;
+ initialize();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!buffer.hasRemaining()) {
+
+ boolean hasMore = feedClient.next(feedObjects);
+ if (!hasMore) {
+ return -1;
+ }
+ buffer.position(0);
+ buffer.limit(capacity);
+ for (String feed : feedObjects) {
+ buffer.put(feed.getBytes());
+ buffer.put("\n".getBytes());
+ }
+ buffer.flip();
+ return buffer.get();
+ } else {
+ return buffer.get();
+ }
+
+ }
+
+ private void initialize() throws Exception {
+ boolean hasMore = feedClient.next(feedObjects);
+ if (!hasMore) {
+ buffer.limit(0);
+ } else {
+ buffer.position(0);
+ buffer.limit(capacity);
+ for (String feed : feedObjects) {
+ buffer.put(feed.getBytes());
+ buffer.put("\n".getBytes());
+ }
+ buffer.flip();
+ }
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/FeedStream.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/FeedStream.java
new file mode 100644
index 0000000..9d75182
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/FeedStream.java
@@ -0,0 +1,13 @@
+package edu.uci.ics.asterix.feed.intake;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+
+public class FeedStream extends AbstractFeedStream {
+
+ public FeedStream(IFeedClient feedClient, IHyracksTaskContext ctx)
+ throws Exception {
+ super(feedClient, ctx);
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IFeedClient.java
new file mode 100644
index 0000000..b67561d
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/IFeedClient.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.asterix.feed.intake;
+
+import java.util.List;
+
+public interface IFeedClient {
+
+ public boolean next(List<String> list);
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/RSSFeedClient.java
new file mode 100644
index 0000000..2e77499
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/intake/RSSFeedClient.java
@@ -0,0 +1,158 @@
+package edu.uci.ics.asterix.feed.intake;
+
+import java.net.URL;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.sun.syndication.feed.synd.SyndEntryImpl;
+import com.sun.syndication.feed.synd.SyndFeed;
+import com.sun.syndication.fetcher.FeedFetcher;
+import com.sun.syndication.fetcher.FetcherEvent;
+import com.sun.syndication.fetcher.FetcherListener;
+import com.sun.syndication.fetcher.impl.FeedFetcherCache;
+import com.sun.syndication.fetcher.impl.HashMapFeedInfoCache;
+import com.sun.syndication.fetcher.impl.HttpURLFeedFetcher;
+
+import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
+
+@SuppressWarnings("rawtypes")
+public class RSSFeedClient implements IFeedClient {
+
+ private final String feedURL;
+ private int timeInterval = 1;
+ private Character delimiter = '|';
+ private long id = 0;
+ private String id_prefix;
+ private boolean feedModified = false;
+ private RSSFeedAdapter adapter;
+
+ public boolean isFeedModified() {
+ return feedModified;
+ }
+
+ public void setFeedModified(boolean feedModified) {
+ this.feedModified = feedModified;
+ }
+
+ public RSSFeedClient(RSSFeedAdapter adapter, String feedURL,
+ String id_prefix) {
+ this.adapter = adapter;
+ this.feedURL = feedURL;
+ this.id_prefix = id_prefix;
+ }
+
+ private void initialize(Map<String, String> params) {
+ if (params.get(adapter.KEY_INTERVAL) != null) {
+ this.timeInterval = Integer.parseInt(params
+ .get(adapter.KEY_INTERVAL));
+ }
+ }
+
+ @Override
+ public boolean next(List<String> feeds) {
+ try {
+ if (adapter.isStopRequested()) {
+ return false;
+ }
+ if (adapter.isAlterRequested()) {
+ initialize(adapter.getAlteredParams());
+ adapter.postAlteration();
+ }
+ Thread.sleep(timeInterval * 1000);
+ feeds.clear();
+ try {
+ getFeed(feeds);
+ } catch (Exception te) {
+ te.printStackTrace();
+ System.out.println("Failed to get feed: " + te.getMessage());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return true;
+ }
+
+ public String formFeedTuple(Object entry) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(id_prefix + ":" + id);
+ builder.append(delimiter);
+ builder.append(((SyndEntryImpl) entry).getTitle());
+ builder.append(delimiter);
+ builder.append(((SyndEntryImpl) entry).getLink());
+ id++;
+ return new String(builder);
+ }
+
+ private void getFeed(List<String> feeds) {
+ try {
+ URL feedUrl = new URL(feedURL);
+ FeedFetcherCache feedInfoCache = HashMapFeedInfoCache.getInstance();
+ FeedFetcher fetcher = new HttpURLFeedFetcher(feedInfoCache);
+
+ FetcherEventListenerImpl listener = new FetcherEventListenerImpl(
+ this);
+ fetcher.addFetcherEventListener(listener);
+ System.err.println("Retrieving feed " + feedUrl);
+ // Retrieve the feed.
+ // We will get a Feed Polled Event and then a
+ // Feed Retrieved event (assuming the feed is valid)
+ SyndFeed feed = fetcher.retrieveFeed(feedUrl);
+ if (feedModified) {
+ System.err.println(feedUrl + " retrieved");
+ System.err.println(feedUrl + " has a title: " + feed.getTitle()
+ + " and contains " + feed.getEntries().size()
+ + " entries.");
+
+ List fetchedFeeds = feed.getEntries();
+ Iterator feedIterator = fetchedFeeds.iterator();
+ while (feedIterator.hasNext()) {
+ SyndEntryImpl feedEntry = (SyndEntryImpl) feedIterator
+ .next();
+ String feedContent = formFeedTuple(feedEntry);
+ feeds.add(escapeChars(feedContent));
+ System.out.println(feedContent);
+ }
+ }
+ } catch (Exception ex) {
+ System.out.println("ERROR: " + ex.getMessage());
+ ex.printStackTrace();
+ }
+ }
+
+ private String escapeChars(String content) {
+ if (content.contains("\n")) {
+ return content.replace("\n", " ");
+ }
+ return content;
+ }
+
+}
+
+class FetcherEventListenerImpl implements FetcherListener {
+
+ private final IFeedClient feedClient;
+
+ public FetcherEventListenerImpl(IFeedClient feedClient) {
+ this.feedClient = feedClient;
+ }
+
+ /**
+ * @see com.sun.syndication.fetcher.FetcherListener#fetcherEvent(com.sun.syndication.fetcher.FetcherEvent)
+ */
+ public void fetcherEvent(FetcherEvent event) {
+ String eventType = event.getEventType();
+ if (FetcherEvent.EVENT_TYPE_FEED_POLLED.equals(eventType)) {
+ System.err.println("\tEVENT: Feed Polled. URL = "
+ + event.getUrlString());
+ } else if (FetcherEvent.EVENT_TYPE_FEED_RETRIEVED.equals(eventType)) {
+ System.err.println("\tEVENT: Feed Retrieved. URL = "
+ + event.getUrlString());
+ ((RSSFeedClient) feedClient).setFeedModified(true);
+ } else if (FetcherEvent.EVENT_TYPE_FEED_UNCHANGED.equals(eventType)) {
+ System.err.println("\tEVENT: Feed Unchanged. URL = "
+ + event.getUrlString());
+ ((RSSFeedClient) feedClient).setFeedModified(true);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java
new file mode 100644
index 0000000..1eb1079
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.managed.adapter;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+
+public interface IManagedFeedAdapter extends IDatasourceReadAdapter {
+
+ public enum OperationState {
+ SUSPENDED,
+ // INACTIVE state signifies that the feed dataset is not
+ // connected with the external world through the feed
+ // adapter.
+ ACTIVE,
+ // ACTIVE state signifies that the feed dataset is connected to the
+ // external world using an adapter that may put data into the dataset.
+ STOPPED, INACTIVE
+ }
+
+ public void beforeSuspend() throws Exception;
+
+ public void beforeResume() throws Exception;
+
+ public void beforeStop() throws Exception;
+
+ public void stop() throws Exception;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IMutableFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IMutableFeedAdapter.java
new file mode 100644
index 0000000..290c7d6
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IMutableFeedAdapter.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.managed.adapter;
+
+import java.util.Map;
+
+public interface IMutableFeedAdapter extends IManagedFeedAdapter {
+
+ public void alter(Map<String,String> properties) throws Exception;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedId.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedId.java
new file mode 100644
index 0000000..19076c4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedId.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package edu.uci.ics.asterix.feed.mgmt;
+
+import java.io.Serializable;
+
+public class FeedId implements Serializable {
+
+ private final String dataverse;
+ private final String dataset;
+
+ public FeedId(String dataverse, String dataset) {
+ this.dataset = dataset;
+ this.dataverse = dataverse;
+ }
+
+ public String getDataverse() {
+ return dataverse;
+ }
+
+ public String getDataset() {
+ return dataset;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof FeedId)) {
+ return false;
+ }
+ if (((FeedId) o).getDataset().equals(dataset) && ((FeedId) o).getDataverse().equals(dataverse)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return dataverse.hashCode() + dataset.hashCode();
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedManager.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedManager.java
new file mode 100644
index 0000000..39c0442
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedManager.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.mgmt;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.uci.ics.asterix.feed.comm.IFeedMessage;
+
+public class FeedManager implements IFeedManager {
+
+ private Map<FeedId, Set<LinkedBlockingQueue<IFeedMessage>>> outGoingMsgQueueMap = new HashMap<FeedId, Set<LinkedBlockingQueue<IFeedMessage>>>();
+ private LinkedBlockingQueue<IFeedMessage> incomingMsgQueue = new LinkedBlockingQueue<IFeedMessage>();
+
+
+ @Override
+ public boolean deliverMessage(FeedId feedId, IFeedMessage feedMessage) throws Exception {
+ Set<LinkedBlockingQueue<IFeedMessage>> operatorQueues = outGoingMsgQueueMap.get(feedId);
+ if (operatorQueues == null) {
+ throw new IllegalArgumentException(" unknown feed id " + feedId.getDataverse() + ":" + feedId.getDataset());
+ }
+
+ for (LinkedBlockingQueue<IFeedMessage> queue : operatorQueues) {
+ queue.put(feedMessage);
+ }
+ return false;
+ }
+
+ @Override
+ public void registerFeedOperatorMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue) {
+ Set<LinkedBlockingQueue<IFeedMessage>> feedQueues = outGoingMsgQueueMap.get(feedId);
+ if (feedQueues == null) {
+ feedQueues = new HashSet<LinkedBlockingQueue<IFeedMessage>>();
+ }
+ feedQueues.add(queue);
+ outGoingMsgQueueMap.put(feedId, feedQueues);
+
+ }
+
+ @Override
+ public void unregisterFeedOperatorMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue) {
+ Set<LinkedBlockingQueue<IFeedMessage>> feedQueues = outGoingMsgQueueMap.get(feedId);
+ if (feedQueues == null || !feedQueues.contains(queue)) {
+ throw new IllegalArgumentException(" unable to de-register feed message queue");
+ }
+ feedQueues.remove(queue);
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedSystemProvider.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedSystemProvider.java
new file mode 100644
index 0000000..82fb67d
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/FeedSystemProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.mgmt;
+
+
+/**
+ * Provider for all the sub-systems (transaction/lock/log/recovery) managers.
+ * Users of transaction sub-systems must obtain them from the provider.
+ */
+public class FeedSystemProvider {
+ private static final IFeedManager feedManager = new FeedManager();
+
+ public static IFeedManager getFeedManager() {
+ return feedManager;
+ }
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/IFeedManager.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/IFeedManager.java
new file mode 100644
index 0000000..a8c1303
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/mgmt/IFeedManager.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.mgmt;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.uci.ics.asterix.feed.comm.IFeedMessage;
+
+public interface IFeedManager {
+
+ public void registerFeedOperatorMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue);
+
+ public void unregisterFeedOperatorMsgQueue(FeedId feedId, LinkedBlockingQueue<IFeedMessage> queue);
+
+ public boolean deliverMessage(FeedId feedId, IFeedMessage feedMessage) throws Exception;
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorDescriptor.java
new file mode 100644
index 0000000..93b8fc5
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorDescriptor.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.operator;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.feed.mgmt.FeedId;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private final String adapter;
+ private final Map<String, String> adapterConfiguration;
+ private final IAType atype;
+ private final FeedId feedId;
+
+ private transient IDatasourceReadAdapter datasourceReadAdapter;
+
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
+ Map<String, String> arguments, ARecordType atype, RecordDescriptor rDesc) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = rDesc;
+ this.adapter = adapter;
+ this.adapterConfiguration = arguments;
+ this.atype = atype;
+ this.feedId = feedId;
+ }
+
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+
+ try {
+ datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(adapter).newInstance();
+ datasourceReadAdapter.configure(adapterConfiguration, atype);
+ datasourceReadAdapter.initialize(ctx);
+
+ } catch (Exception e) {
+ throw new HyracksDataException("initialization of adapter failed", e);
+ }
+ return new FeedIntakeOperatorNodePushable(feedId, datasourceReadAdapter, partition);
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorNodePushable.java
new file mode 100644
index 0000000..4603208
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedIntakeOperatorNodePushable.java
@@ -0,0 +1,107 @@
+package edu.uci.ics.asterix.feed.operator;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
+import edu.uci.ics.asterix.external.data.parser.IManagedDataParser;
+import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.feed.managed.adapter.IMutableFeedAdapter;
+import edu.uci.ics.asterix.feed.mgmt.FeedId;
+import edu.uci.ics.asterix.feed.mgmt.FeedSystemProvider;
+import edu.uci.ics.asterix.feed.mgmt.IFeedManager;
+import edu.uci.ics.asterix.feed.comm.AlterFeedMessage;
+import edu.uci.ics.asterix.feed.comm.IFeedMessage;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class FeedIntakeOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+ private final IDatasourceReadAdapter adapter;
+ private final int partition;
+ private final IFeedManager feedManager;
+ private final FeedId feedId;
+ private final LinkedBlockingQueue<IFeedMessage> inbox;
+ private FeedInboxMonitor feedInboxMonitor;
+
+ public FeedIntakeOperatorNodePushable(FeedId feedId, IDatasourceReadAdapter adapter, int partition) {
+ this.adapter = adapter;
+ this.partition = partition;
+ this.feedManager = (IFeedManager) FeedSystemProvider.getFeedManager();
+ this.feedId = feedId;
+ inbox = new LinkedBlockingQueue<IFeedMessage>();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
+ feedInboxMonitor.start();
+ feedManager.registerFeedOperatorMsgQueue(feedId, inbox);
+ writer.open();
+ try {
+ adapter.getDataParser(partition).parse(writer);
+ } catch (Exception e) {
+ throw new HyracksDataException("exception during reading from external data source", e);
+ } finally {
+ writer.close();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+}
+
+class FeedInboxMonitor extends Thread {
+
+ private LinkedBlockingQueue<IFeedMessage> inbox;
+ private final IManagedFeedAdapter adapter;
+ private final int partition;
+
+ public FeedInboxMonitor(IManagedFeedAdapter adapter, LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
+ this.inbox = inbox;
+ this.adapter = adapter;
+ this.partition = partition;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ IFeedMessage feedMessage = inbox.take();
+ switch (feedMessage.getMessageType()) {
+ case SUSPEND:
+ ((IManagedDataParser) adapter.getDataParser(partition)).getManagedTupleParser().suspend();
+ break;
+ case RESUME:
+ ((IManagedDataParser) adapter.getDataParser(partition)).getManagedTupleParser().resume();
+ break;
+ case STOP:
+ ((IManagedDataParser) adapter.getDataParser(partition)).getManagedTupleParser().stop();
+ break;
+ case ALTER:
+ ((IMutableFeedAdapter) adapter).alter(((AlterFeedMessage) feedMessage).getAlteredConfParams());
+ break;
+ }
+ } catch (InterruptedException ie) {
+ break;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedMessageOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedMessageOperatorDescriptor.java
new file mode 100644
index 0000000..c66c49e
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedMessageOperatorDescriptor.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.operator;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.feed.comm.IFeedMessage;
+import edu.uci.ics.asterix.feed.mgmt.FeedId;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class FeedMessageOperatorDescriptor extends
+ AbstractSingleActivityOperatorDescriptor {
+
+ private final FeedId feedId;
+ private final List<IFeedMessage> feedMessages;
+ private final boolean sendToAll = true;
+
+ public FeedMessageOperatorDescriptor(JobSpecification spec,
+ String dataverse, String dataset, List<IFeedMessage> feedMessages) {
+ super(spec, 0, 1);
+ this.feedId = new FeedId(dataverse, dataset);
+ this.feedMessages = feedMessages;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) throws HyracksDataException {
+ return new FeedMessageOperatorNodePushable(ctx, feedId, feedMessages,
+ sendToAll, partition, nPartitions);
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedMessageOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedMessageOperatorNodePushable.java
new file mode 100644
index 0000000..37e3ba3
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/operator/FeedMessageOperatorNodePushable.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.feed.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.feed.comm.IFeedMessage;
+import edu.uci.ics.asterix.feed.mgmt.FeedId;
+import edu.uci.ics.asterix.feed.mgmt.FeedSystemProvider;
+import edu.uci.ics.asterix.feed.mgmt.IFeedManager;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+
+public class FeedMessageOperatorNodePushable extends
+ AbstractUnaryOutputSourceOperatorNodePushable {
+
+ private final FeedId feedId;
+ private final List<IFeedMessage> feedMessages;
+ private IFeedManager feedManager;
+
+ public FeedMessageOperatorNodePushable(IHyracksTaskContext ctx,
+ FeedId feedId, List<IFeedMessage> feedMessages, boolean applyToAll,
+ int partition, int nPartitions) {
+ this.feedId = feedId;
+ if (applyToAll) {
+ this.feedMessages = feedMessages;
+ } else {
+ this.feedMessages = new ArrayList<IFeedMessage>();
+ feedMessages.add(feedMessages.get(partition));
+ }
+ feedManager = (IFeedManager) FeedSystemProvider.getFeedManager();
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ try {
+ writer.open();
+ for (IFeedMessage feedMessage : feedMessages) {
+ feedManager.deliverMessage(feedId, feedMessage);
+ }
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+
+}