checkpoit
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
index 34c5739..5fce340 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -12,6 +12,8 @@
import edu.uci.ics.asterix.metadata.declared.ExternalFeedDataSource;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
@@ -97,7 +99,6 @@
}
}
v.add(unnest.getVariable());
-
DataSourceScanOperator scan = new DataSourceScanOperator(v, metadataProvider.findDataSource(asid));
List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
scanInpList.addAll(unnest.getInputs());
@@ -141,11 +142,20 @@
}
AqlSourceId asid = new AqlSourceId(dataverseName, datasetName);
+ String policyName = metadataProvider.getConfig().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+ FeedPolicy policy = metadataProvider.findFeedPolicy(dataverseName, policyName);
+ if (policy == null) {
+ policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
+ if (policy == null) {
+ throw new AlgebricksException("Unknown feed policy:" + policyName);
+ }
+ }
+
ArrayList<LogicalVariable> v = new ArrayList<LogicalVariable>();
v.add(unnest.getVariable());
DataSourceScanOperator scan = new DataSourceScanOperator(v, createDummyFeedDataSource(asid, dataset,
- metadataProvider));
+ metadataProvider, policy));
List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
scanInpList.addAll(unnest.getInputs());
@@ -170,7 +180,7 @@
}
private AqlDataSource createDummyFeedDataSource(AqlSourceId aqlId, Dataset dataset,
- AqlMetadataProvider metadataProvider) throws AlgebricksException {
+ AqlMetadataProvider metadataProvider, FeedPolicy feedPolicy) throws AlgebricksException {
if (!aqlId.getDataverseName().equals(
metadataProvider.getDefaultDataverse() == null ? null : metadataProvider.getDefaultDataverse()
.getDataverseName())) {
@@ -180,6 +190,7 @@
IAType itemType = metadataProvider.findType(dataset.getDataverseName(), tName);
ExternalFeedDataSource extDataSource = new ExternalFeedDataSource(aqlId, dataset, itemType,
AqlDataSource.AqlDataSourceType.EXTERNAL_FEED);
+ extDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
return extDataSource;
}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java
index a91f2eb..b2702ae 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/CompiledStatements.java
@@ -362,12 +362,15 @@
public static class CompiledBeginFeedStatement implements ICompiledDmlStatement {
private String dataverseName;
private String datasetName;
+ private String policyName;
private Query query;
private int varCounter;
- public CompiledBeginFeedStatement(String dataverseName, String datasetName, Query query, int varCounter) {
+ public CompiledBeginFeedStatement(String dataverseName, String datasetName, String policyName, Query query,
+ int varCounter) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
+ this.policyName = policyName;
this.query = query;
this.varCounter = varCounter;
}
@@ -398,6 +401,10 @@
public Kind getKind() {
return Kind.BEGIN_FEED;
}
+
+ public String getPolicyName() {
+ return policyName;
+ }
}
public static class CompiledControlFeedStatement implements ICompiledDmlStatement {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 14eea0d..7ad75c8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -64,7 +64,6 @@
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedActivityIdFactory;
import edu.uci.ics.asterix.file.DatasetOperations;
import edu.uci.ics.asterix.file.DataverseOperations;
import edu.uci.ics.asterix.file.FeedOperations;
@@ -83,11 +82,12 @@
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
-import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
@@ -1349,7 +1349,7 @@
: activeDefaultDataverse.getDataverseName() : bfs.getDataverseName().getValue();
CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
- .getValue(), bfs.getQuery(), bfs.getVarCounter());
+ .getValue(), bfs.getPolicy(), bfs.getQuery(), bfs.getVarCounter());
Dataset dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
dataverseName, bfs.getDatasetName().getValue());
@@ -1364,7 +1364,7 @@
bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
cbfs.setQuery(bfs.getQuery());
metadataProvider.getConfig().put(FunctionUtils.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
-
+ metadataProvider.getConfig().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, cbfs.getPolicyName());
JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index ce34076..a95ea36 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -20,14 +20,14 @@
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.feed.lifecycle.AlterFeedMessage;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedMessage;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage.MessageType;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.feeds.AlterFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.FeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage.MessageType;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledControlFeedStatement;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 0faf95e..4c42ebd 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -16,11 +16,10 @@
import edu.uci.ics.asterix.common.api.AsterixAppContextInfo;
import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedActivityIdFactory;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
import edu.uci.ics.asterix.metadata.bootstrap.AsterixStateProxy;
-import edu.uci.ics.asterix.metadata.declared.FeedJobLifecycleListener;
+import edu.uci.ics.asterix.metadata.feeds.FeedJobLifecycleListener;
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.application.ICCApplicationEntryPoint;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 7dad984..cc985ce 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -105,12 +105,14 @@
isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName());
if (isMetadataNode) {
registerRemoteMetadataNode(proxy);
+ }
+ MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
+ MetadataManager.INSTANCE.init();
+ if (isMetadataNode) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Bootstrapping metadata");
}
- MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
- MetadataManager.INSTANCE.init();
MetadataBootstrap.startUniverse(runtimeContext, ncApplicationContext,
systemState == SystemState.NEW_UNIVERSE);
MetadataBootstrap.startDDLRecovery();
diff --git a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443-2/query-issue443-2.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443-2/query-issue443-2.1.ddl.aql
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443-2/query-issue443-2.1.ddl.aql
+++ /dev/null
diff --git a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443-2/query-issue443-2.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443-2/query-issue443-2.2.update.aql
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443-2/query-issue443-2.2.update.aql
+++ /dev/null
diff --git a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443/query-issue443.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443/query-issue443.1.ddl.aql
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443/query-issue443.1.ddl.aql
+++ /dev/null
diff --git a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443/query-issue443.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443/query-issue443.2.update.aql
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/distinct/query-issue443/query-issue443.2.update.aql
+++ /dev/null
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue442/query-issue442.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue442/query-issue442.1.ddl.aql
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue442/query-issue442.1.ddl.aql
+++ /dev/null
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue442/query-issue442.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue442/query-issue442.2.update.aql
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue442/query-issue442.2.update.aql
+++ /dev/null
diff --git a/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue423/query-issue423.1.adm b/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue423/query-issue423.1.adm
deleted file mode 100644
index e69de29..0000000
--- a/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue423/query-issue423.1.adm
+++ /dev/null
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java
index d7de106..6b9006e 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/BeginFeedStatement.java
@@ -10,7 +10,6 @@
import edu.uci.ics.asterix.aql.parser.ParseException;
import edu.uci.ics.asterix.aql.util.FunctionUtils;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.functions.FunctionConstants;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -18,17 +17,20 @@
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.Function;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
public class BeginFeedStatement implements Statement {
private final Identifier dataverseName;
private final Identifier datasetName;
+ private final String policy;
private Query query;
private int varCounter;
- public BeginFeedStatement(Identifier dataverseName, Identifier datasetName, int varCounter) {
+ public BeginFeedStatement(Identifier dataverseName, Identifier datasetName, String policy, int varCounter) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
+ this.policy = policy != null ? policy : BuiltinFeedPolicies.DEFAULT_POLICY.getPolicyName();
this.varCounter = varCounter;
}
@@ -96,6 +98,10 @@
return Kind.BEGIN_FEED;
}
+ public String getPolicy() {
+ return policy;
+ }
+
@Override
public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
return visitor.visitBeginFeedStatement(this, arg);
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 2a5f534..a00ec65 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -498,6 +498,18 @@
}
}
+String GetPolicy() throws ParseException:
+{
+ String policy = null;
+}
+{
+ "using" "policy" policy = Identifier()
+ {
+ return policy;
+ }
+
+}
+
FunctionSignature FunctionSignature() throws ParseException:
{
Pair<Identifier,Identifier> pairId = null;
@@ -758,12 +770,13 @@
Pair<Identifier,Identifier> nameComponents = null;
Map<String,String> configuration = null;
Statement stmt = null;
+ String policy = null;
}
{
(
- "begin" "feed" nameComponents = QualifiedName()
+ "begin" "feed" nameComponents = QualifiedName() (policy = GetPolicy())?
{
- stmt = new BeginFeedStatement(nameComponents.first, nameComponents.second, getVarCounter());
+ stmt = new BeginFeedStatement(nameComponents.first, nameComponents.second, policy, getVarCounter());
}
| "suspend" "feed" nameComponents = QualifiedName()
{
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
index e702ef3..63a791b 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
@@ -18,7 +18,6 @@
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 2e99b7c..b54d032 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -96,6 +96,13 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-metadata</artifactId>
+ <version>0.0.6-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>com.kenai.nbpwr</groupId>
<artifactId>org-apache-commons-io</artifactId>
<version>1.3.1-201002241208</version>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index f1f5884..d3a1995 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -17,7 +17,8 @@
import java.util.Map;
import edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
/**
* A factory class for creating the @see {CNNFeedAdapter}.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 6fcb710..7cd3c9a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -17,7 +17,8 @@
import java.util.Map;
import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.om.types.IAType;
/**
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 5e28eed..f724403 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -17,7 +17,8 @@
import java.util.Map;
import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.om.types.IAType;
/**
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 2040949..9706a77 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -16,8 +16,9 @@
import java.util.Map;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.om.types.IAType;
/**
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index bc00469..fd8f24f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -16,8 +16,9 @@
import java.util.Map;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.PullBasedTwitterAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
/**
* Factory class for creating an instance of PullBasedTwitterAdapter.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index bbbea38..e9c37fa 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -16,8 +16,9 @@
import java.util.Map;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
/**
* Factory class for creating an instance of @see {RSSFeedAdapter}.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
deleted file mode 100644
index 29d0c94..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.operator;
-
-import java.util.Map;
-
-import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * Operator responsible for ingesting data from an external source. This
- * operator uses a (configurable) adapter associated with the feed dataset.
- */
-public class FeedIntakeOperatorDescriptor extends
- AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
-
- private final String adapterFactoryClassName;
- private final Map<String, String> adapterConfiguration;
- private final IAType atype;
- private final FeedId feedId;
-
- private transient IAdapterFactory datasourceAdapterFactory;
-
- public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId,
- String adapter, Map<String, String> arguments, ARecordType atype,
- RecordDescriptor rDesc) {
- super(spec, 1, 1);
- recordDescriptors[0] = rDesc;
- this.adapterFactoryClassName = adapter;
- this.adapterConfiguration = arguments;
- this.atype = atype;
- this.feedId = feedId;
- }
-
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition,
- int nPartitions) throws HyracksDataException {
- ITypedDatasourceAdapter adapter;
- try {
- datasourceAdapterFactory = (IAdapterFactory) Class.forName(
- adapterFactoryClassName).newInstance();
- if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
- adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
- .createAdapter(adapterConfiguration, atype);
- } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
- adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
- .createAdapter(adapterConfiguration);
- } else {
- throw new IllegalStateException(
- " Unknown adapter factory type for "
- + adapterFactoryClassName);
- }
- adapter.initialize(ctx);
- } catch (Exception e) {
- throw new HyracksDataException("initialization of adapter failed",
- e);
- }
- return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
- }
-
- public FeedId getFeedId() {
- return feedId;
- }
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
deleted file mode 100644
index 4d10a81..0000000
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright 2009-2012 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.external.data.operator;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
-import edu.uci.ics.asterix.external.feed.lifecycle.AlterFeedMessage;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedManager;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedManager;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-
-/**
- * The runtime for @see{FeedIntakeOperationDescriptor}
- */
-public class FeedIntakeOperatorNodePushable extends
- AbstractUnaryInputUnaryOutputOperatorNodePushable {
-
- private final IDatasourceAdapter adapter;
- private final int partition;
- private final IFeedManager feedManager;
- private final FeedId feedId;
- private final LinkedBlockingQueue<IFeedMessage> inbox;
- private FeedInboxMonitor feedInboxMonitor;
-
- public FeedIntakeOperatorNodePushable(FeedId feedId,
- IDatasourceAdapter adapter, int partition) {
- this.adapter = adapter;
- this.partition = partition;
- this.feedManager = (IFeedManager) FeedManager.INSTANCE;
- this.feedId = feedId;
- inbox = new LinkedBlockingQueue<IFeedMessage>();
- }
-
- @Override
- public void open() throws HyracksDataException {
- if (adapter instanceof IManagedFeedAdapter) {
- feedInboxMonitor = new FeedInboxMonitor(
- (IManagedFeedAdapter) adapter, inbox, partition);
- feedInboxMonitor.start();
- feedManager.registerFeedMsgQueue(feedId, inbox);
- }
- writer.open();
- try {
- adapter.start(partition, writer);
- } catch (Exception e) {
- e.printStackTrace();
- throw new HyracksDataException(e);
- } finally {
- writer.close();
- if (adapter instanceof IManagedFeedAdapter) {
- feedManager.unregisterFeedMsgQueue(feedId, inbox);
- }
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- writer.close();
- }
-
- @Override
- public void close() throws HyracksDataException {
-
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- // do nothing
- }
-}
-
-class FeedInboxMonitor extends Thread {
-
- private LinkedBlockingQueue<IFeedMessage> inbox;
- private final IManagedFeedAdapter adapter;
-
- public FeedInboxMonitor(IManagedFeedAdapter adapter,
- LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
- this.inbox = inbox;
- this.adapter = adapter;
- }
-
- @Override
- public void run() {
- while (true) {
- try {
- IFeedMessage feedMessage = inbox.take();
- switch (feedMessage.getMessageType()) {
- case STOP:
- adapter.stop();
- break;
- case ALTER:
- adapter.alter(((AlterFeedMessage) feedMessage)
- .getAlteredConfParams());
- break;
- }
- } catch (InterruptedException ie) {
- break;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
-}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 3898f7e..7a8491a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -19,7 +19,8 @@
import java.util.List;
import java.util.Map;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
/**
* An Adapter that provides the functionality of fetching news feed from CNN service
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index 9f8cedc..c520b9f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -24,6 +24,7 @@
import edu.uci.ics.asterix.external.util.DNSResolverFactory;
import edu.uci.ics.asterix.external.util.INodeResolver;
import edu.uci.ics.asterix.external.util.INodeResolverFactory;
+import edu.uci.ics.asterix.metadata.feeds.AbstractDatasourceAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index 3731eba..1c49373 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -16,6 +16,7 @@
import java.util.Map;
+import edu.uci.ics.asterix.metadata.feeds.AbstractDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index 634656d..3d1c2aa 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -15,12 +15,17 @@
package edu.uci.ics.asterix.external.dataset.adapter;
import java.nio.ByteBuffer;
+import java.rmi.RemoteException;
import java.util.Map;
import java.util.logging.Logger;
import edu.uci.ics.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.AbstractFeedDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.FeedPolicyEnforcer;
+import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasourceAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -32,7 +37,7 @@
* Captures the common logic for obtaining bytes from an external source
* and packing them into frames as tuples.
*/
-public abstract class PullBasedAdapter extends AbstractDatasourceAdapter implements ITypedDatasourceAdapter,
+public abstract class PullBasedAdapter extends AbstractFeedDatasourceAdapter implements ITypedDatasourceAdapter,
IManagedFeedAdapter {
private static final long serialVersionUID = 1L;
@@ -46,9 +51,15 @@
protected boolean continueIngestion = true;
protected boolean alterRequested = false;
private Map<String, String> modifiedConfiguration = null;
+ private long tupleCount = 0;
+ private FeedPolicyEnforcer policyEnforcer;
public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
+ public long getIngestedRecordsCount() {
+ return tupleCount;
+ }
+
public void alter(Map<String, String> modifedConfiguration) {
this.modifiedConfiguration = modifedConfiguration;
}
@@ -69,13 +80,14 @@
case DATA_AVAILABLE:
tupleBuilder.addFieldEndOffset();
appendTupleToFrame(writer);
+ tupleCount++;
break;
case NO_MORE_DATA:
LOGGER.info("Reached end of feed");
FrameUtils.flushFrame(frame, writer);
continueIngestion = false;
break;
- case DATA_NOT_AVAILABLE:
+ case DATA_NOT_AVAILABLE:
break;
}
if (alterRequested) {
@@ -87,9 +99,14 @@
}
} catch (Exception failureException) {
try {
- pullBasedFeedClient.resetOnFailure(failureException);
- tupleBuilder.reset();
- continue;
+ boolean continueIngestion = policyEnforcer.handleSoftwareFailure(failureException);
+ if (continueIngestion) {
+ pullBasedFeedClient.resetOnFailure(failureException);
+ tupleBuilder.reset();
+ continue;
+ } else {
+ throw failureException;
+ }
} catch (Exception recoveryException) {
throw new Exception(recoveryException);
}
@@ -118,8 +135,10 @@
*
* @throws Exception
*/
- public void stop() {
+ public void stop() throws Exception {
continueIngestion = false;
+ dumpStatistics();
+ timer.cancel();
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index ef69018..b658d1a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -16,7 +16,7 @@
import java.util.Map;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 611183c..b3f4dd1 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -19,7 +19,7 @@
import java.util.List;
import java.util.Map;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
diff --git a/asterix-metadata/pom.xml b/asterix-metadata/pom.xml
index 425e8ff..58d7b2a 100644
--- a/asterix-metadata/pom.xml
+++ b/asterix-metadata/pom.xml
@@ -37,7 +37,7 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.asterix</groupId>
- <artifactId>asterix-external-data</artifactId>
+ <artifactId>asterix-runtime</artifactId>
<version>0.0.6-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
index a4be0f8..89b75d7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
@@ -21,16 +21,17 @@
import java.util.Map;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
/**
* Caches metadata entities such that the MetadataManager does not have to
@@ -56,6 +57,9 @@
// Key is FeedId
protected final Map<FeedId, FeedActivity> feedActivity = new HashMap<FeedId, FeedActivity>();
+ // Key is DataverseName, Key of the value map is the Policy name
+ protected final Map<String, Map<String, FeedPolicy>> feedPolicies = new HashMap<String, Map<String, FeedPolicy>>();
+
// Atomically executes all metadata operations in ctx's log.
public void commit(MetadataTransactionContext ctx) {
// Forward roll the operations written in ctx's log.
@@ -395,6 +399,32 @@
}
}
+ public Object addFeedPolicyIfNotExists(FeedPolicy feedPolicy) {
+ synchronized (feedPolicy) {
+ Map<String, FeedPolicy> p = feedPolicies.get(feedPolicy.getDataverseName());
+ if (p == null) {
+ p = new HashMap<String, FeedPolicy>();
+ p.put(feedPolicy.getPolicyName(), feedPolicy);
+ feedPolicies.put(feedPolicy.getDataverseName(), p);
+ } else {
+ if (p.get(feedPolicy.getPolicyName()) == null) {
+ p.put(feedPolicy.getPolicyName(), feedPolicy);
+ }
+ }
+ return null;
+ }
+ }
+
+ public Object dropFeedPolicy(FeedPolicy feedPolicy) {
+ synchronized (feedPolicies) {
+ Map<String, FeedPolicy> p = feedPolicies.get(feedPolicy.getDataverseName());
+ if (p != null && p.get(feedPolicy.getPolicyName()) != null) {
+ return p.remove(feedPolicy).getPolicyName();
+ }
+ return null;
+ }
+ }
+
public Object addAdapterIfNotExists(DatasourceAdapter adapter) {
synchronized (adapters) {
DatasourceAdapter adapterObject = adapters.get(adapter.getAdapterIdentifier().getNamespace()).get(
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataException.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataException.java
index fe613da..441816d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataException.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataException.java
@@ -17,6 +17,7 @@
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
public class MetadataException extends AsterixException {
private static final long serialVersionUID = 1L;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index 73df2a8..6362cfd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -22,7 +22,6 @@
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
import edu.uci.ics.asterix.metadata.api.IMetadataManager;
import edu.uci.ics.asterix.metadata.api.IMetadataNode;
@@ -31,10 +30,12 @@
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.Node;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
@@ -543,6 +544,16 @@
}
@Override
+ public void addFeedPolicy(MetadataTransactionContext mdTxnCtx, FeedPolicy feedPolicy) throws MetadataException {
+ try {
+ metadataNode.addFeedPolicy(mdTxnCtx.getJobId(), feedPolicy);
+ } catch (RemoteException e) {
+ throw new MetadataException(e);
+ }
+ mdTxnCtx.addFeedPolicy(feedPolicy);
+ }
+
+ @Override
public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException {
try {
metadataNode.initializeDatasetIdFactory(ctx.getJobId());
@@ -640,4 +651,17 @@
metadataLatch.readLock().unlock();
}
+ @Override
+ public FeedPolicy getFeedPolicy(MetadataTransactionContext ctx, String dataverse, String policyName)
+ throws MetadataException {
+
+ FeedPolicy FeedPolicy = null;
+ try {
+ FeedPolicy = metadataNode.getFeedPolicy(ctx.getJobId(), dataverse, policyName);
+ } catch (RemoteException e) {
+ throw new MetadataException(e);
+ }
+ return FeedPolicy;
+ }
+
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index d363d01..cad1e33 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -25,8 +25,6 @@
import edu.uci.ics.asterix.common.context.AsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedActivityIdFactory;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
import edu.uci.ics.asterix.metadata.api.IMetadataNode;
@@ -38,6 +36,7 @@
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -48,10 +47,13 @@
import edu.uci.ics.asterix.metadata.entitytupletranslators.DatatypeTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.DataverseTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedActivityTupleTranslator;
+import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedPolicyTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.FunctionTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.IndexTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeGroupTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeTupleTranslator;
+import edu.uci.ics.asterix.metadata.feeds.FeedActivityIdFactory;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
import edu.uci.ics.asterix.metadata.valueextractors.DatasetNameValueExtractor;
import edu.uci.ics.asterix.metadata.valueextractors.DatatypeNameValueExtractor;
import edu.uci.ics.asterix.metadata.valueextractors.MetadataEntityValueExtractor;
@@ -770,6 +772,7 @@
}
return results.get(0);
} catch (Exception e) {
+ e.printStackTrace();
throw new MetadataException(e);
}
}
@@ -1189,4 +1192,40 @@
}
}
+
+ @Override
+ public void addFeedPolicy(JobId jobId, FeedPolicy feedPolicy) throws MetadataException, RemoteException {
+ try {
+ // Insert into the 'FeedPolicy' dataset.
+ FeedPolicyTupleTranslator tupleReaderWriter = new FeedPolicyTupleTranslator(true);
+ ITupleReference feedPolicyTuple = tupleReaderWriter.getTupleFromMetadataEntity(feedPolicy);
+ insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, feedPolicyTuple);
+
+ } catch (BTreeDuplicateKeyException e) {
+ throw new MetadataException("A feed policy with this name " + feedPolicy.getPolicyName()
+ + " already exists in dataverse '" + feedPolicy.getPolicyName() + "'.", e);
+ } catch (Exception e) {
+ throw new MetadataException(e);
+ }
+
+ }
+
+ @Override
+ public FeedPolicy getFeedPolicy(JobId jobId, String dataverse, String policyName) throws MetadataException,
+ RemoteException {
+
+ try {
+ ITupleReference searchKey = createTuple(dataverse, policyName);
+ FeedPolicyTupleTranslator tupleReaderWriter = new FeedPolicyTupleTranslator(false);
+ List<FeedPolicy> results = new ArrayList<FeedPolicy>();
+ IValueExtractor<FeedPolicy> valueExtractor = new MetadataEntityValueExtractor<FeedPolicy>(tupleReaderWriter);
+ searchIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, searchKey, valueExtractor, results);
+ if (!results.isEmpty()) {
+ return results.get(0);
+ }
+ return null;
+ } catch (Exception e) {
+ throw new MetadataException(e);
+ }
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
index cbd37d6..8bd7056 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
@@ -18,15 +18,16 @@
import java.util.ArrayList;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
/**
@@ -200,4 +201,10 @@
droppedCache.clear();
opLog.clear();
}
+
+ public void addFeedPolicy(FeedPolicy feedPolicy) {
+ droppedCache.dropFeedPolicy(feedPolicy);
+ logAndApply(new MetadataLogicalOperation(feedPolicy, true));
+
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
index ca6ab08..9870df2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
@@ -18,15 +18,12 @@
import java.util.List;
import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
/**
* Descriptor interface for a primary or secondary index on metadata datasets.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index 5eff5b3..59248eb 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -19,7 +19,6 @@
import java.util.List;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.entities.Dataset;
@@ -27,12 +26,13 @@
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.Node;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
/**
* A metadata manager provides user access to Asterix metadata (e.g., types,
@@ -462,6 +462,16 @@
public FeedActivity getRecentFeedActivity(MetadataTransactionContext ctx, String dataverseName, String datasetName)
throws MetadataException;
+ /**
+ * @param ctx
+ * @param policy
+ * @throws MetadataException
+ */
+ public void addFeedPolicy(MetadataTransactionContext ctx, FeedPolicy policy) throws MetadataException;
+
+ public FeedPolicy getFeedPolicy(MetadataTransactionContext ctx, String dataverse, String policyName)
+ throws MetadataException;
+
public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException;
public int getMostRecentDatasetId() throws MetadataException;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
index e58a2f0..b524acc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataNode.java
@@ -21,17 +21,18 @@
import java.util.List;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
import edu.uci.ics.asterix.metadata.MetadataException;
-import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.Node;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
@@ -504,4 +505,23 @@
public void registerFeedActivity(JobId jobId, FeedId feedId, FeedActivity feedActivity) throws MetadataException,
RemoteException;
+ /**
+ * @param jobId
+ * @param feedPolicy
+ * @throws MetadataException
+ * @throws RemoteException
+ */
+ public void addFeedPolicy(JobId jobId, FeedPolicy feedPolicy) throws MetadataException, RemoteException;
+
+ /**
+ * @param jobId
+ * @param dataverse
+ * @param policy
+ * @return
+ * @throws MetadataException
+ * @throws RemoteException
+ */
+ public FeedPolicy getFeedPolicy(JobId jobId, String dataverse, String policy) throws MetadataException,
+ RemoteException;
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index e6eb4fe..faac080 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -33,8 +33,6 @@
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.AsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.context.AsterixRuntimeComponentsProvider;
-import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
-import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
import edu.uci.ics.asterix.metadata.IDatasetDetails;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -46,12 +44,16 @@
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails.FileStructure;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
import edu.uci.ics.asterix.metadata.entities.Node;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
+import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
@@ -186,6 +188,7 @@
insertNodes(mdTxnCtx);
insertInitialGroups(mdTxnCtx);
insertInitialAdapters(mdTxnCtx);
+ insertInitialFeedPolicies(mdTxnCtx);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Finished creating metadata B-trees.");
@@ -327,6 +330,12 @@
}
}
+ private static void insertInitialFeedPolicies(MetadataTransactionContext mdTxnCtx) throws Exception {
+ for (FeedPolicy feedPolicy : BuiltinFeedPolicies.policies) {
+ MetadataManager.INSTANCE.addFeedPolicy(mdTxnCtx, feedPolicy);
+ }
+ }
+
private static DatasourceAdapter getAdapter(String adapterFactoryClassName) throws Exception {
String adapterName = ((IAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getName();
return new DatasourceAdapter(new AdapterIdentifier(MetadataConstants.METADATA_DATAVERSE_NAME, adapterName),
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
index c0ce030..db295d4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
@@ -27,17 +27,13 @@
import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
-import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
/**
* Descriptor for a primary or secondary index on metadata datasets.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 631be89..f922169 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -49,6 +49,8 @@
public static ARecordType DATASOURCE_ADAPTER_RECORDTYPE;
public static ARecordType FEED_ACTIVITY_RECORDTYPE;
public static ARecordType FEED_POLICY_RECORDTYPE;
+ public static ARecordType POLICY_PARAMS_RECORDTYPE;
+ public static ARecordType FEED_ACTIVITY_DETAILS_RECORDTYPE;
/**
* Create all metadata record types.
@@ -58,6 +60,7 @@
// depend on other types being created first.
// These calls are one "dependency chain".
try {
+ POLICY_PARAMS_RECORDTYPE = createPropertiesRecordType();
DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE = createPropertiesRecordType();
INTERNAL_DETAILS_RECORDTYPE = createInternalDetailsRecordType();
EXTERNAL_DETAILS_RECORDTYPE = createExternalDetailsRecordType();
@@ -78,6 +81,7 @@
NODEGROUP_RECORDTYPE = createNodeGroupRecordType();
FUNCTION_RECORDTYPE = createFunctionRecordType();
DATASOURCE_ADAPTER_RECORDTYPE = createDatasourceAdapterRecordType();
+ FEED_ACTIVITY_DETAILS_RECORDTYPE = createPropertiesRecordType();
FEED_ACTIVITY_RECORDTYPE = createFeedActivityRecordType();
FEED_POLICY_RECORDTYPE = createFeedPolicyRecordType();
} catch (AsterixException e) {
@@ -91,12 +95,10 @@
public static final int FEED_POLICY_ARECORD_PROPERTIES_FIELD_INDEX = 3;
private static ARecordType createFeedPolicyRecordType() throws AsterixException {
- IAType propertiesRecordType = createPropertiesRecordType();
- AUnorderedListType listPropertiesType = new AUnorderedListType(propertiesRecordType, null);
-
+ AUnorderedListType listPropertiesType = new AUnorderedListType(POLICY_PARAMS_RECORDTYPE, null);
String[] fieldNames = { "DataverseName", "PolicyName", "Description", "Properties" };
IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, listPropertiesType };
- return new ARecordType(null, fieldNames, fieldTypes, true);
+ return new ARecordType("FeedPolicyRecordType", fieldNames, fieldTypes, true);
}
// Helper constants for accessing fields in an ARecord of type
@@ -115,8 +117,8 @@
// Helper constants for accessing fields in an ARecord of anonymous type
// dataset properties.
// Used for dataset hints or dataset adapter properties.
- public static final int DATASOURCE_PROPERTIES_NAME_FIELD_INDEX = 0;
- public static final int DATASOURCE_PROPERTIES_VALUE_FIELD_INDEX = 1;
+ public static final int PROPERTIES_NAME_FIELD_INDEX = 0;
+ public static final int PROPERTIES_VALUE_FIELD_INDEX = 1;
private static final ARecordType createPropertiesRecordType() throws AsterixException {
String[] fieldNames = { "Name", "Value" };
@@ -385,16 +387,15 @@
public static final int FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX = 1;
public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_ID_FIELD_INDEX = 2;
public static final int FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX = 3;
- public static final int FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX = 4;
- public static final int FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX = 5;
- public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 6;
+ public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 4;
+ public static final int FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX = 5;
private static ARecordType createFeedActivityRecordType() throws AsterixException {
- AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, null);
- String[] fieldNames = { "DataverseName", "DatasetName", "ActivityId", "ActivityType", "IngestNodes",
- "ComputeNodes", "UpdateTimestamp" };
+ AUnorderedListType unorderedPropertyListType = new AUnorderedListType(FEED_ACTIVITY_DETAILS_RECORDTYPE, null);
+ String[] fieldNames = { "DataverseName", "DatasetName", "ActivityId", "ActivityType", "UpdateTimestamp",
+ "Details" };
IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32, BuiltinType.ASTRING,
- unorderedListType, unorderedListType, BuiltinType.ASTRING };
+ BuiltinType.ASTRING, unorderedPropertyListType };
return new ARecordType("FeedActivityRecordType", fieldNames, fieldTypes, true);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/dataset/hints/IHint.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/dataset/hints/IHint.java
index 8957ca7..107eb53 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/dataset/hints/IHint.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/dataset/hints/IHint.java
@@ -16,6 +16,7 @@
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+
/**
* Represents a hint provided as part of an AQL statement.
*/
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
index 0ed3c78..e3e4cb7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
@@ -16,8 +16,11 @@
package edu.uci.ics.asterix.metadata.declared;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
@@ -51,6 +54,7 @@
private IAType[] schemaTypes;
private INodeDomain domain;
private AqlDataSourceType datasourceType;
+ private Map<String, Serializable> properties = new HashMap<String, Serializable>();
public enum AqlDataSourceType {
INTERNAL,
@@ -268,4 +272,12 @@
return datasourceType;
}
+ public Map<String, Serializable> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, Serializable> properties) {
+ this.properties = properties;
+ }
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index b98e2d7..80e1b9f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -33,16 +33,6 @@
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
-import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.data.operator.ExternalDataScanOperatorDescriptor;
-import edu.uci.ics.asterix.external.data.operator.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.external.data.operator.FeedMessageOperatorDescriptor;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
-import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
@@ -57,9 +47,22 @@
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
+import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
+import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
+import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasourceAdapter;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.types.ARecordType;
@@ -309,7 +312,7 @@
IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
.getDatatype();
if (dataSource instanceof ExternalFeedDataSource) {
- return buildFeedIntakeRuntime(jobSpec, dataset);
+ return buildFeedIntakeRuntime(jobSpec, dataset, dataSource);
} else {
return buildExternalDataScannerRuntime(jobSpec, itemType,
(ExternalDatasetDetails) dataset.getDatasetDetails(), NonTaggedDataFormat.INSTANCE);
@@ -399,7 +402,7 @@
@SuppressWarnings("rawtypes")
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(JobSpecification jobSpec,
- Dataset dataset) throws AlgebricksException {
+ Dataset dataset, IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
FeedDatasetDetails datasetDetails = (FeedDatasetDetails) dataset.getDatasetDetails();
DatasourceAdapter adapterEntity;
@@ -449,9 +452,11 @@
.getSerializerDeserializer(adapterOutputType);
RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+ FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
+ BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
dataset.getDataverseName(), dataset.getDatasetName()), adapterFactoryClassname,
- datasetDetails.getProperties(), (ARecordType) adapterOutputType, feedDesc);
+ datasetDetails.getProperties(), (ARecordType) adapterOutputType, feedDesc, feedPolicy.getProperties());
AlgebricksPartitionConstraint constraint = null;
try {
@@ -467,7 +472,7 @@
String dataverse, String dataset, List<IFeedMessage> feedMessages, FeedActivity feedActivity)
throws AlgebricksException {
AlgebricksPartitionConstraint partitionConstraint = new AlgebricksAbsolutePartitionConstraint(feedActivity
- .getIngestNodes().toArray(new String[] {}));
+ .getFeedActivityDetails().get(FeedActivityDetails.INGEST_LOCATIONS).split(","));
FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, dataset,
feedMessages);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, partitionConstraint);
@@ -1455,6 +1460,14 @@
return type.getDatatype();
}
+ public FeedPolicy findFeedPolicy(String dataverse, String policyName) throws AlgebricksException {
+ try {
+ return MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverse, policyName);
+ } catch (MetadataException e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
try {
return MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitDataSink.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitDataSink.java
index 586be9c..777b335 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitDataSink.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitDataSink.java
@@ -18,6 +18,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+
public class FileSplitDataSink implements IDataSink {
private FileSplitSinkId id;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitSinkId.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitSinkId.java
index 00625fe..59aa62b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitSinkId.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FileSplitSinkId.java
@@ -17,6 +17,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+
public class FileSplitSinkId {
private FileSplit fileSplit;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetDataSink.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetDataSink.java
index ad91111..93faef5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetDataSink.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetDataSink.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ResultSetDomain;
+
public class ResultSetDataSink implements IDataSink {
private ResultSetSinkId id;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetSinkId.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetSinkId.java
index 1eb4336..6db14da 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetSinkId.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/ResultSetSinkId.java
@@ -16,6 +16,7 @@
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+
public class ResultSetSinkId {
private final ResultSetId resultSetId;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/DatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/DatasourceAdapter.java
index 2955a08..6f63199 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/DatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/DatasourceAdapter.java
@@ -14,9 +14,9 @@
*/
package edu.uci.ics.asterix.metadata.entities;
-import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
import edu.uci.ics.asterix.metadata.MetadataCache;
import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
public class DatasourceAdapter implements IMetadataEntity {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
index eb2673f..72a1f03 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -16,6 +16,7 @@
package edu.uci.ics.asterix.metadata.entities;
import java.util.List;
+import java.util.Map;
import edu.uci.ics.asterix.metadata.MetadataCache;
import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
@@ -33,10 +34,9 @@
// Enforced to be unique within a dataverse.
private final String datasetName;
- private List<String> ingestNodes;
- private List<String> computeNodes;
private String lastUpdatedTimestamp;
private FeedActivityType activityType;
+ private Map<String, String> feedActivityDetails;
public static enum FeedActivityType {
FEED_BEGIN,
@@ -47,13 +47,22 @@
FEED_SHRINK
}
+ public static class FeedActivityDetails {
+ public static final String COMPUTE_LOCATIONS = "compute-locations";
+ public static final String INGEST_LOCATIONS = "ingest-locations";
+ public static final String TOTAL_INGESTED = "total-ingested";
+ public static final String INGESTION_RATE = "ingestion-rate";
+ public static final String EXCEPTION_LOCATION = "exception-location";
+ public static final String EXCEPTION_MESSAGE = "exception-message";
+
+ }
+
public FeedActivity(String dataverseName, String datasetName, FeedActivityType feedActivityType,
- List<String> ingestNodes, List<String> computeNodes) {
+ Map<String, String> feedActivityDetails) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.activityType = feedActivityType;
- this.ingestNodes = ingestNodes;
- this.computeNodes = computeNodes;
+ this.feedActivityDetails = feedActivityDetails;
}
public String getDataverseName() {
@@ -100,22 +109,6 @@
this.activityType = feedActivityType;
}
- public List<String> getIngestNodes() {
- return ingestNodes;
- }
-
- public void setIngestNodes(List<String> ingestNodes) {
- this.ingestNodes = ingestNodes;
- }
-
- public List<String> getComputeNodes() {
- return computeNodes;
- }
-
- public void setComputeNodes(List<String> computeNodes) {
- this.computeNodes = computeNodes;
- }
-
public String getLastUpdatedTimestamp() {
return lastUpdatedTimestamp;
}
@@ -132,8 +125,25 @@
this.activityId = activityId;
}
+ public Map<String, String> getFeedActivityDetails() {
+ return feedActivityDetails;
+ }
+
+ public void setFeedActivityDetails(Map<String, String> feedActivityDetails) {
+ this.feedActivityDetails = feedActivityDetails;
+ }
+
+ public FeedActivityType getActivityType() {
+ return activityType;
+ }
+
+ public void setActivityType(FeedActivityType activityType) {
+ this.activityType = activityType;
+ }
+
@Override
public int compareTo(FeedActivity o) {
return this.activityId - o.getActivityId();
}
+
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedPolicy.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedPolicy.java
index f796cb7..b011e5c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedPolicy.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedPolicy.java
@@ -30,7 +30,7 @@
private final String dataverseName;
// Enforced to be unique within a dataverse.
private final String policyName;
- // A descriptiokn of the policy
+ // A description of the policy
private final String description;
// The policy properties associated with the feed dataset
private Map<String, String> properties;
@@ -39,6 +39,7 @@
this.dataverseName = dataverseName;
this.policyName = policyName;
this.description = description;
+ this.properties = properties;
}
public String getDataverseName() {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index 2a68ae9..65b2d78 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -138,9 +138,9 @@
String value;
while (cursor.next()) {
ARecord field = (ARecord) cursor.get();
- key = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_NAME_FIELD_INDEX))
+ key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
.getStringValue();
- value = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_VALUE_FIELD_INDEX))
+ value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
.getStringValue();
properties.put(key, value);
}
@@ -228,9 +228,9 @@
String value;
while (cursor.next()) {
ARecord field = (ARecord) cursor.get();
- key = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_NAME_FIELD_INDEX))
+ key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
.getStringValue();
- value = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_VALUE_FIELD_INDEX))
+ value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
.getStringValue();
properties.put(key, value);
}
@@ -359,9 +359,9 @@
IACursor cursor = list.getCursor();
while (cursor.next()) {
ARecord field = (ARecord) cursor.get();
- key = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_NAME_FIELD_INDEX))
+ key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
.getStringValue();
- value = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_VALUE_FIELD_INDEX))
+ value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
.getStringValue();
hints.put(key, value);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
index 4a5e4dcf..e93fd97 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
@@ -22,13 +22,13 @@
import java.util.Calendar;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
+import edu.uci.ics.asterix.metadata.feeds.AdapterIdentifier;
import edu.uci.ics.asterix.om.base.ARecord;
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
index e81eb3e..51678dc2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
@@ -18,11 +18,16 @@
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.builders.RecordBuilder;
import edu.uci.ics.asterix.builders.UnorderedListBuilder;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -33,6 +38,7 @@
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.om.base.AInt32;
import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableString;
import edu.uci.ics.asterix.om.base.ARecord;
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.base.AUnorderedList;
@@ -40,6 +46,7 @@
import edu.uci.ics.asterix.om.types.AUnorderedListType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -93,22 +100,20 @@
String feedActivityType = ((AString) feedActivityRecord
.getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX)).getStringValue();
- List<String> ingestNodes = new ArrayList<String>();
IACursor cursor = ((AUnorderedList) feedActivityRecord
- .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX)).getCursor();
+ .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX)).getCursor();
+ Map<String, String> activityDetails = new HashMap<String, String>();
+ String key;
+ String value;
while (cursor.next()) {
- ingestNodes.add(((AString) cursor.get()).getStringValue());
- }
-
- List<String> computeNodes = new ArrayList<String>();
- cursor = ((AUnorderedList) feedActivityRecord
- .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX)).getCursor();
- while (cursor.next()) {
- computeNodes.add(((AString) cursor.get()).getStringValue());
+ ARecord field = (ARecord) cursor.get();
+ key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
+ value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
+ activityDetails.put(key, value);
}
FeedActivity fa = new FeedActivity(dataverseName, datasetName, FeedActivityType.valueOf(feedActivityType),
- ingestNodes, computeNodes);
+ activityDetails);
fa.setActivityId(activityId);
return fa;
}
@@ -159,38 +164,27 @@
recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_ACTIVITY_TYPE_FIELD_INDEX, fieldValue);
// write field 4
- UnorderedListBuilder listBuilder = new UnorderedListBuilder();
- listBuilder
- .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX]);
- for (String field : feedActivity.getIngestNodes()) {
- itemValue.reset();
- aString.setValue(field);
- stringSerde.serialize(aString, itemValue.getDataOutput());
- listBuilder.addItem(itemValue);
- }
- fieldValue.reset();
- listBuilder.write(fieldValue.getDataOutput(), true);
- recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX, fieldValue);
-
- // write field 5
- listBuilder
- .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX]);
- for (String field : feedActivity.getIngestNodes()) {
- itemValue.reset();
- aString.setValue(field);
- stringSerde.serialize(aString, itemValue.getDataOutput());
- listBuilder.addItem(itemValue);
- }
- fieldValue.reset();
- listBuilder.write(fieldValue.getDataOutput(), true);
- recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX, fieldValue);
-
- // write field 6
fieldValue.reset();
aString.setValue(Calendar.getInstance().getTime().toString());
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX, fieldValue);
+ // write field 5
+ Map<String, String> properties = feedActivity.getFeedActivityDetails();
+ UnorderedListBuilder listBuilder = new UnorderedListBuilder();
+ listBuilder
+ .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX]);
+ for (Map.Entry<String, String> property : properties.entrySet()) {
+ String name = property.getKey();
+ String value = property.getValue();
+ itemValue.reset();
+ writePropertyTypeRecord(name, value, itemValue.getDataOutput());
+ listBuilder.addItem(itemValue);
+ }
+ fieldValue.reset();
+ listBuilder.write(fieldValue.getDataOutput(), true);
+ recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DETAILS_FIELD_INDEX, fieldValue);
+
// write record
try {
recordBuilder.write(tupleBuilder.getDataOutput(), true);
@@ -203,4 +197,31 @@
return tuple;
}
+ public void writePropertyTypeRecord(String name, String value, DataOutput out) throws HyracksDataException {
+ IARecordBuilder propertyRecordBuilder = new RecordBuilder();
+ ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+ propertyRecordBuilder.reset(MetadataRecordTypes.FEED_ACTIVITY_DETAILS_RECORDTYPE);
+ AMutableString aString = new AMutableString("");
+ ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(name);
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ propertyRecordBuilder.addField(0, fieldValue);
+
+ // write field 1
+ fieldValue.reset();
+ aString.setValue(value);
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ propertyRecordBuilder.addField(1, fieldValue);
+
+ try {
+ propertyRecordBuilder.write(out, true);
+ } catch (IOException | AsterixException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedPolicyTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedPolicyTupleTranslator.java
index 05effd8..57d2d1c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedPolicyTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedPolicyTupleTranslator.java
@@ -18,10 +18,13 @@
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.builders.RecordBuilder;
import edu.uci.ics.asterix.builders.UnorderedListBuilder;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -31,13 +34,15 @@
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.om.base.AInt32;
import edu.uci.ics.asterix.om.base.AMutableInt32;
-import edu.uci.ics.asterix.om.base.AOrderedList;
+import edu.uci.ics.asterix.om.base.AMutableString;
import edu.uci.ics.asterix.om.base.ARecord;
import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.base.AUnorderedList;
import edu.uci.ics.asterix.om.base.IACursor;
import edu.uci.ics.asterix.om.types.AUnorderedListType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -88,7 +93,18 @@
String description = ((AString) feedPolicyRecord
.getValueByPos(MetadataRecordTypes.FEED_POLICY_ARECORD_DESCRIPTION_FIELD_INDEX)).getStringValue();
+ IACursor cursor = ((AUnorderedList) feedPolicyRecord
+ .getValueByPos(MetadataRecordTypes.FEED_POLICY_ARECORD_PROPERTIES_FIELD_INDEX)).getCursor();
Map<String, String> policyParamters = new HashMap<String, String>();
+ String key;
+ String value;
+ while (cursor.next()) {
+ ARecord field = (ARecord) cursor.get();
+ key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
+ value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
+ policyParamters.put(key, value);
+ }
+
feedPolicy = new FeedPolicy(dataverseName, policyName, description, policyParamters);
return feedPolicy;
}
@@ -128,8 +144,10 @@
recordBuilder.addField(MetadataRecordTypes.FEED_POLICY_ARECORD_POLICY_NAME_FIELD_INDEX, fieldValue);
// write field 3 (properties)
+ Map<String, String> properties = feedPolicy.getProperties();
UnorderedListBuilder listBuilder = new UnorderedListBuilder();
- listBuilder.reset((AUnorderedListType) MetadataRecordTypes.FEED_POLICY_RECORDTYPE.getFieldTypes()[6]);
+ listBuilder
+ .reset((AUnorderedListType) MetadataRecordTypes.FEED_POLICY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_POLICY_ARECORD_PROPERTIES_FIELD_INDEX]);
for (Map.Entry<String, String> property : properties.entrySet()) {
String name = property.getKey();
String value = property.getValue();
@@ -139,24 +157,6 @@
}
fieldValue.reset();
listBuilder.write(fieldValue.getDataOutput(), true);
- feedRecordBuilder.addField(MetadataRecordTypes.FEED_DETAILS_ARECORD_PROPERTIES_FIELD_INDEX, fieldValue);
-
- fieldValue.reset();
- Map<String, String> properties = new HashMap<String, String>();
- String key;
- String value;
- IACursor cursor = ((AOrderedList) feedPolicy
- .getValueByPos(MetadataRecordTypes.FEED_POLICY_ARECORD_PROPERTIES_FIELD_INDEX)).getCursor();
-
- while (cursor.next()) {
- ARecord field = (ARecord) cursor.get();
- key = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_NAME_FIELD_INDEX))
- .getStringValue();
- value = ((AString) field.getValueByPos(MetadataRecordTypes.DATASOURCE_PROPERTIES_VALUE_FIELD_INDEX))
- .getStringValue();
- properties.put(key, value);
- }
-
recordBuilder.addField(MetadataRecordTypes.FEED_POLICY_ARECORD_PROPERTIES_FIELD_INDEX, fieldValue);
// write record
@@ -171,4 +171,30 @@
return tuple;
}
-}
\ No newline at end of file
+ public void writePropertyTypeRecord(String name, String value, DataOutput out) throws HyracksDataException {
+ IARecordBuilder propertyRecordBuilder = new RecordBuilder();
+ ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+ propertyRecordBuilder.reset(MetadataRecordTypes.POLICY_PARAMS_RECORDTYPE);
+ AMutableString aString = new AMutableString("");
+ ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(name);
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ propertyRecordBuilder.addField(0, fieldValue);
+
+ // write field 1
+ fieldValue.reset();
+ aString.setValue(value);
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ propertyRecordBuilder.addField(1, fieldValue);
+
+ try {
+ propertyRecordBuilder.write(out, true);
+ } catch (IOException | AsterixException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
similarity index 96%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
index 440ee8c..476fdbb 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractDatasourceAdapter.java
@@ -12,11 +12,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.dataset.adapter;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.HashMap;
import java.util.Map;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
new file mode 100644
index 0000000..d350a9e
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
@@ -0,0 +1,106 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.rmi.RemoteException;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+
+public abstract class AbstractFeedDatasourceAdapter extends AbstractDatasourceAdapter {
+
+ private static final long serialVersionUID = 1L;
+
+ private int batchPersistSize = 10;
+ protected FeedPolicyEnforcer policyEnforcer;
+ protected StatisticsCollector collector;
+ protected Timer timer;
+
+ public FeedPolicyEnforcer getPolicyEnforcer() {
+ return policyEnforcer;
+ }
+
+ public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
+ this.policyEnforcer = policyEnforcer;
+ FeedPolicyAccessor policyAccessor = this.policyEnforcer.getFeedPolicyAccessor();
+ if (policyAccessor.collectStatistics()) {
+ long period = policyAccessor.getStatisicsCollectionPeriodInSecs();
+ collector = new StatisticsCollector(this, policyEnforcer.getFeedId(), period, batchPersistSize);
+ timer = new Timer();
+ timer.schedule(collector, period * 1000, period * 1000);
+ }
+ }
+
+ public abstract long getIngestedRecordsCount();
+
+ protected void dumpStatistics() throws RemoteException, ACIDException {
+ collector.persistStatistics(true);
+ }
+
+ private static class StatisticsCollector extends TimerTask {
+
+ private final AbstractFeedDatasourceAdapter adapter;
+ private final FeedId feedId;
+ // private List<Long> previousCountValues = new ArrayList<Long>();
+ // private List<Integer> ingestionRates = new ArrayList<Integer>();
+ private final long period;
+ private FeedActivity feedActivity;
+ private int numMeasurements = 0;
+ private int batchPersistSize;
+ private StringBuilder count = new StringBuilder();
+ private StringBuilder rate = new StringBuilder();
+ private long previousCount = 0;
+
+ public StatisticsCollector(AbstractFeedDatasourceAdapter adapter, FeedId feedId, long period,
+ int batchPersistSize) {
+ this.adapter = adapter;
+ this.feedId = feedId;
+ this.period = period;
+ this.batchPersistSize = batchPersistSize;
+ this.feedActivity = new FeedActivity(feedId.getDataverse(), feedId.getDataset(),
+ FeedActivityType.FEED_STATS, new HashMap<String, String>());
+ }
+
+ @Override
+ public void run() {
+ try {
+ persistStatistics(false);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public void persistStatistics(boolean force) throws RemoteException, ACIDException {
+ MetadataTransactionContext ctx = null;
+ try {
+ long currentCount = adapter.getIngestedRecordsCount();
+ long diff = (currentCount - previousCount);
+ count.append(currentCount + ",");
+ rate.append(new Double(diff / period).intValue() + ",");
+ feedActivity.getFeedActivityDetails().put(FeedActivityDetails.INGESTION_RATE, "" + rate.toString());
+ feedActivity.getFeedActivityDetails().put(FeedActivityDetails.TOTAL_INGESTED, "" + count.toString());
+ numMeasurements++;
+ if (numMeasurements == batchPersistSize || force) {
+ numMeasurements = 0;
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ MetadataManager.INSTANCE.registerFeedActivity(ctx, feedId, feedActivity);
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ count.delete(0, count.length() - 1);
+ rate.delete(0, rate.length() - 1);
+ }
+ previousCount = currentCount;
+ } catch (Exception e) {
+ if (ctx != null) {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ }
+ }
+ }
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AdapterIdentifier.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
similarity index 96%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AdapterIdentifier.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
index ac36733..ee63f69 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AdapterIdentifier.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AdapterIdentifier.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.dataset.adapter;
+package edu.uci.ics.asterix.metadata.feeds;
import java.io.Serializable;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/AlterFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
similarity index 91%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/AlterFeedMessage.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
index 537bf07..a15c2b0 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/AlterFeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/AlterFeedMessage.java
@@ -12,10 +12,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.Map;
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage.MessageType;
+
/**
* A feed control message containing the altered values for
* adapter configuration parameters. This message is dispatched
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
new file mode 100644
index 0000000..9556ae0
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -0,0 +1,83 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+
+public class BuiltinFeedPolicies {
+
+ public static final FeedPolicy MISSTION_CRITICAL = initializeMissionCriticalFeedPolicy();
+
+ public static final FeedPolicy ADVANCED = initializeAdvancedFeedPolicy();
+
+ public static final FeedPolicy BASIC_MONITORED = initializeBasicMonitoredPolicy();
+
+ public static final FeedPolicy BASIC = initializeBasicPolicy();
+
+ public static final FeedPolicy[] policies = new FeedPolicy[] { MISSTION_CRITICAL, ADVANCED, BASIC_MONITORED, BASIC };
+
+ public static final FeedPolicy DEFAULT_POLICY = BASIC_MONITORED;
+
+ public static final String CONFIG_FEED_POLICY_KEY = "policy";
+
+ public static FeedPolicy getFeedPolicy(String policyName) {
+ for (FeedPolicy policy : policies) {
+ if (policy.getPolicyName().equalsIgnoreCase(policyName)) {
+ return policy;
+ }
+ }
+ return null;
+ }
+
+ private static FeedPolicy initializeMissionCriticalFeedPolicy() {
+ Map<String, String> policyParams = new HashMap<String, String>();
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "true");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "true");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "true");
+ policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "true");
+ policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD, "60");
+ policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
+ policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+ String description = "MissionCritical";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "MissionCritical", description, policyParams);
+ }
+
+ private static FeedPolicy initializeBasicPolicy() {
+ Map<String, String> policyParams = new HashMap<String, String>();
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "false");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "false");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "false");
+ policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "false");
+ policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+ String description = "Basic";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Basic", description, policyParams);
+ }
+
+ private static FeedPolicy initializeBasicMonitoredPolicy() {
+ Map<String, String> policyParams = new HashMap<String, String>();
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "true");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "false");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "false");
+ policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "true");
+ policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD, "5");
+ policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
+ policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+ String description = "BasicMonitored";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "BasicMonitored", description, policyParams);
+ }
+
+ private static FeedPolicy initializeAdvancedFeedPolicy() {
+ Map<String, String> policyParams = new HashMap<String, String>();
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_PERSIST_EXCEPTION, "true");
+ policyParams.put(FeedPolicyAccessor.APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION, "true");
+ policyParams.put(FeedPolicyAccessor.HARDWARE_FAILURE_AUTO_RESTART, "false");
+ policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT, "true");
+ policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD, "60");
+ policyParams.put(FeedPolicyAccessor.STATISTICS_COLLECT_PERIOD_UNIT, FeedPolicyAccessor.TimeUnit.SEC.name());
+ policyParams.put(FeedPolicyAccessor.ELASTIC, "false");
+ String description = "Advanced";
+ return new FeedPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, "Advanced", description, policyParams);
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
similarity index 95%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
index fb4cc99..d4ab54b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
@@ -12,12 +12,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.data.operator;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.Map;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedActivityIdFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedActivityIdFactory.java
similarity index 90%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedActivityIdFactory.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedActivityIdFactory.java
index b90f897..db1a62a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedActivityIdFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedActivityIdFactory.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.concurrent.atomic.AtomicInteger;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedId.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedId.java
similarity index 96%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedId.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedId.java
index b1889ee..76ad775 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedId.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedId.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
import java.io.Serializable;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
new file mode 100644
index 0000000..b8dbefa
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+/**
+ * Operator responsible for ingesting data from an external source. This
+ * operator uses a (configurable) adapter associated with the feed dataset.
+ */
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String adapterFactoryClassName;
+ private final Map<String, String> adapterConfiguration;
+ private final IAType atype;
+ private final FeedId feedId;
+ private final Map<String, String> feedPolicy;
+
+ private transient IAdapterFactory datasourceAdapterFactory;
+
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
+ Map<String, String> arguments, ARecordType atype, RecordDescriptor rDesc, Map<String, String> feedPolicy) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = rDesc;
+ this.adapterFactoryClassName = adapter;
+ this.adapterConfiguration = arguments;
+ this.atype = atype;
+ this.feedId = feedId;
+ this.feedPolicy = feedPolicy;
+ }
+
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ ITypedDatasourceAdapter adapter;
+ try {
+ datasourceAdapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassName).newInstance();
+ if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
+ adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
+ .createAdapter(adapterConfiguration, atype);
+ } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
+ adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
+ .createAdapter(adapterConfiguration);
+ } else {
+ throw new IllegalStateException(" Unknown adapter factory type for " + adapterFactoryClassName);
+ }
+ adapter.initialize(ctx);
+ } catch (Exception e) {
+ throw new HyracksDataException("initialization of adapter failed", e);
+ }
+ return new FeedIntakeOperatorNodePushable(feedId, adapter, feedPolicy, partition);
+ }
+
+ public FeedId getFeedId() {
+ return feedId;
+ }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
new file mode 100644
index 0000000..0e6249c
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+/**
+ * The runtime for @see{FeedIntakeOperationDescriptor}
+ */
+public class FeedIntakeOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+ private final IDatasourceAdapter adapter;
+ private final int partition;
+ private final IFeedManager feedManager;
+ private final FeedId feedId;
+ private final LinkedBlockingQueue<IFeedMessage> inbox;
+ private FeedInboxMonitor feedInboxMonitor;
+ private final Map<String, String> feedPolicy;
+ private final FeedPolicyEnforcer policyEnforcer;
+
+ public FeedIntakeOperatorNodePushable(FeedId feedId, IDatasourceAdapter adapter, Map<String, String> feedPolicy,
+ int partition) {
+ this.adapter = adapter;
+ this.partition = partition;
+ this.feedManager = (IFeedManager) FeedManager.INSTANCE;
+ this.feedId = feedId;
+ inbox = new LinkedBlockingQueue<IFeedMessage>();
+ this.feedPolicy = feedPolicy;
+ policyEnforcer = new FeedPolicyEnforcer(feedId, feedPolicy);
+
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (adapter instanceof IManagedFeedAdapter) {
+ feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
+ feedInboxMonitor.start();
+ feedManager.registerFeedMsgQueue(feedId, inbox);
+ }
+ writer.open();
+ try {
+ ((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
+ adapter.start(partition, writer);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ if (adapter instanceof IManagedFeedAdapter) {
+ try {
+ ((IManagedFeedAdapter) adapter).stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ feedManager.unregisterFeedMsgQueue(feedId, inbox);
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ // do nothing
+ }
+
+ public Map<String, String> getFeedPolicy() {
+ return feedPolicy;
+ }
+}
+
+class FeedInboxMonitor extends Thread {
+
+ private LinkedBlockingQueue<IFeedMessage> inbox;
+ private final IManagedFeedAdapter adapter;
+
+ public FeedInboxMonitor(IManagedFeedAdapter adapter, LinkedBlockingQueue<IFeedMessage> inbox, int partition) {
+ this.inbox = inbox;
+ this.adapter = adapter;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ IFeedMessage feedMessage = inbox.take();
+ switch (feedMessage.getMessageType()) {
+ case STOP:
+ adapter.stop();
+ break;
+ case ALTER:
+ adapter.alter(((AlterFeedMessage) feedMessage).getAlteredConfParams());
+ break;
+ }
+ } catch (InterruptedException ie) {
+ break;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobLifecycleListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java
similarity index 89%
rename from asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobLifecycleListener.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java
index 0d044fe..f7e1325 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobLifecycleListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedJobLifecycleListener.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.metadata.declared;
+package edu.uci.ics.asterix.metadata.feeds;
import java.io.Serializable;
import java.rmi.RemoteException;
@@ -25,8 +25,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import edu.uci.ics.asterix.common.api.AsterixAppContextInfo;
-import edu.uci.ics.asterix.external.data.operator.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -50,6 +48,8 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
+//import edu.uci.ics.hyracks.api.job.JobInfo;
+
public class FeedJobLifecycleListener implements IJobLifecycleListener, Serializable {
private static final long serialVersionUID = 1L;
@@ -82,9 +82,10 @@
@Override
public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+
IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(jobId, AsterixAppContextInfo
.getInstance().getCCApplicationContext(), EnumSet.noneOf(JobFlag.class));
- JobSpecification spec = acgg.getJobSpecification();
+ JobSpecification spec = acggf.getJobSpecification();
boolean feedIngestionJob = false;
FeedId feedId = null;
for (IOperatorDescriptor opDesc : spec.getOperatorMap().values()) {
@@ -98,6 +99,7 @@
if (feedIngestionJob) {
feedJobNotificationHandler.registerFeed(feedId, jobId, spec);
}
+
}
private static class Message {
@@ -160,6 +162,7 @@
}
private void handleJobStartMessage(FeedInfo feedInfo, Message message) {
+
JobSpecification jobSpec = feedInfo.jobSpec;
List<OperatorDescriptorId> ingestOperatorIds = new ArrayList<OperatorDescriptorId>();
@@ -184,9 +187,12 @@
IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
JobInfo info = hcc.getJobInfo(message.jobId);
+ Map<String, String> feedActivityDetails = new HashMap<String, String>();
+ StringBuilder ingestLocs = new StringBuilder();
for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
feedInfo.ingestLocations.addAll(info.getOperatorLocations().get(ingestOpId));
}
+ StringBuilder computeLocs = new StringBuilder();
for (OperatorDescriptorId computeOpId : computeOperatorIds) {
List<String> locations = info.getOperatorLocations().get(computeOpId);
if (locations != null) {
@@ -195,9 +201,21 @@
feedInfo.computeLocations.addAll(feedInfo.ingestLocations);
}
}
+
+ for (String ingestLoc : feedInfo.ingestLocations) {
+ ingestLocs.append(ingestLoc);
+ ingestLocs.append(",");
+ }
+ for (String computeLoc : feedInfo.computeLocations) {
+ computeLocs.append(computeLoc);
+ computeLocs.append(",");
+ }
+
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.INGEST_LOCATIONS, ingestLocs.toString());
+ feedActivityDetails.put(FeedActivity.FeedActivityDetails.COMPUTE_LOCATIONS, computeLocs.toString());
+
FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
- feedInfo.feedId.getDataset(), FeedActivityType.FEED_BEGIN, feedInfo.ingestLocations,
- feedInfo.computeLocations);
+ feedInfo.feedId.getDataset(), FeedActivityType.FEED_BEGIN, feedActivityDetails);
MetadataManager.INSTANCE.acquireWriteLatch();
MetadataTransactionContext mdTxnCtx = null;
@@ -214,9 +232,11 @@
} catch (Exception e) {
// TODO Add Exception handling here
}
+
}
private void handleJobFinishMessage(FeedInfo feedInfo, Message message) {
+
MetadataManager.INSTANCE.acquireWriteLatch();
MetadataTransactionContext mdTxnCtx = null;
@@ -224,16 +244,18 @@
IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
JobInfo info = hcc.getJobInfo(message.jobId);
JobStatus status = info.getPendingStatus();
- Exception e;
+ List<Exception> exceptions;
boolean failure = status != null && status.equals(JobStatus.FAILURE);
FeedActivityType activityType = FeedActivityType.FEED_END;
+ Map<String, String> details = new HashMap<String, String>();
if (failure) {
- e = info.getPendingException();
+ exceptions = info.getPendingExceptions();
activityType = FeedActivityType.FEED_FAILURE;
+ details.put(FeedActivity.FeedActivityDetails.EXCEPTION_MESSAGE, exceptions.get(0).getMessage());
}
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
FeedActivity feedActivity = new FeedActivity(feedInfo.feedId.getDataverse(),
- feedInfo.feedId.getDataset(), activityType, feedInfo.ingestLocations, feedInfo.computeLocations);
+ feedInfo.feedId.getDataset(), activityType, details);
MetadataManager.INSTANCE.registerFeedActivity(mdTxnCtx, new FeedId(feedInfo.feedId.getDataverse(),
feedInfo.feedId.getDataset()), feedActivity);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -248,8 +270,8 @@
} finally {
MetadataManager.INSTANCE.releaseWriteLatch();
}
- }
+ }
}
private static class FeedInfo {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
similarity index 91%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedManager.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index 8314267..eb47287 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.HashMap;
import java.util.HashSet;
@@ -21,6 +21,9 @@
import java.util.concurrent.LinkedBlockingQueue;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.metadata.feeds.FeedId;
+import edu.uci.ics.asterix.metadata.feeds.IFeedManager;
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
/**
* Handle (de-)registration of feeds for delivery of control messages.
@@ -43,7 +46,7 @@
for (LinkedBlockingQueue<IFeedMessage> queue : operatorQueues) {
queue.put(feedMessage);
}
- }
+ }
} catch (Exception e) {
throw new AsterixException(e);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessage.java
similarity index 91%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedMessage.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessage.java
index af84d4f..68c2017 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/FeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessage.java
@@ -12,7 +12,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
+
+import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
/**
* A control message that can be sent to the runtime instance of a
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedMessageOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
similarity index 91%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedMessageOperatorDescriptor.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
index d0dc5ca..bbd8869 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedMessageOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorDescriptor.java
@@ -12,12 +12,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.data.operator;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.List;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
similarity index 86%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedMessageOperatorNodePushable.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index d03eeaa..b51ffed 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -12,15 +12,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.data.operator;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.ArrayList;
import java.util.List;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
-import edu.uci.ics.asterix.external.feed.lifecycle.FeedManager;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedManager;
-import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
new file mode 100644
index 0000000..3e16877
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyAccessor.java
@@ -0,0 +1,81 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.util.Map;
+
+public class FeedPolicyAccessor {
+ public static final String APPLICATION_FAILURE_PERSIST_EXCEPTION = "software.failure.persist.exception";
+ public static final String APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION = "software.failure.continue.on.exception";
+ public static final String HARDWARE_FAILURE_AUTO_RESTART = "hardware.failure.auto.restart";
+ public static final String STATISTICS_COLLECT = "statistics.collect";
+ public static final String STATISTICS_COLLECT_PERIOD = "statistics.collect.period";
+ public static final String STATISTICS_COLLECT_PERIOD_UNIT = "statistics.collect.period.unit";
+ public static final String ELASTIC = "elastic";
+
+ public enum TimeUnit {
+ SEC,
+ MIN,
+ HRS,
+ DAYS
+ }
+
+ private Map<String, String> feedPolicy;
+
+ public FeedPolicyAccessor(Map<String, String> feedPolicy) {
+ this.feedPolicy = feedPolicy;
+ }
+
+ public boolean persistExceptionDetailsOnApplicationFailure() {
+ return getBooleanPropertyValue(APPLICATION_FAILURE_PERSIST_EXCEPTION);
+ }
+
+ public boolean continueOnApplicationFailure() {
+ return getBooleanPropertyValue(APPLICATION_FAILURE_CONTINUE_ON_EXCEPTION);
+ }
+
+ public boolean autoRestartOnHardwareFailure() {
+ return getBooleanPropertyValue(HARDWARE_FAILURE_AUTO_RESTART);
+ }
+
+ public boolean collectStatistics() {
+ return getBooleanPropertyValue(STATISTICS_COLLECT);
+ }
+
+ public long getStatisicsCollectionPeriodInSecs() {
+ return getIntegerPropertyValue(STATISTICS_COLLECT_PERIOD) * getTimeUnitFactor();
+ }
+
+ public boolean isElastic() {
+ return getBooleanPropertyValue(ELASTIC);
+ }
+
+ private int getTimeUnitFactor() {
+ String v = feedPolicy.get(STATISTICS_COLLECT_PERIOD_UNIT);
+ int factor = 1;
+ switch (TimeUnit.valueOf(v)) {
+ case SEC:
+ factor = 1;
+ break;
+ case MIN:
+ factor = 60;
+ break;
+ case HRS:
+ factor = 3600;
+ break;
+ case DAYS:
+ factor = 216000;
+ break;
+
+ }
+ return factor;
+ }
+
+ private boolean getBooleanPropertyValue(String key) {
+ String v = feedPolicy.get(key);
+ return v == null ? false : Boolean.valueOf(v);
+ }
+
+ private int getIntegerPropertyValue(String key) {
+ String v = feedPolicy.get(key);
+ return Integer.parseInt(v);
+ }
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
new file mode 100644
index 0000000..7eeee4c
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedPolicyEnforcer.java
@@ -0,0 +1,64 @@
+package edu.uci.ics.asterix.metadata.feeds;
+
+import java.rmi.RemoteException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+
+public class FeedPolicyEnforcer {
+
+ private final FeedId feedId;
+ private final FeedPolicyAccessor feedPolicyAccessor;
+ private final FeedActivity feedActivity;
+
+ public FeedPolicyEnforcer(FeedId feedId, Map<String, String> feedPolicy) {
+ this.feedId = feedId;
+ this.feedPolicyAccessor = new FeedPolicyAccessor(feedPolicy);
+ this.feedActivity = new FeedActivity(feedId.getDataverse(), feedId.getDataset(), null,
+ new HashMap<String, String>());
+ }
+
+ public boolean handleSoftwareFailure(Exception e) throws RemoteException, ACIDException {
+ boolean continueIngestion = feedPolicyAccessor.continueOnApplicationFailure();
+ if (feedPolicyAccessor.persistExceptionDetailsOnApplicationFailure()) {
+ persistExceptionDetails(e);
+ }
+ return continueIngestion;
+ }
+
+ private synchronized void persistExceptionDetails(Exception e) throws RemoteException, ACIDException {
+ MetadataManager.INSTANCE.acquireWriteLatch();
+ MetadataTransactionContext ctx = null;
+ try {
+ ctx = MetadataManager.INSTANCE.beginTransaction();
+ feedActivity.setActivityType(FeedActivityType.FEED_FAILURE);
+ feedActivity.getFeedActivityDetails().put(FeedActivity.FeedActivityDetails.EXCEPTION_MESSAGE,
+ e.getMessage());
+ MetadataManager.INSTANCE.registerFeedActivity(ctx, feedId, feedActivity);
+ MetadataManager.INSTANCE.commitTransaction(ctx);
+ } catch (Exception e2) {
+ MetadataManager.INSTANCE.abortTransaction(ctx);
+ } finally {
+ MetadataManager.INSTANCE.releaseWriteLatch();
+ }
+ }
+
+ public void handleHardwareFailure(List<String> nodeId) {
+
+ }
+
+ public FeedPolicyAccessor getFeedPolicyAccessor() {
+ return feedPolicyAccessor;
+ }
+
+ public FeedId getFeedId() {
+ return feedId;
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
similarity index 94%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
index 45fd6cf..e5f7e97 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IAdapterFactory.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.adapter.factory;
+package edu.uci.ics.asterix.metadata.feeds;
/**
* Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
similarity index 98%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
index b0dc32f..5d44dbd 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IDatasourceAdapter.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.dataset.adapter;
+package edu.uci.ics.asterix.metadata.feeds;
import java.io.Serializable;
import java.util.Map;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/IFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
similarity index 97%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/IFeedManager.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
index 587d5a7..6e6e1ac 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/IFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedManager.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.concurrent.LinkedBlockingQueue;
@@ -56,4 +56,5 @@
* @throws Exception
*/
public void deliverMessage(FeedId feedId, IFeedMessage feedMessage) throws AsterixException;
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/IFeedMessage.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
similarity index 93%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/IFeedMessage.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
index 9e1e907..dc50071 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/feed/lifecycle/IFeedMessage.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IFeedMessage.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.feed.lifecycle;
+package edu.uci.ics.asterix.metadata.feeds;
import java.io.Serializable;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericDatasetAdapterFactory.java
similarity index 92%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericDatasetAdapterFactory.java
index 093a3dd..8f8f496 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IGenericDatasetAdapterFactory.java
@@ -12,11 +12,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.adapter.factory;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.Map;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
/**
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IManagedFeedAdapter.java
similarity index 93%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IManagedFeedAdapter.java
index 6f993ae..51a3272 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/feed/managed/adapter/IManagedFeedAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/IManagedFeedAdapter.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.feed.managed.adapter;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.Map;
@@ -27,7 +27,7 @@
*
* @throws Exception
*/
- public void stop();
+ public void stop() throws Exception;
/**
* Modify the adapter configuration parameters. This method is called
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedDatasetAdapterFactory.java
similarity index 91%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedDatasetAdapterFactory.java
index 0f9978e..38e4fe4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedDatasetAdapterFactory.java
@@ -12,12 +12,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.adapter.factory;
+package edu.uci.ics.asterix.metadata.feeds;
import java.util.Map;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
-
/**
* A base interface for an adapter factory that creates instance of an adapter kind that
* is 'typed' in nature. A 'typed' adapter returns records with a pre-defined datatype.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ITypedDatasourceAdapter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedDatasourceAdapter.java
similarity index 89%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ITypedDatasourceAdapter.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedDatasourceAdapter.java
index 3a4b97b..226ac0f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ITypedDatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/ITypedDatasourceAdapter.java
@@ -12,8 +12,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.asterix.external.dataset.adapter;
+package edu.uci.ics.asterix.metadata.feeds;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
/**
diff --git a/asterix-tools/pom.xml b/asterix-tools/pom.xml
index 248cdb5..f9c4219 100644
--- a/asterix-tools/pom.xml
+++ b/asterix-tools/pom.xml
@@ -134,6 +134,12 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-external-data</artifactId>
+ <version>0.0.6-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.2</version>
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
index 8996a85..39537a8 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/DataGenerator.java
@@ -10,7 +10,6 @@
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Calendar;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index a522336..4ffe61a 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -19,10 +19,10 @@
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
-import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IManagedFeedAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasourceAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 6c32acb..5b5bb47 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -16,8 +16,8 @@
import java.util.Map;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index f66d9e1..c8dcf20 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -75,7 +75,7 @@
@Override
public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
- return new OkSyntheticTwitterFeedClient(configuration, adapterOutputType, partition);
+ return new SyntheticTwitterFeedClient(configuration, adapterOutputType, partition);
}
@Override
@@ -83,24 +83,26 @@
return new AlgebricksCountPartitionConstraint(1);
}
- private static class OkSyntheticTwitterFeedClient extends PullBasedFeedClient implements IPullBasedFeedClient {
+ private static class SyntheticTwitterFeedClient extends PullBasedFeedClient implements IPullBasedFeedClient {
- private static final Logger LOGGER = Logger.getLogger(OkSyntheticTwitterFeedClient.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(SyntheticTwitterFeedClient.class.getName());
public static final String KEY_DURATION = "duration";
public static final String KEY_TPS = "tps";
+ public static final String KEY_EXCEPTION_PERIOD = "exception-period";
private int duration;
private long tweetInterval;
private int numTweetsBeforeDelay;
private TweetMessageIterator tweetIterator = null;
+ private long exeptionInterval;
private IAObject[] mutableFields;
private ARecordType outputRecordType;
private int partition;
private int tweetCount = 0;
- public OkSyntheticTwitterFeedClient(Map<String, String> configuration, ARecordType outputRecordType,
+ public SyntheticTwitterFeedClient(Map<String, String> configuration, ARecordType outputRecordType,
int partition) throws AsterixException {
this.outputRecordType = outputRecordType;
String value = configuration.get(KEY_DURATION);
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
index 5883a62..897091c 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapterFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2012 by The Regents of the University of California
+x * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -16,8 +16,8 @@
import java.util.Map;
-import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
-import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.metadata.feeds.ITypedDatasetAdapterFactory;
/**
* Factory class for creating @see{RateControllerFileSystemBasedAdapter} The