added job event listener
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 66a3980..f00deba 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -74,6 +74,7 @@
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
 import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.declared.FeedJobEventListenerFactory;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
@@ -445,9 +446,11 @@
                     Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
                             .getConfiguration();
                     FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
+
                     datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
                             InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
-                            ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
+                            ngName, adapter, configuration, signature,
+                            FeedDatasetDetails.FeedState.INACTIVE.toString(), null, null);
                     break;
                 }
             }
@@ -1335,8 +1338,7 @@
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        acquireReadLatch();
-
+        acquireWriteLatch();
         try {
             BeginFeedStatement bfs = (BeginFeedStatement) stmt;
             String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
@@ -1359,14 +1361,14 @@
             bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
             cbfs.setQuery(bfs.getQuery());
             JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+            compiled.setJobletEventListenerFactory(new FeedJobEventListenerFactory(dataset, compiled, hcc));
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-
+            JobId jobId = null;
             if (compiled != null) {
-                runJob(hcc, compiled, true);
+                jobId = runJob(hcc, compiled, false);
             }
-
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
index 0382315..a4be0f8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
@@ -21,11 +21,13 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
 import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
-import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
 import edu.uci.ics.asterix.metadata.entities.Datatype;
 import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.entities.Index;
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
@@ -51,6 +53,8 @@
     protected final Map<FunctionSignature, Function> functions = new HashMap<FunctionSignature, Function>();
     // Key is adapter dataverse. Key of value map is the adapter name  
     protected final Map<String, Map<String, DatasourceAdapter>> adapters = new HashMap<String, Map<String, DatasourceAdapter>>();
+    // Key is FeedId   
+    protected final Map<FeedId, FeedActivity> feedActivity = new HashMap<FeedId, FeedActivity>();
 
     // Atomically executes all metadata operations in ctx's log.
     public void commit(MetadataTransactionContext ctx) {
@@ -86,13 +90,16 @@
                         synchronized (datatypes) {
                             synchronized (functions) {
                                 synchronized (adapters) {
-                                    dataverses.clear();
-                                    nodeGroups.clear();
-                                    datasets.clear();
-                                    indexes.clear();
-                                    datatypes.clear();
-                                    functions.clear();
-                                    adapters.clear();
+                                    synchronized (feedActivity) {
+                                        dataverses.clear();
+                                        nodeGroups.clear();
+                                        datasets.clear();
+                                        indexes.clear();
+                                        datatypes.clear();
+                                        functions.clear();
+                                        adapters.clear();
+                                        feedActivity.clear();
+                                    }
                                 }
                             }
                         }
@@ -180,20 +187,34 @@
                 synchronized (indexes) {
                     synchronized (datatypes) {
                         synchronized (functions) {
-                            datasets.remove(dataverse.getDataverseName());
-                            indexes.remove(dataverse.getDataverseName());
-                            datatypes.remove(dataverse.getDataverseName());
-                            adapters.remove(dataverse.getDataverseName());
-                            List<FunctionSignature> markedFunctionsForRemoval = new ArrayList<FunctionSignature>();
-                            for (FunctionSignature signature : functions.keySet()) {
-                                if (signature.getNamespace().equals(dataverse.getDataverseName())) {
-                                    markedFunctionsForRemoval.add(signature);
+                            synchronized (adapters) {
+                                synchronized (feedActivity) {
+                                    datasets.remove(dataverse.getDataverseName());
+                                    indexes.remove(dataverse.getDataverseName());
+                                    datatypes.remove(dataverse.getDataverseName());
+                                    adapters.remove(dataverse.getDataverseName());
+                                    List<FunctionSignature> markedFunctionsForRemoval = new ArrayList<FunctionSignature>();
+                                    for (FunctionSignature signature : functions.keySet()) {
+                                        if (signature.getNamespace().equals(dataverse.getDataverseName())) {
+                                            markedFunctionsForRemoval.add(signature);
+                                        }
+                                    }
+                                    for (FunctionSignature signature : markedFunctionsForRemoval) {
+                                        functions.remove(signature);
+                                    }
+                                    List<FeedId> feedActivitiesMarkedForRemoval = new ArrayList<FeedId>();
+                                    for (FeedId fid : feedActivity.keySet()) {
+                                        if (fid.getDataverse().equals(dataverse.getDataverseName())) {
+                                            feedActivitiesMarkedForRemoval.add(fid);
+                                        }
+                                    }
+                                    for (FeedId fid : feedActivitiesMarkedForRemoval) {
+                                        feedActivity.remove(fid);
+                                    }
+
+                                    return dataverses.remove(dataverse.getDataverseName());
                                 }
                             }
-                            for (FunctionSignature signature : markedFunctionsForRemoval) {
-                                functions.remove(signature);
-                            }
-                            return dataverses.remove(dataverse.getDataverseName());
                         }
                     }
                 }
@@ -220,19 +241,19 @@
             }
         }
     }
-    
+
     public Object dropIndex(Index index) {
         synchronized (indexes) {
             Map<String, Map<String, Index>> datasetMap = indexes.get(index.getDataverseName());
             if (datasetMap == null) {
                 return null;
             }
-            
+
             Map<String, Index> indexMap = datasetMap.get(index.getDatasetName());
             if (indexMap == null) {
                 return null;
             }
-            
+
             return indexMap.remove(index.getIndexName());
         }
     }
@@ -268,7 +289,7 @@
             return m.get(datasetName);
         }
     }
