fixed bug in translating partition constraints for a feed jobspec during alteration
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index 247fb3e..c1cddb7 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -123,11 +123,7 @@
             switch (lexpr.getTag()) {
                 case PARTITION_COUNT:
                     opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
-                    if (operatorCounts.get(opId) == null) {
-                        operatorCounts.put(opId, 1);
-                    } else {
-                        operatorCounts.put(opId, operatorCounts.get(opId) + 1);
-                    }
+                    operatorCounts.put(opId, (int) ((ConstantExpression) cexpr).getValue());
                     break;
                 case PARTITION_LOCATION:
                     opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 8bb0146..241ea6c 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -14,29 +14,17 @@
  */
 package edu.uci.ics.asterix.tools.external.data;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.logging.Level;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
-import edu.uci.ics.asterix.metadata.MetadataManager;
-import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
-import edu.uci.ics.asterix.metadata.entities.NodeGroup;
-import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager;
 import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
 import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.AUnorderedListType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
@@ -87,70 +75,9 @@
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        List<String> candidateIngestionNodes = new ArrayList<String>();
-        List<String> storageNodes = new ArrayList<String>();
-        Set<String> allNodes = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
-        candidateIngestionNodes.addAll(allNodes);
-        String dvds = configuration.get(KEY_DATAVERSE_DATASET);
-        if (dvds != null) {
-            String[] components = dvds.split(":");
-            String dataverse = components[0];
-            String dataset = components[1];
-            MetadataTransactionContext ctx = null;
-            NodeGroup ng = null;
-            try {
-                MetadataManager.INSTANCE.acquireReadLatch();
-                ctx = MetadataManager.INSTANCE.beginTransaction();
-                Dataset ds = MetadataManager.INSTANCE.getDataset(ctx, dataverse, dataset);
-                String nodegroupName = ((InternalDatasetDetails) ds.getDatasetDetails()).getNodeGroupName();
-                ng = MetadataManager.INSTANCE.getNodegroup(ctx, nodegroupName);
-                MetadataManager.INSTANCE.commitTransaction(ctx);
-            } catch (Exception e) {
-                if (ctx != null) {
-                    MetadataManager.INSTANCE.abortTransaction(ctx);
-                }
-                throw e;
-            } finally {
-                MetadataManager.INSTANCE.releaseReadLatch();
-            }
-            storageNodes = ng.getNodeNames();
-            candidateIngestionNodes.removeAll(storageNodes);
-        }
-
-        String iCardinalityParam = (String) configuration.get(KEY_INGESTION_CARDINALITY);
-        int requiredCardinality = iCardinalityParam != null ? Integer.parseInt(iCardinalityParam) : 1;
-        String[] ingestionLocations = new String[requiredCardinality];
-        String[] candidateNodesArray = candidateIngestionNodes.toArray(new String[] {});
-        if (requiredCardinality > candidateIngestionNodes.size()) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning(" Ingestion nodes overlap with storage nodes");
-            }
-            int numChosen = 0;
-            for (int i = 0; i < candidateNodesArray.length; i++, numChosen++) {
-                ingestionLocations[i] = candidateNodesArray[i];
-            }
-
-            for (int j = numChosen, k = 0; j < requiredCardinality && k < storageNodes.size(); j++, k++, numChosen++) {
-                ingestionLocations[j] = storageNodes.get(k);
-            }
-
-            if (numChosen < requiredCardinality) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Multiple ingestion tasks per node.");
-                }
-                for (int j = numChosen, k = 0; j < requiredCardinality; j++, k++) {
-                    ingestionLocations[j] = candidateNodesArray[k];
-                }
-            }
-        } else {
-            Random r = new Random();
-            int ingestLocIndex = r.nextInt(candidateIngestionNodes.size());
-            ingestionLocations[0] = candidateNodesArray[ingestLocIndex];
-            for (int i = 1; i < requiredCardinality; i++) {
-                ingestionLocations[i] = candidateNodesArray[(ingestLocIndex + i) % candidateNodesArray.length];
-            }
-        }
-        return new AlgebricksAbsolutePartitionConstraint(ingestionLocations);
+        String ingestionCardinalityParam = (String) configuration.get(KEY_INGESTION_CARDINALITY);
+        int requiredCardinality = ingestionCardinalityParam != null ? Integer.parseInt(ingestionCardinalityParam) : 1;
+        return new AlgebricksCountPartitionConstraint(requiredCardinality);
     }
 
     @Override