This change integrates a run statement for running Pregelix jobs into the AQL.

Therefore it also provides a new FlushOperator to flush all memory components of a dataset to disc.

Change-Id: I1f97cfdc79943abf035a7342bb777d59af6518e9
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/193
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java
index 23553d1..e4dd711 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/QueryAPIServlet.java
@@ -30,7 +30,7 @@
     }
 
     protected List<Statement.Kind> getAllowedStatements() {
-        Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.FUNCTION_DECL, Kind.QUERY, Kind.SET, Kind.WRITE };
+        Kind[] statementsArray = { Kind.DATAVERSE_DECL, Kind.FUNCTION_DECL, Kind.QUERY, Kind.SET, Kind.WRITE, Kind.RUN };
         return Arrays.asList(statementsArray);
     }
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 25dc341..5fac0f6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -14,7 +14,9 @@
  */
 package edu.uci.ics.asterix.aql.translator;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
@@ -54,6 +56,7 @@
 import edu.uci.ics.asterix.aql.expression.FeedDropStatement;
 import edu.uci.ics.asterix.aql.expression.FunctionDecl;
 import edu.uci.ics.asterix.aql.expression.FunctionDropStatement;
+import edu.uci.ics.asterix.aql.expression.IDatasetDetailsDecl;
 import edu.uci.ics.asterix.aql.expression.Identifier;
 import edu.uci.ics.asterix.aql.expression.IndexDropStatement;
 import edu.uci.ics.asterix.aql.expression.InsertStatement;
@@ -61,13 +64,17 @@
 import edu.uci.ics.asterix.aql.expression.LoadStatement;
 import edu.uci.ics.asterix.aql.expression.NodeGroupDropStatement;
 import edu.uci.ics.asterix.aql.expression.NodegroupDecl;
+import edu.uci.ics.asterix.aql.expression.RunStatement;
 import edu.uci.ics.asterix.aql.expression.Query;
 import edu.uci.ics.asterix.aql.expression.RefreshExternalDatasetStatement;
 import edu.uci.ics.asterix.aql.expression.SetStatement;
 import edu.uci.ics.asterix.aql.expression.TypeDecl;
 import edu.uci.ics.asterix.aql.expression.TypeDropStatement;
+import edu.uci.ics.asterix.aql.expression.VarIdentifier;
+import edu.uci.ics.asterix.aql.expression.VariableExpr;
 import edu.uci.ics.asterix.aql.expression.WriteStatement;
 import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
 import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
 import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
 import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -115,7 +122,10 @@
 import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
 import edu.uci.ics.asterix.result.ResultReader;
 import edu.uci.ics.asterix.result.ResultUtils;
+import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
 import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
 import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
@@ -127,20 +137,30 @@
 import edu.uci.ics.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
 import edu.uci.ics.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import edu.uci.ics.asterix.translator.TypeTranslator;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
 import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
 import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider;
 import edu.uci.ics.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDataset;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
 /*
@@ -168,7 +188,7 @@
     private final SessionConfig sessionConfig;
     private final OutputFormat pdf;
     private Dataverse activeDefaultDataverse;
-    private List<FunctionDecl> declaredFunctions;
+    private final List<FunctionDecl> declaredFunctions;
 
     public AqlTranslator(List<Statement> aqlStatements, PrintWriter out, SessionConfig pc, APIFramework.OutputFormat pdf)
             throws MetadataException, AsterixException {
@@ -191,7 +211,7 @@
 
     /**
      * Compiles and submits for execution a list of AQL statements.
-     *
+     * 
      * @param hcc
      *            A Hyracks client connection that is used to submit a jobspec to Hyracks.
      * @param hdc
@@ -201,8 +221,8 @@
      * @return A List<QueryResult> containing a QueryResult instance corresponding to each submitted query.
      * @throws Exception
      */
-    public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc,
-            ResultDelivery resultDelivery) throws Exception {
+    public void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery)
+            throws Exception {
         int resultSetIdCounter = 0;
         FileSplit outputFile = null;
         IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
@@ -334,6 +354,11 @@
                     outputFile = result.second;
                     break;
                 }
+
+                case RUN: {
+                    handleRunStatement(metadataProvider, stmt, hcc);
+                    break;
+                }
             }
         }
     }
