This change integrates a run statement for running Pregelix jobs into the AQL.
Therefore it also provides a new FlushOperator to flush all memory components of a dataset to disc.
Change-Id: I1f97cfdc79943abf035a7342bb777d59af6518e9
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/193
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java
index 23553d1..e4dd711 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java
@@ -30,7 +30,7 @@
}
protected List<Statement.Kind> getAllowedStatements() {
- Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.FUNCTION_DECL, Kind.QUERY, Kind.SET, Kind.WRITE };
+ Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.FUNCTION_DECL, Kind.QUERY, Kind.SET, Kind.WRITE, Kind.RUN };
return Arrays.asList(statementsArray);
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 25dc341..5fac0f6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -14,7 +14,9 @@
*/
package edu.uci.ics.asterix.aql.translator;
+import java.io.BufferedReader;
import java.io.File;
+import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.rmi.RemoteException;
import java.util.ArrayList;
@@ -54,6 +56,7 @@
import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
import edu.uci.ics.asterix.aql.expression.FunctionDecl;
import edu.uci.ics.asterix.aql.expression.FunctionDropStatement;
+import edu.uci.ics.asterix.aql.expression.IDatasetDetailsDecl;
import edu.uci.ics.asterix.aql.expression.Identifier;
import edu.uci.ics.asterix.aql.expression.IndexDropStatement;
import edu.uci.ics.asterix.aql.expression.InsertStatement;
@@ -61,13 +64,17 @@
import edu.uci.ics.asterix.aql.expression.LoadStatement;
import edu.uci.ics.asterix.aql.expression.NodeGroupDropStatement;
import edu.uci.ics.asterix.aql.expression.NodegroupDecl;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
import edu.uci.ics.asterix.aql.expression.Query;
import edu.uci.ics.asterix.aql.expression.RefreshExternalDatasetStatement;
import edu.uci.ics.asterix.aql.expression.SetStatement;
import edu.uci.ics.asterix.aql.expression.TypeDecl;
import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
+import edu.uci.ics.asterix.aql.expression.VarIdentifier;
+import edu.uci.ics.asterix.aql.expression.VariableExpr;
import edu.uci.ics.asterix.aql.expression.WriteStatement;
import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -115,7 +122,10 @@
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.result.ResultReader;
import edu.uci.ics.asterix.result.ResultUtils;
+import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
@@ -127,20 +137,30 @@
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import edu.uci.ics.asterix.translator.TypeTranslator;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
/*
@@ -168,7 +188,7 @@
private final SessionConfig sessionConfig;
private final OutputFormat pdf;
private Dataverse activeDefaultDataverse;
- private List<FunctionDecl> declaredFunctions;
+ private final List<FunctionDecl> declaredFunctions;
public AqlTranslator(List<Statement> aqlStatements, PrintWriter out, SessionConfig pc, APIFramework.OutputFormat pdf)
throws MetadataException, AsterixException {
@@ -191,7 +211,7 @@
/**
* Compiles and submits for execution a list of AQL statements.
- *
+ *
* @param hcc
* A Hyracks client connection that is used to submit a jobspec to Hyracks.
* @param hdc
@@ -201,8 +221,8 @@
* @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
* @throws Exception
*/
- public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc,
- ResultDelivery resultDelivery) throws Exception {
+ public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery)
+ throws Exception {
int resultSetIdCounter = 0;
FileSplit outputFile = null;
IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
@@ -334,6 +354,11 @@
outputFile = result.second;
break;
}
+
+ case RUN: {
+ handleRunStatement(metadataProvider, stmt, hcc);
+ break;
+ }
}
}
}
@@ -506,8 +531,7 @@
compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
}
} else {
- validateCompactionPolicy(compactionPolicy,
- compactionPolicyProperties, mdTxnCtx, false);
+ validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false);
}
if (filterField != null) {
aRecordType.validateFilterField(filterField);
@@ -2346,6 +2370,232 @@
MetadataLockManager.INSTANCE.refreshDatasetEnd(dataverseName, dataverseName + "." + datasetName);
}
}
+
+ private void handleRunStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc) throws AsterixException, Exception {
+ RunStatement runStmt = (RunStatement) stmt;
+ switch(runStmt.getSystem()) {
+ case "pregel":
+ case "pregelix":
+ handlePregelixStatement(metadataProvider, runStmt, hcc);
+ break;
+ default:
+ throw new AlgebricksException("The system \""+runStmt.getSystem()+"\" specified in your run statement is not supported.");
+ }
+
+ }
+
+ private void handlePregelixStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+ IHyracksClientConnection hcc) throws AsterixException, Exception {
+
+ RunStatement pregelixStmt = (RunStatement) stmt;
+ boolean bActiveTxn = true;
+
+ String dataverseNameFrom = getActiveDataverseName(pregelixStmt.getDataverseNameFrom());
+ String dataverseNameTo = getActiveDataverseName(pregelixStmt.getDataverseNameTo());
+ String datasetNameFrom = pregelixStmt.getDatasetNameFrom().getValue();
+ String datasetNameTo = pregelixStmt.getDatasetNameTo().getValue();
+
+ if(dataverseNameFrom != dataverseNameTo) {
+ throw new AlgebricksException("Pregelix statements across different dataverses are not supported.");
+ }
+
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+ MetadataLockManager.INSTANCE.pregelixBegin(dataverseNameFrom, datasetNameFrom, datasetNameTo);
+
+ try {
+
+ // construct input paths
+ Index fromIndex = null;
+ List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameFrom, pregelixStmt
+ .getDatasetNameFrom().getValue());
+ for (Index ind : indexes) {
+ if (ind.isPrimaryIndex())
+ fromIndex = ind;
+ }
+
+ if (fromIndex == null) {
+ throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameFrom);
+ }
+
+ IFileSplitProvider fromSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
+ dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName()).first;
+ StringBuilder fromSplitsPaths = new StringBuilder();
+
+ for (FileSplit f : fromSplits.getFileSplits()) {
+ fromSplitsPaths.append("asterix://" + f.getNodeName() + f.getLocalFile().getFile().getAbsolutePath());
+ fromSplitsPaths.append(",");
+ }
+ fromSplitsPaths.setLength(fromSplitsPaths.length() - 1);
+
+ // Construct output paths
+ Index toIndex = null;
+ indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo, pregelixStmt
+ .getDatasetNameTo().getValue());
+ for (Index ind : indexes) {
+ if (ind.isPrimaryIndex())
+ toIndex = ind;
+ }
+
+ if (toIndex == null) {
+ throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo);
+ }
+
+ IFileSplitProvider toSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
+ dataverseNameTo, datasetNameTo, toIndex.getIndexName()).first;
+ StringBuilder toSplitsPaths = new StringBuilder();
+
+ for (FileSplit f : toSplits.getFileSplits()) {
+ toSplitsPaths.append("asterix://" + f.getNodeName() + f.getLocalFile().getFile().getAbsolutePath());
+ toSplitsPaths.append(",");
+ }
+ toSplitsPaths.setLength(toSplitsPaths.length() - 1);
+
+ try {
+ Dataset toDataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameTo, datasetNameTo);
+ DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo),
+ pregelixStmt.getDatasetNameTo(), true);
+ this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+
+ IDatasetDetailsDecl idd = new InternalDetailsDecl(new Identifier(toDataset.getDatasetDetails()
+ .getNodeGroupName()), toIndex.getKeyFieldNames(), false, toDataset.getDatasetDetails()
+ .getCompactionPolicy(), toDataset.getDatasetDetails().getCompactionPolicyProperties(), null);
+ DatasetDecl createToDataset = new DatasetDecl(new Identifier(dataverseNameTo),
+ pregelixStmt.getDatasetNameTo(), new Identifier(toDataset.getItemTypeName()),
+ toDataset.getHints(), toDataset.getDatasetType(), idd, false);
+ this.handleCreateDatasetStatement(metadataProvider, createToDataset, hcc);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Error cleaning the result dataset. This should not happen.");
+ }
+
+ // Flush source dataset
+ flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName());
+
+ // call Pregelix
+ String pregelix_home = System.getenv("PREGELIX_HOME");
+ if (pregelix_home == null) {
+ throw new AlgebricksException("PREGELIX_HOME is not defined!");
+ }
+
+ // construct command
+ ArrayList<String> cmd = new ArrayList<String>();
+ cmd.add("bin/pregelix");
+ cmd.add(pregelixStmt.getParameters().get(0)); // jar
+ cmd.add(pregelixStmt.getParameters().get(1)); // class
+ for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
+ cmd.add(s);
+ }
+ cmd.add("-inputpaths");
+ cmd.add(fromSplitsPaths.toString());
+ cmd.add("-outputpath");
+ cmd.add(toSplitsPaths.toString());
+
+ StringBuilder command = new StringBuilder();
+ for (String s : cmd) {
+ command.append(s);
+ command.append(" ");
+ }
+ LOGGER.info("Running Pregelix Command: " + command.toString());
+
+ ProcessBuilder pb = new ProcessBuilder(cmd);
+ pb.directory(new File(pregelix_home));
+ pb.redirectErrorStream(true);
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
+
+ Process pr = pb.start();
+
+ int resultState = 0;
+
+ BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()));
+ String line;
+ while ((line = in.readLine()) != null) {
+ System.out.println(line);
+ if (line.contains("job finished")) {
+ resultState = 1;
+ }
+ if (line.contains("Exception") || line.contains("Error")) {
+
+ if (line.contains("Connection refused")) {
+ throw new AlgebricksException(
+ "The connection to your Pregelix cluster was refused. Is it running? Is the port in the query correct?");
+ }
+
+ if (line.contains("Could not find or load main class")) {
+ throw new AlgebricksException(
+ "The main class of your Pregelix query was not found. Is the path to your .jar file correct?");
+ }
+
+ if (line.contains("ClassNotFoundException")) {
+ throw new AlgebricksException(
+ "The vertex class of your Pregelix query was not found. Does it exist? Is the spelling correct?");
+ }
+
+ if (line.contains("HyracksException")) {
+ throw new AlgebricksException(
+ "Something went wrong executing your Pregelix Job (HyracksException). Check the configuration of STORAGE_BUFFERCACHE_PAGESIZE and STORAGE_MEMORYCOMPONENT_PAGESIZE."
+ + "It must match the one of Asterix. You can use managix describe -admin to find out the right configuration. "
+ + "Check also if your datatypes in Pregelix and Asterix are matching.");
+ }
+
+ throw new AlgebricksException(
+ "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restartet. "
+ + "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
+ + "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
+ }
+ }
+ pr.waitFor();
+ in.close();
+
+ if (resultState != 1) {
+ throw new AlgebricksException(
+ "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restartet. "
+ + "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
+ + "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
+ }
+ } catch (Exception e) {
+ if (bActiveTxn) {
+ abort(e, e, mdTxnCtx);
+ }
+ throw e;
+ } finally {
+ MetadataLockManager.INSTANCE
+ .pregelixEnd(dataverseNameFrom, datasetNameFrom, datasetNameTo);
+ }
+ }
+
+ private void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
+ MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
+ throws Exception {
+ AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+ int frameSize = compilerProperties.getFrameSize();
+ JobSpecification spec = new JobSpecification(frameSize);
+
+ RecordDescriptor[] rDescs = new RecordDescriptor[] { new RecordDescriptor(new ISerializerDeserializer[] {}) };
+ AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
+ new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
+
+ edu.uci.ics.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId();
+ FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId,
+ MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName).getDatasetId());
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName);
+ AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
+
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
+ primaryPartitionConstraint);
+
+ IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, false);
+ spec.setJobletEventListenerFactory(jobEventListenerFactory);
+ runJob(hcc, spec, true);
+ }
private JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
throws Exception {
@@ -2353,8 +2603,8 @@
return jobIds[0];
}
- public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out,
- boolean waitForCompletion) throws Exception {
+ public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, boolean waitForCompletion)
+ throws Exception {
JobId[] startedJobIds = new JobId[jobs.length];
for (int i = 0; i < jobs.length; i++) {
JobSpecification spec = jobs[i].getJobSpec();
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.1.ddl.aql
new file mode 100644
index 0000000..8d47100f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.1.ddl.aql
@@ -0,0 +1,47 @@
+drop dataverse Pregelix if exists;
+create dataverse Pregelix
+use dataverse Pregelix
+
+create type TwitterUserType as open {
+ screen-name: string,
+ lang: string,
+ friends_count: int32,
+ statuses_count: int32,
+ name: string,
+ followers_count: int32
+}
+
+create type TweetMessageType as open {
+ tweetid: int64,
+ user: TwitterUserType,
+ sender-location: point?,
+ send-time: datetime,
+ referred-topics: {{ string }},
+ message-text: string,
+ retweeted-from: int64,
+ forwarded-from: int64
+}
+
+create dataset TwitterMsgs(TweetMessageType)
+ primary key tweetid;
+
+create dataset TwitterUsers(TwitterUserType)
+ primary key screen-name;
+
+ create type TMEdge as open {
+ tweetid: int64,
+ value: float?
+ }
+
+ create type TMGraph as open {
+ tweetid: int64,
+ rank-value: double?,
+ populated-by: {{TMEdge}}
+ }
+
+
+create dataset MyInputGraph(TMGraph)
+ primary key tweetid;
+
+create dataset MyOutputGraph(TMGraph)
+ primary key tweetid;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.2.update.aql
new file mode 100644
index 0000000..5c8272f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.2.update.aql
@@ -0,0 +1,34 @@
+use dataverse Pregelix;
+
+insert into dataset TwitterUsers (
+{{
+{"screen-name":"NathanGiesen@211","lang":"en","friends_count":18,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},
+{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},
+{"screen-name":"NilaMilliron_tw","lang":"en","friends_count":445,"statuses_count":164,"name":"Nila Milliron","followers_count":22649},
+{"screen-name":"ChangEwing_573","lang":"en","friends_count":182,"statuses_count":394,"name":"Chang Ewing","followers_count":32136}
+}})
+
+insert into dataset TwitterMsgs (
+{{
+ {"tweetid":1,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("47.44,80.65"),"send-time":datetime("2008-04-26T10:10:00"),"referred-topics":{{"t-mobile","customization"}},"message-text":" love t-mobile its customization is good:)", "retweeted-from":2, "forwarded-from":3},
+ {"tweetid":2,"user":{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},"sender-location":point("32.84,67.14"),"send-time":datetime("2010-05-13T10:10:00"),"referred-topics":{{"verizon","shortcut-menu"}},"message-text":" like verizon its shortcut-menu is awesome:)", "retweeted-from":7, "forwarded-from":1},
+ {"tweetid":3,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("29.72,75.8"),"send-time":datetime("2006-11-04T10:10:00"),"referred-topics":{{"motorola","speed"}},"message-text":" like motorola the speed is good:)", "retweeted-from":8, "forwarded-from":6},
+ {"tweetid":4,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("39.28,70.48"),"send-time":datetime("2011-12-26T10:10:00"),"referred-topics":{{"sprint","voice-command"}},"message-text":" like sprint the voice-command is mind-blowing:)", "retweeted-from":1, "forwarded-from":3},
+ {"tweetid":5,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("40.09,92.69"),"send-time":datetime("2006-08-04T10:10:00"),"referred-topics":{{"motorola","speed"}},"message-text":" can't stand motorola its speed is terrible:(", "retweeted-from":8, "forwarded-from":6},
+ {"tweetid":6,"user":{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},"sender-location":point("47.51,83.99"),"send-time":datetime("2010-05-07T10:10:00"),"referred-topics":{{"iphone","voice-clarity"}},"message-text":" like iphone the voice-clarity is good:)", "retweeted-from":4, "forwarded-from":5},
+ {"tweetid":7,"user":{"screen-name":"ChangEwing_573","lang":"en","friends_count":182,"statuses_count":394,"name":"Chang Ewing","followers_count":32136},"sender-location":point("36.21,72.6"),"send-time":datetime("2011-08-25T10:10:00"),"referred-topics":{{"samsung","platform"}},"message-text":" like samsung the platform is good", "retweeted-from":2, "forwarded-from":7},
+ {"tweetid":8,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("46.05,93.34"),"send-time":datetime("2005-10-14T10:10:00"),"referred-topics":{{"t-mobile","shortcut-menu"}},"message-text":" like t-mobile the shortcut-menu is awesome:)", "retweeted-from":3, "forwarded-from":7},
+ {"tweetid":9,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("36.86,74.62"),"send-time":datetime("2012-07-21T10:10:00"),"referred-topics":{{"verizon","voicemail-service"}},"message-text":" love verizon its voicemail-service is awesome", "retweeted-from":6, "forwarded-from":2},
+ {"tweetid":10,"user":{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},"sender-location":point("29.15,76.53"),"send-time":datetime("2008-01-26T10:10:00"),"referred-topics":{{"verizon","voice-clarity"}},"message-text":" hate verizon its voice-clarity is OMG:(", "retweeted-from":4, "forwarded-from":5},
+ {"tweetid":11,"user":{"screen-name":"NilaMilliron_tw","lang":"en","friends_count":445,"statuses_count":164,"name":"Nila Milliron","followers_count":22649},"sender-location":point("37.59,68.42"),"send-time":datetime("2008-03-09T10:10:00"),"referred-topics":{{"iphone","platform"}},"message-text":" can't stand iphone its platform is terrible", "retweeted-from":6, "forwarded-from":3},
+ {"tweetid":12,"user":{"screen-name":"OliJackson_512","lang":"en","friends_count":445,"statuses_count":164,"name":"Oli Jackson","followers_count":22649},"sender-location":point("24.82,94.63"),"send-time":datetime("2010-02-13T10:10:00"),"referred-topics":{{"samsung","voice-command"}},"message-text":" like samsung the voice-command is amazing:)", "retweeted-from":6, "forwarded-from":5}
+}})
+
+insert into dataset MyInputGraph
+for $tm in dataset TwitterMsgs
+let $links:={{ {"tweetid":$tm.retweeted-from}, {"tweetid":$tm.forwarded-from}}}
+return {
+ "tweetid": $tm.tweetid,
+ "rank-value": 0.0,
+ "populated-by": $links
+}
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.3.query.aql
new file mode 100644
index 0000000..78fb02e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.3.query.aql
@@ -0,0 +1,9 @@
+use dataverse Pregelix;
+
+run pregel("examples/pregelix-example-0.2.14-SNAPSHOT-jar-with-dependencies.jar"
+ "edu.uci.ics.pregelix.example.PageRankVertex"
+ "-ip 10.0.2.15 -port 3199 -vnum 17")
+from dataset MyInputGraph
+to dataset MyOutputGraph;
+
+for $n in dataset MyOutputGraph return $n;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.1.ddl.aql
new file mode 100644
index 0000000..1439cc3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.1.ddl.aql
@@ -0,0 +1,16 @@
+drop dataverse Pregelix if exists;
+create dataverse Pregelix
+use dataverse Pregelix
+
+create type EdgeType as open {
+ destVertexId: int64,
+ value: float?
+}
+create type NodeType as open {
+ id: int64,
+ value: int64?,
+ edges: {{EdgeType}}
+}
+
+create dataset InputGraph(NodeType) primary key id;
+create dataset ResultGraph(NodeType) primary key id;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.2.update.aql
new file mode 100644
index 0000000..5537960
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.2.update.aql
@@ -0,0 +1,27 @@
+use dataverse Pregelix;
+
+insert into dataset InputGraph (
+{{
+ {"id":0,"value":0,"edges":{{ {"destVertexId":1}}}},
+{"id":1,"value":1,"edges":{{ {"destVertexId":1},{"destVertexId":2}}}},
+{"id":2,"value":2,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3}}}},
+{"id":3,"value":3,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4}}}},
+{"id":4,"value":4,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5}}}},
+{"id":5,"value":5,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6}}}},
+{"id":6,"value":6,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7}}}},
+{"id":7,"value":7,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7},{"destVertexId":8}}}},
+{"id":8,"value":8,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7},{"destVertexId":8},{"destVertexId":9}}}},
+{"id":9,"value":9,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7},{"destVertexId":8},{"destVertexId":9},{"destVertexId":0}}}},
+{"id":10,"value":10,"edges":{{ {"destVertexId":11}}}},
+{"id":11,"value":11,"edges":{{ {"destVertexId":11},{"destVertexId":12}}}},
+{"id":12,"value":12,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13}}}},
+{"id":13,"value":13,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14}}}},
+{"id":14,"value":14,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15}}}},
+{"id":15,"value":15,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16}}}},
+{"id":16,"value":16,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17}}}},
+{"id":17,"value":17,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17},{"destVertexId":18}}}},
+{"id":18,"value":18,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17},{"destVertexId":18},{"destVertexId":19}}}},
+{"id":19,"value":19,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17},{"destVertexId":18},{"destVertexId":19}, {"destVertexId":10}}}}
+}}
+
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.3.query.aql
new file mode 100644
index 0000000..77c5d7e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.3.query.aql
@@ -0,0 +1,9 @@
+use dataverse Pregelix;
+
+run pregel("examples/pregelix-example-0.2.14-SNAPSHOT-jar-with-dependencies.jar"
+ "edu.uci.ics.pregelix.example.ConnectedComponentsVertex"
+ "-ip 10.0.2.15 -port 3199")
+from dataset InputGraph
+to dataset ResultGraph;
+
+for $n in dataset ResultGraph return $n;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.1.ddl.aql
new file mode 100644
index 0000000..9642ce3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.1.ddl.aql
@@ -0,0 +1,16 @@
+drop dataverse Pregelix if exists;
+create dataverse Pregelix
+use dataverse Pregelix
+
+create type EdgeType as open {
+ destVertexId: int64,
+ cost: float?
+}
+create type NodeType as open {
+ id: int64,
+ value: double?,
+ edges: {{EdgeType}}
+}
+
+create dataset InputGraph(NodeType) primary key id;
+create dataset ResultGraph(NodeType) primary key id;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.2.update.aql
new file mode 100644
index 0000000..f328614
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.2.update.aql
@@ -0,0 +1,16 @@
+use dataverse Pregelix;
+
+insert into dataset InputGraph (
+{{
+{"id":0, "edges":{{ {"destVertexId":1, "cost":1.0f}}}},
+{"id":1, "edges":{{ {"destVertexId":3, "cost":4.0f},{"destVertexId":2, "cost":3.0f}}}},
+{"id":2, "edges":{{ {"destVertexId":4, "cost":5.0f},{"destVertexId":5, "cost":23.0f}}}},
+{"id":3, "edges":{{ {"destVertexId":2, "cost":1.0f},{"destVertexId":8, "cost":13.0f}}}},
+{"id":4, "edges":{{ {"destVertexId":1, "cost":5.0f},{"destVertexId":2, "cost":8.0f},{"destVertexId":3, "cost":23.0f},{"destVertexId":4, "cost":12.0f}}}},
+{"id":5, "edges":{{ {"destVertexId":6, "cost":12.0f},{"destVertexId":7, "cost":17.0f}}}},
+{"id":6, "edges":{{ {"destVertexId":1, "cost":12.0f},{"destVertexId":2, "cost":1.0f}}}},
+{"id":7, "edges":{{ {"destVertexId":9, "cost":100.0f}}}},
+{"id":8, "edges":{{ {"destVertexId":4, "cost":11.0f}}}},
+{"id":9, "edges":{{ {"destVertexId":1, "cost":16.0f},{"destVertexId":2, "cost":9.0f}}}}
+}}
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.3.query.aql
new file mode 100644
index 0000000..0bb4229
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.3.query.aql
@@ -0,0 +1,9 @@
+use dataverse Pregelix;
+
+run pregel("examples/pregelix-example-0.2.14-SNAPSHOT-jar-with-dependencies.jar"
+ "edu.uci.ics.pregelix.example.ShortestPathsVertex"
+ "-ip 10.0.2.15 -port 3199 -source-vertex 0")
+from dataset InputGraph
+to dataset ResultGraph;
+
+for $n in dataset ResultGraph return $n;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/graph/pregel-q01/pregel-q01.1.adm b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q01/pregel-q01.1.adm
new file mode 100644
index 0000000..4b6f605
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q01/pregel-q01.1.adm
@@ -0,0 +1,13 @@
+[ { "tweetid": 1i64, "rank-value": 0.07349647505081802d, "populated-by": {{ { "tweetid": 2i64, "value": 0.0f }, { "tweetid": 3i64, "value": 0.0f } }} }
+, { "tweetid": 2i64, "rank-value": 0.10497306443795257d, "populated-by": {{ { "tweetid": 7i64, "value": 0.0f }, { "tweetid": 1i64, "value": 0.0f } }} }
+, { "tweetid": 3i64, "rank-value": 0.09458886502271134d, "populated-by": {{ { "tweetid": 8i64, "value": 0.0f }, { "tweetid": 6i64, "value": 0.0f } }} }
+, { "tweetid": 4i64, "rank-value": 0.04783722202788025d, "populated-by": {{ { "tweetid": 1i64, "value": 0.0f }, { "tweetid": 3i64, "value": 0.0f } }} }
+, { "tweetid": 5i64, "rank-value": 0.051587222027880256d, "populated-by": {{ { "tweetid": 8i64, "value": 0.0f }, { "tweetid": 6i64, "value": 0.0f } }} }
+, { "tweetid": 6i64, "rank-value": 0.0822549135142923d, "populated-by": {{ { "tweetid": 4i64, "value": 0.0f }, { "tweetid": 5i64, "value": 0.0f } }} }
+, { "tweetid": 7i64, "rank-value": 0.14484555969829038d, "populated-by": {{ { "tweetid": 2i64, "value": 0.0f }, { "tweetid": 7i64, "value": 0.0f } }} }
+, { "tweetid": 8i64, "rank-value": 0.0710049135142923d, "populated-by": {{ { "tweetid": 3i64, "value": 0.0f }, { "tweetid": 7i64, "value": 0.0f } }} }
+, { "tweetid": 9i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 6i64, "value": 0.0f }, { "tweetid": 2i64, "value": 0.0f } }} }
+, { "tweetid": 10i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 4i64, "value": 0.0f }, { "tweetid": 5i64, "value": 0.0f } }} }
+, { "tweetid": 11i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 6i64, "value": 0.0f }, { "tweetid": 3i64, "value": 0.0f } }} }
+, { "tweetid": 12i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 6i64, "value": 0.0f }, { "tweetid": 5i64, "value": 0.0f } }} }
+]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/graph/pregel-q02/pregel-q02.1.adm b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q02/pregel-q02.1.adm
new file mode 100644
index 0000000..82091fb
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q02/pregel-q02.1.adm
@@ -0,0 +1,21 @@
+[ { "id": 0i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f } }} }
+, { "id": 1i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f } }} }
+, { "id": 2i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f } }} }
+, { "id": 3i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f } }} }
+, { "id": 4i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f } }} }
+, { "id": 5i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f } }} }
+, { "id": 6i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f } }} }
+, { "id": 7i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f }, { "destVertexId": 8i64, "value": 0.0f } }} }
+, { "id": 8i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f }, { "destVertexId": 8i64, "value": 0.0f }, { "destVertexId": 9i64, "value": 0.0f } }} }
+, { "id": 9i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f }, { "destVertexId": 8i64, "value": 0.0f }, { "destVertexId": 9i64, "value": 0.0f }, { "destVertexId": 0i64, "value": 0.0f } }} }
+, { "id": 10i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f } }} }
+, { "id": 11i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f } }} }
+, { "id": 12i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f } }} }
+, { "id": 13i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f } }} }
+, { "id": 14i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f } }} }
+, { "id": 15i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f } }} }
+, { "id": 16i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f } }} }
+, { "id": 17i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f }, { "destVertexId": 18i64, "value": 0.0f } }} }
+, { "id": 18i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f }, { "destVertexId": 18i64, "value": 0.0f }, { "destVertexId": 19i64, "value": 0.0f } }} }
+, { "id": 19i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f }, { "destVertexId": 18i64, "value": 0.0f }, { "destVertexId": 19i64, "value": 0.0f }, { "destVertexId": 10i64, "value": 0.0f } }} }
+]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/graph/pregel-q03/pregel-q03.1.adm b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q03/pregel-q03.1.adm
new file mode 100644
index 0000000..38eeba3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q03/pregel-q03.1.adm
@@ -0,0 +1,11 @@
+[ { "id": 0i64, "value": 0.0d, "edges": {{ { "destVertexId": 1i64, "cost": 1.0f } }} }
+, { "id": 1i64, "value": 1.0d, "edges": {{ { "destVertexId": 3i64, "cost": 4.0f }, { "destVertexId": 2i64, "cost": 3.0f } }} }
+, { "id": 2i64, "value": 4.0d, "edges": {{ { "destVertexId": 4i64, "cost": 5.0f }, { "destVertexId": 5i64, "cost": 23.0f } }} }
+, { "id": 3i64, "value": 5.0d, "edges": {{ { "destVertexId": 2i64, "cost": 1.0f }, { "destVertexId": 8i64, "cost": 13.0f } }} }
+, { "id": 4i64, "value": 9.0d, "edges": {{ { "destVertexId": 1i64, "cost": 5.0f }, { "destVertexId": 2i64, "cost": 8.0f }, { "destVertexId": 3i64, "cost": 23.0f }, { "destVertexId": 4i64, "cost": 12.0f } }} }
+, { "id": 5i64, "value": 27.0d, "edges": {{ { "destVertexId": 6i64, "cost": 12.0f }, { "destVertexId": 7i64, "cost": 17.0f } }} }
+, { "id": 6i64, "value": 39.0d, "edges": {{ { "destVertexId": 1i64, "cost": 12.0f }, { "destVertexId": 2i64, "cost": 1.0f } }} }
+, { "id": 7i64, "value": 44.0d, "edges": {{ { "destVertexId": 9i64, "cost": 100.0f } }} }
+, { "id": 8i64, "value": 18.0d, "edges": {{ { "destVertexId": 4i64, "cost": 11.0f } }} }
+, { "id": 9i64, "value": 144.0d, "edges": {{ { "destVertexId": 1i64, "cost": 16.0f }, { "destVertexId": 2i64, "cost": 9.0f } }} }
+]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index c3b097a..baf330c 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -2346,6 +2346,21 @@
</compilation-unit>
</test-case>
</test-group>
+ <!-- <test-group name="graph">
+ <test-case FilePath="graph">
+ <compilation-unit name="pregel-q01">
+ <output-dir compare="Text">pregel-q01</output-dir>
+ </compilation-unit>
+ <test-case FilePath="graph">
+ <compilation-unit name="pregel-q02">
+ <output-dir compare="Text">pregel-q02</output-dir>
+ </compilation-unit>
+ <test-case FilePath="graph">
+ <compilation-unit name="pregel-q03">
+ <output-dir compare="Text">pregel-q03</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group> -->
<test-group name="index-join">
<test-case FilePath="index-join">
<compilation-unit name="btree-primary-equi-join">