added support for cloning a feed job spec, wrapping operators with meta versions
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index eb148d5..fd55a07 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -83,6 +83,7 @@
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
 import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints;
 import edu.uci.ics.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
@@ -90,6 +91,7 @@
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
 import edu.uci.ics.asterix.metadata.entities.Feed;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
@@ -1497,8 +1499,10 @@
                 throw new AsterixException("Unknown source feed :" + cfs.getFeedName());
             }
 
+            FeedConnectionId feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName()
+                    .getValue());
             FeedActivity recentActivity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(mdTxnCtx,
-                    new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName().getValue()), null);
+                    feedConnId, null);
             boolean isFeedActive = FeedUtil.isFeedActive(recentActivity);
             if (isFeedActive && !cfs.forceConnect()) {
                 throw new AsterixException("Feed " + cfs.getDatasetName().getValue()
@@ -1506,16 +1510,25 @@
             }
             IDatasetDetails datasetDetails = dataset.getDatasetDetails();
             if (datasetDetails.getDatasetType() != DatasetType.INTERNAL) {
-                throw new IllegalArgumentException("Dataset " + cfs.getDatasetName().getValue()
-                        + " is not an interal dataset");
+                throw new AsterixException("Dataset " + cfs.getDatasetName().getValue() + " is not an interal dataset");
             }
+
+            FeedPolicy feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName,
+                    cbfs.getPolicyName());
+            if (feedPolicy == null) {
+                feedPolicy = MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx,
+                        MetadataConstants.METADATA_DATAVERSE_NAME, cbfs.getPolicyName());
+                if (feedPolicy == null) {
+                    throw new AsterixException("Unknown feed policy" + cbfs.getPolicyName());
+                }
+            }
+
             cfs.initialize(metadataProvider.getMetadataTxnContext(), dataset, feed);
             cbfs.setQuery(cfs.getQuery());
             metadataProvider.getConfig().put(FunctionUtils.IMPORT_PRIVATE_FUNCTIONS, "" + Boolean.TRUE);
             metadataProvider.getConfig().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, cbfs.getPolicyName());
             JobSpecification compiled = rewriteCompileQuery(metadataProvider, cfs.getQuery(), cbfs);
-            //FeedUtil.alterJobSpecificationForFeed(compiled);
-            JobSpecification newJobSpec = FeedUtil.alterJobSpecificationForFeed2(compiled);
+            JobSpecification newJobSpec = FeedUtil.alterJobSpecificationForFeed(compiled, feedConnId, feedPolicy);
 
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Altered feed ingestion spec to wrap operators");
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
index 8c42aad..0a93a8e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
@@ -2,24 +2,22 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.commons.lang3.tuple.Pair;
 
-import edu.uci.ics.asterix.algebra.operators.physical.CommitRuntimeFactory;
+import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
+import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
 import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.hyracks.api.constraints.Constraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
 import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
@@ -32,7 +30,6 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 
 public class FeedUtil {
 
@@ -43,7 +40,8 @@
                 .getActivityType().equals(FeedActivityType.FEED_FAILURE)));
     }
 
