merge from zheilbron/asterix_msr (checkpoint 2)
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
index c15b0f0..2700872 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java
@@ -54,8 +54,8 @@
}
InsertDeleteOperator insertOp = (InsertDeleteOperator) op;
- boolean sameDataset = checkIfInsertAndScanDatasetsSame(op, ((AqlDataSource) insertOp.getDataSource())
- .getDataset().getDatasetName());
+ boolean sameDataset = checkIfInsertAndScanDatasetsSame(op,
+ ((AqlDataSource) insertOp.getDataSource()).getDatasourceName());
if (sameDataset) {
MaterializeOperator materializeOperator = new MaterializeOperator();
@@ -104,9 +104,8 @@
} else if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
DataSourceScanOperator dataSourceScanOp = (DataSourceScanOperator) descendantOp;
AqlDataSource ds = (AqlDataSource) dataSourceScanOp.getDataSource();
- if (ds.getDatasourceType() != AqlDataSourceType.FEED
- && ds.getDatasourceType() != AqlDataSourceType.EXTERNAL_FEED) {
- if (ds.getDataset().getDatasetName().compareTo(insertDatasetName) == 0) {
+ if (ds.getDatasourceType() != AqlDataSourceType.FEED) {
+ if (ds.getDatasourceName().compareTo(insertDatasetName) == 0) {
return true;
}
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index f844c13..18c2775 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -62,9 +62,9 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting Asterix cluster controller");
}
-
+
appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager()));
- AsterixAppContextInfo.initialize(appCtx);
+ AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection());
proxy = AsterixStateProxy.registerRemoteObject();
appCtx.setDistributedState(proxy);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
index eb0af90..6d1a00b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlDataSource.java
@@ -15,19 +15,13 @@
package edu.uci.ics.asterix.metadata.declared;
-import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
-import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
@@ -35,7 +29,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
@@ -79,35 +72,6 @@
return datasourceName;
}
- // TODO: Seems like initFeedDataset() could simply call this method.
- private void initInternalDataset(IAType itemType) throws IOException {
- List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
- ARecordType recordType = (ARecordType) itemType;
- int n = partitioningKeys.size();
- schemaTypes = new IAType[n + 1];
- for (int i = 0; i < n; i++) {
- schemaTypes[i] = recordType.getFieldType(partitioningKeys.get(i));
- }
- schemaTypes[n] = itemType;
- domain = new DefaultNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
- }
-
- private void initFeedDataset(IAType itemType, Dataset dataset) throws IOException {
- if (dataset.getDatasetDetails() instanceof ExternalDatasetDetails) {
- initExternalDataset(itemType);
- } else {
- List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
- int n = partitioningKeys.size();
- schemaTypes = new IAType[n + 1];
- ARecordType recordType = (ARecordType) itemType;
- for (int i = 0; i < n; i++) {
- schemaTypes[i] = recordType.getFieldType(partitioningKeys.get(i));
- }
- schemaTypes[n] = itemType;
- domain = new DefaultNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
- }
- }
-
public abstract IAType[] getSchemaTypes();
public abstract INodeDomain getDomain();
@@ -126,7 +90,7 @@
public IDataSourcePropertiesProvider getPropertiesProvider() {
return new AqlDataSourcePartitioningProvider(datasourceType, domain);
}
-
+
@Override
public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) {
int n = scanVariables.size();
@@ -138,7 +102,8 @@
fdList.add(fd);
}
}
-
+
+
private static class AqlDataSourcePartitioningProvider implements IDataSourcePropertiesProvider {
private INodeDomain domain;
@@ -167,7 +132,7 @@
if (n < 2) {
pp = new RandomPartitioningProperty(domain);
} else {
- Set<LogicalVariable> pvars = new HashSet<LogicalVariable>();
+ Set<LogicalVariable> pvars = new ListSet<LogicalVariable>();
int i = 0;
for (LogicalVariable v : scanVariables) {
pvars.add(v);
@@ -189,7 +154,7 @@
if (n < 2) {
pp = new RandomPartitioningProperty(domain);
} else {
- Set<LogicalVariable> pvars = new HashSet<LogicalVariable>();
+ Set<LogicalVariable> pvars = new ListSet<LogicalVariable>();
int i = 0;
for (LogicalVariable v : scanVariables) {
pvars.add(v);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
index 23a6d84..ed08759 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/DatasetDataSource.java
@@ -10,8 +10,10 @@
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.AsterixNodeGroupDomain;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
+
+//import edu.uci.ics.hyracks.algebricks.core.algebra.properties.AsterixNodeGroupDomain;
public class DatasetDataSource extends AqlDataSource {
@@ -65,7 +67,7 @@
schemaTypes[i] = recordType.getFieldType(partitioningKeys.get(i));
}
schemaTypes[n] = itemType;
- domain = new AsterixNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
+ domain = new DefaultNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
}
private void initExternalDataset(IAType itemType) {