-    
+
     public Index getIndex(String dataverseName, String datasetName, String indexName) {
         synchronized (indexes) {
             Map<String, Map<String, Index>> datasetMap = indexes.get(dataverseName);
@@ -401,4 +422,21 @@
             return null;
         }
     }
+
+    public Object addFeedActivityIfNotExists(FeedActivity fa) {
+        synchronized (feedActivity) {
+            FeedId fid = new FeedId(fa.getDataverseName(), fa.getDatasetName());
+            if (!feedActivity.containsKey(fid)) {
+                feedActivity.put(fid, fa);
+            }
+        }
+        return null;
+    }
+
+    public Object dropFeedActivity(FeedActivity fa) {
+        synchronized (feedActivity) {
+            FeedId fid = new FeedId(fa.getDataverseName(), fa.getDatasetName());
+            return feedActivity.remove(fid);
+        }
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 8d5d13b..ee81b63 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -124,7 +124,7 @@
                 MetadataPrimaryIndexes.DATASET_DATASET, MetadataPrimaryIndexes.DATATYPE_DATASET,
                 MetadataPrimaryIndexes.INDEX_DATASET, MetadataPrimaryIndexes.NODE_DATASET,
                 MetadataPrimaryIndexes.NODEGROUP_DATASET, MetadataPrimaryIndexes.FUNCTION_DATASET,
-                MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET };
+                MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET };
         secondaryIndexes = new IMetadataIndex[] { MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX,
                 MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX,
                 MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX };
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 2fc3e59..4d2faf3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -32,6 +32,8 @@
     public static IMetadataIndex NODE_DATASET;
     public static IMetadataIndex NODEGROUP_DATASET;
     public static IMetadataIndex FUNCTION_DATASET;
+    public static IMetadataIndex DATASOURCE_ADAPTER_DATASET;
+    public static IMetadataIndex FEED_ACTIVITY_DATASET;
 
     public static final int METADATA_DATASET_ID = 0;
     public static final int DATAVERSE_DATASET_ID = 1;
@@ -42,10 +44,9 @@
     public static final int NODEGROUP_DATASET_ID = 6;
     public static final int FUNCTION_DATASET_ID = 7;
     public static final int DATASOURCE_ADAPTER_DATASET_ID = 8;
+    public static final int FEED_ACTIVITY_DATASET_ID = 9;
     public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
 