-    public static JobSpecification alterJobSpecificationForFeed2(JobSpecification spec) {
+    public static JobSpecification alterJobSpecificationForFeed(JobSpecification spec,
+            FeedConnectionId feedConnectionId, FeedPolicy feedPolicy) {
         JobSpecification altered = null;
         altered = new JobSpecification();
         Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
@@ -52,20 +50,40 @@
         Map<OperatorDescriptorId, OperatorDescriptorId> oldNewOID = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
         for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operatorMap.entrySet()) {
             IOperatorDescriptor opDesc = entry.getValue();
-            FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, opDesc);
-            oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+            if (opDesc instanceof FeedIntakeOperatorDescriptor) {
+                FeedIntakeOperatorDescriptor orig = (FeedIntakeOperatorDescriptor) opDesc;
+                FeedIntakeOperatorDescriptor fiop = new FeedIntakeOperatorDescriptor(altered, orig.getFeedId(),
+                        orig.getAdapterFactory(), (ARecordType) orig.getAtype(), orig.getRecordDescriptor(),
+                        orig.getFeedPolicy());
+                oldNewOID.put(opDesc.getOperatorId(), fiop.getOperatorId());
+            } else if (opDesc instanceof AsterixLSMTreeInsertDeleteOperatorDescriptor) {
+                AsterixLSMTreeInsertDeleteOperatorDescriptor orig = (AsterixLSMTreeInsertDeleteOperatorDescriptor) opDesc;
+                AsterixLSMTreeInsertDeleteOperatorDescriptor liop = new AsterixLSMTreeInsertDeleteOperatorDescriptor(
+                        altered, orig.getRecordDescriptor(), orig.getStorageManager(),
+                        orig.getLifecycleManagerProvider(), orig.getFileSplitProvider(), orig.getTreeIndexTypeTraits(),
+                        orig.getComparatorFactories(), orig.getTreeIndexBloomFilterKeyFields(),
+                        orig.getFieldPermutations(), orig.getIndexOperation(), orig.getIndexDataflowHelperFactory(),
+                        orig.getTupleFilterFactory(), orig.getModificationOpCallbackFactory(), orig.isPrimary());
+                oldNewOID.put(opDesc.getOperatorId(), liop.getOperatorId());
+            } else {
+                FeedMetaOperatorDescriptor metaOp = new FeedMetaOperatorDescriptor(altered, feedConnectionId, opDesc,
+                        feedPolicy);
+                oldNewOID.put(opDesc.getOperatorId(), metaOp.getOperatorId());
+            }
         }
 
         // copy connectors
+        Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorMapping = new HashMap<ConnectorDescriptorId, ConnectorDescriptorId>();
         for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()) {
             IConnectorDescriptor connDesc = entry.getValue();
-            altered.getConnectorMap().put(connDesc.getConnectorId(), connDesc);
+            ConnectorDescriptorId newConnId = altered.createConnectorDescriptor(connDesc);
+            connectorMapping.put(entry.getKey(), newConnId);
         }
 
         // make connections between operators
         for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : spec
                 .getConnectorOperatorMap().entrySet()) {
-            IConnectorDescriptor connDesc = altered.getConnectorMap().get(entry.getKey());
+            IConnectorDescriptor connDesc = altered.getConnectorMap().get(connectorMapping.get(entry.getKey()));
             Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
             Pair<IOperatorDescriptor, Integer> rightOp = entry.getValue().getRight();
 
@@ -123,6 +141,17 @@
             }
         }
 
+        // useConnectorSchedulingPolicy
+        altered.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
+
+        // connectorAssignmentPolicy
+        altered.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
+
+        // roots
+        for (OperatorDescriptorId root : spec.getRoots()) {
+            altered.addRoot(altered.getOperatorMap().get(oldNewOID.get(root)));
+        }
+
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("New Job Spec:" + altered);
         }
@@ -131,173 +160,4 @@
 
     }
 
