Introduces Feeds 2.0
commit c3f577861fc705d848c1641605689cadd6973bae
Merge: ebc4cae fc0c2c0
Author: ramangrover29 <ramangrover29@gmail.com>
Date: Fri Jun 26 13:04:05 2015 -0700
Merge branch 'raman/feeds_2_release' of https://code.google.com/p/asterixdb-sandbox into raman/feeds_2_release
Conflicts:
asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
commit ebc4cae21a7302869f953df1ebda601e798d12d2
Author: ramangrover29 <ramangrover29@gmail.com>
Date: Sat Jun 20 17:14:45 2015 -0700
Introduces Feeds 2.0
Some of the prominent chnages introduced are as follows
a) Support for building a cascade network of feeds (via secondary feeds feature)
b) Feed Management Console for tracking active feeds and associated metrics
c) Support for elastic runtime for data ingestion
d) Improved fault-tolerance with support for logging of failed records
Documentation has been added at asterix-doc/src/site/markdown/feeds/
commit fc0c2c0549a6ee8b202e57607d2e110478cd57bb
Author: ramangrover29 <ramangrover29@gmail.com>
Date: Sat Jun 20 17:14:45 2015 -0700
Introduces Feeds 2.0
Some of the prominent chnages introduced are as follows
a) Support for building a cascade network of feeds (via secondary feeds feature)
b) Feed Management Console for tracking active feeds and associated metrics
c) Support for elastic runtime for data ingestion
d) Improved fault-tolerance with support for logging of failed records
Documentation has been added at asterix-doc/src/site/markdown/feeds/
Change-Id: I498f01c591a229aaf51cec43ab20f3e5c4f072f4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/297
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Steven Jacobs <sjaco002@ucr.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java
new file mode 100644
index 0000000..90fddcd
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.IExternalScalarFunction;
+import edu.uci.ics.asterix.external.library.IFunctionFactory;
+
+public class AddHashTagsFactory implements IFunctionFactory {
+
+ @Override
+ public IExternalScalarFunction getExternalFunction() {
+ return new AddHashTagsFunction();
+ }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java
new file mode 100644
index 0000000..93a87f5
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JDouble;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPoint;
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.external.library.java.JTypeTag;
+import edu.uci.ics.asterix.external.util.Datatypes;
+
+public class AddHashTagsFunction implements IExternalScalarFunction {
+
+ private JUnorderedList list = null;
+ private JPoint location = null;
+
+ @Override
+ public void initialize(IFunctionHelper functionHelper) {
+ list = new JUnorderedList(functionHelper.getObject(JTypeTag.STRING));
+ location = new JPoint(0, 0);
+ }
+
+ @Override
+ public void deinitialize() {
+ }
+
+ @Override
+ public void evaluate(IFunctionHelper functionHelper) throws Exception {
+ list.clear();
+ JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+ JString text = (JString) inputRecord.getValueByName(Datatypes.Tweet.MESSAGE);
+ JDouble latitude = (JDouble) inputRecord.getValueByName(Datatypes.Tweet.LATITUDE);
+ JDouble longitude = (JDouble) inputRecord.getValueByName(Datatypes.Tweet.LONGITUDE);
+
+ if (latitude != null && longitude != null) {
+ location.setValue(latitude.getValue(), longitude.getValue());
+ }
+ String[] tokens = text.getValue().split(" ");
+ for (String tk : tokens) {
+ if (tk.startsWith("#")) {
+ JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
+ newField.setValue(tk);
+ list.add(newField);
+ }
+ }
+
+ JRecord outputRecord = (JRecord) functionHelper.getResultObject();
+ outputRecord.setField(Datatypes.Tweet.ID, inputRecord.getValueByName(Datatypes.Tweet.ID));
+
+ JRecord userRecord = (JRecord) inputRecord.getValueByName(Datatypes.Tweet.USER);
+ outputRecord.setField(Datatypes.ProcessedTweet.USER_NAME,
+ userRecord.getValueByName(Datatypes.Tweet.SCREEN_NAME));
+
+ outputRecord.setField(Datatypes.ProcessedTweet.LOCATION, location);
+ outputRecord.setField(Datatypes.Tweet.CREATED_AT, inputRecord.getValueByName(Datatypes.Tweet.CREATED_AT));
+ outputRecord.setField(Datatypes.Tweet.MESSAGE, text);
+ outputRecord.setField(Datatypes.ProcessedTweet.TOPICS, list);
+
+ inputRecord.addField(Datatypes.ProcessedTweet.TOPICS, list);
+ functionHelper.setResult(outputRecord);
+ }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java
new file mode 100644
index 0000000..a2bec69
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+
+public class AddHashTagsInPlaceFactory implements IFunctionFactory {
+
+ @Override
+ public IExternalScalarFunction getExternalFunction() {
+ return new AddHashTagsInPlaceFunction();
+ }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java
new file mode 100644
index 0000000..a3f6f702
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java
@@ -0,0 +1,54 @@
+/* 1
+ * Copyright 2009-2013 by The Regents of the University of California 2
+ * Licensed under the Apache License, Version 2.0 (the "License"); 3
+ * you may not use this file except in compliance with the License. 4
+ * you may obtain a copy of the License from 5
+ * 6
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.external.library.java.JTypeTag;
+import edu.uci.ics.asterix.external.util.Datatypes;
+
+public class AddHashTagsInPlaceFunction implements IExternalScalarFunction {
+
+ private JUnorderedList list = null;
+
+ @Override
+ public void initialize(IFunctionHelper functionHelper) {
+ list = new JUnorderedList(functionHelper.getObject(JTypeTag.STRING));
+ }
+
+ @Override
+ public void deinitialize() {
+ }
+
+ @Override
+ public void evaluate(IFunctionHelper functionHelper) throws Exception {
+ list.clear();
+ JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+ JString text = (JString) inputRecord.getValueByName(Datatypes.Tweet.MESSAGE);
+
+ String[] tokens = text.getValue().split(" ");
+ for (String tk : tokens) {
+ if (tk.startsWith("#")) {
+ JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
+ newField.setValue(tk);
+ list.add(newField);
+ }
+ }
+ inputRecord.addField(Datatypes.ProcessedTweet.TOPICS, list);
+ functionHelper.setResult(inputRecord);
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
index 58995c2..ec04541 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
@@ -53,9 +53,6 @@
JRecord result = (JRecord) functionHelper.getResultObject();
result.setField("id", id);
result.setField("text", text);
- JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
- newField.setValue(text.getValue().substring(random.nextInt(text.getValue().length())));
- result.addField("substring", newField);
functionHelper.setResult(result);
}
}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
index 07f1a40..7a2597e 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
@@ -23,8 +23,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter.DataExchangeMode;
import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -43,8 +44,8 @@
private DummyGenerator generator;
public TestTypedAdapter(ITupleParserFactory parserFactory, ARecordType sourceDatatype, IHyracksTaskContext ctx,
- Map<String, String> configuration) throws IOException {
- super(parserFactory, sourceDatatype, ctx);
+ Map<String, String> configuration, int partition) throws IOException {
+ super(parserFactory, sourceDatatype, ctx, partition);
pos = new PipedOutputStream();
pis = new PipedInputStream(pos);
this.configuration = configuration;
@@ -131,4 +132,9 @@
generator.stop();
}
+ @Override
+ public boolean handleException(Exception e) {
+ return false;
+ }
+
}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
index b042e9c..5416ce2 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -17,19 +17,23 @@
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory.SupportedOperation;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-public class TestTypedAdapterFactory implements ITypedAdapterFactory {
+public class TestTypedAdapterFactory implements IFeedAdapterFactory {
/**
*
@@ -38,7 +42,7 @@
public static final String NAME = "test_typed_adapter";
- private static ARecordType adapterOutputType = initOutputType();
+ private ARecordType outputType;
public static final String KEY_NUM_OUTPUT_RECORDS = "num_output_records";
@@ -50,7 +54,7 @@
}
private static ARecordType initOutputType() {
- String[] fieldNames = new String[] { "tweetid", "message-text" };
+ String[] fieldNames = new String[] { "id", "message-text" };
IAType[] fieldTypes = new IAType[] { BuiltinType.AINT64, BuiltinType.ASTRING };
ARecordType outputType = null;
try {
@@ -67,29 +71,36 @@
}
@Override
- public AdapterType getAdapterType() {
- return AdapterType.TYPED;
- }
-
- @Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
return new AlgebricksCountPartitionConstraint(1);
}
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- ITupleParserFactory tupleParserFactory = new AdmSchemafullRecordParserFactory(adapterOutputType);
- return new TestTypedAdapter(tupleParserFactory, adapterOutputType, ctx, configuration);
+ ITupleParserFactory tupleParserFactory = new AsterixTupleParserFactory(configuration, outputType,
+ InputDataFormat.ADM);
+ return new TestTypedAdapter(tupleParserFactory, outputType, ctx, configuration, partition);
}
@Override
public ARecordType getAdapterOutputType() {
- return adapterOutputType;
+ return outputType;
}
@Override
- public void configure(Map<String, String> configuration) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
+ this.outputType = outputType;
+ }
+
+ @Override
+ public boolean isRecordTrackingEnabled() {
+ return false;
+ }
+
+ @Override
+ public IIntakeProgressTracker createIntakeProgressTracker() {
+ return null;
}
}
diff --git a/asterix-external-data/src/test/resources/text_functions.xml b/asterix-external-data/src/test/resources/library_descriptor.xml
similarity index 75%
rename from asterix-external-data/src/test/resources/text_functions.xml
rename to asterix-external-data/src/test/resources/library_descriptor.xml
index 8c7a92c..e35288f 100644
--- a/asterix-external-data/src/test/resources/text_functions.xml
+++ b/asterix-external-data/src/test/resources/library_descriptor.xml
@@ -11,6 +11,22 @@
</libraryFunction>
<libraryFunction>
<function_type>SCALAR</function_type>
+ <name>addHashTags</name>
+ <arguments>Tweet</arguments>
+ <return_type>ProcessedTweet</return_type>
+ <definition>edu.uci.ics.asterix.external.library.AddHashTagsFactory
+ </definition>
+ </libraryFunction>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
+ <name>addHashTagsInPlace</name>
+ <arguments>Tweet</arguments>
+ <return_type>ProcessedTweet</return_type>
+ <definition>edu.uci.ics.asterix.external.library.AddHashTagsInPlaceFactory
+ </definition>
+ </libraryFunction>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
<name>mysum</name>
<arguments>AINT32,AINT32</arguments>
<return_type>AINT32</return_type>
@@ -53,7 +69,8 @@
<libraryAdapters>
<libraryAdapter>
<name>test_typed_adapter</name>
- <factory_class>edu.uci.ics.asterix.external.library.adapter.TestTypedAdapterFactory</factory_class>
+ <factory_class>edu.uci.ics.asterix.external.library.adapter.TestTypedAdapterFactory
+ </factory_class>
</libraryAdapter>
</libraryAdapters>
</externalLibrary>