-    public static IMetadataIndex DATASOURCE_ADAPTER_DATASET;
-
     /**
      * Create all metadata primary index descriptors. MetadataRecordTypes must
      * have been initialized before calling this init.
@@ -93,5 +94,9 @@
                 BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName", "Name" }, 0,
                 MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, DATASOURCE_ADAPTER_DATASET_ID, true, new int[] { 0,
                         1 });
+
+        FEED_ACTIVITY_DATASET = new MetadataIndex("FeedActivity", null, 3, new IAType[] { BuiltinType.ASTRING,
+                BuiltinType.ASTRING }, new String[] { "DataverseName", "DatasetName" }, 0,
+                MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE, FEED_ACTIVITY_DATASET_ID, true, new int[] { 0, 1 });
     }
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 50681ee..5b67102 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -47,6 +47,7 @@
     public static ARecordType NODEGROUP_RECORDTYPE;
     public static ARecordType FUNCTION_RECORDTYPE;
     public static ARecordType DATASOURCE_ADAPTER_RECORDTYPE;
+    public static ARecordType FEED_ACTIVITY_RECORDTYPE;
 
     /**
      * Create all metadata record types.
@@ -76,6 +77,7 @@
             NODEGROUP_RECORDTYPE = createNodeGroupRecordType();
             FUNCTION_RECORDTYPE = createFunctionRecordType();
             DATASOURCE_ADAPTER_RECORDTYPE = createDatasourceAdapterRecordType();
+            FEED_ACTIVITY_RECORDTYPE = createFeedActivityRecordType();
         } catch (AsterixException e) {
             throw new MetadataException(e);
         }
@@ -143,13 +145,17 @@
     public static final int FEED_DETAILS_ARECORD_PROPERTIES_FIELD_INDEX = 6;
     public static final int FEED_DETAILS_ARECORD_FUNCTION_FIELD_INDEX = 7;
     public static final int FEED_DETAILS_ARECORD_STATE_FIELD_INDEX = 8;
+    public static final int FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX = 9;
+    public static final int FEED_DETAILS_ARECORD_COMPUTE_NODES_FIELD_INDEX = 10;
 
     private static final ARecordType createFeedDetailsRecordType() throws AsterixException {
         AOrderedListType orderedListType = new AOrderedListType(BuiltinType.ASTRING, null);
         AOrderedListType orderedListOfPropertiesType = new AOrderedListType(DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE,
                 null);
+        AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, null);
+
         String[] fieldNames = { "FileStructure", "PartitioningStrategy", "PartitioningKey", "PrimaryKey", "GroupName",
-                "DatasourceAdapter", "Properties", "Function", "Status" };
+                "DatasourceAdapter", "Properties", "Function", "Status", "IngestNodes", "ComputeNodes" };
 
         List<IAType> feedFunctionUnionList = new ArrayList<IAType>();
         feedFunctionUnionList.add(BuiltinType.ANULL);
@@ -158,7 +164,7 @@
 
         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, orderedListType, orderedListType,
                 BuiltinType.ASTRING, BuiltinType.ASTRING, orderedListOfPropertiesType, feedFunctionUnion,
-                BuiltinType.ASTRING };
+                BuiltinType.ASTRING, unorderedListType, unorderedListType };
 
         return new ARecordType(null, fieldNames, fieldTypes, true);
     }
@@ -357,4 +363,22 @@
         return new ARecordType("DatasourceAdapterRecordType", fieldNames, fieldTypes, true);
     }
 
+    // Helper constants for accessing fields in an ARecord of type
+    // FeedActivityRecordType.
+    public static final int FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
+    public static final int FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX = 1;
+    public static final int FEED_ACTIVITY_ARECORD_STATUS_FIELD_INDEX = 2;
+    public static final int FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX = 3;
+    public static final int FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX = 4;
+    public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 5;
+
+    private static ARecordType createFeedActivityRecordType() throws AsterixException {
+        AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, null);
+        String[] fieldNames = { "DataverseName", "DatasetName", "Status", "IngestNodes", "ComputeNodes",
+                "UpdateTimestamp" };
+        IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, unorderedListType,
+                unorderedListType, BuiltinType.ASTRING };
+        return new ARecordType("FeedActivityRecordType", fieldNames, fieldTypes, true);
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobEventListenerFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobEventListenerFactory.java
new file mode 100644
index 0000000..3e7510f
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobEventListenerFactory.java
@@ -0,0 +1,110 @@
+package edu.uci.ics.asterix.metadata.declared;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import edu.uci.ics.asterix.external.data.operator.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+
+public class FeedJobEventListenerFactory implements IJobletEventListenerFactory {
+
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 1L;
+
+    private final Dataset dataset;
+    private final JobSpecification jobSpec;
+    private final IHyracksClientConnection hcc;
+    private IJobletEventListener jobEventListener;
+
+    public FeedJobEventListenerFactory(Dataset dataset, JobSpecification jobSpec, IHyracksClientConnection hcc) {
+        this.dataset = dataset;
+        this.jobSpec = jobSpec;
+        this.hcc = hcc;
+    }
+
+    @Override
+    public IJobletEventListener createListener(IHyracksJobletContext ctx) {
+
+        jobEventListener = new IJobletEventListener() {
+
+            private final List<String> ingestLocations = new ArrayList<String>();
+            private final List<String> computeLocations = new ArrayList<String>();
+
+            @Override
+            public void jobletStart(JobId jobId) {
+                List<OperatorDescriptorId> ingestOperatorIds = new ArrayList<OperatorDescriptorId>();
+                List<OperatorDescriptorId> computeOperatorIds = new ArrayList<OperatorDescriptorId>();
+
+                Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
+                for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+                    if (entry.getValue() instanceof FeedIntakeOperatorDescriptor) {
+                        ingestOperatorIds.add(entry.getKey());
+                    } else if (entry.getValue() instanceof AlgebricksMetaOperatorDescriptor) {
+                        computeOperatorIds.add(entry.getKey());
+                    }
+                }
+
+                try {
+                    JobInfo info = hcc.getJobInfo(jobId);
+
+                    for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
+                        ingestLocations.addAll(info.getOperatorLocations().get(ingestOpId));
+                    }
+                    for (OperatorDescriptorId computeOpId : computeOperatorIds) {
+                        computeLocations.addAll(info.getOperatorLocations().get(computeOpId));
+                    }
+                    System.out.println("job info " + info);
+
+                } catch (Exception e) {
+                    // TODO Add Exception handling here
+                }
+            }
+
+            @Override
+            public void jobletFinish(JobStatus status) {
+                MetadataManager.INSTANCE.acquireWriteLatch();
+                MetadataTransactionContext mdTxnCtx = null;
+                try {
+                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                    FeedDatasetDetails feedDetails = (FeedDatasetDetails) dataset.getDatasetDetails();
+                    feedDetails.setFeedState(FeedState.INACTIVE);
+                    feedDetails.setComputeNodes(null);
+                    feedDetails.setIngestNodes(null);
+                    MetadataManager.INSTANCE
+                            .dropDataset(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName());
+                    MetadataManager.INSTANCE.addDataset(mdTxnCtx, dataset);
+                } catch (Exception e) {
+
+                }
+            }
+
+            public List<String> getIngestLocations() {
+                return ingestLocations;
+            }
+
+            public List<String> getComputeLocations() {
+                return computeLocations;
+            }
+        };
+        return jobEventListener;
+    }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
new file mode 100644
index 0000000..97f7a1e
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.metadata.entities;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.metadata.IDatasetDetails;
+import edu.uci.ics.asterix.metadata.MetadataCache;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
+
+/**
+ * Metadata describing a feed activity record.
+ */
+public class FeedActivity implements IMetadataEntity {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String dataverseName;
+    // Enforced to be unique within a dataverse.
+    private final String datasetName;
+
+    private FeedState feedState;
+    private List<String> ingestNodes;
+    private List<String> computeNodes;
+    private String lastUpdatedTimestamp;
+
+    public FeedActivity(String dataverseName, String datasetName, String feedState, List<String> ingestNodes,
+            List<String> computeNodes) {
+        this.dataverseName = dataverseName;
+        this.datasetName = datasetName;
+        this.feedState = FeedState.valueOf(feedState);
+        this.ingestNodes = ingestNodes;
+        this.computeNodes = computeNodes;
+    }
+
+    public String getDataverseName() {
+        return dataverseName;
+    }
+
+    public String getDatasetName() {
+        return datasetName;
+    }
+
+    @Override
+    public Object addToCache(MetadataCache cache) {
+        return cache.addFeedActivityIfNotExists(this);
+    }
+
+    @Override
+    public Object dropFromCache(MetadataCache cache) {
+        return cache.dropFeedActivity(this);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (!(other instanceof FeedActivity)) {
+            return false;
+        }
+        FeedActivity otherDataset = (FeedActivity) other;
+        if (!otherDataset.dataverseName.equals(dataverseName)) {
+            return false;
+        }
+        if (!otherDataset.datasetName.equals(datasetName)) {
+            return false;
+        }
+        return true;
+    }
+
+    public FeedState getFeedState() {
+        return feedState;
+    }
+
+    public void setFeedState(FeedState feedState) {
+        this.feedState = feedState;
+    }
+
+    public List<String> getIngestNodes() {
+        return ingestNodes;
+    }
+
+    public void setIngestNodes(List<String> ingestNodes) {
+        this.ingestNodes = ingestNodes;
+    }
+
+    public List<String> getComputeNodes() {
+        return computeNodes;
+    }
+
+    public void setComputeNodes(List<String> computeNodes) {
+        this.computeNodes = computeNodes;
+    }
+
+    public String getLastUpdatedTimestamp() {
+        return lastUpdatedTimestamp;
+    }
+
+    public void setLastUpdatedTimestamp(String lastUpdatedTimestamp) {
+        this.lastUpdatedTimestamp = lastUpdatedTimestamp;
+    }
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java
index 22de3d3..81be3b5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.asterix.builders.IARecordBuilder;
 import edu.uci.ics.asterix.builders.OrderedListBuilder;
 import edu.uci.ics.asterix.builders.RecordBuilder;
+import edu.uci.ics.asterix.builders.UnorderedListBuilder;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.common.functions.FunctionSignature;
@@ -31,6 +32,7 @@
 import edu.uci.ics.asterix.om.base.AMutableString;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.types.AOrderedListType;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -48,25 +50,31 @@
     private final Map<String, String> properties;
     private final FunctionSignature signature;
     private FeedState feedState;
+    private List<String> ingestNodes;
+    private List<String> computeNodes;
 
     public enum FeedState {
-        INACTIVE,
         // INACTIVE state signifies that the feed dataset is not
         // connected with the external world through the feed
         // adapter.
-        ACTIVE
+        INACTIVE,
+
         // ACTIVE state signifies that the feed dataset is connected to the
         // external world using an adapter that may put data into the dataset.
+        ACTIVE
     }
 
     public FeedDatasetDetails(FileStructure fileStructure, PartitioningStrategy partitioningStrategy,
             List<String> partitioningKey, List<String> primaryKey, String groupName, String adapterFactory,
-            Map<String, String> properties, FunctionSignature signature, String feedState) {
+            Map<String, String> properties, FunctionSignature signature, String feedState, List<String> ingestNodes,
+            List<String> computeNodes) {
         super(fileStructure, partitioningStrategy, partitioningKey, primaryKey, groupName);
         this.properties = properties;
         this.adapterFactory = adapterFactory;
         this.signature = signature;
         this.feedState = feedState.equals(FeedState.ACTIVE.toString()) ? FeedState.ACTIVE : FeedState.INACTIVE;
+        this.ingestNodes = ingestNodes;
+        this.computeNodes = computeNodes;
     }
 
     @Override
@@ -160,6 +168,38 @@
         stringSerde.serialize(aString, fieldValue.getDataOutput());
         feedRecordBuilder.addField(MetadataRecordTypes.FEED_DETAILS_ARECORD_STATE_FIELD_INDEX, fieldValue);
 
+        // write field 9
+        UnorderedListBuilder unorderedlistBuilder = new UnorderedListBuilder();
+        unorderedlistBuilder
+                .reset((AUnorderedListType) MetadataRecordTypes.FEED_DETAILS_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX]);
+        itemValue = new ArrayBackedValueStorage();
+        if (ingestNodes != null) {
+            for (String node : ingestNodes) {
+                itemValue.reset();
+                aString.setValue(node);
+                listBuilder.addItem(itemValue);
+            }
+        }
+        fieldValue.reset();
+        unorderedlistBuilder.write(fieldValue.getDataOutput(), true);
+        feedRecordBuilder.addField(MetadataRecordTypes.FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX, fieldValue);
+
+        // write field 10
+        unorderedlistBuilder = new UnorderedListBuilder();
+        unorderedlistBuilder
+                .reset((AUnorderedListType) MetadataRecordTypes.FEED_DETAILS_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_DETAILS_ARECORD_COMPUTE_NODES_FIELD_INDEX]);
+        itemValue = new ArrayBackedValueStorage();
+        if (computeNodes != null) {
+            for (String node : computeNodes) {
+                itemValue.reset();
+                aString.setValue(node);
+                listBuilder.addItem(itemValue);
+            }
+        }
+        fieldValue.reset();
+        unorderedlistBuilder.write(fieldValue.getDataOutput(), true);
+        feedRecordBuilder.addField(MetadataRecordTypes.FEED_DETAILS_ARECORD_COMPUTE_NODES_FIELD_INDEX, fieldValue);
+
         try {
             feedRecordBuilder.write(out, true);
         } catch (IOException | AsterixException e) {
@@ -215,4 +255,20 @@
         return signature;
     }
 