-    public static void alterJobSpecificationForFeed(JobSpecification spec) {
-
-        Map<OperatorDescriptorId, IOperatorDescriptor> operatorMap = spec.getOperatorMap();
-        Map<OperatorDescriptorId, IOperatorDescriptor> opIdToOp = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
-        Map<IOperatorDescriptor, IOperatorDescriptor> opToOp = new HashMap<IOperatorDescriptor, IOperatorDescriptor>();
-        Map<OperatorDescriptorId, OperatorDescriptorId> opIdToOpId = new HashMap<OperatorDescriptorId, OperatorDescriptorId>();
-
-        Iterator<OperatorDescriptorId> opIt = operatorMap.keySet().iterator();
-        List<IOperatorDescriptor> opToReplace = new ArrayList<IOperatorDescriptor>();
-        while (opIt.hasNext()) {
-            OperatorDescriptorId opId = opIt.next();
-            IOperatorDescriptor op = operatorMap.get(opId);
-            if (op instanceof FeedIntakeOperatorDescriptor) {
-                opIdToOp.put(opId, op);
-                opToOp.put(op, op);
-                opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
-            } else if (op instanceof AlgebricksMetaOperatorDescriptor) {
-                AlgebricksMetaOperatorDescriptor mop = (AlgebricksMetaOperatorDescriptor) op;
-                IPushRuntimeFactory[] runtimeFactories = mop.getPipeline().getRuntimeFactories();
-                boolean added = false;
-                for (IPushRuntimeFactory rf : runtimeFactories) {
-                    if (rf instanceof CommitRuntimeFactory) {
-                        opIdToOp.put(opId, op);
-                        opToOp.put(op, op);
-                        opIdToOpId.put(op.getOperatorId(), op.getOperatorId());
-                        added = true;
-                    }
-                }
-                if (!added) {
-                    opToReplace.add(op);
-                }
-            } else {
-                opToReplace.add(op);
-            }
-        }
-
-        // operator map
-        for (OperatorDescriptorId opId : spec.getOperatorMap().keySet()) {
-            if (opIdToOp.get(opId) != null) {
-                operatorMap.put(opId, opIdToOp.get(opId));
-            }
-        }
-
-        for (IOperatorDescriptor op : opToReplace) {
-            spec.getOperatorMap().remove(op.getOperatorId());
-        }
-
-        for (IOperatorDescriptor op : opToReplace) {
-            IOperatorDescriptor newOp = new FeedMetaOperatorDescriptor(spec, op);
-            opIdToOp.put(op.getOperatorId(), newOp);
-            opToOp.put(op, newOp);
-            opIdToOpId.put(op.getOperatorId(), newOp.getOperatorId());
-        }
-
-        // connectors
-
-        /*
-        for(Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : spec.getConnectorMap().entrySet()){
-            ConnectorDescriptorId cid= entry.getKey();
-            IConnectorDescriptor cdesc = entry.getValue();
-            if(cdesc instanceof OneToOneConnectorDescriptor){
-                ((OneToOneConnectorDescriptor)cdesc).
-            }
-         }
-         */
-
-        // connector operator Map
-        for (ConnectorDescriptorId cid : spec.getConnectorOperatorMap().keySet()) {
-            Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> p = spec
-                    .getConnectorOperatorMap().get(cid);
-
-            Pair<IOperatorDescriptor, Integer> leftPair = p.getLeft();
-            Pair<IOperatorDescriptor, Integer> newLeftPair = Pair.of(opToOp.get(leftPair.getLeft()),
-                    leftPair.getRight());
-
-            Pair<IOperatorDescriptor, Integer> newRightPair = Pair.of(opToOp.get(p.getRight().getLeft()), p.getRight()
-                    .getRight());
-
-            spec.getConnectorOperatorMap().put(cid, Pair.of(newLeftPair, newRightPair));
-        }
-
-        // operator Output Map
-        Set<OperatorDescriptorId> keysForRemoval = new HashSet<OperatorDescriptorId>();
-        Map<OperatorDescriptorId, List<IConnectorDescriptor>> keysForAddition = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
-        for (Entry<OperatorDescriptorId, List<IConnectorDescriptor>> entry : spec.getOperatorOutputMap().entrySet()) {
-            OperatorDescriptorId opId = entry.getKey();
-            if (!opIdToOpId.get(opId).equals(opId)) {
-                keysForRemoval.add(opId);
-                keysForAddition.put(opIdToOpId.get(opId), entry.getValue());
-            }
-        }
-
-        for (OperatorDescriptorId opId : keysForRemoval) {
-            spec.getOperatorOutputMap().remove(opId);
-        }
-
-        /*
-        for(OperatorDescriptorId opId : keysForAddition.keySet()){
-            List<IConnectorDescriptor> origConnectors = keysForAddition.get(opId);
-            List<IConnectorDescriptor> newConnectors = new ArrayList<IConnectorDescriptor>();
-            for(IConnectorDescriptor  connDesc : origConnectors){
-                newConnectors.add(e)
-            }
-                     
-        }*/
-
-        spec.getOperatorOutputMap().putAll(keysForAddition);
-
-        // operator input Map
-        keysForRemoval.clear();
-        keysForAddition.clear();
-        for (Entry<OperatorDescriptorId, List<IConnectorDescriptor>> entry : spec.getOperatorInputMap().entrySet()) {
-            OperatorDescriptorId opId = entry.getKey();
-            if (!opIdToOpId.get(opId).equals(opId)) {
-                keysForRemoval.add(opId);
-                keysForAddition.put(opIdToOpId.get(opId), entry.getValue());
-            }
-        }
-
-        for (OperatorDescriptorId opId : keysForRemoval) {
-            spec.getOperatorInputMap().remove(opId);
-        }
-        spec.getOperatorInputMap().putAll(keysForAddition);
-
-        Set<Constraint> userConstraints = spec.getUserConstraints();
-        Set<Constraint> constraintsForRemoval = new HashSet<Constraint>();
-        Map<OperatorDescriptorId, List<String>> constraintsForAddition = new HashMap<OperatorDescriptorId, List<String>>();
-
-        OperatorDescriptorId opId;
-        for (Constraint constraint : userConstraints) {
-            LValueConstraintExpression lexpr = constraint.getLValue();
-            ConstraintExpression cexpr = constraint.getRValue();
-            switch (lexpr.getTag()) {
-                case PARTITION_COUNT:
-                    opId = ((PartitionCountExpression) lexpr).getOperatorDescriptorId();
-                    if (!opIdToOpId.get(opId).equals(opId)) {
-                        constraintsForRemoval.add(constraint);
-                    }
-                    break;
-                case PARTITION_LOCATION:
-                    opId = ((PartitionLocationExpression) lexpr).getOperatorDescriptorId();
-                    if (!opIdToOpId.get(opId).equals(opId)) {
-                        constraintsForRemoval.add(constraint);
-                        String oldLocation = (String) ((ConstantExpression) cexpr).getValue();
-                        List<String> locations = constraintsForAddition.get(opId);
-                        if (locations == null) {
-                            locations = new ArrayList<String>();
-                            constraintsForAddition.put(opId, locations);
-                        }
-                        locations.add(oldLocation);
-                    }
-                    break;
-            }
-        }
-
-        spec.getUserConstraints().removeAll(constraintsForRemoval);
-        for (Entry<OperatorDescriptorId, List<String>> entry : constraintsForAddition.entrySet()) {
-            OperatorDescriptorId oldOpId = entry.getKey();
-            OperatorDescriptorId newOpId = opIdToOpId.get(oldOpId);
-            if (!newOpId.equals(oldOpId)) {
-                PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, opIdToOp.get(oldOpId), entry.getValue()
-                        .toArray(new String[] {}));
-            }
-        }
-
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Modified job spec with wrapped operators\n" + spec);
-        }
-    }
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
index 4402959..325dc2e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/FeedLifecycleListener.java
@@ -55,6 +55,7 @@
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
 import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.FeedPolicyAccessor;
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
@@ -238,17 +239,25 @@
 
             Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
             for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