@@ -506,8 +531,7 @@
                             compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
                         }
                     } else {
-                        validateCompactionPolicy(compactionPolicy,
-                                                 compactionPolicyProperties, mdTxnCtx, false);
+                        validateCompactionPolicy(compactionPolicy, compactionPolicyProperties, mdTxnCtx, false);
                     }
                     if (filterField != null) {
                         aRecordType.validateFilterField(filterField);
@@ -2346,6 +2370,232 @@
             MetadataLockManager.INSTANCE.refreshDatasetEnd(dataverseName, dataverseName + "." + datasetName);
         }
     }
+    
+    private void handleRunStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws AsterixException, Exception {
+        RunStatement runStmt = (RunStatement) stmt;
+        switch(runStmt.getSystem()) {
+            case "pregel":
+            case "pregelix":
+                handlePregelixStatement(metadataProvider, runStmt, hcc);
+                break;
+            default:
+                throw new AlgebricksException("The system \""+runStmt.getSystem()+"\" specified in your run statement is not supported.");
+        }
+        
+    }
+
+    private void handlePregelixStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc) throws AsterixException, Exception {
+
+        RunStatement pregelixStmt = (RunStatement) stmt;
+        boolean bActiveTxn = true;
+
+        String dataverseNameFrom = getActiveDataverseName(pregelixStmt.getDataverseNameFrom());
+        String dataverseNameTo = getActiveDataverseName(pregelixStmt.getDataverseNameTo());
+        String datasetNameFrom = pregelixStmt.getDatasetNameFrom().getValue();
+        String datasetNameTo = pregelixStmt.getDatasetNameTo().getValue();
+        
+        if(dataverseNameFrom != dataverseNameTo) {
+            throw new AlgebricksException("Pregelix statements across different dataverses are not supported.");
+        }
+
+        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
+        MetadataLockManager.INSTANCE.pregelixBegin(dataverseNameFrom, datasetNameFrom, datasetNameTo);
+
+        try {
+
+            // construct input paths
+            Index fromIndex = null;
+            List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameFrom, pregelixStmt
+                    .getDatasetNameFrom().getValue());
+            for (Index ind : indexes) {
+                if (ind.isPrimaryIndex())
+                    fromIndex = ind;
+            }
+
+            if (fromIndex == null) {
+                throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameFrom);
+            }
+
+            IFileSplitProvider fromSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
+                    dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName()).first;
+            StringBuilder fromSplitsPaths = new StringBuilder();
+
+            for (FileSplit f : fromSplits.getFileSplits()) {
+                fromSplitsPaths.append("asterix://" + f.getNodeName() + f.getLocalFile().getFile().getAbsolutePath());
+                fromSplitsPaths.append(",");
+            }
+            fromSplitsPaths.setLength(fromSplitsPaths.length() - 1);
+
+            // Construct output paths
+            Index toIndex = null;
+            indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo, pregelixStmt
+                    .getDatasetNameTo().getValue());
+            for (Index ind : indexes) {
+                if (ind.isPrimaryIndex())
+                    toIndex = ind;
+            }
+
+            if (toIndex == null) {
+                throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo);
+            }
+
+            IFileSplitProvider toSplits = metadataProvider.splitProviderAndPartitionConstraintsForDataset(
+                    dataverseNameTo, datasetNameTo, toIndex.getIndexName()).first;
+            StringBuilder toSplitsPaths = new StringBuilder();
+
+            for (FileSplit f : toSplits.getFileSplits()) {
+                toSplitsPaths.append("asterix://" + f.getNodeName() + f.getLocalFile().getFile().getAbsolutePath());
+                toSplitsPaths.append(",");
+            }
+            toSplitsPaths.setLength(toSplitsPaths.length() - 1);
+
+            try {
+                Dataset toDataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseNameTo, datasetNameTo);
+                DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo),
+                        pregelixStmt.getDatasetNameTo(), true);
+                this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
+
+                IDatasetDetailsDecl idd = new InternalDetailsDecl(new Identifier(toDataset.getDatasetDetails()
+                        .getNodeGroupName()), toIndex.getKeyFieldNames(), false, toDataset.getDatasetDetails()
+                        .getCompactionPolicy(), toDataset.getDatasetDetails().getCompactionPolicyProperties(), null);
+                DatasetDecl createToDataset = new DatasetDecl(new Identifier(dataverseNameTo),
+                        pregelixStmt.getDatasetNameTo(), new Identifier(toDataset.getItemTypeName()),
+                        toDataset.getHints(), toDataset.getDatasetType(), idd, false);
+                this.handleCreateDatasetStatement(metadataProvider, createToDataset, hcc);
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw new AlgebricksException("Error cleaning the result dataset. This should not happen.");
+            }
+
+            // Flush source dataset
+            flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom, fromIndex.getIndexName());
+
+            // call Pregelix
+            String pregelix_home = System.getenv("PREGELIX_HOME");
+            if (pregelix_home == null) {
+                throw new AlgebricksException("PREGELIX_HOME is not defined!");
+            }
+
+            // construct command
+            ArrayList<String> cmd = new ArrayList<String>();
+            cmd.add("bin/pregelix");
+            cmd.add(pregelixStmt.getParameters().get(0)); // jar
+            cmd.add(pregelixStmt.getParameters().get(1)); // class
+            for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
+                cmd.add(s);
+            }
+            cmd.add("-inputpaths");
+            cmd.add(fromSplitsPaths.toString());
+            cmd.add("-outputpath");
+            cmd.add(toSplitsPaths.toString());
+
+            StringBuilder command = new StringBuilder();
+            for (String s : cmd) {
+                command.append(s);
+                command.append(" ");
+            }
+            LOGGER.info("Running Pregelix Command: " + command.toString());
+
+            ProcessBuilder pb = new ProcessBuilder(cmd);
+            pb.directory(new File(pregelix_home));
+            pb.redirectErrorStream(true);
+
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+
+            Process pr = pb.start();
+
+            int resultState = 0;
+
+            BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()));
+            String line;
+            while ((line = in.readLine()) != null) {
+                System.out.println(line);
+                if (line.contains("job finished")) {
+                    resultState = 1;
+                }
+                if (line.contains("Exception") || line.contains("Error")) {
+
+                    if (line.contains("Connection refused")) {
+                        throw new AlgebricksException(
+                                "The connection to your Pregelix cluster was refused. Is it running? Is the port in the query correct?");
+                    }
+
+                    if (line.contains("Could not find or load main class")) {
+                        throw new AlgebricksException(
+                                "The main class of your Pregelix query was not found. Is the path to your .jar file correct?");
+                    }
+
+                    if (line.contains("ClassNotFoundException")) {
+                        throw new AlgebricksException(
+                                "The vertex class of your Pregelix query was not found. Does it exist? Is the spelling correct?");
+                    }
+
+                    if (line.contains("HyracksException")) {
+                        throw new AlgebricksException(
+                                "Something went wrong executing your Pregelix Job (HyracksException). Check the configuration of STORAGE_BUFFERCACHE_PAGESIZE and STORAGE_MEMORYCOMPONENT_PAGESIZE."
+                                        + "It must match the one of Asterix. You can use managix describe -admin to find out the right configuration. "
+                                        + "Check also if your datatypes in Pregelix and Asterix are matching.");
+                    }
+
+                    throw new AlgebricksException(
+                            "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restartet. "
+                                    + "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
+                                    + "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
+                }
+            }
+            pr.waitFor();
+            in.close();
+
+            if (resultState != 1) {
+                throw new AlgebricksException(
+                        "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restartet. "
+                                + "Check the following things: Are the datatypes of Asterix and Pregelix matching? "
+                                + "Is the server configuration correct (node names, buffer sizes, framesize)? Check the logfiles for more details.");
+            }
+        } catch (Exception e) {
+            if (bActiveTxn) {
+                abort(e, e, mdTxnCtx);
+            }
+            throw e;
+        } finally {
+            MetadataLockManager.INSTANCE
+                    .pregelixEnd(dataverseNameFrom, datasetNameFrom, datasetNameTo);
+        }
+    }
+
+    private void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
+            MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
+            throws Exception {
+        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+        int frameSize = compilerProperties.getFrameSize();
+        JobSpecification spec = new JobSpecification(frameSize);
+
+        RecordDescriptor[] rDescs = new RecordDescriptor[] { new RecordDescriptor(new ISerializerDeserializer[] {}) };
+        AlgebricksMetaOperatorDescriptor emptySource = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
+                new IPushRuntimeFactory[] { new EmptyTupleSourceRuntimeFactory() }, rDescs);
+
+        edu.uci.ics.asterix.common.transactions.JobId jobId = JobIdFactory.generateJobId();
+        FlushDatasetOperatorDescriptor flushOperator = new FlushDatasetOperatorDescriptor(spec, jobId,
+                MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName).getDatasetId());
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+                .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName);
+        AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
+
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
+                primaryPartitionConstraint);
+
+        IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, false);
+        spec.setJobletEventListenerFactory(jobEventListenerFactory);
+        runJob(hcc, spec, true);
+    }
 
     private JobId runJob(IHyracksClientConnection hcc, JobSpecification spec, boolean waitForCompletion)
             throws Exception {
@@ -2353,8 +2603,8 @@
         return jobIds[0];
     }
 
