fixed bug in translating partition constraints for a feed jobspec during alteration
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index 247fb3e..c1cddb7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -123,11 +123,7 @@
switch (lexpr.getTag()) {
case PARTITION_COUNT:
opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
- if (operatorCounts.get(opId) == null) {
- operatorCounts.put(opId, 1);
- } else {
- operatorCounts.put(opId, operatorCounts.get(opId) + 1);
- }
+ operatorCounts.put(opId, (int) ((ConstantExpression) cexpr).getValue());
break;
case PARTITION_LOCATION:
opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 8bb0146..241ea6c 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -14,29 +14,17 @@
*/
package edu.uci.ics.asterix.tools.external.data;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.logging.Level;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
-import edu.uci.ics.asterix.metadata.entities.NodeGroup;
-import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.AUnorderedListType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -87,70 +75,9 @@
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- List<String> candidateIngestionNodes = new ArrayList<String>();
- List<String> storageNodes = new ArrayList<String>();
- Set<String> allNodes = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
- candidateIngestionNodes.addAll(allNodes);
- String dvds = configuration.get(KEY_DATAVERSE_DATASET);
- if (dvds != null) {
- String[] components = dvds.split(":");
- String dataverse = components[0];
- String dataset = components[1];
- MetadataTransactionContext ctx = null;
- NodeGroup ng = null;
- try {
- MetadataManager.INSTANCE.acquireReadLatch();
- ctx = MetadataManager.INSTANCE.beginTransaction();
- Dataset ds = MetadataManager.INSTANCE.getDataset(ctx, dataverse, dataset);
- String nodegroupName = ((InternalDatasetDetails) ds.getDatasetDetails()).getNodeGroupName();
- ng = MetadataManager.INSTANCE.getNodegroup(ctx, nodegroupName);
- MetadataManager.INSTANCE.commitTransaction(ctx);
- } catch (Exception e) {
- if (ctx != null) {
- MetadataManager.INSTANCE.abortTransaction(ctx);
- }
- throw e;
- } finally {
- MetadataManager.INSTANCE.releaseReadLatch();
- }
- storageNodes = ng.getNodeNames();
- candidateIngestionNodes.removeAll(storageNodes);
- }
-
- String iCardinalityParam = (String) configuration.get(KEY_INGESTION_CARDINALITY);
- int requiredCardinality = iCardinalityParam != null ? Integer.parseInt(iCardinalityParam) : 1;
- String[] ingestionLocations = new String[requiredCardinality];
- String[] candidateNodesArray = candidateIngestionNodes.toArray(new String[] {});
- if (requiredCardinality > candidateIngestionNodes.size()) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning(" Ingestion nodes overlap with storage nodes");
- }
- int numChosen = 0;
- for (int i = 0; i < candidateNodesArray.length; i++, numChosen++) {
- ingestionLocations[i] = candidateNodesArray[i];
- }
-
- for (int j = numChosen, k = 0; j < requiredCardinality && k < storageNodes.size(); j++, k++, numChosen++) {
- ingestionLocations[j] = storageNodes.get(k);
- }
-
- if (numChosen < requiredCardinality) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Multiple ingestion tasks per node.");
- }
- for (int j = numChosen, k = 0; j < requiredCardinality; j++, k++) {
- ingestionLocations[j] = candidateNodesArray[k];
- }
- }
- } else {
- Random r = new Random();
- int ingestLocIndex = r.nextInt(candidateIngestionNodes.size());
- ingestionLocations[0] = candidateNodesArray[ingestLocIndex];
- for (int i = 1; i < requiredCardinality; i++) {
- ingestionLocations[i] = candidateNodesArray[(ingestLocIndex + i) % candidateNodesArray.length];
- }
- }
- return new AlgebricksAbsolutePartitionConstraint(ingestionLocations);
+ String ingestionCardinalityParam = (String) configuration.get(KEY_INGESTION_CARDINALITY);
+ int requiredCardinality = ingestionCardinalityParam != null ? Integer.parseInt(ingestionCardinalityParam) : 1;
+ return new AlgebricksCountPartitionConstraint(requiredCardinality);
}
@Override