-                if (entry.getValue() instanceof AlgebricksMetaOperatorDescriptor) {
-                    AlgebricksMetaOperatorDescriptor op = ((AlgebricksMetaOperatorDescriptor) entry.getValue());
+                IOperatorDescriptor opDesc = entry.getValue();
+                IOperatorDescriptor actualOp = null;
+                if (opDesc instanceof FeedMetaOperatorDescriptor) {
+                    actualOp = ((FeedMetaOperatorDescriptor) opDesc).getCoreOperator();
+                } else {
+                    actualOp = opDesc;
+                }
+
+                if (actualOp instanceof AlgebricksMetaOperatorDescriptor) {
+                    AlgebricksMetaOperatorDescriptor op = ((AlgebricksMetaOperatorDescriptor) actualOp);
                     IPushRuntimeFactory[] runtimeFactories = op.getPipeline().getRuntimeFactories();
                     for (IPushRuntimeFactory rf : runtimeFactories) {
                         if (rf instanceof AssignRuntimeFactory) {
                             computeOperatorIds.add(entry.getKey());
                         }
                     }
-                } else if (entry.getValue() instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
+                } else if (actualOp instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
                     storageOperatorIds.add(entry.getKey());
-                } else if (entry.getValue() instanceof FeedIntakeOperatorDescriptor) {
+                } else if (actualOp instanceof FeedIntakeOperatorDescriptor) {
                     ingestOperatorIds.add(entry.getKey());
                 }
             }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
index e58bbfa..6f6a7b2 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMTreeInsertDeleteOperatorDescriptor.java
@@ -56,4 +56,19 @@
                 recordDescProvider, op, isPrimary);
     }
 