-    public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out,
-            boolean waitForCompletion) throws Exception {
+    public JobId[] executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out, boolean waitForCompletion)
+            throws Exception {
         JobId[] startedJobIds = new JobId[jobs.length];
         for (int i = 0; i < jobs.length; i++) {
             JobSpecification spec = jobs[i].getJobSpec();
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.1.ddl.aql
new file mode 100644
index 0000000..8d47100f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.1.ddl.aql
@@ -0,0 +1,47 @@
+drop dataverse Pregelix if exists;
+create dataverse Pregelix
+use dataverse Pregelix
+
+create type TwitterUserType as open {
+        screen-name: string,
+        lang: string,
+        friends_count: int32,
+        statuses_count: int32,
+        name: string,
+        followers_count: int32
+}
+
+create type TweetMessageType as open {
+       tweetid: int64,
+       user: TwitterUserType,
+       sender-location: point?,
+       send-time: datetime,
+       referred-topics: {{ string }},
+       message-text: string,
+       retweeted-from: int64, 
+       forwarded-from: int64
+}
+
+create dataset TwitterMsgs(TweetMessageType)
+    primary key tweetid;
+
+create dataset TwitterUsers(TwitterUserType)
+    primary key screen-name;
+
+    create type TMEdge as open {
+        tweetid: int64,
+        value: float?
+    }
+
+    create type TMGraph as open {
+        tweetid: int64,
+        rank-value: double?,
+        populated-by: {{TMEdge}}
+    }
+
+
+create dataset MyInputGraph(TMGraph)
+    primary key tweetid;
+
+create dataset MyOutputGraph(TMGraph)
+    primary key tweetid;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.2.update.aql
new file mode 100644
index 0000000..5c8272f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.2.update.aql
@@ -0,0 +1,34 @@
+use dataverse Pregelix;
+
+insert into dataset TwitterUsers (
+{{
+{"screen-name":"NathanGiesen@211","lang":"en","friends_count":18,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},
+{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},
+{"screen-name":"NilaMilliron_tw","lang":"en","friends_count":445,"statuses_count":164,"name":"Nila Milliron","followers_count":22649},
+{"screen-name":"ChangEwing_573","lang":"en","friends_count":182,"statuses_count":394,"name":"Chang Ewing","followers_count":32136}
+}})
+
+insert into dataset TwitterMsgs (
+{{
+    {"tweetid":1,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("47.44,80.65"),"send-time":datetime("2008-04-26T10:10:00"),"referred-topics":{{"t-mobile","customization"}},"message-text":" love t-mobile its customization is good:)", "retweeted-from":2, "forwarded-from":3},
+    {"tweetid":2,"user":{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},"sender-location":point("32.84,67.14"),"send-time":datetime("2010-05-13T10:10:00"),"referred-topics":{{"verizon","shortcut-menu"}},"message-text":" like verizon its shortcut-menu is awesome:)", "retweeted-from":7, "forwarded-from":1},
+    {"tweetid":3,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("29.72,75.8"),"send-time":datetime("2006-11-04T10:10:00"),"referred-topics":{{"motorola","speed"}},"message-text":" like motorola the speed is good:)", "retweeted-from":8, "forwarded-from":6},
+    {"tweetid":4,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("39.28,70.48"),"send-time":datetime("2011-12-26T10:10:00"),"referred-topics":{{"sprint","voice-command"}},"message-text":" like sprint the voice-command is mind-blowing:)", "retweeted-from":1, "forwarded-from":3},
+    {"tweetid":5,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("40.09,92.69"),"send-time":datetime("2006-08-04T10:10:00"),"referred-topics":{{"motorola","speed"}},"message-text":" can't stand motorola its speed is terrible:(", "retweeted-from":8, "forwarded-from":6},
+    {"tweetid":6,"user":{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},"sender-location":point("47.51,83.99"),"send-time":datetime("2010-05-07T10:10:00"),"referred-topics":{{"iphone","voice-clarity"}},"message-text":" like iphone the voice-clarity is good:)", "retweeted-from":4, "forwarded-from":5},
+    {"tweetid":7,"user":{"screen-name":"ChangEwing_573","lang":"en","friends_count":182,"statuses_count":394,"name":"Chang Ewing","followers_count":32136},"sender-location":point("36.21,72.6"),"send-time":datetime("2011-08-25T10:10:00"),"referred-topics":{{"samsung","platform"}},"message-text":" like samsung the platform is good", "retweeted-from":2, "forwarded-from":7},
+    {"tweetid":8,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("46.05,93.34"),"send-time":datetime("2005-10-14T10:10:00"),"referred-topics":{{"t-mobile","shortcut-menu"}},"message-text":" like t-mobile the shortcut-menu is awesome:)", "retweeted-from":3, "forwarded-from":7},
+    {"tweetid":9,"user":{"screen-name":"NathanGiesen@211","lang":"en","friends_count":39339,"statuses_count":473,"name":"Nathan Giesen","followers_count":49416},"sender-location":point("36.86,74.62"),"send-time":datetime("2012-07-21T10:10:00"),"referred-topics":{{"verizon","voicemail-service"}},"message-text":" love verizon its voicemail-service is awesome", "retweeted-from":6, "forwarded-from":2},
+    {"tweetid":10,"user":{"screen-name":"ColineGeyer@63","lang":"en","friends_count":121,"statuses_count":362,"name":"Coline Geyer","followers_count":17159},"sender-location":point("29.15,76.53"),"send-time":datetime("2008-01-26T10:10:00"),"referred-topics":{{"verizon","voice-clarity"}},"message-text":" hate verizon its voice-clarity is OMG:(", "retweeted-from":4, "forwarded-from":5},
+    {"tweetid":11,"user":{"screen-name":"NilaMilliron_tw","lang":"en","friends_count":445,"statuses_count":164,"name":"Nila Milliron","followers_count":22649},"sender-location":point("37.59,68.42"),"send-time":datetime("2008-03-09T10:10:00"),"referred-topics":{{"iphone","platform"}},"message-text":" can't stand iphone its platform is terrible", "retweeted-from":6, "forwarded-from":3},
+    {"tweetid":12,"user":{"screen-name":"OliJackson_512","lang":"en","friends_count":445,"statuses_count":164,"name":"Oli Jackson","followers_count":22649},"sender-location":point("24.82,94.63"),"send-time":datetime("2010-02-13T10:10:00"),"referred-topics":{{"samsung","voice-command"}},"message-text":" like samsung the voice-command is amazing:)", "retweeted-from":6, "forwarded-from":5}
+}})
+
+insert into dataset MyInputGraph
+for $tm in dataset TwitterMsgs
+let $links:={{ {"tweetid":$tm.retweeted-from}, {"tweetid":$tm.forwarded-from}}}
+return {
+      "tweetid": $tm.tweetid,
+      "rank-value": 0.0,
+      "populated-by": $links
+}
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.3.query.aql
new file mode 100644
index 0000000..78fb02e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q01/pregel-q01.3.query.aql
@@ -0,0 +1,9 @@
+use dataverse Pregelix;
+
+run pregel("examples/pregelix-example-0.2.14-SNAPSHOT-jar-with-dependencies.jar"
+ "edu.uci.ics.pregelix.example.PageRankVertex"
+ "-ip 10.0.2.15 -port 3199 -vnum 17")
+from dataset MyInputGraph 
+to dataset MyOutputGraph;
+
+for $n in dataset MyOutputGraph return $n;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.1.ddl.aql
new file mode 100644
index 0000000..1439cc3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.1.ddl.aql
@@ -0,0 +1,16 @@
+drop dataverse Pregelix if exists;
+create dataverse Pregelix
+use dataverse Pregelix
+
+create type EdgeType as open {
+    destVertexId: int64,
+    value: float?
+}
+create type NodeType as open {
+   id: int64,
+   value: int64?,
+   edges: {{EdgeType}}
+}
+
+create dataset InputGraph(NodeType) primary key id;
+create dataset ResultGraph(NodeType) primary key id;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.2.update.aql
new file mode 100644
index 0000000..5537960
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.2.update.aql
@@ -0,0 +1,27 @@
+use dataverse Pregelix;
+
+insert into dataset InputGraph (
+{{
+    {"id":0,"value":0,"edges":{{ {"destVertexId":1}}}},
+{"id":1,"value":1,"edges":{{ {"destVertexId":1},{"destVertexId":2}}}},
+{"id":2,"value":2,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3}}}},
+{"id":3,"value":3,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4}}}},
+{"id":4,"value":4,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5}}}},
+{"id":5,"value":5,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6}}}},
+{"id":6,"value":6,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7}}}},
+{"id":7,"value":7,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7},{"destVertexId":8}}}},
+{"id":8,"value":8,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7},{"destVertexId":8},{"destVertexId":9}}}},
+{"id":9,"value":9,"edges":{{ {"destVertexId":1},{"destVertexId":2},{"destVertexId":3},{"destVertexId":4},{"destVertexId":5},{"destVertexId":6},{"destVertexId":7},{"destVertexId":8},{"destVertexId":9},{"destVertexId":0}}}},
+{"id":10,"value":10,"edges":{{ {"destVertexId":11}}}},
+{"id":11,"value":11,"edges":{{ {"destVertexId":11},{"destVertexId":12}}}},
+{"id":12,"value":12,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13}}}},
+{"id":13,"value":13,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14}}}},
+{"id":14,"value":14,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15}}}},
+{"id":15,"value":15,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16}}}},
+{"id":16,"value":16,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17}}}},
+{"id":17,"value":17,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17},{"destVertexId":18}}}},
+{"id":18,"value":18,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17},{"destVertexId":18},{"destVertexId":19}}}},
+{"id":19,"value":19,"edges":{{ {"destVertexId":11},{"destVertexId":12},{"destVertexId":13},{"destVertexId":14},{"destVertexId":15},{"destVertexId":16},{"destVertexId":17},{"destVertexId":18},{"destVertexId":19},   {"destVertexId":10}}}}
+}}
+
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.3.query.aql
new file mode 100644
index 0000000..77c5d7e
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q02/pregel-q02.3.query.aql
@@ -0,0 +1,9 @@
+use dataverse Pregelix;
+
+run pregel("examples/pregelix-example-0.2.14-SNAPSHOT-jar-with-dependencies.jar"
+ "edu.uci.ics.pregelix.example.ConnectedComponentsVertex"
+ "-ip 10.0.2.15 -port 3199")
+from dataset InputGraph 
+to dataset ResultGraph;
+
+for $n in dataset ResultGraph return $n;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.1.ddl.aql
new file mode 100644
index 0000000..9642ce3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.1.ddl.aql
@@ -0,0 +1,16 @@
+drop dataverse Pregelix if exists;
+create dataverse Pregelix
+use dataverse Pregelix
+
+create type EdgeType as open {
+    destVertexId: int64,
+    cost: float?
+}
+create type NodeType as open {
+    id: int64,
+    value: double?,
+    edges: {{EdgeType}}
+}
+
+create dataset InputGraph(NodeType) primary key id;
+create dataset ResultGraph(NodeType) primary key id;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.2.update.aql
new file mode 100644
index 0000000..f328614
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.2.update.aql
@@ -0,0 +1,16 @@
+use dataverse Pregelix;
+
+insert into dataset InputGraph (
+{{
+{"id":0,  "edges":{{ {"destVertexId":1, "cost":1.0f}}}},
+{"id":1,  "edges":{{ {"destVertexId":3, "cost":4.0f},{"destVertexId":2, "cost":3.0f}}}},
+{"id":2,  "edges":{{ {"destVertexId":4, "cost":5.0f},{"destVertexId":5, "cost":23.0f}}}},
+{"id":3,  "edges":{{ {"destVertexId":2, "cost":1.0f},{"destVertexId":8, "cost":13.0f}}}},
+{"id":4,  "edges":{{ {"destVertexId":1, "cost":5.0f},{"destVertexId":2, "cost":8.0f},{"destVertexId":3, "cost":23.0f},{"destVertexId":4, "cost":12.0f}}}},
+{"id":5,  "edges":{{ {"destVertexId":6, "cost":12.0f},{"destVertexId":7, "cost":17.0f}}}},
+{"id":6,  "edges":{{ {"destVertexId":1, "cost":12.0f},{"destVertexId":2, "cost":1.0f}}}},
+{"id":7,  "edges":{{ {"destVertexId":9, "cost":100.0f}}}},
+{"id":8,  "edges":{{ {"destVertexId":4, "cost":11.0f}}}},
+{"id":9,  "edges":{{ {"destVertexId":1, "cost":16.0f},{"destVertexId":2, "cost":9.0f}}}}
+}}
+)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.3.query.aql
new file mode 100644
index 0000000..0bb4229
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/graph/pregel-q03/pregel-q03.3.query.aql
@@ -0,0 +1,9 @@
+use dataverse Pregelix;
+
+run pregel("examples/pregelix-example-0.2.14-SNAPSHOT-jar-with-dependencies.jar"
+ "edu.uci.ics.pregelix.example.ShortestPathsVertex"
+ "-ip 10.0.2.15 -port 3199 -source-vertex 0")
+from dataset InputGraph
+to dataset ResultGraph;
+
+for $n in dataset ResultGraph return $n;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/graph/pregel-q01/pregel-q01.1.adm b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q01/pregel-q01.1.adm
new file mode 100644
index 0000000..4b6f605
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q01/pregel-q01.1.adm
@@ -0,0 +1,13 @@
+[ { "tweetid": 1i64, "rank-value": 0.07349647505081802d, "populated-by": {{ { "tweetid": 2i64, "value": 0.0f }, { "tweetid": 3i64, "value": 0.0f } }} }
+, { "tweetid": 2i64, "rank-value": 0.10497306443795257d, "populated-by": {{ { "tweetid": 7i64, "value": 0.0f }, { "tweetid": 1i64, "value": 0.0f } }} }
+, { "tweetid": 3i64, "rank-value": 0.09458886502271134d, "populated-by": {{ { "tweetid": 8i64, "value": 0.0f }, { "tweetid": 6i64, "value": 0.0f } }} }
+, { "tweetid": 4i64, "rank-value": 0.04783722202788025d, "populated-by": {{ { "tweetid": 1i64, "value": 0.0f }, { "tweetid": 3i64, "value": 0.0f } }} }
+, { "tweetid": 5i64, "rank-value": 0.051587222027880256d, "populated-by": {{ { "tweetid": 8i64, "value": 0.0f }, { "tweetid": 6i64, "value": 0.0f } }} }
+, { "tweetid": 6i64, "rank-value": 0.0822549135142923d, "populated-by": {{ { "tweetid": 4i64, "value": 0.0f }, { "tweetid": 5i64, "value": 0.0f } }} }
+, { "tweetid": 7i64, "rank-value": 0.14484555969829038d, "populated-by": {{ { "tweetid": 2i64, "value": 0.0f }, { "tweetid": 7i64, "value": 0.0f } }} }
+, { "tweetid": 8i64, "rank-value": 0.0710049135142923d, "populated-by": {{ { "tweetid": 3i64, "value": 0.0f }, { "tweetid": 7i64, "value": 0.0f } }} }
+, { "tweetid": 9i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 6i64, "value": 0.0f }, { "tweetid": 2i64, "value": 0.0f } }} }
+, { "tweetid": 10i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 4i64, "value": 0.0f }, { "tweetid": 5i64, "value": 0.0f } }} }
+, { "tweetid": 11i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 6i64, "value": 0.0f }, { "tweetid": 3i64, "value": 0.0f } }} }
+, { "tweetid": 12i64, "rank-value": 0.008823529411764706d, "populated-by": {{ { "tweetid": 6i64, "value": 0.0f }, { "tweetid": 5i64, "value": 0.0f } }} }
+]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/graph/pregel-q02/pregel-q02.1.adm b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q02/pregel-q02.1.adm
new file mode 100644
index 0000000..82091fb
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q02/pregel-q02.1.adm
@@ -0,0 +1,21 @@
+[ { "id": 0i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f } }} }
+, { "id": 1i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f } }} }
+, { "id": 2i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f } }} }
+, { "id": 3i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f } }} }
+, { "id": 4i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f } }} }
+, { "id": 5i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f } }} }
+, { "id": 6i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f } }} }
+, { "id": 7i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f }, { "destVertexId": 8i64, "value": 0.0f } }} }
+, { "id": 8i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f }, { "destVertexId": 8i64, "value": 0.0f }, { "destVertexId": 9i64, "value": 0.0f } }} }
+, { "id": 9i64, "value": 0i64, "edges": {{ { "destVertexId": 1i64, "value": 0.0f }, { "destVertexId": 2i64, "value": 0.0f }, { "destVertexId": 3i64, "value": 0.0f }, { "destVertexId": 4i64, "value": 0.0f }, { "destVertexId": 5i64, "value": 0.0f }, { "destVertexId": 6i64, "value": 0.0f }, { "destVertexId": 7i64, "value": 0.0f }, { "destVertexId": 8i64, "value": 0.0f }, { "destVertexId": 9i64, "value": 0.0f }, { "destVertexId": 0i64, "value": 0.0f } }} }
+, { "id": 10i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f } }} }
+, { "id": 11i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f } }} }
+, { "id": 12i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f } }} }
+, { "id": 13i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f } }} }
+, { "id": 14i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f } }} }
+, { "id": 15i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f } }} }
+, { "id": 16i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f } }} }
+, { "id": 17i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f }, { "destVertexId": 18i64, "value": 0.0f } }} }
+, { "id": 18i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f }, { "destVertexId": 18i64, "value": 0.0f }, { "destVertexId": 19i64, "value": 0.0f } }} }
+, { "id": 19i64, "value": 10i64, "edges": {{ { "destVertexId": 11i64, "value": 0.0f }, { "destVertexId": 12i64, "value": 0.0f }, { "destVertexId": 13i64, "value": 0.0f }, { "destVertexId": 14i64, "value": 0.0f }, { "destVertexId": 15i64, "value": 0.0f }, { "destVertexId": 16i64, "value": 0.0f }, { "destVertexId": 17i64, "value": 0.0f }, { "destVertexId": 18i64, "value": 0.0f }, { "destVertexId": 19i64, "value": 0.0f }, { "destVertexId": 10i64, "value": 0.0f } }} }
+]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/graph/pregel-q03/pregel-q03.1.adm b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q03/pregel-q03.1.adm
new file mode 100644
index 0000000..38eeba3
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/graph/pregel-q03/pregel-q03.1.adm
@@ -0,0 +1,11 @@
+[ { "id": 0i64, "value": 0.0d, "edges": {{ { "destVertexId": 1i64, "cost": 1.0f } }} }
+, { "id": 1i64, "value": 1.0d, "edges": {{ { "destVertexId": 3i64, "cost": 4.0f }, { "destVertexId": 2i64, "cost": 3.0f } }} }
+, { "id": 2i64, "value": 4.0d, "edges": {{ { "destVertexId": 4i64, "cost": 5.0f }, { "destVertexId": 5i64, "cost": 23.0f } }} }
+, { "id": 3i64, "value": 5.0d, "edges": {{ { "destVertexId": 2i64, "cost": 1.0f }, { "destVertexId": 8i64, "cost": 13.0f } }} }
+, { "id": 4i64, "value": 9.0d, "edges": {{ { "destVertexId": 1i64, "cost": 5.0f }, { "destVertexId": 2i64, "cost": 8.0f }, { "destVertexId": 3i64, "cost": 23.0f }, { "destVertexId": 4i64, "cost": 12.0f } }} }
+, { "id": 5i64, "value": 27.0d, "edges": {{ { "destVertexId": 6i64, "cost": 12.0f }, { "destVertexId": 7i64, "cost": 17.0f } }} }
+, { "id": 6i64, "value": 39.0d, "edges": {{ { "destVertexId": 1i64, "cost": 12.0f }, { "destVertexId": 2i64, "cost": 1.0f } }} }
+, { "id": 7i64, "value": 44.0d, "edges": {{ { "destVertexId": 9i64, "cost": 100.0f } }} }
+, { "id": 8i64, "value": 18.0d, "edges": {{ { "destVertexId": 4i64, "cost": 11.0f } }} }
+, { "id": 9i64, "value": 144.0d, "edges": {{ { "destVertexId": 1i64, "cost": 16.0f }, { "destVertexId": 2i64, "cost": 9.0f } }} }
+]
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index c3b097a..baf330c 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -2346,6 +2346,21 @@
             </compilation-unit>
         </test-case>
     </test-group>
+    <!-- <test-group name="graph">
+        <test-case FilePath="graph">
+            <compilation-unit name="pregel-q01">
+                <output-dir compare="Text">pregel-q01</output-dir>
+            </compilation-unit>
+            <test-case FilePath="graph">
+            <compilation-unit name="pregel-q02">
+                <output-dir compare="Text">pregel-q02</output-dir>
+            </compilation-unit>
+            <test-case FilePath="graph">
+            <compilation-unit name="pregel-q03">
+                <output-dir compare="Text">pregel-q03</output-dir>
+            </compilation-unit>
+        </test-case>
+    </test-group> -->
     <test-group name="index-join">
         <test-case FilePath="index-join">
             <compilation-unit name="btree-primary-equi-join">