This change integrates a run statement for running Pregelix jobs into the AQL.
Therefore it also provides a new FlushOperator to flush all memory components of a dataset to disc.
Change-Id: I1f97cfdc79943abf035a7342bb777d59af6518e9
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/193
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java
index 23553d1..e4dd711 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java
@@ -30,7 +30,7 @@
}
protected List<Statement.Kind> getAllowedStatements() {
- Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.FUNCTION_DECL, Kind.QUERY, Kind.SET, Kind.WRITE };
+ Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.FUNCTION_DECL, Kind.QUERY, Kind.SET, Kind.WRITE, Kind.RUN };
return Arrays.asList(statementsArray);
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 25dc341..5fac0f6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -14,7 +14,9 @@
*/
package edu.uci.ics.asterix.aql.translator;
+import java.io.BufferedReader;
import java.io.File;
+import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.rmi.RemoteException;
import java.util.ArrayList;
@@ -54,6 +56,7 @@
import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
import edu.uci.ics.asterix.aql.expression.FunctionDecl;
import edu.uci.ics.asterix.aql.expression.FunctionDropStatement;
+import edu.uci.ics.asterix.aql.expression.IDatasetDetailsDecl;
import edu.uci.ics.asterix.aql.expression.Identifier;
import edu.uci.ics.asterix.aql.expression.IndexDropStatement;
import edu.uci.ics.asterix.aql.expression.InsertStatement;
@@ -61,13 +64,17 @@
import edu.uci.ics.asterix.aql.expression.LoadStatement;
import edu.uci.ics.asterix.aql.expression.NodeGroupDropStatement;
import edu.uci.ics.asterix.aql.expression.NodegroupDecl;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
import edu.uci.ics.asterix.aql.expression.Query;
import edu.uci.ics.asterix.aql.expression.RefreshExternalDatasetStatement;
import edu.uci.ics.asterix.aql.expression.SetStatement;
import edu.uci.ics.asterix.aql.expression.TypeDecl;
import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
+import edu.uci.ics.asterix.aql.expression.VarIdentifier;
+import edu.uci.ics.asterix.aql.expression.VariableExpr;
import edu.uci.ics.asterix.aql.expression.WriteStatement;
import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -115,7 +122,10 @@
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.result.ResultReader;
import edu.uci.ics.asterix.result.ResultUtils;
+import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
@@ -127,20 +137,30 @@
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import edu.uci.ics.asterix.translator.TypeTranslator;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
/*
@@ -168,7 +188,7 @@
private final SessionConfig sessionConfig;
private final OutputFormat pdf;
private Dataverse activeDefaultDataverse;
- private List<FunctionDecl> declaredFunctions;
+ private final List<FunctionDecl> declaredFunctions;
public AqlTranslator(List<Statement> aqlStatements, PrintWriter out, SessionConfig pc, APIFramework.OutputFormat pdf)
throws MetadataException, AsterixException {
@@ -191,7 +211,7 @@
/**
* Compiles and submits for execution a list of AQL statements.
- *
+ *
* @param hcc
* A Hyracks client connection that is used to submit a jobspec to Hyracks.
* @param hdc
@@ -201,8 +221,8 @@
* @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
* @throws Exception
*/
- public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc,
- ResultDelivery resultDelivery) throws Exception {
+ public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery)
+ throws Exception {
int resultSetIdCounter = 0;
FileSplit outputFile = null;
IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
@@ -334,6 +354,11 @@
outputFile = result.second;
break;
}
+
+ case RUN: {
+ handleRunStatement(metadataProvider, stmt, hcc);
+ break;
+ }
}
}
}
@@ -506,8 +531,7 @@
compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
}
} else {
- validateCompactionPolicy(compactionPolicy,
- compactionPolicyProperties, mdTxnCtx, false);
+ validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false);
}
if (filterField != null) {
aRecordType.validateFilterField(filterField);
@@ -2346,6 +2370,232 @@
MetadataLockManager.INSTANCE.refreshDatasetEnd(dataverseName, dataverseName + "." + datasetName);
}
}
+
+ private void handleRunStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc) throws AsterixException, Exception {
+ RunStatement runStmt = (RunStatement) stmt;
+ switch(runStmt.getSystem()) {
+ case "pregel":
+ case "pregelix":
+ handlePregelixStatement(metadataProvider, runStmt, hcc);
+ break;
+ default:
+ throw new AlgebricksException("The system \""+runStmt.getSystem()+"\" specified in your run statement is not supported.");
+ }
+
+ }
+
+ private void handlePregelixStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc) throws AsterixException, Exception {
+
+ RunStatement pregelixStmt = (RunStatement) stmt;
+ boolean bActiveTxn = true;
+
+ String dataverseNameFrom = getActiveDataverseName(pregelixStmt.getDataverseNameFrom());
+ String dataverseNameTo = getActiveDataverseName(pregelixStmt.getDataverseNameTo());
+ String datasetNameFrom = pregelixStmt.getDatasetNameFrom().getValue();
+ String datasetNameTo = pregelixStmt.getDatasetNameTo().getValue();
+
+ if(dataverseNameFrom != dataverseNameTo) {
+ throw new AlgebricksException("Pregelix statements across different dataverses are not supported.");
+ }
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ MetadataLockManager.INSTANCE.pregelixBegin(dataverseNameFrom, datasetNameFrom, datasetNameTo);
+
+ try {
+
+ // construct input paths
+ Index fromIndex = null;
+ List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameFrom, pregelixStmt
+ .getDatasetNameFrom().getValue());
+ for (Index ind : indexes) {
+ if (ind.isPrimaryIndex())
+ fromIndex = ind;
+ }
+
+ if (fromIndex == null) {
+ throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameFrom);
+ }
+
+ IFileSplitProvider fromSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
+ dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName()).first;
+ StringBuilder fromSplitsPaths = new StringBuilder();
+
+ for (FileSplit f : fromSplits.getFileSplits()) {
+ fromSplitsPaths.append("asterix://" + f.getNodeName() + f.getLocalFile().getFile().getAbsolutePath());
+ fromSplitsPaths.append(",");
+ }
+ fromSplitsPaths.setLength(fromSplitsPaths.length() - 1);
+
+ // Construct output paths
+ Index toIndex = null;
+ indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo, pregelixStmt
+ .getDatasetNameTo().getValue());
+ for (Index ind : indexes) {
+ if (ind.isPrimaryIndex())
+ toIndex = ind;
+ }
+
+ if (toIndex == null) {
+ throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo);
+ }
+
+ IFileSplitProvider toSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
+ dataverseNameTo, datasetNameTo, toIndex.getIndexName()).first;
+ StringBuilder toSplitsPaths = new StringBuilder();
+
+ for (FileSplit f : toSplits.getFileSplits()) {
+ toSplitsPaths.append("asterix://" + f.getNodeName() + f.getLocalFile().getFile().getAbsolutePath());
+ toSplitsPaths.append(",");
+ }
+ toSplitsPaths.setLength(toSplitsPaths.length() - 1);
+
+ try {
+ Dataset toDataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameTo, datasetNameTo);
+ DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo),
+ pregelixStmt.getDatasetNameTo(), true);
+ this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+
+ IDatasetDetailsDecl idd = new InternalDetailsDecl(new Identifier(toDataset.getDatasetDetails()
+ .getNodeGroupName()), toIndex.getKeyFieldNames(), false, toDataset.getDatasetDetails()
+ .getCompactionPolicy(), toDataset.getDatasetDetails().getCompactionPolicyProperties(), null);
+ DatasetDecl createToDataset = new DatasetDecl(new Identifier(dataverseNameTo),
+ pregelixStmt.getDatasetNameTo(), new Identifier(toDataset.getItemTypeName()),
+ toDataset.getHints(), toDataset.getDatasetType(), idd, false);
+ this.handleCreateDatasetStatement(metadataProvider, createToDataset, hcc);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Error cleaning the result dataset. This should not happen.");
+ }
+
+ // Flush source dataset
+ flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName());
+
+ // call Pregelix
+ String pregelix_home = System.getenv("PREGELIX_HOME");
+ if (pregelix_home == null) {
+ throw new AlgebricksException("PREGELIX_HOME is not defined!");
+ }
+
+ // construct command
+ ArrayList<String> cmd = new ArrayList<String>();
+ cmd.add("bin/pregelix");
+ cmd.add(pregelixStmt.getParameters().get(0)); // jar
+ cmd.add(pregelixStmt.getParameters().get(1)); // class
+ for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
+ cmd.add(s);
+ }
+ cmd.add("-inputpaths");
+ cmd.add(fromSplitsPaths.toString());
+ cmd.add("-outputpath");
+ cmd.add(toSplitsPaths.toString());
+
+ StringBuilder command = new StringBuilder();
+ for (String s : cmd) {
+ command.append(s);
+ command.append(" ");
+ }
+ LOGGER.info("Running Pregelix Command: " + command.toString());
+
+ ProcessBuilder pb = new ProcessBuilder(cmd);
+ pb.directory(new File(pregelix_home));
+ pb.redirectErrorStream(true);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ Process pr = pb.start();
+
+ int resultState = 0;
+
+ BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()));
+ String line;
+ while ((line = in.readLine()) != null) {
+ System.out.println(line);
+ if (line.contains("job finished")) {
+ resultState = 1;
+ }
+ if (line.contains("Exception") || line.contains("Error")) {
+
+ if (line.contains("Connection refused")) {
+ throw new AlgebricksException(
+ "The connection to your Pregelix cluster was refused. Is it running? Is the port in the query correct?");
+ }
+
+ if (line.contains("Could not find or load main class")) {
+ throw new AlgebricksException(
+ "The main class of your Pregelix query was not found. Is the path to your .jar file correct?");
+ }
+
+ if (line.contains("ClassNotFoundException")) {
+ throw new AlgebricksException(
+ "The vertex class of your Pregelix query was not found. Does it exist? Is the spelling correct?");
+ }
+
+ if (line.contains("HyracksException")) {
+ throw new AlgebricksException(
+ "Something went wrong executing your Pregelix Job (HyracksException). Check the configuration of STORAGE_BUFFERCACHE_PAGESIZE and STORAGE_MEMORYCOMPONENT_PAGESIZE."
+ + "It must match the one of Asterix. You can use managix describe -admin to find out the right configuration. "
+ + "Check also if your datatypes in Pregelix and Asterix are matching.");
+ }
+
+ throw new AlgebricksException(
+ "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restartet. "
+ + "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
+ + "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
+ }
+ }
+ pr.waitFor();
+ in.close();
+
+ if (resultState != 1) {
+ throw new AlgebricksException(
+ "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restartet. "
+ + "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
+ + "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
+ }
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ abort(e, e, mdTxnCtx);
+ }
+ throw e;
+ } finally {
+ MetadataLockManager.INSTANCE
+ .pregelixEnd(dataverseNameFrom, datasetNameFrom, datasetNameTo);
+ }
+ }
+
+ private void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
+ MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
+ throws Exception {
+ AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+ int frameSize = compilerProperties.getFrameSize();
+ JobSpecification spec = new JobSpecification(frameSize);
+
+ RecordDescriptor[] rDescs = new RecordDescriptor[] { new RecordDescriptor(new ISerializerDeserializer[] {}) };
+ AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
+ new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
+
+ edu.uci.ics.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId();
+ FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId,
+ MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName).getDatasetId());
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName);
+ AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
+ primaryPartitionConstraint);
+
+ IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, false);
+ spec.setJobletEventListenerFactory(jobEventListenerFactory);
+ runJob(hcc, spec, true);
+ }
private JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
throws Exception {
@@ -2353,8 +2603,8 @@
return jobIds[0];
}
- public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out,
- boolean waitForCompletion) throws Exception {
+ public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, boolean waitForCompletion)
+ throws Exception {
JobId[] startedJobIds = new JobId[jobs.length];
for (int i = 0; i < jobs.length; i++) {
JobSpecification spec = jobs[i].getJobSpec();
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.1.ddl.aql
new file mode 100644
index 0000000..8d47100f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.1.ddl.aql
@@ -0,0 +1,47 @@
+drop dataverse Pregelix if exists;
+create dataverse Pregelix
+use dataverse Pregelix
+
+create type TwitterUserType as open {
+ screen-name: string,
+ lang: string,
+ friends_count: int32,
+ statuses_count: int32,
+ name: string,
+ followers_count: int32
+}
+
+create type TweetMessageType as open {
+ tweetid: int64,
+ user: TwitterUserType,
+ sender-location: point?,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string,
+ retweeted-from: int64,
+ forwarded-from: int64
+}
+
+create dataset TwitterMsgs(TweetMessageType)
+ primary key tweetid;
+
+create dataset TwitterUsers(TwitterUserType)
+ primary key screen-name;
+
+ create type TMEdge as open {
+ tweetid: int64,
+ value: float?
+ }
+
+ create type TMGraph as open {
+ tweetid: int64,
+ rank-value: double?,
+ populated-by: {{TMEdge}}
+ }
+
+
+create dataset MyInputGraph(TMGraph)
+ primary key tweetid;
+
+create dataset MyOutputGraph(TMGraph)
+ primary key tweetid;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.2.update.aql
new file mode 100644
index 0000000..5c8272f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.2.update.aql
@@ -0,0 +1,34 @@
+use dataverse Pregelix;
+
+insert into dataset TwitterUsers (
+{{
+{"screen-name":"NathanGiesen@211","lang":"en","friends_count":18,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},
+{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},
+{"screen-name":"NilaMilliron_tw","lang":"en","friends_count":445,"statuses_count":164,"name":"Nila Milliron","followers_count":22649},
+{"screen-name":"ChangEwing_573","lang":"en","friends_count":182,"statuses_count":394,"name":"Chang Ewing","followers_count":32136}
+}})
+
+insert into dataset TwitterMsgs (
+{{
+ {"tweetid":1,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("47.44,80.65"),"send-time":datetime("2008-04-26T10:10:00"),"referred-topics":{{"t-mobile","customization"}},"message-text":" love t-mobile its customization is good:)", "retweeted-from":2, "forwarded-from":3},
+ {"tweetid":2,"user":{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},"sender-location":point("32.84,67.14"),"send-time":datetime("2010-05-13T10:10:00"),"referred-topics":{{"verizon","shortcut-menu"}},"message-text":" like verizon its shortcut-menu is awesome:)", "retweeted-from":7, "forwarded-from":1},
+ {"tweetid":3,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("29.72,75.8"),"send-time":datetime("2006-11-04T10:10:00"),"referred-topics":{{"motorola","speed"}},"message-text":" like motorola the speed is good:)", "retweeted-from":8, "forwarded-from":6},
+ {"tweetid":4,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("39.28,70.48"),"send-time":datetime("2011-12-26T10:10:00"),"referred-topics":{{"sprint","voice-command"}},"message-text":" like sprint the voice-command is mind-blowing:)", "retweeted-from":1, "forwarded-from":3},
+ {"tweetid":5,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("40.09,92.69"),"send-time":datetime("2006-08-04T10:10:00"),"referred-topics":{{"motorola","speed"}},"message-text":" can't stand motorola its speed is terrible:(", "retweeted-from":8, "forwarded-from":6},
+ {"tweetid":6,"user":{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},"sender-location":point("47.51,83.99"),"send-time":datetime("2010-05-07T10:10:00"),"referred-topics":{{"iphone","voice-clarity"}},"message-text":" like iphone the voice-clarity is good:)", "retweeted-from":4, "forwarded-from":5},
+ {"tweetid":7,"user":{"screen-name":"ChangEwing_573","lang":"en","friends_count":182,"statuses_count":394,"name":"Chang Ewing","followers_count":32136},"sender-location":point("36.21,72.6"),"send-time":datetime("2011-08-25T10:10:00"),"referred-topics":{{"samsung","platform"}},"message-text":" like samsung the platform is good", "retweeted-from":2, "forwarded-from":7},
+ {"tweetid":8,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("46.05,93.34"),"send-time":datetime("2005-10-14T10:10:00"),"referred-topics":{{"t-mobile","shortcut-menu"}},"message-text":" like t-mobile the shortcut-menu is awesome:)", "retweeted-from":3, "forwarded-from":7},
+ {"tweetid":9,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("36.86,74.62"),"send-time":datetime("2012-07-21T10:10:00"),"referred-topics":{{"verizon","voicemail-service"}},"message-text":" love verizon its voicemail-service is awesome", "retweeted-from":6, "forwarded-from":2},
+ {"tweetid":10,"user":{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},"sender-location":point("29.15,76.53"),"send-time":datetime("2008-01-26T10:10:00"),"referred-topics":{{"verizon","voice-clarity"}},"message-text":" hate verizon its voice-clarity is OMG:(", "retweeted-from":4, "forwarded-from":5},
+ {"tweetid":11,"user":{"screen-name":"NilaMilliron_tw","lang":"en","friends_count":445,"statuses_count":164,"name":"Nila Milliron","followers_count":22649},"sender-location":point("37.59,68.42"),"send-time":datetime("2008-03-09T10:10:00"),"referred-topics":{{"iphone","platform"}},"message-text":" can't stand iphone its platform is terrible", "retweeted-from":6, "forwarded-from":3},
+ {"tweetid":12,"user":{"screen-name":"OliJackson_512","lang":"en","friends_count":445,"statuses_count":164,"name":"Oli Jackson","followers_count":22649},"sender-location":point("24.82,94.63"),"send-time":datetime("2010-02-13T10:10:00"),"referred-topics":{{"samsung","voice-command"}},"message-text":" like samsung the voice-command is amazing:)", "retweeted-from":6, "forwarded-from":5}
+}})
+
+insert into dataset MyInputGraph
+for $tm in dataset TwitterMsgs
+let $links:={{ {"tweetid":$tm.retweeted-from}, {"tweetid":$tm.forwarded-from}}}
+return {
+ "tweetid": $tm.tweetid,
+ "rank-value": 0.0,
+ "populated-by": $links
+}
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.3.query.aql
new file mode 100644
index 0000000..78fb02e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.3.query.aql
@@ -0,0 +1,9 @@
+use dataverse Pregelix;
+
+run pregel("examples/pregelix-example-0.2.14-SNAPSHOT-jar-with-dependencies.jar"
+ "edu.uci.ics.pregelix.example.PageRankVertex"
+ "-ip 10.0.2.15 -port 3199 -vnum 17")
+from dataset MyInputGraph
+to dataset MyOutputGraph;
+
+for $n in dataset MyOutputGraph return $n;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.1.ddl.aql
new file mode 100644
index 0000000..1439cc3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.1.ddl.aql
@@ -0,0 +1,16 @@
+drop dataverse Pregelix if exists;
+create dataverse Pregelix
+use dataverse Pregelix
+
+create type EdgeType as open {
+ destVertexId: int64,
+ value: float?
+}
+create type NodeType as open {
+ id: int64,
+ value: int64?,
+ edges: {{EdgeType}}
+}
+
+create dataset InputGraph(NodeType) primary key id;
+create dataset ResultGraph(NodeType) primary key id;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.2.update.aql
new file mode 100644
index 0000000..5537960
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.2.update.aql
@@ -0,0 +1,27 @@
+use dataverse Pregelix;
+
+insert into dataset InputGraph (
+{{
+ {"id":0,"value":0,"edges":{{ {"destVertexId":1}}}},
+{"id":1,"value":1,"edges":{{ {"destVertexId":1},{"destVertexId":2}}}},
+{"id":2,"value":2,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3}}}},
+{"id":3,"value":3,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4}}}},
+{"id":4,"value":4,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5}}}},
+{"id":5,"value":5,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6}}}},
+{"id":6,"value":6,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7}}}},
+{"id":7,"value":7,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7},{"destVertexId":8}}}},
+{"id":8,"value":8,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7},{"destVertexId":8},{"destVertexId":9}}}},
+{"id":9,"value":9,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7},{"destVertexId":8},{"destVertexId":9},{"destVertexId":0}}}},
+{"id":10,"value":10,"edges":{{ {"destVertexId":11}}}},
+{"id":11,"value":11,"edges":{{ {"destVertexId":11},{"destVertexId":12}}}},
+{"id":12,"value":12,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13}}}},
+{"id":13,"value":13,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14}}}},
+{"id":14,"value":14,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15}}}},
+{"id":15,"value":15,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16}}}},
+{"id":16,"value":16,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17}}}},
+{"id":17,"value":17,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17},{"destVertexId":18}}}},
+{"id":18,"value":18,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17},{"destVertexId":18},{"destVertexId":19}}}},
+{"id":19,"value":19,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17},{"destVertexId":18},{"destVertexId":19}, {"destVertexId":10}}}}
+}}
+
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.3.query.aql
new file mode 100644
index 0000000..77c5d7e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.3.query.aql
@@ -0,0 +1,9 @@
+use dataverse Pregelix;
+
+run pregel("examples/pregelix-example-0.2.14-SNAPSHOT-jar-with-dependencies.jar"
+ "edu.uci.ics.pregelix.example.ConnectedComponentsVertex"
+ "-ip 10.0.2.15 -port 3199")
+from dataset InputGraph
+to dataset ResultGraph;
+
+for $n in dataset ResultGraph return $n;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.1.ddl.aql
new file mode 100644
index 0000000..9642ce3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.1.ddl.aql
@@ -0,0 +1,16 @@
+drop dataverse Pregelix if exists;
+create dataverse Pregelix
+use dataverse Pregelix
+
+create type EdgeType as open {
+ destVertexId: int64,
+ cost: float?
+}
+create type NodeType as open {
+ id: int64,
+ value: double?,
+ edges: {{EdgeType}}
+}
+
+create dataset InputGraph(NodeType) primary key id;
+create dataset ResultGraph(NodeType) primary key id;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.2.update.aql
new file mode 100644
index 0000000..f328614
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.2.update.aql
@@ -0,0 +1,16 @@
+use dataverse Pregelix;
+
+insert into dataset InputGraph (
+{{
+{"id":0, "edges":{{ {"destVertexId":1, "cost":1.0f}}}},
+{"id":1, "edges":{{ {"destVertexId":3, "cost":4.0f},{"destVertexId":2, "cost":3.0f}}}},
+{"id":2, "edges":{{ {"destVertexId":4, "cost":5.0f},{"destVertexId":5, "cost":23.0f}}}},
+{"id":3, "edges":{{ {"destVertexId":2, "cost":1.0f},{"destVertexId":8, "cost":13.0f}}}},
+{"id":4, "edges":{{ {"destVertexId":1, "cost":5.0f},{"destVertexId":2, "cost":8.0f},{"destVertexId":3, "cost":23.0f},{"destVertexId":4, "cost":12.0f}}}},
+{"id":5, "edges":{{ {"destVertexId":6, "cost":12.0f},{"destVertexId":7, "cost":17.0f}}}},
+{"id":6, "edges":{{ {"destVertexId":1, "cost":12.0f},{"destVertexId":2, "cost":1.0f}}}},
+{"id":7, "edges":{{ {"destVertexId":9, "cost":100.0f}}}},
+{"id":8, "edges":{{ {"destVertexId":4, "cost":11.0f}}}},
+{"id":9, "edges":{{ {"destVertexId":1, "cost":16.0f},{"destVertexId":2, "cost":9.0f}}}}
+}}
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.3.query.aql
new file mode 100644
index 0000000..0bb4229
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.3.query.aql
@@ -0,0 +1,9 @@
+use dataverse Pregelix;
+
+run pregel("examples/pregelix-example-0.2.14-SNAPSHOT-jar-with-dependencies.jar"
+ "edu.uci.ics.pregelix.example.ShortestPathsVertex"
+ "-ip 10.0.2.15 -port 3199 -source-vertex 0")
+from dataset InputGraph
+to dataset ResultGraph;
+
+for $n in dataset ResultGraph return $n;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/graph/pregel-q01/pregel-q01.1.adm b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q01/pregel-q01.1.adm
new file mode 100644
index 0000000..4b6f605
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q01/pregel-q01.1.adm
@@ -0,0 +1,13 @@
+[ { "tweetid": 1i64, "rank-value": 0.07349647505081802d, "populated-by": {{ { "tweetid": 2i64, "value": 0.0f }, { "tweetid": 3i64, "value": 0.0f } }} }
+, { "tweetid": 2i64, "rank-value": 0.10497306443795257d, "populated-by": {{ { "tweetid": 7i64, "value": 0.0f }, { "tweetid": 1i64, "value": 0.0f } }} }
+, { "tweetid": 3i64, "rank-value": 0.09458886502271134d, "populated-by": {{ { "tweetid": 8i64, "value": 0.0f }, { "tweetid": 6i64, "value": 0.0f } }} }
+, { "tweetid": 4i64, "rank-value": 0.04783722202788025d, "populated-by": {{ { "tweetid": 1i64, "value": 0.0f }, { "tweetid": 3i64, "value": 0.0f } }} }
+, { "tweetid": 5i64, "rank-value": 0.051587222027880256d, "populated-by": {{ { "tweetid": 8i64, "value": 0.0f }, { "tweetid": 6i64, "value": 0.0f } }} }
+, { "tweetid": 6i64, "rank-value": 0.0822549135142923d, "populated-by": {{ { "tweetid": 4i64, "value": 0.0f }, { "tweetid": 5i64, "value": 0.0f } }} }
+, { "tweetid": 7i64, "rank-value": 0.14484555969829038d, "populated-by": {{ { "tweetid": 2i64, "value": 0.0f }, { "tweetid": 7i64, "value": 0.0f } }} }
+, { "tweetid": 8i64, "rank-value": 0.0710049135142923d, "populated-by": {{ { "tweetid": 3i64, "value": 0.0f }, { "tweetid": 7i64, "value": 0.0f } }} }
+, { "tweetid": 9i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 6i64, "value": 0.0f }, { "tweetid": 2i64, "value": 0.0f } }} }
+, { "tweetid": 10i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 4i64, "value": 0.0f }, { "tweetid": 5i64, "value": 0.0f } }} }
+, { "tweetid": 11i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 6i64, "value": 0.0f }, { "tweetid": 3i64, "value": 0.0f } }} }
+, { "tweetid": 12i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 6i64, "value": 0.0f }, { "tweetid": 5i64, "value": 0.0f } }} }
+]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/graph/pregel-q02/pregel-q02.1.adm b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q02/pregel-q02.1.adm
new file mode 100644
index 0000000..82091fb
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q02/pregel-q02.1.adm
@@ -0,0 +1,21 @@
+[ { "id": 0i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f } }} }
+, { "id": 1i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f } }} }
+, { "id": 2i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f } }} }
+, { "id": 3i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f } }} }
+, { "id": 4i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f } }} }
+, { "id": 5i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f } }} }
+, { "id": 6i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f } }} }
+, { "id": 7i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f }, { "destVertexId": 8i64, "value": 0.0f } }} }
+, { "id": 8i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f }, { "destVertexId": 8i64, "value": 0.0f }, { "destVertexId": 9i64, "value": 0.0f } }} }
+, { "id": 9i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f }, { "destVertexId": 8i64, "value": 0.0f }, { "destVertexId": 9i64, "value": 0.0f }, { "destVertexId": 0i64, "value": 0.0f } }} }
+, { "id": 10i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f } }} }
+, { "id": 11i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f } }} }
+, { "id": 12i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f } }} }
+, { "id": 13i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f } }} }
+, { "id": 14i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f } }} }
+, { "id": 15i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f } }} }
+, { "id": 16i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f } }} }
+, { "id": 17i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f }, { "destVertexId": 18i64, "value": 0.0f } }} }
+, { "id": 18i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f }, { "destVertexId": 18i64, "value": 0.0f }, { "destVertexId": 19i64, "value": 0.0f } }} }
+, { "id": 19i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f }, { "destVertexId": 18i64, "value": 0.0f }, { "destVertexId": 19i64, "value": 0.0f }, { "destVertexId": 10i64, "value": 0.0f } }} }
+]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/graph/pregel-q03/pregel-q03.1.adm b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q03/pregel-q03.1.adm
new file mode 100644
index 0000000..38eeba3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q03/pregel-q03.1.adm
@@ -0,0 +1,11 @@
+[ { "id": 0i64, "value": 0.0d, "edges": {{ { "destVertexId": 1i64, "cost": 1.0f } }} }
+, { "id": 1i64, "value": 1.0d, "edges": {{ { "destVertexId": 3i64, "cost": 4.0f }, { "destVertexId": 2i64, "cost": 3.0f } }} }
+, { "id": 2i64, "value": 4.0d, "edges": {{ { "destVertexId": 4i64, "cost": 5.0f }, { "destVertexId": 5i64, "cost": 23.0f } }} }
+, { "id": 3i64, "value": 5.0d, "edges": {{ { "destVertexId": 2i64, "cost": 1.0f }, { "destVertexId": 8i64, "cost": 13.0f } }} }
+, { "id": 4i64, "value": 9.0d, "edges": {{ { "destVertexId": 1i64, "cost": 5.0f }, { "destVertexId": 2i64, "cost": 8.0f }, { "destVertexId": 3i64, "cost": 23.0f }, { "destVertexId": 4i64, "cost": 12.0f } }} }
+, { "id": 5i64, "value": 27.0d, "edges": {{ { "destVertexId": 6i64, "cost": 12.0f }, { "destVertexId": 7i64, "cost": 17.0f } }} }
+, { "id": 6i64, "value": 39.0d, "edges": {{ { "destVertexId": 1i64, "cost": 12.0f }, { "destVertexId": 2i64, "cost": 1.0f } }} }
+, { "id": 7i64, "value": 44.0d, "edges": {{ { "destVertexId": 9i64, "cost": 100.0f } }} }
+, { "id": 8i64, "value": 18.0d, "edges": {{ { "destVertexId": 4i64, "cost": 11.0f } }} }
+, { "id": 9i64, "value": 144.0d, "edges": {{ { "destVertexId": 1i64, "cost": 16.0f }, { "destVertexId": 2i64, "cost": 9.0f } }} }
+]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index c3b097a..baf330c 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -2346,6 +2346,21 @@
</compilation-unit>
</test-case>
</test-group>
+ <!-- <test-group name="graph">
+ <test-case FilePath="graph">
+ <compilation-unit name="pregel-q01">
+ <output-dir compare="Text">pregel-q01</output-dir>
+ </compilation-unit>
+ <test-case FilePath="graph">
+ <compilation-unit name="pregel-q02">
+ <output-dir compare="Text">pregel-q02</output-dir>
+ </compilation-unit>
+ <test-case FilePath="graph">
+ <compilation-unit name="pregel-q03">
+ <output-dir compare="Text">pregel-q03</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group> -->
<test-group name="index-join">
<test-case FilePath="index-join">
<compilation-unit name="btree-primary-equi-join">
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
index 6bdca80..abac9df 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/base/Statement.java
@@ -44,7 +44,8 @@
CREATE_FUNCTION,
FUNCTION_DROP,
COMPACT,
- EXTERNAL_DATASET_REFRESH
+ EXTERNAL_DATASET_REFRESH,
+ RUN
}
public abstract Kind getKind();
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/RunStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/RunStatement.java
new file mode 100644
index 0000000..200a620
--- /dev/null
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/RunStatement.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aql.expression;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.asterix.aql.base.Statement;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlExpressionVisitor;
+import edu.uci.ics.asterix.aql.expression.visitor.IAqlVisitorWithVoidReturn;
+import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public class RunStatement implements Statement {
+
+ private String system;
+ private List<String> parameters;
+ private final Identifier dataverseNameFrom;
+ private final Identifier datasetNameFrom;
+ private final Identifier dataverseNameTo;
+ private final Identifier datasetNameTo;
+
+ public RunStatement(String system, List<String> parameters, Identifier dataverseNameFrom, Identifier datasetNameFrom, Identifier dataverseNameTo, Identifier datasetNameTo) {
+ this.system = system;
+ this.parameters = parameters;
+ this.datasetNameFrom = datasetNameFrom;
+ this.dataverseNameFrom = dataverseNameFrom;
+ this.datasetNameTo = datasetNameTo;
+ this.dataverseNameTo = dataverseNameTo;
+ }
+
+ public String getSystem() {
+ return system;
+ }
+
+ public List<String> getParameters() {
+ return parameters;
+ }
+
+ public Identifier getDataverseNameFrom() {
+ return dataverseNameFrom;
+ }
+
+ public Identifier getDatasetNameFrom() {
+ return datasetNameFrom;
+ }
+
+ public Identifier getDataverseNameTo() {
+ return dataverseNameTo;
+ }
+
+ public Identifier getDatasetNameTo() {
+ return datasetNameTo;
+ }
+
+ @Override
+ public <R, T> R accept(IAqlExpressionVisitor<R, T> visitor, T arg) throws AsterixException {
+ return null;
+ }
+
+ @Override
+ public <T> void accept(IAqlVisitorWithVoidReturn<T> visitor, T arg) throws AsterixException {
+ visitor.visit(this, arg);
+ }
+
+ @Override
+ public Kind getKind() {
+ return Kind.RUN;
+ }
+
+}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
index ad8c791..21a2388 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/AQLPrintVisitor.java
@@ -61,6 +61,7 @@
import edu.uci.ics.asterix.aql.expression.OrderbyClause;
import edu.uci.ics.asterix.aql.expression.OrderbyClause.OrderModifier;
import edu.uci.ics.asterix.aql.expression.OrderedListTypeDefinition;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
import edu.uci.ics.asterix.aql.expression.QuantifiedExpression;
import edu.uci.ics.asterix.aql.expression.QuantifiedPair;
import edu.uci.ics.asterix.aql.expression.Query;
@@ -319,7 +320,7 @@
public void visit(FieldAccessor fa, Integer step) throws AsterixException {
out.println(skip(step) + "FieldAccessor [");
fa.getExpr().accept(this, step + 1);
- out.println(skip(step + 1) + "Field=" + ((FieldAccessor) fa).getIdent().getValue());
+ out.println(skip(step + 1) + "Field=" + fa.getIdent().getValue());
out.println(skip(step) + "]");
}
@@ -557,4 +558,10 @@
}
+ @Override
+ public void visit(RunStatement stmt, Integer arg) throws AsterixException {
+ // TODO Auto-generated method stub
+
+ }
+
}
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
index ce0e121..f04c3a0 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/visitor/IAqlVisitorWithVoidReturn.java
@@ -15,8 +15,8 @@
package edu.uci.ics.asterix.aql.expression.visitor;
import edu.uci.ics.asterix.aql.expression.CallExpr;
-import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CompactStatement;
+import edu.uci.ics.asterix.aql.expression.ConnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateDataverseStatement;
import edu.uci.ics.asterix.aql.expression.CreateFeedStatement;
import edu.uci.ics.asterix.aql.expression.CreateFunctionStatement;
@@ -27,9 +27,9 @@
import edu.uci.ics.asterix.aql.expression.DeleteStatement;
import edu.uci.ics.asterix.aql.expression.DisconnectFeedStatement;
import edu.uci.ics.asterix.aql.expression.DistinctClause;
-import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
import edu.uci.ics.asterix.aql.expression.DropStatement;
import edu.uci.ics.asterix.aql.expression.FLWOGRExpression;
+import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
import edu.uci.ics.asterix.aql.expression.FieldAccessor;
import edu.uci.ics.asterix.aql.expression.ForClause;
import edu.uci.ics.asterix.aql.expression.FunctionDecl;
@@ -49,6 +49,7 @@
import edu.uci.ics.asterix.aql.expression.OperatorExpr;
import edu.uci.ics.asterix.aql.expression.OrderbyClause;
import edu.uci.ics.asterix.aql.expression.OrderedListTypeDefinition;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
import edu.uci.ics.asterix.aql.expression.QuantifiedExpression;
import edu.uci.ics.asterix.aql.expression.Query;
import edu.uci.ics.asterix.aql.expression.RecordConstructor;
@@ -170,4 +171,6 @@
void visit(FunctionDropStatement fds, T arg) throws AsterixException;
void visit(CompactStatement fds, T arg) throws AsterixException;
+
+ void visit(RunStatement stmt, T arg) throws AsterixException;
}
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 826a27b..2cd38c6 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -213,6 +213,7 @@
| stmt = CompactStatement()
| stmt = Query()
| stmt = RefreshExternalDatasetStatement()
+ | stmt = RunStatement()
)
{
return stmt;
@@ -398,6 +399,27 @@
}
}
+RunStatement RunStatement() throws ParseException:
+{
+ String system = null;
+ String tmp;
+ ArrayList<String> parameters = new ArrayList<String>();
+ Pair<Identifier,Identifier> nameComponentsFrom = null;
+ Pair<Identifier,Identifier> nameComponentsTo = null;
+}
+{
+ "run" system = Identifier()<LEFTPAREN> ( tmp = Identifier() [<COMMA>]
+ {
+ parameters.add(tmp);
+ }
+ )*<RIGHTPAREN>
+ <FROM> <DATASET> nameComponentsFrom = QualifiedName()
+ "to" <DATASET> nameComponentsTo = QualifiedName()
+ {
+ return new RunStatement(system, parameters, nameComponentsFrom.first, nameComponentsFrom.second, nameComponentsTo.first, nameComponentsTo.second);
+ }
+}
+
CreateIndexStatement IndexSpecification() throws ParseException:
{
CreateIndexStatement cis = new CreateIndexStatement();
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 379a7eb..14db1d2 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -56,7 +56,7 @@
private final long capacity;
private long used;
private final ILogManager logManager;
- private LogRecord logRecord;
+ private final LogRecord logRecord;
public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager) {
@@ -231,7 +231,7 @@
private void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
if (iInfo.isOpen) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+ ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
}
@@ -336,7 +336,7 @@
}
private static class IndexInfo extends Info {
- private ILSMIndex index;
+ private final ILSMIndex index;
public IndexInfo(ILSMIndex index) {
this.index = index;
@@ -357,11 +357,13 @@
this.isExternal = isExternal;
}
+ @Override
public void touch() {
super.touch();
lastAccess = System.currentTimeMillis();
}
+ @Override
public void untouch() {
super.untouch();
lastAccess = System.currentTimeMillis();
@@ -409,6 +411,7 @@
}
+ @Override
public String toString() {
return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount
+ ", lastAccess: " + lastAccess + "}";
@@ -426,6 +429,13 @@
}
}
+ public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
+ DatasetInfo datasetInfo = datasetInfos.get(datasetId);
+ if (datasetInfo != null) {
+ flushDatasetOpenIndexes(datasetInfo, asyncFlush);
+ }
+ }
+
public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
List<DatasetInfo> laggingDatasets = new ArrayList<DatasetInfo>();
@@ -433,7 +443,7 @@
//find dataset with min lsn < targetLSN
for (DatasetInfo dsInfo : datasetInfos.values()) {
for (IndexInfo iInfo : dsInfo.indexes.values()) {
- AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) iInfo.index)
+ AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.index
.getIOOperationCallback();
if (!((AbstractLSMIndex) iInfo.index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
firstLSN = ioCallback.getFirstLSN();
@@ -479,8 +489,8 @@
if (asyncFlush) {
for (IndexInfo iInfo : dsInfo.indexes.values()) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(
- NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
}
} else {
@@ -541,6 +551,7 @@
datasetInfos.clear();
}
+ @Override
public void dumpState(OutputStream outputStream) throws IOException {
StringBuilder sb = new StringBuilder();
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 5f15bf3..744c208 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -91,7 +91,6 @@
}
if (needsFlush) {
-
LogRecord logRecord = new LogRecord();
logRecord.formFlushLogRecord(datasetID, this);
try {
@@ -107,8 +106,8 @@
synchronized (this) {
for (ILSMIndex lsmIndex : datasetLifecycleManager.getDatasetIndexes(datasetID)) {
//get resource
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
- NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
//update resource lsn
AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
@@ -121,6 +120,7 @@
}
}
+ @Override
public void exclusiveJobCommitted() throws HyracksDataException {
numActiveOperations.set(0);
flushIfFull();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
index 02e40b2..436fe4d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/utils/MetadataLockManager.java
@@ -10,13 +10,13 @@
public class MetadataLockManager {
public static MetadataLockManager INSTANCE = new MetadataLockManager();
- private ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks;
- private ConcurrentHashMap<String, DatasetLock> datasetsLocks;
- private ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
- private ConcurrentHashMap<String, ReentrantReadWriteLock> nodeGroupsLocks;
- private ConcurrentHashMap<String, ReentrantReadWriteLock> feedsLocks;
- private ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
- private ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
+ private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks;
+ private final ConcurrentHashMap<String, DatasetLock> datasetsLocks;
+ private final ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
+ private final ConcurrentHashMap<String, ReentrantReadWriteLock> nodeGroupsLocks;
+ private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedsLocks;
+ private final ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
+ private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
private MetadataLockManager() {
dataversesLocks = new ConcurrentHashMap<String, ReentrantReadWriteLock>();
@@ -509,4 +509,25 @@
releaseExternalDatasetRefreshLock(datasetFullyQualifiedName);
releaseDataverseReadLock(dataverseName);
}
+
+ public void pregelixBegin(String dataverseName, String datasetFullyQualifiedNameFrom,
+ String datasetFullyQualifiedNameTo) {
+ acquireDataverseReadLock(dataverseName);
+
+ if (datasetFullyQualifiedNameFrom.compareTo(datasetFullyQualifiedNameTo) < 0) {
+ acquireDatasetReadLock(datasetFullyQualifiedNameFrom);
+ acquireDatasetWriteLock(datasetFullyQualifiedNameTo);
+ } else {
+ acquireDatasetWriteLock(datasetFullyQualifiedNameTo);
+ acquireDatasetReadLock(datasetFullyQualifiedNameFrom);
+ }
+ }
+
+ public void pregelixEnd(String dataverseName, String datasetFullyQualifiedNameFrom,
+ String datasetFullyQualifiedNameTo) {
+
+ releaseDatasetReadLock(datasetFullyQualifiedNameFrom);
+ releaseDatasetWriteLock(datasetFullyQualifiedNameTo);
+ releaseDataverseReadLock(dataverseName);
+ }
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
new file mode 100644
index 0000000..6009ff1
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILockManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class FlushDatasetOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private final JobId jobId;
+ private final DatasetId datasetId;
+
+ public FlushDatasetOperatorDescriptor(IOperatorDescriptorRegistry spec, JobId jobId, int datasetId) {
+ super(spec, 1, 0);
+ this.jobId = jobId;
+ this.datasetId = new DatasetId(datasetId);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ this.close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
+ DatasetLifecycleManager datasetLifeCycleManager = (DatasetLifecycleManager) runtimeCtx
+ .getIndexLifecycleManager();
+ ILockManager lockManager = runtimeCtx.getTransactionSubsystem().getLockManager();
+ ITransactionManager txnManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
+ // get the local transaction
+ ITransactionContext txnCtx = txnManager.getTransactionContext(jobId, false);
+ // lock the dataset granule
+ lockManager.lock(datasetId, -1, LockMode.S, txnCtx);
+ // flush the dataset synchronously
+ datasetLifeCycleManager.flushDataset(datasetId.getId(), false);
+ } catch (ACIDException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ };
+ }
+}