+    public boolean isPrimary() {
+        return isPrimary;
+    }
+
+    public int[] getFieldPermutations() {
+        return fieldPermutation;
+    }
+
+    public IndexOperation getIndexOperation() {
+        return op;
+    }
+
+    public IBinaryComparatorFactory[] getComparatorFactories() {
+        return comparatorFactories;
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index ee87272..0889711 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -68,4 +68,16 @@
     public Map<String, String> getFeedPolicy() {
         return feedPolicy;
     }
+
+    public IAdapterFactory getAdapterFactory() {
+        return adapterFactory;
+    }
+
+    public IAType getAtype() {
+        return atype;
+    }
+
+    public RecordDescriptor getRecordDescriptor() {
+        return recordDescriptors[0];
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index 4ad7617..9acfc63 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -1,7 +1,10 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
 import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivity;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -14,56 +17,102 @@
 
 public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private IOperatorDescriptor coreOperator;
+    private static final Logger LOGGER = Logger.getLogger(FeedMetaOperatorDescriptor.class.getName());
 
-    public FeedMetaOperatorDescriptor(JobSpecification spec, IOperatorDescriptor coreOperatorDescriptor) {
+    private IOperatorDescriptor coreOperator;
+    private final FeedConnectionId feedConnectionId;
+    private final FeedPolicy feedPolicy;
+
+    public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
+            IOperatorDescriptor coreOperatorDescriptor, FeedPolicy feedPolicy) {
         super(spec, coreOperatorDescriptor.getInputArity(), coreOperatorDescriptor.getOutputArity());
+        this.feedConnectionId = feedConnectionId;
+        this.feedPolicy = feedPolicy;
+        if (coreOperatorDescriptor.getOutputRecordDescriptors().length == 1) {
+            recordDescriptors[0] = coreOperatorDescriptor.getOutputRecordDescriptors()[0];
+        }
         this.coreOperator = coreOperatorDescriptor;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
-        return new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator);
+        return new FeedMetaNodePushable(ctx, recordDescProvider, partition, nPartitions, coreOperator,
+                feedConnectionId, feedPolicy);
     }
 
     @Override
     public String toString() {
-        return coreOperator.getDisplayName();
+        return "FeedMeta [" + coreOperator + " ]";
     }
 
     private static class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
         private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperatorNodePushable;
+        private FeedPolicyEnforcer policyEnforcer;
 
         public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
-                int partition, int nPartitions, IOperatorDescriptor coreOperator) throws HyracksDataException {
+                int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
+                FeedPolicy feedPolicy) throws HyracksDataException {
             this.coreOperatorNodePushable = (AbstractUnaryInputUnaryOutputOperatorNodePushable) ((IActivity) coreOperator)
                     .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+            this.policyEnforcer = new FeedPolicyEnforcer(feedConnectionId, feedPolicy.getProperties());
         }
 
         @Override
         public void open() throws HyracksDataException {
-            //  coreOperatorNodePushable.setOutputFrameWriter(partition, coreOperatorNodePushable, recordDescProvider);
             coreOperatorNodePushable.setOutputFrameWriter(0, writer, recordDesc);
             coreOperatorNodePushable.open();
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " open ");
+            }
+
         }
 
         @Override
         public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            coreOperatorNodePushable.nextFrame(buffer);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " received frame ");
+            }
+            try {
+                coreOperatorNodePushable.nextFrame(buffer);
+            } catch (HyracksDataException e) {
+                // log tuple
+                if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
+                } else {
+                    throw e;
+                }
+            }
         }
 
         @Override
         public void fail() throws HyracksDataException {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " fail ");
+            }
             coreOperatorNodePushable.fail();
         }
 
         @Override
         public void close() throws HyracksDataException {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Core Op:" + coreOperatorNodePushable.getDisplayName() + " close ");
+            }
             coreOperatorNodePushable.close();
         }
 
     }
 
+    public IOperatorDescriptor getCoreOperator() {
+        return coreOperator;
+    }
+
+    public FeedConnectionId getFeedConnectionId() {
+        return feedConnectionId;
+    }
+
+    public FeedPolicy getFeedPolicy() {
+        return feedPolicy;
+    }
+
 }