+    public List<String> getIngestNodes() {
+        return ingestNodes;
+    }
+
+    public void setIngestNodes(List<String> ingestNodes) {
+        this.ingestNodes = ingestNodes;
+    }
+
+    public List<String> getComputeNodes() {
+        return computeNodes;
+    }
+
+    public void setComputeNodes(List<String> computeNodes) {
+        this.computeNodes = computeNodes;
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index 61f856a..2a68ae9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -169,8 +169,23 @@
                 String feedState = ((AString) datasetDetailsRecord
                         .getValueByPos(MetadataRecordTypes.FEED_DETAILS_ARECORD_STATE_FIELD_INDEX)).getStringValue();
 
+                cursor = ((AUnorderedList) datasetDetailsRecord
+                        .getValueByPos(MetadataRecordTypes.FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX)).getCursor();
+                List<String> ingestNodes = new ArrayList<String>();
+                while (cursor.next()) {
+                    ingestNodes.add(((AString) cursor.get()).getStringValue());
+                }
+
+                cursor = ((AUnorderedList) datasetDetailsRecord
+                        .getValueByPos(MetadataRecordTypes.FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX)).getCursor();
+                List<String> computeNodes = new ArrayList<String>();
+                while (cursor.next()) {
+                    computeNodes.add(((AString) cursor.get()).getStringValue());
+                }
+
                 datasetDetails = new FeedDatasetDetails(fileStructure, partitioningStrategy, partitioningKey,
-                        partitioningKey, groupName, adapter, properties, signature, feedState);
+                        partitioningKey, groupName, adapter, properties, signature, feedState, ingestNodes,
+                        computeNodes);
                 break;
             }
             case INTERNAL: {
@@ -221,10 +236,11 @@
                 }
                 datasetDetails = new ExternalDatasetDetails(adapter, properties);
         }
