minor cosmetic changes
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 5e90603..a18d97c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -37,8 +37,8 @@
import edu.uci.ics.asterix.api.common.Job;
import edu.uci.ics.asterix.api.common.SessionConfig;
import edu.uci.ics.asterix.aql.base.Statement;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
@@ -113,7 +113,6 @@
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDeleteStatement;
-import edu.uci.ics.asterix.translator.CompiledStatements.CompiledDisconnectFeedStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexCompactStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledInsertStatement;
@@ -461,7 +460,7 @@
Identifier ngNameId = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName();
String ngName = ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd,
dataverseName, mdTxnCtx);
-
+
String compactionPolicy = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getCompactionPolicy();
Map<String, String> compactionPolicyProperties = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
.getCompactionPolicyProperties();
@@ -483,7 +482,7 @@
break;
}
- }
+ }
//#. initialize DatasetIdFactory if it is not initialized.
if (!DatasetIdFactory.isInitialized()) {
@@ -844,7 +843,17 @@
for (FeedActivity fa : feedActivities) {
disStmt = new DisconnectFeedStatement(dvId, new Identifier(fa.getFeedName()), new Identifier(
fa.getDatasetName()));
- handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
+ try {
+ handleDisconnectFeedStatement(metadataProvider, disStmt, hcc);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Disconnected feed " + fa.getFeedName() + " from dataset " + fa.getDatasetName());
+ }
+ } catch (Exception exception) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to disconnect feed " + fa.getFeedName() + " from dataset "
+ + fa.getDatasetName() + ". Encountered exception " + exception);
+ }
+ }
}
//#. prepare jobs which will drop corresponding datasets with indexes.
@@ -965,23 +974,23 @@
}
}
- List<FeedActivity> feedActivities = MetadataManager.INSTANCE.getActiveFeeds(mdTxnCtx, dataverseName,
- datasetName);
- List<JobSpecification> disconnectFeedJobSpecs = new ArrayList<JobSpecification>();
- if (feedActivities != null && !feedActivities.isEmpty()) {
- for (FeedActivity fa : feedActivities) {
- JobSpecification jobSpec = FeedOperations.buildDisconnectFeedJobSpec(dataverseName,
- fa.getFeedName(), datasetName, metadataProvider, fa);
- disconnectFeedJobSpecs.add(jobSpec);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Disconnected feed " + fa.getFeedName() + " from dataset " + datasetName
- + " as dataset is being dropped");
+ if (ds.getDatasetType() == DatasetType.INTERNAL) {
+ // prepare job spec(s) that would disconnect any active feeds involving the dataset.
+ List<FeedActivity> feedActivities = MetadataManager.INSTANCE.getActiveFeeds(mdTxnCtx, dataverseName,
+ datasetName);
+ List<JobSpecification> disconnectFeedJobSpecs = new ArrayList<JobSpecification>();
+ if (feedActivities != null && !feedActivities.isEmpty()) {
+ for (FeedActivity fa : feedActivities) {
+ JobSpecification jobSpec = FeedOperations.buildDisconnectFeedJobSpec(dataverseName,
+ fa.getFeedName(), datasetName, metadataProvider, fa);
+ disconnectFeedJobSpecs.add(jobSpec);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Disconnected feed " + fa.getFeedName() + " from dataset " + datasetName
+ + " as dataset is being dropped");
+ }
}
}
- }
-
- if (ds.getDatasetType() == DatasetType.INTERNAL) {
-
+
//#. prepare jobs to drop the datatset and the indexes in NC
List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
for (int j = 0; j < indexes.size(); j++) {
@@ -1538,7 +1547,7 @@
Feed feed = MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), dataverseName,
cfs.getFeedName());
if (feed == null) {
- throw new AsterixException("Unknown source feed :" + cfs.getFeedName());
+ throw new AsterixException("Unknown source feed: " + cfs.getFeedName());
}
FeedConnectionId feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName()
@@ -1550,7 +1559,7 @@
throw new AsterixException("Feed " + cfs.getDatasetName().getValue()
+ " is currently ACTIVE. Operation not supported");
}
-
+
FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName,
cbfs.getPolicyName());
if (feedPolicy == null) {
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql
index 8cdeb9b..17bfcee 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_03/feeds_03.1.ddl.aql
@@ -26,5 +26,5 @@
create feed TweetFeed
using file_feed
(("output-type-name"="TweetType"),("fs"="localfs"),("path"="nc1://data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
-apply function feed_processor@1;
+apply function feed_processor;
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 2e1bc6b..93c6365 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -552,12 +552,14 @@
FunctionSignature ApplyFunction() throws ParseException:
{
+ FunctionName functioName = null;
FunctionSignature funcSig = null;
}
{
- "apply" "function" funcSig = FunctionSignature()
+ "apply" "function" functioName = FunctionName()
{
- return funcSig;
+ String fqFunctionName = functioName.library == null ? functioName.function : functioName.library + "#" + functioName.function;
+ return new FunctionSignature(functioName.dataverse, fqFunctionName, 1);
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
index 3fd96ec..c922a5e 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.external.library;
import java.io.IOException;
@@ -9,7 +23,6 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -71,5 +84,10 @@
public void deinitialize() {
externalFunction.deinitialize();
}
+
+ @Override
+ public void initialize(IFunctionHelper functionHelper) throws Exception{
+ externalFunction.initialize(functionHelper);
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
index db6c224..d8cd823 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.external.library;
import java.util.HashMap;
@@ -57,9 +71,4 @@
((IExternalScalarFunction) externalFunction).evaluate(argumentProvider);
}
- @Override
- public void initialize(IFunctionHelper functionHelper) throws Exception {
- ((IExternalScalarFunction) externalFunction).initialize(functionHelper);
- }
-
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IExternalFunction.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IExternalFunction.java
index e667828..24ba691 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IExternalFunction.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IExternalFunction.java
@@ -1,7 +1,23 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.external.library;
public interface IExternalFunction {
+ public void initialize(IFunctionHelper functionHelper) throws Exception;
+
public void deinitialize();
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IExternalScalarFunction.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IExternalScalarFunction.java
index 09451d8..3348466 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IExternalScalarFunction.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IExternalScalarFunction.java
@@ -1,9 +1,21 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.external.library;
public interface IExternalScalarFunction extends IExternalFunction {
- public void initialize(IFunctionHelper functionHelper) throws Exception;
-
public void evaluate(IFunctionHelper functionHelper) throws Exception;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionFactory.java
index f67957d..f2f2a52 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionFactory.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.external.library;
public interface IFunctionFactory {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
index ad56981..a4d0f59 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.external.library;
import java.util.ArrayList;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/RuntimeExternalFunctionUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/RuntimeExternalFunctionUtil.java
index 0c6713d..a5cac66 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/RuntimeExternalFunctionUtil.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/RuntimeExternalFunctionUtil.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.external.library;
import java.io.File;
@@ -6,12 +20,10 @@
import java.util.Map;
import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AMutableRecord;
import edu.uci.ics.asterix.om.base.AMutableString;
import edu.uci.ics.asterix.om.base.IAObject;
import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
-import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
index 1561c42..ff8e563 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.external.library.java;
import edu.uci.ics.asterix.om.base.IAObject;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
index 52071af..0c5d287 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.external.library.java;
import java.io.IOException;
@@ -6,7 +20,6 @@
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
import edu.uci.ics.asterix.external.library.java.JObjects.ByteArrayAccessibleDataInputStream;
@@ -29,7 +42,6 @@
import edu.uci.ics.asterix.external.library.java.JObjects.JString;
import edu.uci.ics.asterix.external.library.java.JObjects.JTime;
import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
-import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
import edu.uci.ics.asterix.om.types.AOrderedListType;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
index 2fae902..e53f252 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.external.library.java;
import java.io.ByteArrayInputStream;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java
index 3b686f6..c851197 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JTypeTag.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.external.library.java;
public enum JTypeTag {
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/ingest_feed/feed_ingest.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/ingest_feed/feed_ingest.1.ddl.aql
index edd08ce..4221594 100644
--- a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/ingest_feed/feed_ingest.1.ddl.aql
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/ingest_feed/feed_ingest.1.ddl.aql
@@ -18,8 +18,10 @@
topics : {{string}}?
}
-create feed dataset TweetFeed(TweetType)
+create feed TweetFeed
using "edu.uci.ics.asterix.tools.external.data.RateControlledFileSystemBasedAdapterFactory"
-(("output-type-name"="TweetType"),("fs"="localfs"),("path"="nc1://../asterix-app/data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
-apply function testlib#parseTweet@1
+(("output-type-name"="TweetType"),("fs"="localfs"),("path"="nc1://data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
+apply function testlib#parseTweet;
+
+create dataset Tweets(TweetType)
primary key id;
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/ingest_feed/feed_ingest.2.update.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/ingest_feed/feed_ingest.2.update.aql
index b7e03ec..028ac59 100644
--- a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/ingest_feed/feed_ingest.2.update.aql
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/ingest_feed/feed_ingest.2.update.aql
@@ -9,4 +9,4 @@
*/
use dataverse externallibtest;
-begin feed TweetFeed;
+connect feed TweetFeed to dataset Tweets;
diff --git a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/ingest_feed/feed_ingest.3.query.aql b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/ingest_feed/feed_ingest.3.query.aql
index fd36fbc..5f74687 100644
--- a/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/ingest_feed/feed_ingest.3.query.aql
+++ b/asterix-installer/src/test/resources/integrationts/library/queries/library-feeds/ingest_feed/feed_ingest.3.query.aql
@@ -9,5 +9,5 @@
*/
use dataverse externallibtest;
-for $x in dataset TweetFeed
+for $x in dataset Tweets
return $x
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index 1792e1d..281a62f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2012 by The Regents of the University of California
+ * Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -32,24 +32,24 @@
/**
* FeedIntakeOperatorDescriptor is responsible for ingesting data from an external source. This
- * operator uses a user specified for a built-in adaptor for retrieving data from the external
- * data source.
+ * operator uses a user specified for a built-in adaptor for retrieving data from the external
+ * data source.
*/
public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
- /** The type associated with the ADM data output from the feed adaptor*/
- private final IAType atype;
-
- /** unique identifier for a feed instance.*/
+ /** The type associated with the ADM data output from the feed adaptor */
+ private final IAType outptuType;
+
+ /** unique identifier for a feed instance. */
private final FeedConnectionId feedId;
-
+
/** Map representation of policy parameters */
private final Map<String, String> feedPolicy;
-
- /** The adaptor factory that is used to create an instance of the feed adaptor**/
+
+ /** The adaptor factory that is used to create an instance of the feed adaptor **/
private IAdapterFactory adapterFactory;
public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, IAdapterFactory adapterFactory,
@@ -57,7 +57,7 @@
super(spec, 0, 1);
recordDescriptors[0] = rDesc;
this.adapterFactory = adapterFactory;
- this.atype = atype;
+ this.outptuType = atype;
this.feedId = feedId;
this.feedPolicy = feedPolicy;
}
@@ -65,26 +65,28 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
- IFeedAdapter adapter;
+ IFeedAdapter adapter = null;
FeedRuntimeId feedRuntimeId = new FeedRuntimeId(FeedRuntimeType.INGESTION, feedId, partition);
IngestionRuntime ingestionRuntime = (IngestionRuntime) FeedManager.INSTANCE.getFeedRuntime(feedRuntimeId);
try {
if (ingestionRuntime == null) {
+ // create an instance of a feed adaptor to ingest data.
adapter = (IFeedAdapter) adapterFactory.createAdapter(ctx, partition);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Beginning new feed:" + feedId);
}
} else {
+ // retrieve the instance of the feed adaptor used in previous failed execution.
adapter = ((IngestionRuntime) ingestionRuntime).getAdapterRuntimeManager().getFeedAdapter();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Resuming old feed:" + feedId);
}
}
- } catch (Exception e) {
- if(LOGGER.isLoggable(Level.SEVERE)){
- LOGGER.severe("Initialization of the feed adaptor failed");
+ } catch (Exception exception) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Initialization of the feed adaptor failed with exception " + exception);
}
- throw new HyracksDataException("Initialization of the feed adapter failed", e);
+ throw new HyracksDataException("Initialization of the feed adapter failed", exception);
}
return new FeedIntakeOperatorNodePushable(ctx, feedId, adapter, feedPolicy, partition, ingestionRuntime);
}
@@ -101,8 +103,8 @@
return adapterFactory;
}
- public IAType getAtype() {
- return atype;
+ public IAType getOutputType() {
+ return outptuType;
}
public RecordDescriptor getRecordDescriptor() {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntime.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntime.java
index af25be2..29b494e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntime.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntime.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.metadata.feeds;
import java.nio.ByteBuffer;
@@ -13,8 +27,10 @@
COMMIT
}
+ /** A unique identifier */
protected final FeedRuntimeId feedRuntimeId;
+ /** The runtime state: @see FeedRuntimeState */
protected FeedRuntimeState runtimeState;
public FeedRuntime(FeedConnectionId feedId, int partition, FeedRuntimeType feedRuntimeType) {
@@ -29,7 +45,7 @@
@Override
public String toString() {
- return feedRuntimeId + " " + "runtime state ? " + (runtimeState != null);
+ return feedRuntimeId + " " + "runtime state present ? " + (runtimeState != null);
}
public static class FeedRuntimeState {
@@ -38,7 +54,7 @@
private IFrameWriter frameWriter;
private Exception exception;
- public FeedRuntimeState(ByteBuffer frame, IFrameWriter frameWriter, Exception e) {
+ public FeedRuntimeState(ByteBuffer frame, IFrameWriter frameWriter, Exception exception) {
this.frame = frame;
this.frameWriter = frameWriter;
this.exception = exception;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index 1f50889..177c082 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -1,8 +1,20 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.asterix.metadata.feeds;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.logging.Level;
@@ -22,9 +34,7 @@
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
@@ -42,6 +52,9 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+/**
+ * A utility class for providing helper functions for feeds
+ */
public class FeedUtil {
private static Logger LOGGER = Logger.getLogger(FeedUtil.class.getName());
@@ -64,7 +77,7 @@
if (opDesc instanceof FeedIntakeOperatorDescriptor) {
FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
FeedIntakeOperatorDescriptor fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(),
- orig.getAdapterFactory(), (ARecordType) orig.getAtype(), orig.getRecordDescriptor(),
+ orig.getAdapterFactory(), (ARecordType) orig.getOutputType(), orig.getRecordDescriptor(),
orig.getFeedPolicy());
oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
} else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
@@ -168,7 +181,8 @@
for (OperatorDescriptorId root : spec.getRoots()) {
altered.addRoot(altered.getOperatorMap().get(oldNewOID.get(root)));
}
-
+
+ // jobEventListenerFactory
altered.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
if (LOGGER.isLoggable(Level.INFO)) {