added job event listener
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 66a3980..f00deba 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -74,6 +74,7 @@
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.declared.FeedJobEventListenerFactory;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
@@ -445,9 +446,11 @@
Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
.getConfiguration();
FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
+
datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
- ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
+ ngName, adapter, configuration, signature,
+ FeedDatasetDetails.FeedState.INACTIVE.toString(), null, null);
break;
}
}
@@ -1335,8 +1338,7 @@
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- acquireReadLatch();
-
+ acquireWriteLatch();
try {
BeginFeedStatement bfs = (BeginFeedStatement) stmt;
String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
@@ -1359,14 +1361,14 @@
bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
cbfs.setQuery(bfs.getQuery());
JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
+ compiled.setJobletEventListenerFactory(new FeedJobEventListenerFactory(dataset, compiled, hcc));
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
-
+ JobId jobId = null;
if (compiled != null) {
- runJob(hcc, compiled, true);
+ jobId = runJob(hcc, compiled, false);
}
-
} catch (Exception e) {
if (bActiveTxn) {
abort(e, e, mdTxnCtx);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
index 0382315..a4be0f8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataCache.java
@@ -21,11 +21,13 @@
import java.util.Map;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
+import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
-import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
@@ -51,6 +53,8 @@
protected final Map<FunctionSignature, Function> functions = new HashMap<FunctionSignature, Function>();
// Key is adapter dataverse. Key of value map is the adapter name
protected final Map<String, Map<String, DatasourceAdapter>> adapters = new HashMap<String, Map<String, DatasourceAdapter>>();
+ // Key is FeedId
+ protected final Map<FeedId, FeedActivity> feedActivity = new HashMap<FeedId, FeedActivity>();
// Atomically executes all metadata operations in ctx's log.
public void commit(MetadataTransactionContext ctx) {
@@ -86,13 +90,16 @@
synchronized (datatypes) {
synchronized (functions) {
synchronized (adapters) {
- dataverses.clear();
- nodeGroups.clear();
- datasets.clear();
- indexes.clear();
- datatypes.clear();
- functions.clear();
- adapters.clear();
+ synchronized (feedActivity) {
+ dataverses.clear();
+ nodeGroups.clear();
+ datasets.clear();
+ indexes.clear();
+ datatypes.clear();
+ functions.clear();
+ adapters.clear();
+ feedActivity.clear();
+ }
}
}
}
@@ -180,20 +187,34 @@
synchronized (indexes) {
synchronized (datatypes) {
synchronized (functions) {
- datasets.remove(dataverse.getDataverseName());
- indexes.remove(dataverse.getDataverseName());
- datatypes.remove(dataverse.getDataverseName());
- adapters.remove(dataverse.getDataverseName());
- List<FunctionSignature> markedFunctionsForRemoval = new ArrayList<FunctionSignature>();
- for (FunctionSignature signature : functions.keySet()) {
- if (signature.getNamespace().equals(dataverse.getDataverseName())) {
- markedFunctionsForRemoval.add(signature);
+ synchronized (adapters) {
+ synchronized (feedActivity) {
+ datasets.remove(dataverse.getDataverseName());
+ indexes.remove(dataverse.getDataverseName());
+ datatypes.remove(dataverse.getDataverseName());
+ adapters.remove(dataverse.getDataverseName());
+ List<FunctionSignature> markedFunctionsForRemoval = new ArrayList<FunctionSignature>();
+ for (FunctionSignature signature : functions.keySet()) {
+ if (signature.getNamespace().equals(dataverse.getDataverseName())) {
+ markedFunctionsForRemoval.add(signature);
+ }
+ }
+ for (FunctionSignature signature : markedFunctionsForRemoval) {
+ functions.remove(signature);
+ }
+ List<FeedId> feedActivitiesMarkedForRemoval = new ArrayList<FeedId>();
+ for (FeedId fid : feedActivity.keySet()) {
+ if (fid.getDataverse().equals(dataverse.getDataverseName())) {
+ feedActivitiesMarkedForRemoval.add(fid);
+ }
+ }
+ for (FeedId fid : feedActivitiesMarkedForRemoval) {
+ feedActivity.remove(fid);
+ }
+
+ return dataverses.remove(dataverse.getDataverseName());
}
}
- for (FunctionSignature signature : markedFunctionsForRemoval) {
- functions.remove(signature);
- }
- return dataverses.remove(dataverse.getDataverseName());
}
}
}
@@ -220,19 +241,19 @@
}
}
}
-
+
public Object dropIndex(Index index) {
synchronized (indexes) {
Map<String, Map<String, Index>> datasetMap = indexes.get(index.getDataverseName());
if (datasetMap == null) {
return null;
}
-
+
Map<String, Index> indexMap = datasetMap.get(index.getDatasetName());
if (indexMap == null) {
return null;
}
-
+
return indexMap.remove(index.getIndexName());
}
}
@@ -268,7 +289,7 @@
return m.get(datasetName);
}
}
-
+
public Index getIndex(String dataverseName, String datasetName, String indexName) {
synchronized (indexes) {
Map<String, Map<String, Index>> datasetMap = indexes.get(dataverseName);
@@ -401,4 +422,21 @@
return null;
}
}
+
+ public Object addFeedActivityIfNotExists(FeedActivity fa) {
+ synchronized (feedActivity) {
+ FeedId fid = new FeedId(fa.getDataverseName(), fa.getDatasetName());
+ if (!feedActivity.containsKey(fid)) {
+ feedActivity.put(fid, fa);
+ }
+ }
+ return null;
+ }
+
+ public Object dropFeedActivity(FeedActivity fa) {
+ synchronized (feedActivity) {
+ FeedId fid = new FeedId(fa.getDataverseName(), fa.getDatasetName());
+ return feedActivity.remove(fid);
+ }
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 8d5d13b..ee81b63 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -124,7 +124,7 @@
MetadataPrimaryIndexes.DATASET_DATASET, MetadataPrimaryIndexes.DATATYPE_DATASET,
MetadataPrimaryIndexes.INDEX_DATASET, MetadataPrimaryIndexes.NODE_DATASET,
MetadataPrimaryIndexes.NODEGROUP_DATASET, MetadataPrimaryIndexes.FUNCTION_DATASET,
- MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET };
+ MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET };
secondaryIndexes = new IMetadataIndex[] { MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX,
MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX,
MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX };
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 2fc3e59..4d2faf3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -32,6 +32,8 @@
public static IMetadataIndex NODE_DATASET;
public static IMetadataIndex NODEGROUP_DATASET;
public static IMetadataIndex FUNCTION_DATASET;
+ public static IMetadataIndex DATASOURCE_ADAPTER_DATASET;
+ public static IMetadataIndex FEED_ACTIVITY_DATASET;
public static final int METADATA_DATASET_ID = 0;
public static final int DATAVERSE_DATASET_ID = 1;
@@ -42,10 +44,9 @@
public static final int NODEGROUP_DATASET_ID = 6;
public static final int FUNCTION_DATASET_ID = 7;
public static final int DATASOURCE_ADAPTER_DATASET_ID = 8;
+ public static final int FEED_ACTIVITY_DATASET_ID = 9;
public static final int FIRST_AVAILABLE_USER_DATASET_ID = 100;
- public static IMetadataIndex DATASOURCE_ADAPTER_DATASET;
-
/**
* Create all metadata primary index descriptors. MetadataRecordTypes must
* have been initialized before calling this init.
@@ -93,5 +94,9 @@
BuiltinType.ASTRING, BuiltinType.ASTRING }, new String[] { "DataverseName", "Name" }, 0,
MetadataRecordTypes.DATASOURCE_ADAPTER_RECORDTYPE, DATASOURCE_ADAPTER_DATASET_ID, true, new int[] { 0,
1 });
+
+ FEED_ACTIVITY_DATASET = new MetadataIndex("FeedActivity", null, 3, new IAType[] { BuiltinType.ASTRING,
+ BuiltinType.ASTRING }, new String[] { "DataverseName", "DatasetName" }, 0,
+ MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE, FEED_ACTIVITY_DATASET_ID, true, new int[] { 0, 1 });
}
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 50681ee..5b67102 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -47,6 +47,7 @@
public static ARecordType NODEGROUP_RECORDTYPE;
public static ARecordType FUNCTION_RECORDTYPE;
public static ARecordType DATASOURCE_ADAPTER_RECORDTYPE;
+ public static ARecordType FEED_ACTIVITY_RECORDTYPE;
/**
* Create all metadata record types.
@@ -76,6 +77,7 @@
NODEGROUP_RECORDTYPE = createNodeGroupRecordType();
FUNCTION_RECORDTYPE = createFunctionRecordType();
DATASOURCE_ADAPTER_RECORDTYPE = createDatasourceAdapterRecordType();
+ FEED_ACTIVITY_RECORDTYPE = createFeedActivityRecordType();
} catch (AsterixException e) {
throw new MetadataException(e);
}
@@ -143,13 +145,17 @@
public static final int FEED_DETAILS_ARECORD_PROPERTIES_FIELD_INDEX = 6;
public static final int FEED_DETAILS_ARECORD_FUNCTION_FIELD_INDEX = 7;
public static final int FEED_DETAILS_ARECORD_STATE_FIELD_INDEX = 8;
+ public static final int FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX = 9;
+ public static final int FEED_DETAILS_ARECORD_COMPUTE_NODES_FIELD_INDEX = 10;
private static final ARecordType createFeedDetailsRecordType() throws AsterixException {
AOrderedListType orderedListType = new AOrderedListType(BuiltinType.ASTRING, null);
AOrderedListType orderedListOfPropertiesType = new AOrderedListType(DATASOURCE_ADAPTER_PROPERTIES_RECORDTYPE,
null);
+ AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, null);
+
String[] fieldNames = { "FileStructure", "PartitioningStrategy", "PartitioningKey", "PrimaryKey", "GroupName",
- "DatasourceAdapter", "Properties", "Function", "Status" };
+ "DatasourceAdapter", "Properties", "Function", "Status", "IngestNodes", "ComputeNodes" };
List<IAType> feedFunctionUnionList = new ArrayList<IAType>();
feedFunctionUnionList.add(BuiltinType.ANULL);
@@ -158,7 +164,7 @@
IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, orderedListType, orderedListType,
BuiltinType.ASTRING, BuiltinType.ASTRING, orderedListOfPropertiesType, feedFunctionUnion,
- BuiltinType.ASTRING };
+ BuiltinType.ASTRING, unorderedListType, unorderedListType };
return new ARecordType(null, fieldNames, fieldTypes, true);
}
@@ -357,4 +363,22 @@
return new ARecordType("DatasourceAdapterRecordType", fieldNames, fieldTypes, true);
}
+ // Helper constants for accessing fields in an ARecord of type
+ // FeedActivityRecordType.
+ public static final int FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0;
+ public static final int FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX = 1;
+ public static final int FEED_ACTIVITY_ARECORD_STATUS_FIELD_INDEX = 2;
+ public static final int FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX = 3;
+ public static final int FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX = 4;
+ public static final int FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX = 5;
+
+ private static ARecordType createFeedActivityRecordType() throws AsterixException {
+ AUnorderedListType unorderedListType = new AUnorderedListType(BuiltinType.ASTRING, null);
+ String[] fieldNames = { "DataverseName", "DatasetName", "Status", "IngestNodes", "ComputeNodes",
+ "UpdateTimestamp" };
+ IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, unorderedListType,
+ unorderedListType, BuiltinType.ASTRING };
+ return new ARecordType("FeedActivityRecordType", fieldNames, fieldTypes, true);
+ }
+
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobEventListenerFactory.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobEventListenerFactory.java
new file mode 100644
index 0000000..3e7510f
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/FeedJobEventListenerFactory.java
@@ -0,0 +1,110 @@
+package edu.uci.ics.asterix.metadata.declared;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import edu.uci.ics.asterix.external.data.operator.FeedIntakeOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.Dataset;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+
+public class FeedJobEventListenerFactory implements IJobletEventListenerFactory {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private final Dataset dataset;
+ private final JobSpecification jobSpec;
+ private final IHyracksClientConnection hcc;
+ private IJobletEventListener jobEventListener;
+
+ public FeedJobEventListenerFactory(Dataset dataset, JobSpecification jobSpec, IHyracksClientConnection hcc) {
+ this.dataset = dataset;
+ this.jobSpec = jobSpec;
+ this.hcc = hcc;
+ }
+
+ @Override
+ public IJobletEventListener createListener(IHyracksJobletContext ctx) {
+
+ jobEventListener = new IJobletEventListener() {
+
+ private final List<String> ingestLocations = new ArrayList<String>();
+ private final List<String> computeLocations = new ArrayList<String>();
+
+ @Override
+ public void jobletStart(JobId jobId) {
+ List<OperatorDescriptorId> ingestOperatorIds = new ArrayList<OperatorDescriptorId>();
+ List<OperatorDescriptorId> computeOperatorIds = new ArrayList<OperatorDescriptorId>();
+
+ Map<OperatorDescriptorId, IOperatorDescriptor> operators = jobSpec.getOperatorMap();
+ for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry : operators.entrySet()) {
+ if (entry.getValue() instanceof FeedIntakeOperatorDescriptor) {
+ ingestOperatorIds.add(entry.getKey());
+ } else if (entry.getValue() instanceof AlgebricksMetaOperatorDescriptor) {
+ computeOperatorIds.add(entry.getKey());
+ }
+ }
+
+ try {
+ JobInfo info = hcc.getJobInfo(jobId);
+
+ for (OperatorDescriptorId ingestOpId : ingestOperatorIds) {
+ ingestLocations.addAll(info.getOperatorLocations().get(ingestOpId));
+ }
+ for (OperatorDescriptorId computeOpId : computeOperatorIds) {
+ computeLocations.addAll(info.getOperatorLocations().get(computeOpId));
+ }
+ System.out.println("job info " + info);
+
+ } catch (Exception e) {
+ // TODO Add Exception handling here
+ }
+ }
+
+ @Override
+ public void jobletFinish(JobStatus status) {
+ MetadataManager.INSTANCE.acquireWriteLatch();
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ FeedDatasetDetails feedDetails = (FeedDatasetDetails) dataset.getDatasetDetails();
+ feedDetails.setFeedState(FeedState.INACTIVE);
+ feedDetails.setComputeNodes(null);
+ feedDetails.setIngestNodes(null);
+ MetadataManager.INSTANCE
+ .dropDataset(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName());
+ MetadataManager.INSTANCE.addDataset(mdTxnCtx, dataset);
+ } catch (Exception e) {
+
+ }
+ }
+
+ public List<String> getIngestLocations() {
+ return ingestLocations;
+ }
+
+ public List<String> getComputeLocations() {
+ return computeLocations;
+ }
+ };
+ return jobEventListener;
+ }
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
new file mode 100644
index 0000000..97f7a1e
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.metadata.entities;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.metadata.IDatasetDetails;
+import edu.uci.ics.asterix.metadata.MetadataCache;
+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
+import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails.FeedState;
+
+/**
+ * Metadata describing a feed activity record.
+ */
+public class FeedActivity implements IMetadataEntity {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String dataverseName;
+ // Enforced to be unique within a dataverse.
+ private final String datasetName;
+
+ private FeedState feedState;
+ private List<String> ingestNodes;
+ private List<String> computeNodes;
+ private String lastUpdatedTimestamp;
+
+ public FeedActivity(String dataverseName, String datasetName, String feedState, List<String> ingestNodes,
+ List<String> computeNodes) {
+ this.dataverseName = dataverseName;
+ this.datasetName = datasetName;
+ this.feedState = FeedState.valueOf(feedState);
+ this.ingestNodes = ingestNodes;
+ this.computeNodes = computeNodes;
+ }
+
+ public String getDataverseName() {
+ return dataverseName;
+ }
+
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ @Override
+ public Object addToCache(MetadataCache cache) {
+ return cache.addFeedActivityIfNotExists(this);
+ }
+
+ @Override
+ public Object dropFromCache(MetadataCache cache) {
+ return cache.dropFeedActivity(this);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof FeedActivity)) {
+ return false;
+ }
+ FeedActivity otherDataset = (FeedActivity) other;
+ if (!otherDataset.dataverseName.equals(dataverseName)) {
+ return false;
+ }
+ if (!otherDataset.datasetName.equals(datasetName)) {
+ return false;
+ }
+ return true;
+ }
+
+ public FeedState getFeedState() {
+ return feedState;
+ }
+
+ public void setFeedState(FeedState feedState) {
+ this.feedState = feedState;
+ }
+
+ public List<String> getIngestNodes() {
+ return ingestNodes;
+ }
+
+ public void setIngestNodes(List<String> ingestNodes) {
+ this.ingestNodes = ingestNodes;
+ }
+
+ public List<String> getComputeNodes() {
+ return computeNodes;
+ }
+
+ public void setComputeNodes(List<String> computeNodes) {
+ this.computeNodes = computeNodes;
+ }
+
+ public String getLastUpdatedTimestamp() {
+ return lastUpdatedTimestamp;
+ }
+
+ public void setLastUpdatedTimestamp(String lastUpdatedTimestamp) {
+ this.lastUpdatedTimestamp = lastUpdatedTimestamp;
+ }
+}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java
index 22de3d3..81be3b5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedDatasetDetails.java
@@ -23,6 +23,7 @@
import edu.uci.ics.asterix.builders.IARecordBuilder;
import edu.uci.ics.asterix.builders.OrderedListBuilder;
import edu.uci.ics.asterix.builders.RecordBuilder;
+import edu.uci.ics.asterix.builders.UnorderedListBuilder;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
@@ -31,6 +32,7 @@
import edu.uci.ics.asterix.om.base.AMutableString;
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.types.AOrderedListType;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -48,25 +50,31 @@
private final Map<String, String> properties;
private final FunctionSignature signature;
private FeedState feedState;
+ private List<String> ingestNodes;
+ private List<String> computeNodes;
public enum FeedState {
- INACTIVE,
// INACTIVE state signifies that the feed dataset is not
// connected with the external world through the feed
// adapter.
- ACTIVE
+ INACTIVE,
+
// ACTIVE state signifies that the feed dataset is connected to the
// external world using an adapter that may put data into the dataset.
+ ACTIVE
}
public FeedDatasetDetails(FileStructure fileStructure, PartitioningStrategy partitioningStrategy,
List<String> partitioningKey, List<String> primaryKey, String groupName, String adapterFactory,
- Map<String, String> properties, FunctionSignature signature, String feedState) {
+ Map<String, String> properties, FunctionSignature signature, String feedState, List<String> ingestNodes,
+ List<String> computeNodes) {
super(fileStructure, partitioningStrategy, partitioningKey, primaryKey, groupName);
this.properties = properties;
this.adapterFactory = adapterFactory;
this.signature = signature;
this.feedState = feedState.equals(FeedState.ACTIVE.toString()) ? FeedState.ACTIVE : FeedState.INACTIVE;
+ this.ingestNodes = ingestNodes;
+ this.computeNodes = computeNodes;
}
@Override
@@ -160,6 +168,38 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
feedRecordBuilder.addField(MetadataRecordTypes.FEED_DETAILS_ARECORD_STATE_FIELD_INDEX, fieldValue);
+ // write field 9
+ UnorderedListBuilder unorderedlistBuilder = new UnorderedListBuilder();
+ unorderedlistBuilder
+ .reset((AUnorderedListType) MetadataRecordTypes.FEED_DETAILS_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX]);
+ itemValue = new ArrayBackedValueStorage();
+ if (ingestNodes != null) {
+ for (String node : ingestNodes) {
+ itemValue.reset();
+ aString.setValue(node);
+ listBuilder.addItem(itemValue);
+ }
+ }
+ fieldValue.reset();
+ unorderedlistBuilder.write(fieldValue.getDataOutput(), true);
+ feedRecordBuilder.addField(MetadataRecordTypes.FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX, fieldValue);
+
+ // write field 10
+ unorderedlistBuilder = new UnorderedListBuilder();
+ unorderedlistBuilder
+ .reset((AUnorderedListType) MetadataRecordTypes.FEED_DETAILS_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_DETAILS_ARECORD_COMPUTE_NODES_FIELD_INDEX]);
+ itemValue = new ArrayBackedValueStorage();
+ if (computeNodes != null) {
+ for (String node : computeNodes) {
+ itemValue.reset();
+ aString.setValue(node);
+ listBuilder.addItem(itemValue);
+ }
+ }
+ fieldValue.reset();
+ unorderedlistBuilder.write(fieldValue.getDataOutput(), true);
+ feedRecordBuilder.addField(MetadataRecordTypes.FEED_DETAILS_ARECORD_COMPUTE_NODES_FIELD_INDEX, fieldValue);
+
try {
feedRecordBuilder.write(out, true);
} catch (IOException | AsterixException e) {
@@ -215,4 +255,20 @@
return signature;
}
+ public List<String> getIngestNodes() {
+ return ingestNodes;
+ }
+
+ public void setIngestNodes(List<String> ingestNodes) {
+ this.ingestNodes = ingestNodes;
+ }
+
+ public List<String> getComputeNodes() {
+ return computeNodes;
+ }
+
+ public void setComputeNodes(List<String> computeNodes) {
+ this.computeNodes = computeNodes;
+ }
+
}
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index 61f856a..2a68ae9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -169,8 +169,23 @@
String feedState = ((AString) datasetDetailsRecord
.getValueByPos(MetadataRecordTypes.FEED_DETAILS_ARECORD_STATE_FIELD_INDEX)).getStringValue();
+ cursor = ((AUnorderedList) datasetDetailsRecord
+ .getValueByPos(MetadataRecordTypes.FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX)).getCursor();
+ List<String> ingestNodes = new ArrayList<String>();
+ while (cursor.next()) {
+ ingestNodes.add(((AString) cursor.get()).getStringValue());
+ }
+
+ cursor = ((AUnorderedList) datasetDetailsRecord
+ .getValueByPos(MetadataRecordTypes.FEED_DETAILS_ARECORD_INGEST_NODES_FIELD_INDEX)).getCursor();
+ List<String> computeNodes = new ArrayList<String>();
+ while (cursor.next()) {
+ computeNodes.add(((AString) cursor.get()).getStringValue());
+ }
+
datasetDetails = new FeedDatasetDetails(fileStructure, partitioningStrategy, partitioningKey,
- partitioningKey, groupName, adapter, properties, signature, feedState);
+ partitioningKey, groupName, adapter, properties, signature, feedState, ingestNodes,
+ computeNodes);
break;
}
case INTERNAL: {
@@ -221,10 +236,11 @@
}
datasetDetails = new ExternalDatasetDetails(adapter, properties);
}
-
+
Map<String, String> hints = getDatasetHints(datasetRecord);
-
- return new Dataset(dataverseName, datasetName, typeName, datasetDetails, hints, datasetType, datasetId, pendingOp);
+
+ return new Dataset(dataverseName, datasetName, typeName, datasetDetails, hints, datasetType, datasetId,
+ pendingOp);
}
@Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
new file mode 100644
index 0000000..a1acd38
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/FeedActivityTupleTranslator.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.metadata.entitytupletranslators;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+
+import edu.uci.ics.asterix.builders.UnorderedListBuilder;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import edu.uci.ics.asterix.metadata.MetadataException;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.ARecord;
+import edu.uci.ics.asterix.om.base.AString;
+import edu.uci.ics.asterix.om.base.AUnorderedList;
+import edu.uci.ics.asterix.om.base.IACursor;
+import edu.uci.ics.asterix.om.types.AUnorderedListType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+/**
+ * Translates a Dataset metadata entity to an ITupleReference and vice versa.
+ */
+public class FeedActivityTupleTranslator extends AbstractTupleTranslator<FeedActivity> {
+ // Field indexes of serialized Dataset in a tuple.
+ // First key field.
+ public static final int FEED_ACTIVITY_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
+ // Second key field.
+ public static final int FEED_ACTIVITY_DATASET_DATASETNAME_TUPLE_FIELD_INDEX = 1;
+ // Payload field containing serialized Dataset.
+ public static final int FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+
+ @SuppressWarnings("unchecked")
+ private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE);
+ private AMutableInt32 aInt32;
+ protected ISerializerDeserializer<AInt32> aInt32Serde;
+
+ @SuppressWarnings("unchecked")
+ public FeedActivityTupleTranslator(boolean getTuple) {
+ super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
+ aInt32 = new AMutableInt32(-1);
+ aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+ }
+
+ @Override
+ public FeedActivity getMetadataEntytiFromTuple(ITupleReference frameTuple) throws IOException {
+ byte[] serRecord = frameTuple.getFieldData(FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordStartOffset = frameTuple.getFieldStart(FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX);
+ int recordLength = frameTuple.getFieldLength(FEED_ACTIVITY_PAYLOAD_TUPLE_FIELD_INDEX);
+ ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
+ DataInput in = new DataInputStream(stream);
+ ARecord feedActivityRecord = (ARecord) recordSerDes.deserialize(in);
+ return createFeedActivityFromARecord(feedActivityRecord);
+ }
+
+ private FeedActivity createFeedActivityFromARecord(ARecord feedActivityRecord) {
+
+ String dataverseName = ((AString) feedActivityRecord
+ .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX)).getStringValue();
+ String datasetName = ((AString) feedActivityRecord
+ .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX)).getStringValue();
+ String status = ((AString) feedActivityRecord
+ .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_STATUS_FIELD_INDEX)).getStringValue();
+
+ List<String> ingestNodes = new ArrayList<String>();
+ IACursor cursor = ((AUnorderedList) feedActivityRecord
+ .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX)).getCursor();
+ while (cursor.next()) {
+ ingestNodes.add(((AString) cursor.get()).getStringValue());
+ }
+
+ List<String> computeNodes = new ArrayList<String>();
+ cursor = ((AUnorderedList) feedActivityRecord
+ .getValueByPos(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX)).getCursor();
+ while (cursor.next()) {
+ computeNodes.add(((AString) cursor.get()).getStringValue());
+ }
+
+ return new FeedActivity(dataverseName, datasetName, status, ingestNodes, computeNodes);
+ }
+
+ @Override
+ public ITupleReference getTupleFromMetadataEntity(FeedActivity feedActivity) throws IOException, MetadataException {
+ // write the key in the first 2 fields of the tuple
+ ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage();
+ tupleBuilder.reset();
+ aString.setValue(feedActivity.getDataverseName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+ aString.setValue(feedActivity.getDatasetName());
+ stringSerde.serialize(aString, tupleBuilder.getDataOutput());
+ tupleBuilder.addFieldEndOffset();
+
+ // write the pay-load in the third field of the tuple
+
+ recordBuilder.reset(MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE);
+
+ // write field 0
+ fieldValue.reset();
+ aString.setValue(feedActivity.getDataverseName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATAVERSE_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 1
+ fieldValue.reset();
+ aString.setValue(feedActivity.getDatasetName());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_DATASET_NAME_FIELD_INDEX, fieldValue);
+
+ // write field 2
+ fieldValue.reset();
+ aString.setValue(feedActivity.getFeedState().name());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_STATUS_FIELD_INDEX, fieldValue);
+
+ // write field 3
+ UnorderedListBuilder listBuilder = new UnorderedListBuilder();
+ listBuilder
+ .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX]);
+ for (String field : feedActivity.getIngestNodes()) {
+ itemValue.reset();
+ aString.setValue(field);
+ stringSerde.serialize(aString, itemValue.getDataOutput());
+ listBuilder.addItem(itemValue);
+ }
+ fieldValue.reset();
+ listBuilder.write(fieldValue.getDataOutput(), true);
+ recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_INGEST_NODES_FIELD_INDEX, fieldValue);
+
+ // write field 4
+ listBuilder
+ .reset((AUnorderedListType) MetadataRecordTypes.FEED_ACTIVITY_RECORDTYPE.getFieldTypes()[MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX]);
+ for (String field : feedActivity.getIngestNodes()) {
+ itemValue.reset();
+ aString.setValue(field);
+ stringSerde.serialize(aString, itemValue.getDataOutput());
+ listBuilder.addItem(itemValue);
+ }
+ fieldValue.reset();
+ listBuilder.write(fieldValue.getDataOutput(), true);
+ recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_COMPUTE_NODES_FIELD_INDEX, fieldValue);
+
+ // write field 5
+ fieldValue.reset();
+ aString.setValue(Calendar.getInstance().getTime().toString());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.FEED_ACTIVITY_ARECORD_LAST_UPDATE_TIMESTAMP_FIELD_INDEX, fieldValue);
+
+ // write record
+ try {
+ recordBuilder.write(tupleBuilder.getDataOutput(), true);
+ } catch (AsterixException e) {
+ throw new MetadataException(e);
+ }
+ tupleBuilder.addFieldEndOffset();
+
+ tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ return tuple;
+ }
+
+}
\ No newline at end of file