Refactor Active Listeners
Change-Id: I260c8608329523f56dc54780d87d796f838505cf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1118
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
index 30a2eb6..5ff02c7 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
@@ -198,4 +198,17 @@
LOGGER.severe("No listener was found for the entity: " + activeJob.getEntityId());
}
}
+
+ public synchronized void unregisterListener(IActiveEntityEventsListener listener) throws HyracksDataException {
+ if (DEBUG) {
+ LOGGER.log(Level.WARNING,
+ "unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
+ + listener.getEntityId());
+ }
+ IActiveEntityEventsListener registeredListener = entityEventListener.remove(listener.getEntityId());
+ if (registeredListener == null) {
+ throw new HyracksDataException(
+ "Active Entity Listener " + listener.getEntityId() + " hasn't been registered");
+ }
+ }
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 156576c..2dd9fe7 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -31,4 +31,6 @@
public EntityId getEntityId();
+ public boolean isEntityConnectedToDataset(String dataverseName, String datasetName);
+
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
index 916355d..a574711 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
@@ -18,14 +18,13 @@
*/
package org.apache.asterix.optimizer.rules;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
import org.apache.asterix.metadata.declared.AqlDataSource;
import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
import org.apache.asterix.metadata.declared.DatasetDataSource;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -45,7 +44,8 @@
public class IntroduceMaterializationForInsertWithSelfScanRule implements IAlgebraicRewriteRule {
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
return false;
}
@@ -105,11 +105,10 @@
} else if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
DataSourceScanOperator dataSourceScanOp = (DataSourceScanOperator) descendantOp;
AqlDataSource ds = (AqlDataSource) dataSourceScanOp.getDataSource();
- if (ds.getDatasourceType() != AqlDataSourceType.FEED
- && ds.getDatasourceType() != AqlDataSourceType.LOADABLE) {
- if (((DatasetDataSource) ds).getDataset().getDatasetName().compareTo(insertDatasetName) == 0) {
- return true;
- }
+ if ((ds.getDatasourceType() == AqlDataSourceType.INTERNAL_DATASET
+ || ds.getDatasourceType() == AqlDataSourceType.EXTERNAL_DATASET)
+ && ((DatasetDataSource) ds).getDataset().getDatasetName().compareTo(insertDatasetName) == 0) {
+ return true;
}
}
sameDataset = checkIfInsertAndScanDatasetsSame(descendantOp, insertDatasetName);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index 4f5a848..815bd93 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -18,12 +18,12 @@
*/
package org.apache.asterix.optimizer.rules;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.asterix.metadata.declared.AqlDataSource;
import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
-import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.declared.FeedDataSource;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -41,7 +41,8 @@
public class IntroduceRandomPartitioningFeedComputationRule implements IAlgebraicRewriteRule {
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
ILogicalOperator op = opRef.getValue();
if (!op.getOperatorTag().equals(LogicalOperatorTag.ASSIGN)) {
return false;
@@ -54,7 +55,7 @@
DataSourceScanOperator scanOp = (DataSourceScanOperator) opChild;
AqlDataSource dataSource = (AqlDataSource) scanOp.getDataSource();
- if (!dataSource.getDatasourceType().equals(AqlDataSourceType.FEED)) {
+ if (dataSource.getDatasourceType() != AqlDataSourceType.FEED) {
return false;
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
index 1fa7730..06a6f37 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
@@ -85,7 +85,10 @@
LogicalVariable dataVar = dataSource.getDataRecordVariable(allVars);
LogicalVariable metaVar = dataSource.getMetaVariable(allVars);
LogicalExpressionReferenceTransform currentTransformer = null;
- if (dataSource.getDatasourceType() == AqlDataSourceType.FEED) {
+ // https://issues.apache.org/jira/browse/ASTERIXDB-1618
+ if (dataSource.getDatasourceType() != AqlDataSourceType.EXTERNAL_DATASET
+ && dataSource.getDatasourceType() != AqlDataSourceType.INTERNAL_DATASET
+ && dataSource.getDatasourceType() != AqlDataSourceType.LOADABLE) {
IMutationDataSource mds = (IMutationDataSource) dataSource;
if (mds.isChange()) {
transformers = new ArrayList<>();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java
index 079f61a..7af3c1a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java
@@ -24,7 +24,6 @@
import java.util.List;
import org.apache.asterix.algebra.base.AsterixOperatorAnnotations;
-import org.apache.asterix.om.util.ConstantExpressionUtil;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.metadata.declared.AqlDataSource;
import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
@@ -40,6 +39,7 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.ConstantExpressionUtil;
import org.apache.asterix.optimizer.base.AnalysisUtil;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -85,7 +85,7 @@
}
AssignOperator access = (AssignOperator) op;
ILogicalExpression expr = getFirstExpr(access);
- String finalAnnot = null;
+ String finalAnnot;
if (AnalysisUtil.isAccessToFieldRecord(expr)) {
finalAnnot = AsterixOperatorAnnotations.PUSHED_FIELD_ACCESS;
} else if (AnalysisUtil.isRunnableAccessToFieldRecord(expr)) {
@@ -195,17 +195,17 @@
propagateFieldAccessRec(opRef2, context, finalAnnot);
return true;
}
- List<LogicalVariable> usedInAccess = new LinkedList<LogicalVariable>();
+ List<LogicalVariable> usedInAccess = new LinkedList<>();
VariableUtilities.getUsedVariables(access, usedInAccess);
- List<LogicalVariable> produced2 = new LinkedList<LogicalVariable>();
+ List<LogicalVariable> produced2 = new LinkedList<>();
if (op2.getOperatorTag() == LogicalOperatorTag.GROUP) {
VariableUtilities.getLiveVariables(op2, produced2);
} else {
VariableUtilities.getProducedVariables(op2, produced2);
}
boolean pushItDown = false;
- List<LogicalVariable> inter = new ArrayList<LogicalVariable>(usedInAccess);
+ List<LogicalVariable> inter = new ArrayList<>(usedInAccess);
if (inter.isEmpty()) { // ground value
return false;
}
@@ -214,7 +214,8 @@
pushItDown = true;
} else if (op2.getOperatorTag() == LogicalOperatorTag.GROUP) {
GroupByOperator g = (GroupByOperator) op2;
- List<Pair<LogicalVariable, LogicalVariable>> varMappings = new ArrayList<Pair<LogicalVariable, LogicalVariable>>();
+ List<Pair<LogicalVariable, LogicalVariable>> varMappings =
+ new ArrayList<>();
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : g.getDecorList()) {
ILogicalExpression e = p.second.getValue();
if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
@@ -222,7 +223,7 @@
if (inter.contains(decorVar)) {
inter.remove(decorVar);
LogicalVariable v1 = ((VariableReferenceExpression) e).getVariableReference();
- varMappings.add(new Pair<LogicalVariable, LogicalVariable>(decorVar, v1));
+ varMappings.add(new Pair<>(decorVar, v1));
}
}
}
@@ -257,7 +258,7 @@
return true;
} else {
for (Mutable<ILogicalOperator> inp : op2.getInputs()) {
- HashSet<LogicalVariable> v2 = new HashSet<LogicalVariable>();
+ HashSet<LogicalVariable> v2 = new HashSet<>();
VariableUtilities.getLiveVariables(inp.getValue(), v2);
if (v2.containsAll(usedInAccess)) {
pushAccessDown(opRef, op2, inp, context, finalAnnot);
@@ -269,7 +270,7 @@
AbstractOperatorWithNestedPlans nestedOp = (AbstractOperatorWithNestedPlans) op2;
for (ILogicalPlan plan : nestedOp.getNestedPlans()) {
for (Mutable<ILogicalOperator> root : plan.getRoots()) {
- HashSet<LogicalVariable> v2 = new HashSet<LogicalVariable>();
+ HashSet<LogicalVariable> v2 = new HashSet<>();
VariableUtilities.getLiveVariables(root.getValue(), v2);
if (v2.containsAll(usedInAccess)) {
pushAccessDown(opRef, op2, root, context, finalAnnot);
@@ -297,7 +298,7 @@
ILogicalExpression e1 = accessFun.getArguments().get(1).getValue();
if (e1.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
IDataSource<AqlSourceId> dataSource = (IDataSource<AqlSourceId>) scan.getDataSource();
- AqlDataSourceType dsType = ((AqlDataSource) dataSource).getDatasourceType();
+ byte dsType = ((AqlDataSource) dataSource).getDatasourceType();
if (dsType == AqlDataSourceType.FEED || dsType == AqlDataSourceType.LOADABLE) {
return false;
}
@@ -368,7 +369,7 @@
// indirect recursivity with propagateFieldAccessRec
private void pushAccessDown(Mutable<ILogicalOperator> fldAccessOpRef, ILogicalOperator op2,
Mutable<ILogicalOperator> inputOfOp2, IOptimizationContext context, String finalAnnot)
- throws AlgebricksException {
+ throws AlgebricksException {
ILogicalOperator fieldAccessOp = fldAccessOpRef.getValue();
fldAccessOpRef.setValue(op2);
List<Mutable<ILogicalOperator>> faInpList = fieldAccessOp.getInputs();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
index b088607..25dcf35 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java
@@ -18,10 +18,9 @@
*/
package org.apache.asterix.optimizer.rules;
-import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.asterix.metadata.declared.AqlDataSource;
import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -33,7 +32,8 @@
public class RemoveSortInFeedIngestionRule implements IAlgebraicRewriteRule {
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
return false;
}
@@ -51,7 +51,7 @@
while (descendantOp != null) {
if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
AqlDataSource dataSource = (AqlDataSource) ((DataSourceScanOperator) descendantOp).getDataSource();
- if (dataSource.getDatasourceType().equals(AqlDataSourceType.FEED)) {
+ if (dataSource.getDatasourceType() == AqlDataSourceType.FEED) {
isSourceAFeed = true;
}
break;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
index f483d70..01476e7 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
@@ -228,7 +228,7 @@
DataSourceScanOperator dataSourceScan = (DataSourceScanOperator) sourceOpRefs.get(i).getValue();
IDataSource<?> datasource = dataSourceScan.getDataSource();
if (datasource instanceof AqlDataSource) {
- AqlDataSourceType dsType = ((AqlDataSource) datasource).getDatasourceType();
+ byte dsType = ((AqlDataSource) datasource).getDatasourceType();
if (dsType != AqlDataSourceType.INTERNAL_DATASET
&& dsType != AqlDataSourceType.EXTERNAL_DATASET) {
return false;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java
index 5b91f2f..015044c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java
@@ -37,6 +37,7 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.app.result.ResultUtil;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.common.app.SessionConfig;
import org.apache.asterix.common.config.GlobalConfig;
@@ -51,7 +52,6 @@
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
@@ -119,7 +119,7 @@
}
}
- private enum ResultFields {
+ public enum ResultFields {
REQUEST_ID("requestID"),
SIGNATURE("signature"),
TYPE("type"),
@@ -139,7 +139,7 @@
}
}
- private enum ResultStatus {
+ public enum ResultStatus {
SUCCESS("success"),
TIMEOUT("timeout"),
ERRORS("errors"),
@@ -331,7 +331,7 @@
}
private static void printError(PrintWriter pw, Throwable e) {
- Throwable rootCause = ExceptionUtils.getRootCause(e);
+ Throwable rootCause = ResultUtil.getRootCause(e);
if (rootCause == null) {
rootCause = e;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
index b5fd96e..979f7f4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java
@@ -158,12 +158,12 @@
return errorMessage.toString();
}
- private static Throwable getRootCause(Throwable cause) {
+ public static Throwable getRootCause(Throwable cause) {
Throwable currentCause = cause;
Throwable nextCause = cause.getCause();
while (nextCause != null && nextCause != currentCause) {
currentCause = nextCause;
- nextCause = cause.getCause();
+ nextCause = nextCause.getCause();
}
return currentCause;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6390b9d..727a1f9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -190,6 +190,7 @@
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
@@ -696,8 +697,7 @@
StringBuilder builder = null;
IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
for (IActiveEntityEventsListener listener : listeners) {
- if (listener instanceof FeedEventsListener
- && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) {
+ if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
if (builder == null) {
builder = new StringBuilder();
}
@@ -706,7 +706,7 @@
}
if (builder != null) {
throw new AsterixException("Dataset " + dataverseName + "." + datasetName + " is currently being "
- + "fed into by the following feed(s).\n" + builder.toString() + "\n" + "Operation not supported");
+ + "fed into by the following active entities.\n" + builder.toString());
}
}
@@ -1411,22 +1411,11 @@
Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>();
if (ds.getDatasetType() == DatasetType.INTERNAL) {
// prepare job spec(s) that would disconnect any active feeds involving the dataset.
- IActiveEntityEventsListener[] feedConnections = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
- for (IActiveEntityEventsListener conn : feedConnections) {
- if (conn.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
- && ((FeedEventsListener) conn).isConnectedToDataset(datasetName)) {
- FeedConnectionId connectionId = new FeedConnectionId(conn.getEntityId(), datasetName);
- Pair<JobSpecification, Boolean> p = FeedOperations.buildDisconnectFeedJobSpec(metadataProvider,
- connectionId);
- disconnectJobList.put(connectionId, p);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Disconnecting feed " + connectionId.getFeedId().getEntityName() + " from dataset "
- + datasetName + " as dataset is being dropped");
- }
- // prepare job to remove feed log storage
- jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed(
- mdTxnCtx.getValue(), connectionId.getFeedId().getDataverse(),
- connectionId.getFeedId().getEntityName())));
+ IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
+ for (IActiveEntityEventsListener listener : activeListeners) {
+ if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+ throw new AsterixException(
+ "Can't drop dataset since it is connected to active entity: " + listener.getEntityId());
}
}
@@ -1547,8 +1536,7 @@
IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
StringBuilder builder = null;
for (IActiveEntityEventsListener listener : listeners) {
- if (listener.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
- && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) {
+ if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
if (builder == null) {
builder = new StringBuilder();
}
@@ -1557,8 +1545,8 @@
}
if (builder != null) {
throw new AsterixException(
- "Dataset" + datasetName + " is currently being fed into by the following feeds " + "."
- + builder.toString() + "\nOperation not supported.");
+ "Dataset" + datasetName + " is currently being fed into by the following active entities: "
+ + builder.toString());
}
if (ds.getDatasetType() == DatasetType.INTERNAL) {
@@ -1709,7 +1697,7 @@
e.addSuppressed(e2);
abort(e, e2, mdTxnCtx);
throw new IllegalStateException("System is inconsistent state: pending index("
- + dataverseName + "." + datasetName + "."
+ + dataverseName + "." + datasetName + "."
+ indexName + ") couldn't be removed from the metadata", e);
}
}
@@ -2008,17 +1996,19 @@
}
}
- protected void handleCreateFeedPolicyStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ protected void handleCreateFeedPolicyStatement(AqlMetadataProvider metadataProvider, Statement stmt)
+ throws AlgebricksException, HyracksDataException {
String dataverse;
String policy;
FeedPolicyEntity newPolicy = null;
+ MetadataTransactionContext mdTxnCtx = null;
CreateFeedPolicyStatement cfps = (CreateFeedPolicyStatement) stmt;
dataverse = getActiveDataverse(null);
policy = cfps.getPolicyName();
MetadataLockManager.INSTANCE.createFeedPolicyBegin(dataverse, dataverse + "." + policy);
try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE
.getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy);
if (feedPolicy != null) {
@@ -2036,8 +2026,9 @@
.getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse,
cfps.getSourcePolicyName());
if (sourceFeedPolicy == null) {
- sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
- MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
+ sourceFeedPolicy =
+ MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
+ MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
if (sourceFeedPolicy == null) {
throw new AlgebricksException("Unknown policy " + cfps.getSourcePolicyName());
}
@@ -2061,9 +2052,9 @@
}
MetadataManager.INSTANCE.addFeedPolicy(mdTxnCtx, newPolicy);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
+ } catch (RemoteException | ACIDException e) {
abort(e, e, mdTxnCtx);
- throw e;
+ throw new HyracksDataException(e);
} finally {
MetadataLockManager.INSTANCE.createFeedPolicyEnd(dataverse, dataverse + "." + policy);
}
@@ -2350,7 +2341,7 @@
IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
.getActiveEntityListener(entityId);
- if (listener == null || !listener.isConnectedToDataset(datasetName)) {
+ if (listener == null || !listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
throw new AsterixException("Feed " + feed.getFeedId().getEntityName() + " is currently not connected to "
+ cfs.getDatasetName().getValue() + ". Invalid operation!");
}
@@ -2873,7 +2864,7 @@
default:
throw new AlgebricksException(
"The system \"" + runStmt.getSystem() +
- "\" specified in your run statement is not supported.");
+ "\" specified in your run statement is not supported.");
}
}
@@ -3107,12 +3098,21 @@
return getActiveDataverseName(dataverse != null ? dataverse.getValue() : null);
}
+ /**
+ * Abort the ongoing metadata transaction logging the error cause
+ *
+ * @param rootE
+ * @param parentE
+ * @param mdTxnCtx
+ */
public static void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) {
try {
if (IS_DEBUG_MODE) {
LOGGER.log(Level.SEVERE, rootE.getMessage(), rootE);
}
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ if (mdTxnCtx != null) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ }
} catch (Exception e2) {
parentE.addSuppressed(e2);
throw new IllegalStateException(rootE);
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index 47290e7..4dc206c 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -43,10 +43,10 @@
import org.apache.asterix.test.server.ITestServer;
import org.apache.asterix.test.server.TestServerProvider;
import org.apache.asterix.testframework.context.TestCaseContext;
-import org.apache.asterix.testframework.context.TestFileContext;
import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat;
-import org.apache.asterix.testframework.xml.TestGroup;
+import org.apache.asterix.testframework.context.TestFileContext;
import org.apache.asterix.testframework.xml.TestCase.CompilationUnit;
+import org.apache.asterix.testframework.xml.TestGroup;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.http.HttpResponse;
@@ -249,7 +249,7 @@
}
}
- private HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
+ protected HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception {
HttpClient client = HttpClients.custom()
.setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE)
.build();
@@ -270,8 +270,8 @@
String errorBody = EntityUtils.toString(httpResponse.getEntity());
try {
JSONObject result = new JSONObject(errorBody);
- String[] errors = {result.getJSONArray("error-code").getString(0), result.getString("summary"),
- result.getString("stacktrace")};
+ String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
+ result.getString("stacktrace") };
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
String exceptionMsg = "HTTP operation failed: " + errors[0]
+ "\nSTATUS LINE: " + httpResponse.getStatusLine()
@@ -307,7 +307,7 @@
return response.getEntity().getContent();
}
- private void setFormatParam(List<CompilationUnit.Parameter> params, OutputFormat fmt) {
+ protected void setFormatParam(List<CompilationUnit.Parameter> params, OutputFormat fmt) {
boolean formatSet = false;
for (CompilationUnit.Parameter param : params) {
if ("format".equals(param.getName())) {
@@ -344,7 +344,7 @@
return builder.build();
}
- private HttpUriRequest constructPostMethod(String statement, String endpoint, String stmtParam,
+ protected HttpUriRequest constructPostMethod(String statement, String endpoint, String stmtParam,
boolean postStmtAsParam, List<CompilationUnit.Parameter> otherParams) {
RequestBuilder builder = RequestBuilder.post(endpoint);
if (postStmtAsParam) {
@@ -514,7 +514,6 @@
executeTest(actualPath, testCaseCtx, pb, isDmlRecoveryTest, null);
}
-
public void executeTest(TestCaseContext testCaseCtx, TestFileContext ctx, String statement,
boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount,
List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath) throws Exception {
@@ -699,7 +698,7 @@
}
break;
case "server": // (start <test server name> <port>
- // [<arg1>][<arg2>][<arg3>]...|stop (<port>|all))
+ // [<arg1>][<arg2>][<arg3>]...|stop (<port>|all))
try {
lines = statement.trim().split("\n");
String[] command = lines[lines.length - 1].trim().split(" ");
@@ -747,7 +746,7 @@
}
break;
case "lib": // expected format <dataverse-name> <library-name>
- // <library-directory>
+ // <library-directory>
// TODO: make this case work well with entity names containing spaces by
// looking for \"
lines = statement.split("\n");
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index f4e67f3..c7bed8d9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -180,4 +180,8 @@
public void setMetaType(ARecordType metaType) {
this.metaType = metaType;
}
+
+ public IExternalDataSourceFactory getDataSourceFactory() {
+ return dataSourceFactory;
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index c40fed6..7f2191c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -611,7 +611,7 @@
cInfo.setState(ActivityState.ACTIVE);
}
- public synchronized boolean isConnectedToDataset(String datasetName) {
+ private synchronized boolean isConnectedToDataset(String datasetName) {
for (FeedConnectionId connection : connectJobInfos.keySet()) {
if (connection.getDatasetName().equals(datasetName)) {
return true;
@@ -641,4 +641,9 @@
public IFeedJoint getSourceFeedJoint() {
return sourceFeedJoint;
}
+
+ @Override
+ public boolean isEntityConnectedToDataset(String dataverseName, String datasetName) {
+ return isConnectedToDataset(datasetName);
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
index 1272d03..d406108 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java
@@ -24,29 +24,19 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
-import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
@@ -56,19 +46,24 @@
protected final AqlSourceId id;
protected final IAType itemType;
protected final IAType metaItemType;
- protected final AqlDataSourceType datasourceType;
+ protected final byte datasourceType;
protected IAType[] schemaTypes;
protected INodeDomain domain;
protected Map<String, Serializable> properties = new HashMap<>();
- public enum AqlDataSourceType {
- INTERNAL_DATASET,
- EXTERNAL_DATASET,
- FEED,
- LOADABLE
+ public static class AqlDataSourceType {
+ // positive range is reserved for core datasource types
+ public static final byte INTERNAL_DATASET = 0x00;
+ public static final byte EXTERNAL_DATASET = 0x01;
+ public static final byte FEED = 0x02;
+ public static final byte LOADABLE = 0x03;
+
+ // Hide implicit public constructor
+ private AqlDataSourceType() {
+ }
}
- public AqlDataSource(AqlSourceId id, IAType itemType, IAType metaItemType, AqlDataSourceType datasourceType,
+ public AqlDataSource(AqlSourceId id, IAType itemType, IAType metaItemType, byte datasourceType,
INodeDomain domain) throws AlgebricksException {
this.id = id;
this.itemType = itemType;
@@ -118,72 +113,7 @@
}
}
- private static class AqlDataSourcePartitioningProvider implements IDataSourcePropertiesProvider {
-
- private final AqlDataSource ds;
-
- private final INodeDomain domain;
-
- public AqlDataSourcePartitioningProvider(AqlDataSource dataSource, INodeDomain domain) {
- this.ds = dataSource;
- this.domain = domain;
- }
-
- @Override
- public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) {
- IPhysicalPropertiesVector propsVector = null;
- IPartitioningProperty pp;
- List<ILocalStructuralProperty> propsLocal;
- int n;
- switch (ds.getDatasourceType()) {
- case LOADABLE:
- case EXTERNAL_DATASET:
- pp = new RandomPartitioningProperty(domain);
- propsLocal = new ArrayList<ILocalStructuralProperty>();
- ds.computeLocalStructuralProperties(propsLocal, scanVariables);
- propsVector = new StructuralPropertiesVector(pp, propsLocal);
- break;
-
- case FEED:
- n = scanVariables.size();
- if (n < 2) {
- pp = new RandomPartitioningProperty(domain);
- } else {
- Set<LogicalVariable> pvars = new ListSet<LogicalVariable>();
- pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
- pp = new UnorderedPartitionedProperty(pvars, domain);
- }
- propsLocal = new ArrayList<ILocalStructuralProperty>();
- propsVector = new StructuralPropertiesVector(pp, propsLocal);
- break;
-
- case INTERNAL_DATASET:
- n = scanVariables.size();
- Set<LogicalVariable> pvars = new ListSet<LogicalVariable>();
- if (n < 2) {
- pp = new RandomPartitioningProperty(domain);
- } else {
- pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
- pp = new UnorderedPartitionedProperty(pvars, domain);
- }
- propsLocal = new ArrayList<ILocalStructuralProperty>();
- List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
- for (LogicalVariable pkVar : pvars) {
- orderColumns.add(new OrderColumn(pkVar, OrderKind.ASC));
- }
- propsLocal.add(new LocalOrderProperty(orderColumns));
- propsVector = new StructuralPropertiesVector(pp, propsLocal);
- break;
-
- default:
- throw new IllegalArgumentException();
- }
- return propsVector;
- }
-
- }
-
- public AqlDataSourceType getDatasourceType() {
+ public byte getDatasourceType() {
return datasourceType;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSourcePartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSourcePartitioningProvider.java
new file mode 100644
index 0000000..14a6e68
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSourcePartitioningProvider.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.declared;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+
+public class AqlDataSourcePartitioningProvider implements IDataSourcePropertiesProvider {
+
+ private final AqlDataSource ds;
+ private final INodeDomain domain;
+
+ public AqlDataSourcePartitioningProvider(AqlDataSource dataSource, INodeDomain domain) {
+ this.ds = dataSource;
+ this.domain = domain;
+ }
+
+ @Override
+ public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) {
+ IPhysicalPropertiesVector propsVector;
+ IPartitioningProperty pp;
+ List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
+ switch (ds.getDatasourceType()) {
+ case AqlDataSourceType.LOADABLE:
+ case AqlDataSourceType.EXTERNAL_DATASET:
+ pp = new RandomPartitioningProperty(domain);
+ ds.computeLocalStructuralProperties(propsLocal, scanVariables);
+ break;
+ case AqlDataSourceType.FEED:
+ pp = getFeedPartitioningProperty(ds, domain, scanVariables);
+ break;
+ case AqlDataSourceType.INTERNAL_DATASET:
+ Set<LogicalVariable> pvars = new ListSet<>();
+ pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars);
+ propsLocal.add(new LocalOrderProperty(getOrderColumns(pvars)));
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ propsVector = new StructuralPropertiesVector(pp, propsLocal);
+ return propsVector;
+ }
+
+ private static List<OrderColumn> getOrderColumns(Set<LogicalVariable> pvars) {
+ List<OrderColumn> orderColumns = new ArrayList<>();
+ for (LogicalVariable pkVar : pvars) {
+ orderColumns.add(new OrderColumn(pkVar, OrderKind.ASC));
+ }
+ return orderColumns;
+ }
+
+ private static IPartitioningProperty getInternalDatasetPartitioningProperty(AqlDataSource ds, INodeDomain domain,
+ List<LogicalVariable> scanVariables, Set<LogicalVariable> pvars) {
+ IPartitioningProperty pp;
+ if (scanVariables.size() < 2) {
+ pp = new RandomPartitioningProperty(domain);
+ } else {
+ pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
+ pp = new UnorderedPartitionedProperty(pvars, domain);
+ }
+ return pp;
+ }
+
+ public static IPartitioningProperty getFeedPartitioningProperty(AqlDataSource ds, INodeDomain domain,
+ List<LogicalVariable> scanVariables) {
+ IPartitioningProperty pp;
+ if (scanVariables.size() < 2) {
+ pp = new RandomPartitioningProperty(domain);
+ } else {
+ Set<LogicalVariable> pvars = new ListSet<>();
+ pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
+ pp = new UnorderedPartitionedProperty(pvars, domain);
+ }
+ return pp;
+ }
+
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index da54e32..72360b6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -481,8 +481,7 @@
}
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
- computeFilterBinaryComparatorFactories(dataset,
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
itemType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = null;
int[] btreeFields = null;
@@ -497,10 +496,10 @@
}
Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits =
getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
- secondaryIndex.getIndexType(), secondaryIndex.getKeyFieldNames(),
- secondaryIndex.getKeyFieldTypes(), DatasetUtils.getPartitioningKeys(dataset), itemType,
- dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators,
- secondaryIndex.getKeyFieldSourceIndicators(), metaType);
+ secondaryIndex.getIndexType(), secondaryIndex.getKeyFieldNames(),
+ secondaryIndex.getKeyFieldTypes(), DatasetUtils.getPartitioningKeys(dataset), itemType,
+ dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators,
+ secondaryIndex.getKeyFieldSourceIndicators(), metaType);
comparatorFactories = comparatorFactoriesAndTypeTraits.first;
typeTraits = comparatorFactoriesAndTypeTraits.second;
if (filterTypeTraits != null) {
@@ -581,12 +580,12 @@
int[] buddyBreeFields = new int[] { numSecondaryKeys };
ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory =
new ExternalBTreeWithBuddyDataflowHelperFactory(
- compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
- getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
- ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
+ compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+ getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
+ ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
@@ -703,7 +702,7 @@
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
splitProviderAndPartitionConstraintsForDataset(
- dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
+ dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
ARecordType metaType = null;
if (dataset.hasMetaPart()) {
metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(),
@@ -718,8 +717,7 @@
}
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
- computeFilterBinaryComparatorFactories(dataset,
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
recType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = null;
int[] rtreeFields = null;
@@ -864,33 +862,13 @@
IAType itemType = findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
IAType metaItemType = findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
INodeDomain domain = findNodeDomain(dataset.getNodeGroupName());
- AqlDataSourceType datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL)
+ byte datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL)
? AqlDataSourceType.EXTERNAL_DATASET : AqlDataSourceType.INTERNAL_DATASET;
return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, datasourceType,
dataset.getDatasetDetails(), domain);
}
@Override
- public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
- boolean result = false;
- switch (((AqlDataSource) dataSource).getDatasourceType()) {
- case INTERNAL_DATASET:
- case EXTERNAL_DATASET:
- result = ((DatasetDataSource) dataSource).getDataset().getDatasetType() == DatasetType.EXTERNAL;
- break;
- case FEED:
- result = true;
- break;
- case LOADABLE:
- result = true;
- break;
- default:
- break;
- }
- return result;
- }
-
- @Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context,
@@ -946,14 +924,13 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(
- dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+ dataSource.getId().getDataverseName(), datasetName, indexName, temp);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
long numElementsHint = getCardinalityPerPartitionHint(dataset);
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
- computeFilterBinaryComparatorFactories(dataset,
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
itemType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = DatasetUtils.createFilterFields(dataset);
int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
@@ -1037,7 +1014,7 @@
itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(
- dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+ dataSource.getId().getDataverseName(), datasetName, indexName, temp);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -1359,7 +1336,7 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
+ dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
// Generate Output Record format
ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
@@ -1491,8 +1468,7 @@
dataset.getDatasetName(), indexName);
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
- computeFilterBinaryComparatorFactories(dataset,
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
recType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = null;
int[] btreeFields = null;
@@ -1529,7 +1505,7 @@
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
+ dataverseName, datasetName, indexName, temp);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -1734,7 +1710,7 @@
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
+ dataverseName, datasetName, indexName, temp);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -1869,15 +1845,14 @@
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
+ dataverseName, datasetName, indexName, temp);
int[] btreeFields = new int[primaryComparatorFactories.length];
for (int k = 0; k < btreeFields.length; k++) {
btreeFields[k] = k + numSecondaryKeys;
}
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
- computeFilterBinaryComparatorFactories(dataset,
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
recType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = null;
int[] rtreeFields = null;
@@ -2216,7 +2191,7 @@
itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(
- dataSource.getId().getDataverseName(), datasetName, indexName, temp);
+ dataSource.getId().getDataverseName(), datasetName, indexName, temp);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2227,8 +2202,7 @@
}
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
- computeFilterBinaryComparatorFactories(dataset,
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
itemType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = DatasetUtils.createFilterFields(dataset);
int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
@@ -2512,8 +2486,7 @@
secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
- computeFilterBinaryComparatorFactories(dataset,
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
recType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = null;
@@ -2539,7 +2512,7 @@
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
+ dataverseName, datasetName, indexName, temp);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2692,15 +2665,14 @@
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
+ dataverseName, datasetName, indexName, temp);
int[] btreeFields = new int[primaryComparatorFactories.length];
for (int k = 0; k < btreeFields.length; k++) {
btreeFields[k] = k + numSecondaryKeys;
}
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
- computeFilterBinaryComparatorFactories(dataset,
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
recType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = null;
int[] rtreeFields = null;
@@ -2827,8 +2799,7 @@
dataset.getDatasetName(), indexName);
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.
- computeFilterBinaryComparatorFactories(dataset,
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
recType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = null;
int[] btreeFields = null;
@@ -2849,15 +2820,15 @@
Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
secondaryKeyNames.get(i), recType);
IAType keyType = keyPairType.first;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.
- getBinaryComparatorFactory(keyType, true);
+ comparatorFactories[i] =
+ AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
}
List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
for (List<String> partitioningKey : partitioningKeys) {
IAType keyType = recType.getSubFieldType(partitioningKey);
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.
- getBinaryComparatorFactory(keyType, true);
+ comparatorFactories[i] =
+ AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
++i;
}
@@ -2865,7 +2836,7 @@
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName, temp);
+ dataverseName, datasetName, indexName, temp);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 5a601bc..9729c77 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -20,6 +20,7 @@
import java.util.List;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.MetadataManager;
@@ -48,7 +49,7 @@
private Dataset dataset;
public DatasetDataSource(AqlSourceId id, Dataset dataset, IAType itemType, IAType metaItemType,
- AqlDataSourceType datasourceType, IDatasetDetails datasetDetails, INodeDomain datasetDomain)
+ byte datasourceType, IDatasetDetails datasetDetails, INodeDomain datasetDomain)
throws AlgebricksException {
super(id, itemType, metaItemType, datasourceType, datasetDomain);
this.dataset = dataset;
@@ -141,4 +142,9 @@
}
}
+ @Override
+ public boolean isScanAccessPathALeaf() {
+ return dataset.getDatasetType() == DatasetType.EXTERNAL;
+ }
+
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 0a81f03..d74bf9f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -204,4 +204,9 @@
throw new AlgebricksException(e);
}
}
+
+ @Override
+ public boolean isScanAccessPathALeaf() {
+ return true;
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index 2ffaded..200a5a9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -144,4 +144,9 @@
RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
return aqlMetadataProvider.buildLoadableDatasetScan(jobSpec, adapterFactory, rDesc);
}
+
+ @Override
+ public boolean isScanAccessPathALeaf() {
+ return true;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java
index 300033f..63ce5fa 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java
@@ -31,4 +31,7 @@
public IDataSourcePropertiesProvider getPropertiesProvider();
public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList);
+
+ // https://issues.apache.org/jira/browse/ASTERIXDB-1619
+ public boolean isScanAccessPathALeaf();
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 8466ef9..68710a5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -48,13 +48,11 @@
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
- throws AlgebricksException;
-
- public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource);
+ throws AlgebricksException;
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
- throws AlgebricksException;
+ throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
@@ -63,7 +61,7 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
List<LogicalVariable> additionalNonKeyFields, JobGenContext context, JobSpecification jobSpec)
- throws AlgebricksException;
+ throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
diff --git a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java
index d395f3c..5b60022 100644
--- a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java
+++ b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java
@@ -77,4 +77,9 @@
@Override
public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) {
}
+
+ @Override
+ public boolean isScanAccessPathALeaf() {
+ return true;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index f586af7..7bcb1d6 100644
--- a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -83,7 +83,7 @@
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
- throws AlgebricksException {
+ throws AlgebricksException {
PigletFileDataSource ds = (PigletFileDataSource) dataSource;
FileSplit[] fileSplits = ds.getFileSplits();
@@ -133,14 +133,9 @@
}
@Override
- public boolean scannerOperatorIsLeaf(IDataSource<String> dataSource) {
- return true;
- }
-
- @Override
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
- throws AlgebricksException {
+ throws AlgebricksException {
PigletFileDataSink ds = (PigletFileDataSink) sink;
FileSplit[] fileSplits = ds.getFileSplits();
String[] locations = new String[fileSplits.length];
@@ -192,7 +187,7 @@
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
+ throws AlgebricksException {
// TODO Auto-generated method stub
return null;
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 8701851..ddfb331 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -37,7 +37,6 @@
import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
@@ -269,8 +268,7 @@
DataSourceScanOperator scan = (DataSourceScanOperator) op;
IDataSource dataSource = scan.getDataSource();
DataSourceScanPOperator dss = new DataSourceScanPOperator(dataSource);
- IMetadataProvider mp = context.getMetadataProvider();
- if (mp.scannerOperatorIsLeaf(dataSource)) {
+ if (dataSource.isScanAccessPathALeaf()) {
dss.disableJobGenBelowMe();
}
op.setPhysicalOperator(dss);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 4cddef1..1272562 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -34,10 +34,7 @@
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.control.cc.work.TriggerNCWork;
-import org.apache.hyracks.control.common.controllers.IniUtils;
-import org.ini4j.Ini;
-import org.xml.sax.InputSource;
+
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
@@ -86,10 +83,12 @@
import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
import org.apache.hyracks.control.cc.work.TaskCompleteWork;
import org.apache.hyracks.control.cc.work.TaskFailureWork;
+import org.apache.hyracks.control.cc.work.TriggerNCWork;
import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.IniUtils;
import org.apache.hyracks.control.common.deployment.DeploymentRun;
import org.apache.hyracks.control.common.ipc.CCNCFunctions;
import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function;
@@ -105,6 +104,7 @@
import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.ini4j.Ini;
import org.xml.sax.InputSource;
public class ClusterControllerService implements IControllerService {
@@ -351,6 +351,7 @@
public Map<String, NodeControllerState> getNodeMap() {
return nodeRegistry;
}
+
public CCConfig getConfig() {
return ccConfig;
}
@@ -406,21 +407,24 @@
}
case GET_JOB_STATUS: {
- HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
+ HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
+ (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
workQueue.schedule(new GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(),
new IPCResponder<JobStatus>(handle, mid)));
return;
}
case GET_JOB_INFO: {
- HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
+ HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
+ (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
new IPCResponder<JobInfo>(handle, mid)));
return;
}
case START_JOB: {
- HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
+ HyracksClientInterfaceFunctions.StartJobFunction sjf =
+ (HyracksClientInterfaceFunctions.StartJobFunction) fn;
JobId jobId = createJobId();
workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
@@ -434,14 +438,16 @@
}
case GET_DATASET_RESULT_STATUS: {
- HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+ HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
+ (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(),
gdrlf.getResultSetId(), new IPCResponder<Status>(handle, mid)));
return;
}
case GET_DATASET_RESULT_LOCATIONS: {
- HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+ HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
+ (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this,
gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
@@ -449,7 +455,8 @@
}
case WAIT_FOR_COMPLETION: {
- HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+ HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf =
+ (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
new IPCResponder<Object>(handle, mid)));
return;
@@ -471,14 +478,16 @@
}
case CLI_DEPLOY_BINARY: {
- HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
+ HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
+ (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(),
dbf.getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid)));
return;
}
case CLI_UNDEPLOY_BINARY: {
- HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
+ HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf =
+ (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
workQueue.schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(),
new IPCResponder<DeploymentId>(handle, mid)));
return;
@@ -556,21 +565,24 @@
}
case REGISTER_PARTITION_PROVIDER: {
- CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn;
+ CCNCFunctions.RegisterPartitionProviderFunction rppf =
+ (CCNCFunctions.RegisterPartitionProviderFunction) fn;
workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this,
rppf.getPartitionDescriptor()));
return;
}
case REGISTER_PARTITION_REQUEST: {
- CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction) fn;
+ CCNCFunctions.RegisterPartitionRequestFunction rprf =
+ (CCNCFunctions.RegisterPartitionRequestFunction) fn;
workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this,
rprf.getPartitionRequest()));
return;
}
case REGISTER_RESULT_PARTITION_LOCATION: {
- CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
+ CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
+ (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this,
rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
@@ -578,21 +590,24 @@
}
case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
- CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
+ CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf =
+ (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
return;
}
case REPORT_RESULT_PARTITION_FAILURE: {
- CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
+ CCNCFunctions.ReportResultPartitionFailureFunction rrplf =
+ (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this,
rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
return;
}
case SEND_APPLICATION_MESSAGE: {
- CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+ CCNCFunctions.SendApplicationMessageFunction rsf =
+ (CCNCFunctions.SendApplicationMessageFunction) fn;
workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(),
rsf.getDeploymentId(), rsf.getNodeId()));
return;
@@ -609,7 +624,6 @@
@Override
public void setException(Exception e) {
-
}
}));
return;