-        
+
         Map<String, String> hints = getDatasetHints(datasetRecord);
-        
-        return new Dataset(dataverseName, datasetName, typeName, datasetDetails, hints, datasetType, datasetId, pendingOp);
+
+        return new Dataset(dataverseName, datasetName, typeName, datasetDetails, hints, datasetType, datasetId,
+                pendingOp);
     }
 
     @Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
new file mode 100644
index 0000000..a1acd38
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.metadata.entitytupletranslators;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+import edu.uci.ics.asterix.builders.UnorderedListBuilder;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.ARecord;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.base.AUnorderedList;
+import edu.uci.ics.asterix.om.base.IACursor;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Dataset metadata entity to an ITupleReference and vice versa.
+ */
+public class FeedActivityTupleTranslator extends AbstractTupleTranslator<FeedActivity> {
+    // Field indexes of serialized Dataset in a tuple.
+    // First key field.
+    public static final int FEED_ACTIVITY_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
+    // Second key field.
+    public static final int FEED_ACTIVITY_DATASET_DATASETNAME_TUPLE_FIELD_INDEX = 1;
+    // Payload field containing serialized Dataset.
+    public static final int FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+
+    @SuppressWarnings("unchecked")
+    private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+            .getSerializerDeserializer(MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE);
+    private AMutableInt32 aInt32;
+    protected ISerializerDeserializer<AInt32> aInt32Serde;
+
+    @SuppressWarnings("unchecked")
+    public FeedActivityTupleTranslator(boolean getTuple) {
+        super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
+        aInt32 = new AMutableInt32(-1);
+        aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+    }
+
+    @Override
+    public FeedActivity getMetadataEntytiFromTuple(ITupleReference frameTuple) throws IOException {
+        byte[] serRecord = frameTuple.getFieldData(FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordStartOffset = frameTuple.getFieldStart(FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX);
+        int recordLength = frameTuple.getFieldLength(FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX);
+        ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+        DataInput in = new DataInputStream(stream);
+        ARecord feedActivityRecord = (ARecord) recordSerDes.deserialize(in);
+        return createFeedActivityFromARecord(feedActivityRecord);
+    }
+
+    private FeedActivity createFeedActivityFromARecord(ARecord feedActivityRecord) {
+
+        String dataverseName = ((AString) feedActivityRecord
+                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
+        String datasetName = ((AString) feedActivityRecord
+                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX)).getStringValue();
+        String status = ((AString) feedActivityRecord
+                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_STATUS_FIELD_INDEX)).getStringValue();
+
+        List<String> ingestNodes = new ArrayList<String>();
+        IACursor cursor = ((AUnorderedList) feedActivityRecord
+                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX)).getCursor();
+        while (cursor.next()) {
+            ingestNodes.add(((AString) cursor.get()).getStringValue());
+        }
+
+        List<String> computeNodes = new ArrayList<String>();
+        cursor = ((AUnorderedList) feedActivityRecord
+                .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX)).getCursor();
+        while (cursor.next()) {
+            computeNodes.add(((AString) cursor.get()).getStringValue());
+        }
+
+        return new FeedActivity(dataverseName, datasetName, status, ingestNodes, computeNodes);
+    }
+
+    @Override
+    public ITupleReference getTupleFromMetadataEntity(FeedActivity feedActivity) throws IOException, MetadataException {
+        // write the key in the first 2 fields of the tuple
+        ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage();
+        tupleBuilder.reset();
+        aString.setValue(feedActivity.getDataverseName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+        aString.setValue(feedActivity.getDatasetName());
+        stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+        tupleBuilder.addFieldEndOffset();
+
+        // write the pay-load in the third field of the tuple
+
+        recordBuilder.reset(MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE);
+
+        // write field 0
+        fieldValue.reset();
+        aString.setValue(feedActivity.getDataverseName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 1
+        fieldValue.reset();
+        aString.setValue(feedActivity.getDatasetName());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX, fieldValue);
+
+        // write field 2
+        fieldValue.reset();
+        aString.setValue(feedActivity.getFeedState().name());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_STATUS_FIELD_INDEX, fieldValue);
+
+        // write field 3
+        UnorderedListBuilder listBuilder = new UnorderedListBuilder();
+        listBuilder
+                .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX]);
+        for (String field : feedActivity.getIngestNodes()) {
+            itemValue.reset();
+            aString.setValue(field);
+            stringSerde.serialize(aString, itemValue.getDataOutput());
+            listBuilder.addItem(itemValue);
+        }
+        fieldValue.reset();
+        listBuilder.write(fieldValue.getDataOutput(), true);
+        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX, fieldValue);
+
+        // write field 4
+        listBuilder
+                .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX]);
+        for (String field : feedActivity.getIngestNodes()) {
+            itemValue.reset();
+            aString.setValue(field);
+            stringSerde.serialize(aString, itemValue.getDataOutput());
+            listBuilder.addItem(itemValue);
+        }
+        fieldValue.reset();
+        listBuilder.write(fieldValue.getDataOutput(), true);
+        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX, fieldValue);
+
+        // write field 5
+        fieldValue.reset();
+        aString.setValue(Calendar.getInstance().getTime().toString());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX, fieldValue);
+
+        // write record
+        try {
+            recordBuilder.write(tupleBuilder.getDataOutput(), true);
+        } catch (AsterixException e) {
+            throw new MetadataException(e);
+        }
+        tupleBuilder.addFieldEndOffset();
+
+        tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+        return tuple;
+    }
+
+}
\ No newline at end of file