merged master back to jarodwen/hotfix/issue363
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/typecast/ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/typecast/
index 6cb4d5c..6e3ab37 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/typecast/
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/typecast/
@@ -17,7 +17,9 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -160,6 +162,10 @@
changed = changed || rewriteFuncExpr(argFuncExpr, exprType, exprType, env);
+ if (!compatible(reqType, inputType)) {
+ throw new AlgebricksException("type mistmach, requred: " + reqType.toString() + " actual: "
+ + inputType.toString());
+ }
return changed;
@@ -489,4 +495,42 @@
TypeComputerUtilities.setRequiredAndInputTypes(cast, reqType, inputType);
+ /**
+ * Determine if two types are compatible
+ *
+ * @param reqType
+ * the required type
+ * @param inputType
+ * the input type
+ * @return true if the two types are compatiable; false otherwise
+ */
+ private static boolean compatible(IAType reqType, IAType inputType) {
+ if (reqType.getTypeTag() == ATypeTag.ANY || inputType.getTypeTag() == ATypeTag.ANY) {
+ return true;
+ }
+ if (reqType.getTypeTag() != ATypeTag.UNION && inputType.getTypeTag() != ATypeTag.UNION) {
+ if (reqType.equals(inputType)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ Set<IAType> reqTypePossible = new HashSet<IAType>();
+ Set<IAType> inputTypePossible = new HashSet<IAType>();
+ if (reqType.getTypeTag() == ATypeTag.UNION) {
+ AUnionType unionType = (AUnionType) reqType;
+ reqTypePossible.addAll(unionType.getUnionList());
+ } else {
+ reqTypePossible.add(reqType);
+ }
+ if (inputType.getTypeTag() == ATypeTag.UNION) {
+ AUnionType unionType = (AUnionType) inputType;
+ inputTypePossible.addAll(unionType.getUnionList());
+ } else {
+ inputTypePossible.add(inputType);
+ }
+ return reqTypePossible.equals(inputTypePossible);
+ }
diff --git a/asterix-app/src/test/resources/AQLTS/queries/fieldAccessor.aql b/asterix-app/src/test/resources/AQLTS/queries/fieldAccessor.aql
new file mode 100644
index 0000000..3995374
--- /dev/null
+++ b/asterix-app/src/test/resources/AQLTS/queries/fieldAccessor.aql
@@ -0,0 +1,3 @@
+let $bla := { "name" : "value" }
+ $bla."name" = $
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/AQLTS/queries/functionDecl3.aql b/asterix-app/src/test/resources/AQLTS/queries/functionDecl3.aql
new file mode 100644
index 0000000..80629be5
--- /dev/null
+++ b/asterix-app/src/test/resources/AQLTS/queries/functionDecl3.aql
@@ -0,0 +1,5 @@
+declare function "function with spaces"($a, $b) {
+ "string with spaces"
+"function with spaces" (1, 2)
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan b/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
index bf8978b..4f0d931 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
@@ -3,23 +3,21 @@
- -- STABLE_SORT [$$29(ASC), $$30(ASC), $$31(ASC), $$32(ASC)] |PARTITIONED|
\ No newline at end of file
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan b/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan
index e6cf237..1458c66 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan
@@ -3,28 +3,24 @@
diff --git a/asterix-app/src/test/resources/runtimets/queries/custord/join_q_04/join_q_04.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/custord/join_q_04/join_q_04.1.ddl.aql
index 2fbbf2f..882fecb 100644
--- a/asterix-app/src/test/resources/runtimets/queries/custord/join_q_04/join_q_04.1.ddl.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/custord/join_q_04/join_q_04.1.ddl.aql
@@ -1,3 +1,10 @@
+ * Description : This test case is to verify the fix for issue51
+ :
+ * Expected Res : SUCCESS
+ * Date : 14th May 2013
+ */
drop dataverse test if exists;
create dataverse test;
diff --git a/asterix-app/src/test/resources/runtimets/queries/custord/join_q_04/join_q_04.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/custord/join_q_04/join_q_04.2.update.aql
index e69de29..45fba70 100644
--- a/asterix-app/src/test/resources/runtimets/queries/custord/join_q_04/join_q_04.2.update.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/custord/join_q_04/join_q_04.2.update.aql
@@ -0,0 +1,6 @@
+ * Description : This test case is to verify the fix for issue51
+ :
+ * Expected Res : SUCCESS
+ * Date : 14th May 2013
+ */
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/custord/join_q_04/join_q_04.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/custord/join_q_04/join_q_04.3.query.aql
index 9996053..9781eb5 100644
--- a/asterix-app/src/test/resources/runtimets/queries/custord/join_q_04/join_q_04.3.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/custord/join_q_04/join_q_04.3.query.aql
@@ -1,7 +1,19 @@
+ * Description : This test case is to verify the fix for issue51
+ :
+ * Expected Res : SUCCESS
+ * Date : 14th May 2013
+ */
use dataverse test;
-for $c in dataset('Customers')
-return {"order_id" :
-for $o in dataset('Orders')
-where $c.cid = $o.cid
-return $o.oid }
+for $c in dataset Customers
+order by $
+return {
+ "cust_name": $,
+ "order_ids":
+ for $o in dataset Orders
+ where $c.cid = $o.cid
+ order by $o.oid
+ return $o.oid
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue377/query-issue377.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue377/query-issue377.1.ddl.aql
new file mode 100644
index 0000000..4722e4f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue377/query-issue377.1.ddl.aql
@@ -0,0 +1,39 @@
+ * Description : This test case is to verify the fix for issue377
+ :
+ * Expected Res : Success
+ * Date : 11th May 2013
+ */
+drop dataverse TinySocial if exists;
+create dataverse TinySocial;
+use dataverse TinySocial;
+create type TwitterUserType as open {
+ screen-name: string
+create type TweetMessageType as open {
+ tweetid: string
+create type FacebookUserType as open {
+ id: int32
+create type FacebookMessageType as open {
+ message-id: int32
+create dataset FacebookUsers(FacebookUserType)
+primary key id;
+create dataset FacebookMessages(FacebookMessageType)
+primary key message-id;
+create dataset TwitterUsers(TwitterUserType)
+primary key screen-name;
+create dataset TweetMessages(TweetMessageType)
+primary key tweetid
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue377/query-issue377.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue377/query-issue377.2.update.aql
new file mode 100644
index 0000000..1d6c5ba
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue377/query-issue377.2.update.aql
@@ -0,0 +1,15 @@
+ * Description : This test case is to verify the fix for issue377
+ :
+ * Expected Res : Success
+ * Date : 11th May 2013
+ */
+use dataverse TinySocial;
+load dataset FacebookUsers using localfs
+load dataset TweetMessages using localfs
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue377/query-issue377.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue377/query-issue377.3.query.aql
new file mode 100644
index 0000000..81d6cf6
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue377/query-issue377.3.query.aql
@@ -0,0 +1,24 @@
+ * Description : This test case is to verify the fix for issue377
+ :
+ * Expected Res : Success
+ * Date : 11th May 2013
+ */
+use dataverse TinySocial;
+set simfunction "edit-distance";
+set simthreshold "3";
+for $fbu in dataset FacebookUsers
+return {
+ "id": $,
+ "name": $,
+ "similar-users": for $t in dataset TweetMessages
+ let $tu := $t.user
+ where $ ~= $
+ return {
+ "twitter-screenname": $tu.screen-name,
+ "twitter-name": $
+ }
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue410/query-issue410.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue410/query-issue410.1.ddl.aql
new file mode 100644
index 0000000..0da0aa5
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue410/query-issue410.1.ddl.aql
@@ -0,0 +1,17 @@
+ * Description : This test case is to verify the fix for issue410
+ :
+ * Expected Res : Fail
+ * Date : 13th May 2013
+ */
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+create type Emp as open {
+create dataset Employee(Emp) primary key id;
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue410/query-issue410.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue410/query-issue410.2.update.aql
new file mode 100644
index 0000000..796c5a4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue410/query-issue410.2.update.aql
@@ -0,0 +1,10 @@
+ * Description : This test case is to verify the fix for issue410
+ :
+ * Expected Res : Fail
+ * Date : 11th May 2013
+ */
+use dataverse test;
+insert into dataset Employee({"id":float("59138237473282.3293"), "name": double("0.01")});
diff --git a/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue410/query-issue410.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue410/query-issue410.3.query.aql
new file mode 100644
index 0000000..d76caba
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/open-closed/query-issue410/query-issue410.3.query.aql
@@ -0,0 +1,11 @@
+ * Description : This test case is to verify the fix for issue410
+ :
+ * Expected Res : Fail
+ * Date : 11th May 2013
+ */
+use dataverse test;
+for $x in dataset('Employee')
+return $x
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/results/custord/join_q_04/join_q_04.1.adm b/asterix-app/src/test/resources/runtimets/results/custord/join_q_04/join_q_04.1.adm
index d4721dd..51f998a 100644
--- a/asterix-app/src/test/resources/runtimets/results/custord/join_q_04/join_q_04.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/custord/join_q_04/join_q_04.1.adm
@@ -1,3 +1,5 @@
-{ "cust_name": "Jodi Rotruck", "orderedlist": [ 1000, 66, 775 ], "unorderedlist": {{ 1000, 66, 775 }}, "ol_item1": 1000, "ol_item2": 66, "ol_item5": null, "ul_item1": 1000 }
-{ "cust_name": "Jodi Alex", "orderedlist": [ 10, 48, 5 ], "unorderedlist": {{ 10, 48, 5 }}, "ol_item1": 10, "ol_item2": 48, "ol_item5": null, "ul_item1": 10 }
-{ "cust_name": "Jodi Rotruck", "orderedlist": [ 10, 66, 775 ], "unorderedlist": {{ 10, 66, 775 }}, "ol_item1": 10, "ol_item2": 66, "ol_item5": null, "ul_item1": 10 }
+{ "cust_name": "Jodi Alex", "order_ids": [ 10 ] }
+{ "cust_name": "Jodi Rotruck", "order_ids": [ 10, 1000 ] }
+{ "cust_name": "Mary Carey", "order_ids": [ ] }
+{ "cust_name": "Mike Carey", "order_ids": [ ] }
+{ "cust_name": "Mike ley", "order_ids": [ ] }
diff --git a/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue377/query-issue377.1.adm b/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue377/query-issue377.1.adm
new file mode 100644
index 0000000..c3bb80f
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue377/query-issue377.1.adm
@@ -0,0 +1,29 @@
+{ "id": 9142198, "similar-users": [ ], "name": "SherryFea" }
+{ "id": 9313492, "similar-users": [ ], "name": "TeraWolfe" }
+{ "id": 9478720, "similar-users": [ ], "name": "AngeliaKettlewell" }
+{ "id": 10001080, "similar-users": [ ], "name": "GarrettBode" }
+{ "id": 10179538, "similar-users": [ ], "name": "OrlandoBaxter" }
+{ "id": 10307032, "similar-users": [ ], "name": "QuentinSauter" }
+{ "id": 10394488, "similar-users": [ ], "name": "OswaldRay" }
+{ "id": 10423588, "similar-users": [ ], "name": "ShirleneRuch" }
+{ "id": 10495420, "similar-users": [ ], "name": "WendyMcloskey" }
+{ "id": 11307946, "similar-users": [ ], "name": "HelgaStough" }
+{ "id": 11447332, "similar-users": [ ], "name": "SherisseMaugham" }
+{ "id": 11570326, "similar-users": [ ], "name": "LindenFilby" }
+{ "id": 11951098, "similar-users": [ ], "name": "TeraByers" }
+{ "id": 11954992, "similar-users": [ ], "name": "CaitlinLangston" }
+{ "id": 9510451, "similar-users": [ ], "name": "ChuckFinck" }
+{ "id": 9594523, "similar-users": [ ], "name": "TamWillcox" }
+{ "id": 9629395, "similar-users": [ ], "name": "JuliusWire" }
+{ "id": 9988417, "similar-users": [ ], "name": "ColineLane" }
+{ "id": 10272571, "similar-users": [ ], "name": "JarrettGoldvogel" }
+{ "id": 10361965, "similar-users": [ ], "name": "ArlenFlick" }
+{ "id": 10498285, "similar-users": [ ], "name": "KileyBridger" }
+{ "id": 10733617, "similar-users": [ ], "name": "LeonardoKight" }
+{ "id": 10874791, "similar-users": [ ], "name": "HaydeeGarratt" }
+{ "id": 10957867, "similar-users": [ ], "name": "ZachOppenheimer" }
+{ "id": 11061631, "similar-users": [ ], "name": "MaxeneKellogg" }
+{ "id": 11068231, "similar-users": [ ], "name": "DinahSwink" }
+{ "id": 11140213, "similar-users": [ ], "name": "MontgomeryWhittier" }
+{ "id": 11381089, "similar-users": [ ], "name": "EarleneAmmons" }
+{ "id": 11675221, "similar-users": [ ], "name": "CalantheGearhart" }
diff --git a/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue410/query-issue410.1.adm b/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue410/query-issue410.1.adm
new file mode 100644
index 0000000..ee2dfa4
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/open-closed/query-issue410/query-issue410.1.adm
@@ -0,0 +1 @@
+{"id":0, "name": ""}
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 4a61971..07a8f4a 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -761,13 +761,11 @@
<output-dir compare="Text">join_q_03</output-dir>
- <!--
<test-case FilePath="custord">
<compilation-unit name="join_q_04">
<output-dir compare="Text">join_q_04</output-dir>
- -->
<test-case FilePath="custord">
<compilation-unit name="load-test">
<output-dir compare="Text">load-test</output-dir>
@@ -2773,6 +2771,17 @@
<output-dir compare="Text">query-issue196</output-dir>
+ <test-case FilePath="open-closed">
+ <compilation-unit name="query-issue377">
+ <output-dir compare="Text">query-issue377</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="open-closed">
+ <compilation-unit name="query-issue410">
+ <output-dir compare="Text">query-issue40</output-dir>
+ <expected-error>edu.uci.ics.asterix.common.exceptions.AsterixException</expected-error>
+ </compilation-unit>
+ </test-case>
<test-group name="quantifiers">
<test-case FilePath="quantifiers">
diff --git a/asterix-aql/src/main/javacc/AQL.jj b/asterix-aql/src/main/javacc/AQL.jj
index 68e1497..dc01648 100644
--- a/asterix-aql/src/main/javacc/AQL.jj
+++ b/asterix-aql/src/main/javacc/AQL.jj
@@ -74,33 +74,41 @@
// data generator hints
private static final String DGEN_HINT = "dgen";
+ private static class IndexParams {
+ public IndexType type;
+ public int gramLength;
+ public IndexParams(IndexType type, int gramLength) {
+ this.type = type;
+ this.gramLength = gramLength;
+ }
+ };
private static String getHint(Token t) {
- if (t.specialToken == null) {
- return null;
- }
- String s = t.specialToken.image;
- int n = s.length();
- if (n < 2) {
- return null;
- }
- return s.substring(1).trim();
+ if (t.specialToken == null) {
+ return null;
+ }
+ String s = t.specialToken.image;
+ int n = s.length();
+ if (n < 2) {
+ return null;
+ }
+ return s.substring(1).trim();
public AQLParser(String s){
- this(new StringReader(s));
- super.setInput(s);
- }
- public static void main(String args[]) throws ParseException, TokenMgrError, IOException, FileNotFoundException, AsterixException {
- File file = new File(args[0]);
- Reader fis = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
- AQLParser parser = new AQLParser(fis);
- List<Statement> st = parser.Statement();
- //st.accept(new AQLPrintVisitor(), 0);
- }
+ this(new StringReader(s));
+ super.setInput(s);
+ }
+ public static void main(String args[]) throws ParseException, TokenMgrError, IOException, FileNotFoundException, AsterixException {
+ File file = new File(args[0]);
+ Reader fis = new BufferedReader(new InputStreamReader(new FileInputStream(file), "UTF-8"));
+ AQLParser parser = new AQLParser(fis);
+ List<Statement> st = parser.Statement();
+ //st.accept(new AQLPrintVisitor(), 0);
+ }
@@ -110,235 +118,563 @@
List<Statement> decls = new ArrayList<Statement>();
- Query query=null;
+ Statement stmt = null;
- (
- (
- (
- "use"
- {
- decls.add(DataverseDeclaration());
- }
- | "declare" "function" {
- decls.add(FunctionDeclaration());
- }
- | "create" (
- {
- String hint = getHint(token);
- boolean dgen = false;
- if (hint != null && hint.startsWith(DGEN_HINT)) {
- dgen = true;
- }
- }
- "type"
- {
- decls.add(TypeDeclaration(dgen, hint));
- }
- | "nodegroup"
- {
- decls.add(NodegroupDeclaration());
- }
- | "external" <DATASET>
- {
- decls.add(DatasetDeclaration(DatasetType.EXTERNAL));
- }
- | "feed" <DATASET>
- {
- decls.add(DatasetDeclaration(DatasetType.FEED));
- }
- {
- decls.add(DatasetDeclaration(DatasetType.INTERNAL));
- }
- | "index"
- {
- decls.add(CreateIndexStatement());
- }
- | "dataverse"
- {
- decls.add(CreateDataverseStatement());
- }
- | "function"
- {
- decls.add(FunctionCreation());
- }
- )
- | "load" {
- decls.add(LoadStatement());
- }
- | "drop"
- (
- {
- decls.add(DropStatement());
- }
- | "index"
- {
- decls.add(IndexDropStatement());
- }
- | "nodegroup"
- {
- decls.add(NodeGroupDropStatement());
- }
- | "type"
- {
- decls.add(TypeDropStatement());
- }
- | "dataverse"
- {
- decls.add(DataverseDropStatement());
- }
- | "function"
- {
- decls.add(FunctionDropStatement());
- }
- )
- | "write" {
- decls.add(WriteStatement());
- }
- | "set" {
- decls.add(SetStatement());
- }
- | "insert" {
- decls.add(InsertStatement());
- }
- | "delete" {
- decls.add(DeleteStatement());
- }
- | "update" {
- decls.add(UpdateStatement());
- }
- | "begin" "feed"
- {
- Pair<Identifier,Identifier> nameComponents = getDotSeparatedPair();
- decls.add(new BeginFeedStatement(nameComponents.first, nameComponents.second, getVarCounter()));
- } ";"
- | "suspend" "feed"
- {
- decls.add(ControlFeedDeclaration(ControlFeedStatement.OperationType.SUSPEND));
- } ";"
- | "resume" "feed" {
- decls.add(ControlFeedDeclaration(ControlFeedStatement.OperationType.RESUME));
- } ";"
- | "end" "feed" {
- decls.add(ControlFeedDeclaration(ControlFeedStatement.OperationType.END));
- } ";"
- | "alter" "feed" {
- decls.add(AlterFeedDeclaration());
- } ";"
- | (query = Query()) {
- decls.add(query);
- }
- )*
- // (query = Query())?
- )
- <EOF>
- )
+ ( stmt = SingleStatement() (";") ?
- return decls;
+ decls.add(stmt);
+ }
+ )*
+ <EOF>
+ {
+ return decls;
+ }
+Statement SingleStatement() throws ParseException:
+ Statement stmt = null;
+ (
+ stmt = DataverseDeclaration()
+ | stmt = FunctionDeclaration()
+ | stmt = CreateStatement()
+ | stmt = LoadStatement()
+ | stmt = DropStatement()
+ | stmt = WriteStatement()
+ | stmt = SetStatement()
+ | stmt = InsertStatement()
+ | stmt = DeleteStatement()
+ | stmt = UpdateStatement()
+ | stmt = FeedStatement()
+ | stmt = Query()
+ )
+ {
+ return stmt;
+ }
+DataverseDecl DataverseDeclaration() throws ParseException:
+ String dvName = null;
+ "use" "dataverse" dvName = Identifier()
+ {
+ defaultDataverse = dvName;
+ return new DataverseDecl(new Identifier(dvName));
+ }
+Statement CreateStatement() throws ParseException:
+ String hint = null;
+ boolean dgen = false;
+ Statement stmt = null;
+ "create"
+ (
+ {
+ hint = getHint(token);
+ if (hint != null && hint.startsWith(DGEN_HINT)) {
+ dgen = true;
+ }
+ }
+ stmt = TypeSpecification(hint, dgen)
+ | stmt = NodegroupSpecification()
+ | stmt = DatasetSpecification()
+ | stmt = IndexSpecification()
+ | stmt = DataverseSpecification()
+ | stmt = FunctionSpecification()
+ )
+ {
+ return stmt;
+ }
+TypeDecl TypeSpecification(String hint, boolean dgen) throws ParseException:
+ Pair<Identifier,Identifier> nameComponents = null;
+ boolean ifNotExists = false;
+ TypeExpression typeExpr = null;
+ "type" nameComponents = FunctionOrTypeName() ifNotExists = IfNotExists()
+ "as" typeExpr = TypeExpr()
+ {
+ long numValues = -1;
+ String filename = null;
+ if (dgen) {
+ String splits[] = hint.split(" +");
+ if (splits.length != 3) {
+ throw new ParseException("Expecting /*+ dgen <filename> <numberOfItems> */");
+ }
+ filename = splits[1];
+ numValues = Long.parseLong(splits[2]);
+ }
+ TypeDataGen tddg = new TypeDataGen(dgen, filename, numValues);
+ return new TypeDecl(nameComponents.first, nameComponents.second, typeExpr, tddg, ifNotExists);
+ }
+NodegroupDecl NodegroupSpecification() throws ParseException:
+ String name = null;
+ String tmp = null;
+ boolean ifNotExists = false;
+ List<Identifier>ncNames = null;
+ "nodegroup" name = Identifier()
+ ifNotExists = IfNotExists() "on" tmp = Identifier()
+ {
+ ncNames = new ArrayList<Identifier>();
+ ncNames.add(new Identifier(tmp));
+ }
+ ( "," tmp = Identifier()
+ {
+ ncNames.add(new Identifier(tmp));
+ }
+ )*
+ {
+ return new NodegroupDecl(new Identifier(name), ncNames, ifNotExists);
+ }
+DatasetDecl DatasetSpecification() throws ParseException:
+ Pair<Identifier,Identifier> nameComponents = null;
+ boolean ifNotExists = false;
+ String typeName = null;
+ String adapterName = null;
+ Map<String,String> properties = null;
+ FunctionSignature appliedFunction = null;
+ List<String> primaryKeyFields = null;
+ String nodeGroupName = null;
+ Map<String,String> hints = new HashMap<String,String>();
+ DatasetDecl dsetDecl = null;
+ (
+ "external" <DATASET> nameComponents = QualifiedName()
+ <LEFTPAREN> typeName = Identifier() <RIGHTPAREN>
+ ifNotExists = IfNotExists()
+ "using" adapterName = AdapterName() properties = Configuration()
+ ( "hints" hints = Properties() )?
+ {
+ ExternalDetailsDecl edd = new ExternalDetailsDecl();
+ edd.setAdapter(adapterName);
+ edd.setProperties(properties);
+ dsetDecl = new DatasetDecl(nameComponents.first,
+ nameComponents.second,
+ new Identifier(typeName),
+ hints,
+ DatasetType.EXTERNAL,
+ edd,
+ ifNotExists);
+ }
+ | "feed" <DATASET> nameComponents = QualifiedName()
+ <LEFTPAREN> typeName = Identifier() <RIGHTPAREN>
+ ifNotExists = IfNotExists()
+ "using" adapterName = AdapterName() properties = Configuration()
+ (appliedFunction = ApplyFunction())? primaryKeyFields = PrimaryKey()
+ ( "on" nodeGroupName = Identifier() )?
+ ( "hints" hints = Properties() )?
+ {
+ FeedDetailsDecl fdd = new FeedDetailsDecl(adapterName,
+ properties,
+ appliedFunction,
+ nodeGroupName != null
+ ? new Identifier(nodeGroupName)
+ : null,
+ primaryKeyFields);
+ dsetDecl = new DatasetDecl(nameComponents.first,
+ nameComponents.second,
+ new Identifier(typeName),
+ hints,
+ DatasetType.FEED,
+ fdd,
+ ifNotExists);
+ }
+ | <DATASET> nameComponents = QualifiedName()
+ <LEFTPAREN> typeName = Identifier() <RIGHTPAREN>
+ ifNotExists = IfNotExists()
+ primaryKeyFields = PrimaryKey() ("on" nodeGroupName = Identifier() )?
+ ( "hints" hints = Properties() )?
+ {
+ InternalDetailsDecl idd = new InternalDetailsDecl(nodeGroupName != null
+ ? new Identifier(nodeGroupName)
+ : null,
+ primaryKeyFields);
+ dsetDecl = new DatasetDecl(nameComponents.first,
+ nameComponents.second,
+ new Identifier(typeName),
+ hints,
+ DatasetType.INTERNAL,
+ idd,
+ ifNotExists);
+ }
+ )
+ {
+ return dsetDecl;
+ }
+CreateIndexStatement IndexSpecification() throws ParseException:
+ CreateIndexStatement cis = new CreateIndexStatement();
+ String indexName = null;
+ String fieldExpr = null;
+ boolean ifNotExists = false;
+ Pair<Identifier,Identifier> nameComponents = null;
+ IndexParams indexType = null;
+ "index" indexName = Identifier()
+ ifNotExists = IfNotExists()
+ "on" nameComponents = QualifiedName()
+ <LEFTPAREN> ( fieldExpr = Identifier()
+ {
+ cis.addFieldExpr(fieldExpr);
+ }
+ ) ("," fieldExpr = Identifier()
+ {
+ cis.addFieldExpr(fieldExpr);
+ }
+ )* <RIGHTPAREN> ( "type" indexType = IndexType() )?
+ {
+ cis.setIndexName(new Identifier(indexName));
+ cis.setIfNotExists(ifNotExists);
+ cis.setDataverseName(nameComponents.first);
+ cis.setDatasetName(nameComponents.second);
+ if (indexType != null) {
+ cis.setIndexType(indexType.type);
+ cis.setGramLength(indexType.gramLength);
+ }
+ return cis;
+ }
+IndexParams IndexType() throws ParseException:
+ IndexType type = null;
+ int gramLength = 0;
+ ("btree"
+ {
+ type = IndexType.BTREE;
+ }
+ | "rtree"
+ {
+ type = IndexType.RTREE;
+ }
+ | "keyword"
+ {
+ type = IndexType.WORD_INVIX;
+ }
+ | "fuzzy keyword"
+ {
+ type = IndexType.FUZZY_WORD_INVIX;
+ }
+ {
+ type = IndexType.NGRAM_INVIX;
+ gramLength = Integer.valueOf(token.image);
+ }
+ | "fuzzy ngram" <LEFTPAREN> <INTEGER_LITERAL>
+ {
+ type = IndexType.FUZZY_NGRAM_INVIX;
+ gramLength = Integer.valueOf(token.image);
+ }
+ {
+ return new IndexParams(type, gramLength);
+ }
+CreateDataverseStatement DataverseSpecification() throws ParseException :
+ String dvName = null;
+ boolean ifNotExists = false;
+ String format = null;
+ "dataverse" dvName = Identifier()
+ ifNotExists = IfNotExists()
+ ( "with format" format = StringLiteral() )?
+ {
+ return new CreateDataverseStatement(new Identifier(dvName), format, ifNotExists);
+ }
+CreateFunctionStatement FunctionSpecification() throws ParseException:
+ FunctionSignature signature;
+ boolean ifNotExists = false;
+ List<VarIdentifier> paramList = new ArrayList<VarIdentifier>();
+ String functionBody;
+ VarIdentifier var = null;
+ Expression functionBodyExpr;
+ Token beginPos;
+ Token endPos;
+ Pair<Identifier,Identifier> nameComponents=null;
+ createNewScope();
+ "function" nameComponents = FunctionOrTypeName()
+ ifNotExists = IfNotExists()
+ {
+ var = new VarIdentifier();
+ var.setValue(token.image);
+ paramList.add(var);
+ getCurrentScope().addNewVarSymbolToScope(var);
+ }
+ ("," <VARIABLE>
+ {
+ var = new VarIdentifier();
+ var.setValue(token.image);
+ paramList.add(var);
+ getCurrentScope().addNewVarSymbolToScope(var);
+ }
+ )*)? <RIGHTPAREN> "{"
+ {
+ beginPos = token;
+ }
+ functionBodyExpr = Expression() "}"
+ {
+ endPos = token;
+ functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn);
+ String dataverse = nameComponents.first.getValue();
+ String functionName = nameComponents.second.getValue();
+ signature = new FunctionSignature(dataverse, functionName, paramList.size());
+ getCurrentScope().addFunctionDescriptor(signature, false);
+ return new CreateFunctionStatement(signature, paramList, functionBody, ifNotExists);
+ }
+boolean IfNotExists() throws ParseException:
+ ( "if not exists"
+ {
+ return true;
+ }
+ )?
+ {
+ return false;
+ }
+FunctionSignature ApplyFunction() throws ParseException:
+ FunctionSignature funcSig = null;
+ "apply" "function" funcSig = FunctionSignature()
+ {
+ return funcSig;
+ }
+FunctionSignature FunctionSignature() throws ParseException:
+ Pair<Identifier,Identifier> pairId = null;
+ int arity = 0;
+ pairId = FunctionOrTypeName() "@" <INTEGER_LITERAL>
+ {
+ arity = new Integer(token.image);
+ if (arity < 0 && arity != FunctionIdentifier.VARARGS) {
+ throw new ParseException(" invalid arity:" + arity);
+ }
+ String dataverse = pairId.first.getValue();
+ String functionName = pairId.second.getValue();
+ return new FunctionSignature(dataverse, functionName, arity);
+ }
+List<String> PrimaryKey() throws ParseException:
+ String tmp = null;
+ List<String> primaryKeyFields = new ArrayList<String>();
+ "primary" "key" tmp = Identifier()
+ {
+ primaryKeyFields.add(tmp);
+ }
+ ( "," tmp = Identifier()
+ {
+ primaryKeyFields.add(tmp);
+ }
+ )*
+ {
+ return primaryKeyFields;
+ }
+Statement DropStatement() throws ParseException:
+ String id = null;
+ Pair<Identifier,Identifier> pairId = null;
+ Triple<Identifier,Identifier,Identifier> tripleId = null;
+ FunctionSignature funcSig = null;
+ boolean ifExists = false;
+ Statement stmt = null;
+ "drop"
+ (
+ <DATASET> pairId = QualifiedName() ifExists = IfExists()
+ {
+ stmt = new DropStatement(pairId.first, pairId.second, ifExists);
+ }
+ | "index" tripleId = DoubleQualifiedName() ifExists = IfExists()
+ {
+ stmt = new IndexDropStatement(tripleId.first, tripleId.second, tripleId.third, ifExists);
+ }
+ | "nodegroup" id = Identifier() ifExists = IfExists()
+ {
+ stmt = new NodeGroupDropStatement(new Identifier(id), ifExists);
+ }
+ | "type" pairId = FunctionOrTypeName() ifExists = IfExists()
+ {
+ stmt = new TypeDropStatement(pairId.first, pairId.second, ifExists);
+ }
+ | "dataverse" id = Identifier() ifExists = IfExists()
+ {
+ stmt = new DataverseDropStatement(new Identifier(id), ifExists);
+ }
+ | "function" funcSig = FunctionSignature() ifExists = IfExists()
+ {
+ stmt = new FunctionDropStatement(funcSig, ifExists);
+ }
+ )
+ {
+ return stmt;
+ }
+boolean IfExists() throws ParseException :
+ ( "if" "exists"
+ {
+ return true;
+ }
+ )?
+ {
+ return false;
InsertStatement InsertStatement() throws ParseException:
- Identifier dataverseName;
- Identifier datasetName;
- Pair<Identifier,Identifier> nameComponents = null;
- Query query;
+ Pair<Identifier,Identifier> nameComponents = null;
+ Query query;
- "into" <DATASET>
- {
- nameComponents = getDotSeparatedPair();
- dataverseName = nameComponents.first;
- datasetName = nameComponents.second;
- }
- query = Query() (";")?
- {return new InsertStatement(dataverseName, datasetName, query, getVarCounter());}
+ "insert" "into" <DATASET> nameComponents = QualifiedName() query = Query()
+ {
+ return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+ }
DeleteStatement DeleteStatement() throws ParseException:
- VariableExpr var = null;
- Identifier dataverseName;
- Identifier datasetName = null;
- Expression condition = null;
- Pair<Identifier, Identifier> nameComponents;
+ VariableExpr var = null;
+ Expression condition = null;
+ Pair<Identifier, Identifier> nameComponents;
- var = Variable() { getCurrentScope().addNewVarSymbolToScope(var.getVar()); }
- "from"
- {
- nameComponents = getDotSeparatedPair();
- }
- ("where" condition = Expression())? (";")?
- {return new DeleteStatement(var, nameComponents.first, nameComponents.second, condition, getVarCounter()); }
+ "delete" var = Variable()
+ {
+ getCurrentScope().addNewVarSymbolToScope(var.getVar());
+ }
+ "from" <DATASET> nameComponents = QualifiedName()
+ ("where" condition = Expression())?
+ {
+ return new DeleteStatement(var, nameComponents.first, nameComponents.second, condition, getVarCounter());
+ }
UpdateStatement UpdateStatement() throws ParseException:
- VariableExpr vars;
- Expression target;
- Expression condition;
- UpdateClause uc;
- List<UpdateClause> ucs = new ArrayList<UpdateClause>();
+ VariableExpr vars;
+ Expression target;
+ Expression condition;
+ UpdateClause uc;
+ List<UpdateClause> ucs = new ArrayList<UpdateClause>();
- vars = Variable() "in" target = Expression()
- "where" condition = Expression()
- <LEFTPAREN> (uc=UpdateClause() {ucs.add(uc); } ("," uc=UpdateClause() {ucs.add(uc); } )*) <RIGHTPAREN> ";"
- {return new UpdateStatement(vars, target, condition, ucs);}
+ "update" vars = Variable() "in" target = Expression()
+ "where" condition = Expression()
+ <LEFTPAREN> (uc = UpdateClause()
+ {
+ ucs.add(uc);
+ }
+ ("," uc = UpdateClause()
+ {
+ ucs.add(uc);
+ }
+ {
+ return new UpdateStatement(vars, target, condition, ucs);
+ }
UpdateClause UpdateClause() throws ParseException:
- Expression target = null;
- Expression value = null ;
- InsertStatement is = null;
- DeleteStatement ds = null;
- UpdateStatement us = null;
- Expression condition = null;
- UpdateClause ifbranch = null;
- UpdateClause elsebranch = null;
+ Expression target = null;
+ Expression value = null ;
+ InsertStatement is = null;
+ DeleteStatement ds = null;
+ UpdateStatement us = null;
+ Expression condition = null;
+ UpdateClause ifbranch = null;
+ UpdateClause elsebranch = null;
"set" target = Expression() ":=" value = Expression()
- | "insert" is = InsertStatement()
- | "delete" ds = DeleteStatement()
- | "update" us = UpdateStatement()
- | "if" <LEFTPAREN> condition = Expression() <RIGHTPAREN> "then" ifbranch = UpdateClause() [LOOKAHEAD(1) "else" elsebranch = UpdateClause()]
- {return new UpdateClause(target, value, is, ds, us, condition, ifbranch, elsebranch);}
+ | is = InsertStatement()
+ | ds = DeleteStatement()
+ | us = UpdateStatement()
+ | "if" <LEFTPAREN> condition = Expression() <RIGHTPAREN>
+ "then" ifbranch = UpdateClause()
+ [LOOKAHEAD(1) "else" elsebranch = UpdateClause()]
+ {
+ return new UpdateClause(target, value, is, ds, us, condition, ifbranch, elsebranch);
+ }
Statement SetStatement() throws ParseException:
String pn = null;
- Statement stmt = null;
+ String pv = null;
- <IDENTIFIER> { pn = token.image; }
- { String pv = removeQuotesAndEscapes(token.image); }
- ";"
- {
- return new SetStatement(pn, pv);
- }
+ "set" pn = Identifier() pv = StringLiteral()
+ {
+ return new SetStatement(pn, pv);
+ }
Statement WriteStatement() throws ParseException:
- Identifier nodeName = null;
+ String nodeName = null;
String fileName = null;
Statement stmt = null;
Query query;
@@ -346,288 +682,27 @@
Pair<Identifier,Identifier> nameComponents = null;
- (( "output" "to"
- <IDENTIFIER> { nodeName = new Identifier(token.image); }
- ":" <STRING_LITERAL> { fileName = removeQuotesAndEscapes(token.image); }
- ( "using" <STRING_LITERAL> { writerClass = removeQuotesAndEscapes(token.image); } )?
- {
- stmt = new WriteStatement(nodeName, fileName, writerClass);
- } )
- |
- ( "into"
- {
- nameComponents = getDotSeparatedPair();
+ "write" ((
+ "output" "to" nodeName = Identifier() ":" fileName = StringLiteral()
+ ( "using" writerClass = StringLiteral() )?
+ {
+ stmt = new WriteStatement(new Identifier(nodeName), fileName, writerClass);
- <LEFTPAREN> query = Query() <RIGHTPAREN>
- {
+ ) | (
+ "into" <DATASET>
+ {
+ nameComponents = QualifiedName();
+ }
+ <LEFTPAREN> query = Query() <RIGHTPAREN>
+ {
stmt = new WriteFromQueryResultStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
- } ))
- ";"
+ }
+ ))
return stmt;
-CreateIndexStatement CreateIndexStatement() throws ParseException:
- CreateIndexStatement cis = new CreateIndexStatement();
- Pair<Identifier,Identifier> nameComponents = null;
- <IDENTIFIER> { cis.setIndexName(new Identifier(token.image)); }
- (
- "if not exists"
- {
- cis.setIfNotExists(true);
- }
- )?
- "on"
- {
- nameComponents = getDotSeparatedPair();
- cis.setDataverseName(nameComponents.first);
- cis.setDatasetName(nameComponents.second);
- }
- ( <IDENTIFIER> { cis.addFieldExpr(token.image); } )
- ("," <IDENTIFIER> { cis.addFieldExpr(token.image); })*
- ("type"
- ("btree" { cis.setIndexType(IndexType.BTREE); }
- | "rtree" { cis.setIndexType(IndexType.RTREE); }
- | "keyword" { cis.setIndexType(IndexType.WORD_INVIX); }
- | "fuzzy keyword" { cis.setIndexType(IndexType.FUZZY_WORD_INVIX); }
- | "ngram"
- {
- cis.setIndexType(IndexType.NGRAM_INVIX);
- cis.setGramLength(Integer.valueOf(token.image));
- }
- )
- | "fuzzy ngram"
- {
- cis.setIndexType(IndexType.FUZZY_NGRAM_INVIX);
- cis.setGramLength(Integer.valueOf(token.image));
- }
- )
- )
- ";"
- | ";"
- )
- {
- return cis;
- }
-DataverseDecl DataverseDeclaration() throws ParseException:
- Identifier dvName = null;
- "dataverse" <IDENTIFIER> { defaultDataverse = token.image;}
- ";"
- {
- return new DataverseDecl(new Identifier(defaultDataverse));
- }
-DropStatement DropStatement() throws ParseException :
- Identifier dataverseName = null;
- Identifier datasetName = null;
- boolean ifExists = false;
- Pair<Identifier,Identifier> nameComponents=null;
- {
- nameComponents = getDotSeparatedPair();
- dataverseName = nameComponents.first;
- datasetName = nameComponents.second;
- }
- (
- "if exists"
- {
- ifExists = true;
- }
- )? ";"
- {
- return new DropStatement(dataverseName, datasetName, ifExists);
- }
-IndexDropStatement IndexDropStatement() throws ParseException :
- Identifier dataverseName = null;
- Identifier datasetName = null;
- Identifier indexName = null;
- boolean ifExists = false;
- Triple<Identifier,Identifier,Identifier> nameComponents=null;
- {
- nameComponents = getDotSeparatedTriple();
- dataverseName = nameComponents.first;
- datasetName = nameComponents.second;
- indexName = nameComponents.third;
- }
- (
- "if exists"
- {
- ifExists = true;
- }
- )? ";"
- {
- return new IndexDropStatement(dataverseName, datasetName, indexName, ifExists);
- }
-NodeGroupDropStatement NodeGroupDropStatement() throws ParseException :
- Identifier groupName = null;
- boolean ifExists = false;
- {
- groupName = new Identifier(token.image);
- }
- (
- "if exists"
- {
- ifExists = true;
- }
- )? ";"
- {
- return new NodeGroupDropStatement(groupName, ifExists);
- }
-TypeDropStatement TypeDropStatement() throws ParseException :
- Identifier dataverseName = null;
- Identifier typeName = null;
- boolean ifExists = false;
- Pair<Identifier,Identifier> nameComponents;
- {
- nameComponents = getDotSeparatedPair();
- dataverseName = nameComponents.first == null ? new Identifier(defaultDataverse) : nameComponents.first;
- typeName = nameComponents.second;
- }
- (
- "if exists"
- {
- ifExists = true;
- }
- )? ";"
- {
- return new TypeDropStatement(dataverseName, typeName, ifExists);
- }
-DataverseDropStatement DataverseDropStatement() throws ParseException :
- Identifier dataverseName = null;
- boolean ifExists = false;
- {
- dataverseName = new Identifier(token.image);
- }
- (
- "if exists"
- {
- ifExists = true;
- }
- )? ";"
- {
- return new DataverseDropStatement(dataverseName, ifExists);
- }
-CreateDataverseStatement CreateDataverseStatement() throws ParseException :
- Identifier dvName = null;
- boolean ifNotExists = false;
- String format = null;
- {
- dvName = new Identifier(token.image);
- }
- (
- "if not exists"
- {
- ifNotExists = true;
- }
- )?
- (
- "with format" < STRING_LITERAL >
- {
- format = removeQuotesAndEscapes(token.image);
- }
- )?
- ";"
- {
- return new CreateDataverseStatement(dvName, format, ifNotExists);
- }
-FunctionDropStatement FunctionDropStatement() throws ParseException :
- String dataverse;
- String functionName;
- int arity=0;
- boolean ifExists = false;
- Pair<Identifier, Identifier> nameComponents=null;
- {
- nameComponents = getDotSeparatedPair();
- dataverse = nameComponents.first != null ? nameComponents.first.getValue() : defaultDataverse;
- functionName = nameComponents.second.getValue();
- }
- "@"
- {
- Token t= getToken(0);
- arity = new Integer(t.image);
- if( arity < 0 && arity != FunctionIdentifier.VARARGS){
- throw new ParseException(" invalid arity:" + arity);
- }
- }
- (
- "if exists"
- {
- ifExists = true;
- }
- )? ";"
- {
- return new FunctionDropStatement(new FunctionSignature(dataverse, functionName, arity), ifExists);
- }
LoadFromFileStatement LoadStatement() throws ParseException:
Identifier dataverseName = null;
@@ -638,483 +713,138 @@
Pair<Identifier,Identifier> nameComponents = null;
- {
- nameComponents = getDotSeparatedPair();
- dataverseName = nameComponents.first;
- datasetName = nameComponents.second;
- }
- "using"
+ "load" <DATASET> nameComponents = QualifiedName()
- adapterName = getAdapterName();
+ dataverseName = nameComponents.first;
+ datasetName = nameComponents.second;
+ "using" adapterName = AdapterName() properties = Configuration()
+ ("pre-sorted"
- properties = getConfiguration();
- }
- ("pre-sorted"
- { alreadySorted = true; }
- )?
- ";"
- {
- return new LoadFromFileStatement(dataverseName, datasetName, adapterName, properties, alreadySorted);
- }
-String getAdapterName() throws ParseException :
- String adapterName = null;
- (
- adapterName = (new Identifier(token.image)).getValue();;
- }
- |
- {
- adapterName = removeQuotesAndEscapes(token.image);
- }
- )
- {
- return adapterName;
- }
-DatasetDecl DatasetDeclaration(DatasetType datasetType) throws ParseException :
- DatasetDecl dd = null;
- Identifier datasetName = null;
- Identifier dataverseName = null;
- Identifier itemDataverseName = null;
- Identifier itemTypeName = null;
- String nameComponentFirst = null;
- String nameComponentSecond = null;
- boolean ifNotExists = false;
- IDatasetDetailsDecl datasetDetails = null;
- Pair<Identifier,Identifier> nameComponents = null;
- Map<String,String> hints = new HashMap<String,String>();
- {
- nameComponents = getDotSeparatedPair();
- dataverseName = nameComponents.first;
- datasetName = nameComponents.second;
- }
- (
- "if not exists"
- {
- ifNotExists = true;
+ alreadySorted = true;
- (
- {
- itemTypeName = new Identifier(token.image);
- }
- )
- {
- if(datasetType == DatasetType.INTERNAL) {
- datasetDetails = InternalDatasetDeclaration();
- }
- else if(datasetType == DatasetType.EXTERNAL) {
- datasetDetails = ExternalDatasetDeclaration();
- }
- else if(datasetType == DatasetType.FEED) {
- datasetDetails = FeedDatasetDeclaration();
- }
- }
- (
- "hints"
- {
- initProperties(hints);
- }
- )?
- ";"
- {
- dd = new DatasetDecl(dataverseName, datasetName, itemTypeName, hints, datasetType, datasetDetails,ifNotExists);
- return dd;
- }
+ {
+ return new LoadFromFileStatement(dataverseName, datasetName, adapterName, properties, alreadySorted);
+ }
-InternalDetailsDecl InternalDatasetDeclaration() throws ParseException :
- InternalDetailsDecl idd = null;
- List<String> primaryKeyFields = new ArrayList<String>();
- Identifier nodeGroupName=null;
- (
- {
- primaryKeyFields = getPrimaryKeyFields();
- }
- )
- (
- "on" < IDENTIFIER >
- {
- nodeGroupName = new Identifier(token.image);
- }
- )?
- {
- idd = new InternalDetailsDecl(nodeGroupName, primaryKeyFields);
- return idd;
- }
-ExternalDetailsDecl ExternalDatasetDeclaration() throws ParseException :
+String AdapterName() throws ParseException :
- ExternalDetailsDecl edd = null;
String adapterName = null;
- Map < String, String > properties;
- {
- edd = new ExternalDetailsDecl();
- }
- "using"
+ adapterName = Identifier()
- adapterName = getAdapterName();
+ return adapterName;
- {
- properties = getConfiguration();
- }
- {
- edd = new ExternalDetailsDecl();
- edd.setAdapter(adapterName);
- edd.setProperties(properties);
- }
- {
- return edd;
- }
-FeedDetailsDecl FeedDatasetDeclaration() throws ParseException :
- FeedDetailsDecl fdd = null;
- String adapterName = null;
- Map < String, String > properties;
- Pair<Identifier,Identifier> nameComponents;
- List<String> primaryKeyFields = new ArrayList<String>();
- Identifier nodeGroupName=null;
- FunctionSignature appliedFunction=null;
- String dataverse;
- String functionName;
- int arity;
- "using"
- {
- adapterName = getAdapterName();
- }
- {
- properties = getConfiguration();
- }
- ("apply" "function"
- {
- nameComponents = getDotSeparatedPair();
- dataverse = nameComponents.first != null ? nameComponents.first.getValue() : defaultDataverse;
- functionName = nameComponents.second.getValue();
- }
- {
- arity = Integer.parseInt(token.image);
- }
- )
- {
- appliedFunction = new FunctionSignature(dataverse, functionName, arity);
- }
- )?
- (
- {
- primaryKeyFields = getPrimaryKeyFields();
- }
- )
- (
- "on" < IDENTIFIER >
- {
- nodeGroupName = new Identifier(token.image);
- }
- )?
- {
- fdd = new FeedDetailsDecl(adapterName, properties, appliedFunction, nodeGroupName, primaryKeyFields);
- return fdd;
- }
-List<String> getPrimaryKeyFields() throws ParseException :
- List<String> primaryKeyFields = new ArrayList<String>();
- "primary" "key"
- {
- primaryKeyFields.add(token.image);
- }
- (
- "," < IDENTIFIER >
- {
- primaryKeyFields.add(token.image);
- }
- )*
- {
- return primaryKeyFields;
- }
-ControlFeedStatement ControlFeedDeclaration(ControlFeedStatement.OperationType operationType) throws ParseException :
+Statement FeedStatement() throws ParseException:
Pair<Identifier,Identifier> nameComponents = null;
+ Map<String,String> configuration = null;
+ Statement stmt = null;
+ (
+ "begin" "feed" nameComponents = QualifiedName()
+ {
+ stmt = new BeginFeedStatement(nameComponents.first, nameComponents.second, getVarCounter());
+ }
+ | "suspend" "feed" nameComponents = QualifiedName()
+ {
+ stmt = new ControlFeedStatement(ControlFeedStatement.OperationType.SUSPEND, nameComponents.first, nameComponents.second);
+ }
+ | "resume" "feed" nameComponents = QualifiedName()
+ {
+ stmt = new ControlFeedStatement(ControlFeedStatement.OperationType.RESUME, nameComponents.first, nameComponents.second);
+ }
+ | "end" "feed" nameComponents = QualifiedName()
+ {
+ stmt = new ControlFeedStatement(ControlFeedStatement.OperationType.END, nameComponents.first, nameComponents.second);
+ }
+ | "alter" "feed" nameComponents = QualifiedName() "set" configuration = Configuration()
+ {
+ stmt = new ControlFeedStatement(ControlFeedStatement.OperationType.ALTER, nameComponents.first, nameComponents.second, configuration);
+ }
+ )
- nameComponents = getDotSeparatedPair();
- return new ControlFeedStatement(operationType, nameComponents.first, nameComponents.second);
+ return stmt;
-ControlFeedStatement AlterFeedDeclaration() throws ParseException :
- Pair<Identifier,Identifier> nameComponents = null;
- Map < String, String > configuration = new HashMap < String, String > ();
- {
- nameComponents = getDotSeparatedPair();
- }
- "set"
- {
- configuration = getConfiguration();
- }
- {
- return new ControlFeedStatement(ControlFeedStatement.OperationType.ALTER, nameComponents.first, nameComponents.second, configuration);
- }
-Map<String,String> getConfiguration() throws ParseException :
+Map<String,String> Configuration() throws ParseException :
Map<String,String> configuration = new LinkedHashMap<String,String>();
- String key;
- String value;
+ Pair<String, String> keyValuePair = null;
- (
- (
- (
- {
- key = removeQuotesAndEscapes(token.image);
- }
- {
- value = removeQuotesAndEscapes(token.image);
- }
- )
- {
- configuration.put(key, value);
- }
- )
- (
- (
- {
- key = removeQuotesAndEscapes(token.image);
- }
- {
- value = removeQuotesAndEscapes(token.image);
- }
- )
- {
- configuration.put(key, value);
- }
- )*
- )?
- {
- return configuration;
- }
-void initProperties(Map<String,String> properties) throws ParseException :
- String key;
- String value;
- (
- (
- {
- key = (new Identifier(token.image)).getValue();
- }
- "="
- (
- {
- value = removeQuotesAndEscapes(token.image);
- }
- ) |
- {
- try{
- value = "" + Long.valueOf(token.image);
- } catch (NumberFormatException nfe){
- throw new ParseException("inapproriate value: " + token.image);
- }
- }
- )
- )
- {
- properties.put(key.toUpperCase(), value);
- }
- (
- ","
- (
- {
- key = (new Identifier(token.image)).getValue();
- }
- "="
- (
- {
- value = removeQuotesAndEscapes(token.image);
- }
- ) |
- {
- try{
- value = "" + Long.valueOf(token.image);
- } catch (NumberFormatException nfe){
- throw new ParseException("inapproriate value: " + token.image);
- }
- }
- )
- )
- )
- {
- properties.put(key.toUpperCase(), value);
- }
- )*
- )
- )?
-NodegroupDecl NodegroupDeclaration() throws ParseException :
- Identifier name = null;
- List < Identifier > ncNames = new ArrayList < Identifier > ();
- boolean ifNotExists = false;
- {
- name = new Identifier(token.image);
- }
- (
- "if not exists"
- {
- ifNotExists = true;
- }
- )?
- "on" < IDENTIFIER >
- {
- ncNames.add(new Identifier(token.image));
- }
- (
- "," < IDENTIFIER >
+ <LEFTPAREN> ( keyValuePair = KeyValuePair()
- ncNames.add(new Identifier(token.image));
+ configuration.put(keyValuePair.first, keyValuePair.second);
- )*
- ";"
- {
- return new NodegroupDecl(name, ncNames, ifNotExists);
- }
-TypeDecl TypeDeclaration(boolean dgen, String hint) throws ParseException:
- Identifier dataverse;
- Identifier ident;
- TypeExpression typeExpr;
- boolean ifNotExists = false;
- Pair<Identifier,Identifier> nameComponents=null;
- {
- nameComponents = getDotSeparatedPair();
- dataverse = nameComponents.first;
- ident = nameComponents.second;
- }
- (
- "if not exists"
+ ( "," keyValuePair = KeyValuePair()
- ifNotExists = true;
+ configuration.put(keyValuePair.first, keyValuePair.second);
- )?
- "as"
- ( typeExpr = TypeExpr() )
- (";")?
- {
- long numValues = -1;
- String filename = null;
- if (dgen) {
- String splits[] = hint.split(" +");
- if (splits.length != 3) {
- throw new ParseException("Expecting /*+ dgen <filename> <numberOfItems> */");
- }
- filename = splits[1];
- numValues = Long.parseLong(splits[2]);
+ )* )? <RIGHTPAREN>
+ {
+ return configuration;
+ }
+Pair<String, String> KeyValuePair() throws ParseException:
+ String key;
+ String value;
+ <LEFTPAREN> key = StringLiteral() "=" value = StringLiteral() <RIGHTPAREN>
+ {
+ return new Pair<String, String>(key, value);
- TypeDataGen tddg = new TypeDataGen(dgen, filename, numValues);
- return new TypeDecl(dataverse, ident, typeExpr, tddg, ifNotExists);
- }
+Map<String,String> Properties() throws ParseException:
+ Map<String,String> properties = new HashMap<String,String>();
+ Pair<String, String> property;
+ ( <LEFTPAREN> property = Property()
+ {
+ properties.put(property.first, property.second);
+ }
+ ( "," property = Property()
+ {
+ properties.put(property.first, property.second);
+ }
+ )* <RIGHTPAREN> )?
+ {
+ return properties;
+ }
+Pair<String, String> Property() throws ParseException:
+ String key;
+ String value;
+ key = Identifier() "=" ( value = StringLiteral() | <INTEGER_LITERAL>
+ {
+ try {
+ value = "" + Long.valueOf(token.image);
+ } catch (NumberFormatException nfe) {
+ throw new ParseException("inapproriate value: " + token.image);
+ }
+ }
+ )
+ {
+ return new Pair<String, String>(key.toUpperCase(), value);
+ }
TypeExpression TypeExpr() throws ParseException:
@@ -1181,11 +911,10 @@
boolean nullable = false;
+ fieldName = Identifier()
- Token t = getToken(0);
- fieldName = t.toString();
- String hint = getHint(t);
+ fieldName = token.image;
+ String hint = getHint(token);
IRecordFieldDataGen rfdg = null;
if (hint != null) {
String splits[] = hint.split(" +");
@@ -1237,14 +966,14 @@
TypeReferenceExpression TypeReference() throws ParseException:
- {
- Token t = getToken(0);
- Identifier id = new Identifier(t.toString());
- return new TypeReferenceExpression(id);
- }
+ String id = null;
+ id = Identifier()
+ {
+ return new TypeReferenceExpression(new Identifier(id));
+ }
OrderedListTypeDefinition OrderedListTypeDef() throws ParseException:
@@ -1274,68 +1003,91 @@
-Pair<Identifier,Identifier> getDotSeparatedPair() throws ParseException:
+Pair<Identifier,Identifier> FunctionOrTypeName() throws ParseException:
- Identifier first = null;
- Identifier second = null;
+ Pair<Identifier,Identifier> name = null;
+ name = QualifiedName()
+ {
+ if (name.first == null) {
+ name.first = new Identifier(defaultDataverse);
+ }
+ return name;
+ }
+String Identifier() throws ParseException:
+ String lit = null;
+ {
+ return token.image;
+ }
+ | lit = StringLiteral()
+ {
+ return lit;
+ }
+String StringLiteral() throws ParseException:
+ {
+ return removeQuotesAndEscapes(token.image);
+ }
+Pair<Identifier,Identifier> QualifiedName() throws ParseException:
+ String first = null;
+ String second = null;
+ first = Identifier() ("." second = Identifier())?
- first = new Identifier(token.image);
- }
- {
- second = new Identifier(token.image);
- }
- )?
- {
- if(second == null){
- second = first;
- first = null;
- }
- return new Pair<Identifier,Identifier>(first,second);
+ Identifier id1 = null;
+ Identifier id2 = null;
+ if (second == null) {
+ id2 = new Identifier(first);
+ } else
+ {
+ id1 = new Identifier(first);
+ id2 = new Identifier(second);
+ }
+ return new Pair<Identifier,Identifier>(id1, id2);
-Triple<Identifier,Identifier,Identifier> getDotSeparatedTriple() throws ParseException:
+Triple<Identifier,Identifier,Identifier> DoubleQualifiedName() throws ParseException:
- Identifier first = null;
- Identifier second = null;
- Identifier third = null;
+ String first = null;
+ String second = null;
+ String third = null;
+ first = Identifier() "." second = Identifier() ("." third = Identifier())?
- first = new Identifier(token.image);
- }
- {
- second = new Identifier(token.image);
- }
- (
- {
- third = new Identifier(token.image);
- }
- )?
- {
- if(third == null){
- third = second;
- second = first;
- first = null;
- }
- return new Triple<Identifier,Identifier,Identifier>(first,second,third);
+ Identifier id1 = null;
+ Identifier id2 = null;
+ Identifier id3 = null;
+ if (third == null) {
+ id2 = new Identifier(first);
+ id3 = new Identifier(second);
+ } else {
+ id1 = new Identifier(first);
+ id2 = new Identifier(second);
+ id3 = new Identifier(third);
+ }
+ return new Triple<Identifier,Identifier,Identifier>(id1, id2, id3);
FunctionDecl FunctionDeclaration() throws ParseException:
FunctionDecl funcDecl;
@@ -1348,29 +1100,23 @@
- {
- Token t = getToken(0);
- functionName = t.toString();
- }
+ "declare" "function" functionName = Identifier() <LEFTPAREN> (<VARIABLE>
var = new VarIdentifier();
- var.setValue(getToken(0).toString());
+ var.setValue(token.image);
- ("," <VARIABLE>
+ ("," <VARIABLE>
var = new VarIdentifier();
- var.setValue(getToken(0).toString());
+ var.setValue(token.image);
- })*)? <RIGHTPAREN> "{" funcBody = Expression() "}"
- (";")?
+ }
+ )*)? <RIGHTPAREN> "{" funcBody = Expression() "}"
signature = new FunctionSignature(defaultDataverse, functionName, arity);
getCurrentScope().addFunctionDescriptor(signature, false);
@@ -1379,78 +1125,14 @@
-CreateFunctionStatement FunctionCreation() throws ParseException:
- CreateFunctionStatement cfs = null;
- FunctionSignature signature;
- String dataverse;
- String functionName;
- boolean ifNotExists = false;
- List<VarIdentifier> paramList = new ArrayList<VarIdentifier>();
- String functionBody;
- VarIdentifier var = null;
- createNewScope();
- Expression functionBodyExpr;
- Token beginPos;
- Token endPos;
- Pair<Identifier,Identifier> nameComponents=null;
- {
- nameComponents = getDotSeparatedPair();
- dataverse = nameComponents.first != null ? nameComponents.first.getValue() : defaultDataverse;
- functionName= nameComponents.second.getValue();
- }
- (
- "if not exists"
- {
- ifNotExists = true;
- }
- )?
- {
- var = new VarIdentifier();
- var.setValue(getToken(0).toString());
- paramList.add(var);
- getCurrentScope().addNewVarSymbolToScope(var);
- }
- ("," <VARIABLE>
- {
- var = new VarIdentifier();
- var.setValue(getToken(0).toString());
- paramList.add(var);
- getCurrentScope().addNewVarSymbolToScope(var);
- })*)? <RIGHTPAREN> "{"
- {
- beginPos = getToken(0);
- }
- functionBodyExpr = Expression()
- "}"
- {
- endPos = getToken(0);
- functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn);
- }
- (";")?
- {
- signature = new FunctionSignature(dataverse, functionName, paramList.size());
- getCurrentScope().addFunctionDescriptor(signature, false);
- cfs = new CreateFunctionStatement(signature, paramList, functionBody, ifNotExists);
- return cfs;
- }
-Query Query()throws ParseException:
+Query Query() throws ParseException:
Query query = new Query();
Expression expr;
- expr = Expression()
- (";")?
+ expr = Expression()
@@ -1500,8 +1182,7 @@
- Token t = getToken(0);
- op.addOperator(t.toString());
+ op.addOperator(token.image);
operand = AndExpr()
@@ -1532,8 +1213,7 @@
- Token t = getToken(0);
- op.addOperator(t.toString());
+ op.addOperator(token.image);
operand = RelExpr()
@@ -1580,9 +1260,8 @@
op.addOperand(operand, broadcast);
broadcast = false;
- }
- Token t = getToken(0);
- op.addOperator(t.toString());
+ }
+ op.addOperator(token.image);
operand = AddExpr()
@@ -1620,9 +1299,8 @@
op = new OperatorExpr();
- }
- Token t = getToken(0);
- ((OperatorExpr)op).addOperator(t.toString());
+ }
+ ((OperatorExpr)op).addOperator(token.image);
operand = MultExpr()
@@ -1650,9 +1328,8 @@
op = new OperatorExpr();
- }
- Token t = getToken(0);
- op.addOperator(t.toString());
+ }
+ op.addOperator(token.image);
operand = UnionExpr()
@@ -1694,11 +1371,10 @@
(( "+"|"-")
- uexpr = new UnaryExpr();
- Token t = getToken(0);
- if("+".equals(t.toString()))
+ uexpr = new UnaryExpr();
+ if("+".equals(token.image))
- else if("-".equals(t.toString()))
+ else if("-".equals(token.image))
throw new ParseException();
@@ -1717,73 +1393,39 @@
-Expression ValueExpr() throws ParseException:
- Expression expr;
- expr = FieldOrIndexAccessor()
- {
- return expr;
- }
-Expression FieldOrIndexAccessor()throws ParseException:
+Expression ValueExpr()throws ParseException:
Expression expr = null;
Identifier ident = null;
AbstractAccessor fa = null;
int index;
- ( expr = PrimaryExpr()
- )
- (
- (
- ident = Field()
+ expr = PrimaryExpr() ( ident = Field()
- if(fa == null)
- fa = new FieldAccessor(expr, ident);
- else
- fa = new FieldAccessor(fa, ident);
- }
- )
- | (
- index = Index()
- {
- if(fa == null)
- fa = new IndexAccessor(expr, index);
- else
- fa = new IndexAccessor(fa, index);
- }
- )
- )*
- {
- return fa==null?expr:fa;
- }
+ fa = (fa == null ? new FieldAccessor(expr, ident)
+ : new FieldAccessor(fa, ident));
+ }
+ | index = Index()
+ {
+ fa = (fa == null ? new IndexAccessor(expr, index)
+ : new IndexAccessor(fa, index));
+ }
+ )*
+ {
+ return fa == null ? expr : fa;
+ }
Identifier Field() throws ParseException:
- Identifier ident = null;
+ String ident = null;
- "." < IDENTIFIER >
- {
- ident = new Identifier();
- ident.setValue(getToken(0).toString());
- return ident;
- }
+ "." ident = Identifier()
+ {
+ return new Identifier(ident);
+ }
int Index() throws ParseException:
@@ -1828,75 +1470,63 @@
Expression expr = null;
- //Literal | VariableRef | ListConstructor | RecordConstructor | FunctionCallExpr | DatasetAccessExpression | ParenthesizedExpression
- (
- expr =Literal()
- | expr = FunctionCallExpr()
- | expr = DatasetAccessExpression()
- | expr =VariableRef()
+ expr = FunctionCallExpr()
+ | expr = Literal()
+ | expr = DatasetAccessExpression()
+ | expr = VariableRef()
if(((VariableExpr)expr).getIsNewVar() == true)
- throw new ParseException("can't find variable " + ((VariableExpr)expr).getVar());
+ throw new ParseException("can't find variable " + ((VariableExpr)expr).getVar());
- | expr = ListConstructor()
- | expr = RecordConstructor()
- | expr = ParenthesizedExpression()
- )
- {
- return expr;
- }
+ | expr = ListConstructor()
+ | expr = RecordConstructor()
+ | expr = ParenthesizedExpression()
+ )
+ {
+ return expr;
+ }
Expression Literal() throws ParseException:
LiteralExpr lit = new LiteralExpr();
- Token t;
+ String str = null;
- {
- t= getToken(0);
- lit.setValue( new StringLiteral(removeQuotesAndEscapes(t.image)));
- }
+ ( str = StringLiteral()
- t= getToken(0);
- try {
- lit.setValue(new IntegerLiteral(new Integer(t.image)));
- } catch(NumberFormatException ex) {
- lit.setValue(new LongIntegerLiteral(new Long(t.image)));
- }
- }
+ lit.setValue(new StringLiteral(str));
+ }
- t= getToken(0);
- lit.setValue(new FloatLiteral(new Float(t.image)));
- }
+ try {
+ lit.setValue(new IntegerLiteral(new Integer(token.image)));
+ } catch(NumberFormatException ex) {
+ lit.setValue(new LongIntegerLiteral(new Long(token.image)));
+ }
+ }
- t= getToken(0);
- lit.setValue(new DoubleLiteral(new Double(t.image)));
- }
- | <NULL>
- {
- t= getToken(0);
- lit.setValue(NullLiteral.INSTANCE);
- }
- | <TRUE>
- {
- t= getToken(0);
- lit.setValue(TrueLiteral.INSTANCE);
- }
- | <FALSE>
- {
- t= getToken(0);
- lit.setValue(FalseLiteral.INSTANCE);
- }
+ lit.setValue(new FloatLiteral(new Float(token.image)));
+ }
+ {
+ lit.setValue(new DoubleLiteral(new Double(token.image)));
+ }
+ | <NULL>
+ {
+ lit.setValue(NullLiteral.INSTANCE);
+ }
+ | <TRUE>
+ {
+ lit.setValue(TrueLiteral.INSTANCE);
+ }
+ | <FALSE>
+ {
+ lit.setValue(FalseLiteral.INSTANCE);
+ }
+ )
return lit;
@@ -1907,13 +1537,11 @@
VariableExpr varExp = new VariableExpr();
VarIdentifier var = new VarIdentifier();
- Token t;
- t = getToken(0);//get current token
- String varName = t.toString();
+ String varName = token.image;
Identifier ident = lookupSymbol(varName);
if (isInForbiddenScopes(varName)) {
throw new ParseException("Inside limit clauses, it is disallowed to reference a variable having the same name as any variable bound in the same scope as the limit clause.");
@@ -1924,7 +1552,7 @@
} else {
- var.setValue(t.toString());
+ var.setValue(varName);
return varExp;
@@ -1934,18 +1562,16 @@
VariableExpr varExp = new VariableExpr();
VarIdentifier var = new VarIdentifier();
- Token t;
- t = getToken(0);//get current token
- Identifier ident = lookupSymbol(t.toString());
+ Identifier ident = lookupSymbol(token.image);
if(ident != null) { // exist such ident
- var.setValue(t.toString());
+ var.setValue(token.image);
return varExp;
@@ -2050,36 +1676,42 @@
List<Expression> argList = new ArrayList<Expression>();
Expression tmp;
int arity = 0;
+ Pair<Identifier,Identifier> funcId = null;
String funcName;
String dataverse;
- String hint=null;
- String id1=null;
- String id2=null;
+ String hint = null;
+ String id1 = null;
+ String id2 = null;
- <IDENTIFIER> { dataverse = defaultDataverse; funcName = token.image;} ("." <IDENTIFIER> { dataverse = funcName; funcName = token.image;})?
+ funcId = FunctionOrTypeName()
- hint = getHint(token);
+ dataverse = funcId.first.getValue();
+ funcName = funcId.second.getValue();
+ hint = getHint(token);
- <LEFTPAREN> (tmp = Expression()
- {
- argList.add(tmp);
- arity ++;
- } ("," tmp = Expression() { argList.add(tmp); arity++; })*)? <RIGHTPAREN>
- {
- FunctionSignature signature = lookupFunctionSignature(dataverse, funcName.toString(), arity);
- if(signature == null)
- {
- signature = new FunctionSignature(dataverse, funcName.toString(), arity);
- }
- callExpr = new CallExpr(signature,argList);
- if (hint != null && hint.startsWith(INDEXED_NESTED_LOOP_JOIN_HINT)) {
- callExpr.addHint(IndexedNLJoinExpressionAnnotation.INSTANCE);
- }
- return callExpr;
- }
+ <LEFTPAREN> (tmp = Expression()
+ {
+ argList.add(tmp);
+ arity ++;
+ }
+ ("," tmp = Expression()
+ {
+ argList.add(tmp);
+ arity++;
+ }
+ {
+ FunctionSignature signature = lookupFunctionSignature(dataverse, funcName, arity);
+ if (signature == null) {
+ signature = new FunctionSignature(dataverse, funcName, arity);
+ }
+ callExpr = new CallExpr(signature,argList);
+ if (hint != null && hint.startsWith(INDEXED_NESTED_LOOP_JOIN_HINT)) {
+ callExpr.addHint(IndexedNLJoinExpressionAnnotation.INSTANCE);
+ }
+ return callExpr;
+ }
Expression DatasetAccessExpression() throws ParseException:
@@ -2088,27 +1720,45 @@
List<Expression> argList = new ArrayList<Expression>();
String funcName;
String dataverse;
+ String arg1 = null;
+ String arg2 = null;
LiteralExpr ds;
- LiteralExpr dvds;
Expression nameArg;
int arity = 0;
- <DATASET> {dataverse = MetadataConstants.METADATA_DATAVERSE_NAME; funcName = getToken(0).toString();}
- (
- (<IDENTIFIER> {ds = new LiteralExpr(); ds.setValue( new StringLiteral(token.image) ); argList.add(ds); arity ++;} ("." <IDENTIFIER> { dvds = new LiteralExpr(); dvds.setValue(new StringLiteral(ds.getValue()+"."+token.image)); argList.remove(0); argList.add(dvds);})? ) |
- (<LEFTPAREN> nameArg = Expression() {argList.add(nameArg); arity ++;} ("," nameArg = Expression() { argList.add(nameArg); arity++; })* <RIGHTPAREN>)
- )
- {
- FunctionSignature signature = lookupFunctionSignature(dataverse, funcName.toString(), arity);
- if(signature == null)
- {
- signature = new FunctionSignature(dataverse, funcName.toString(), arity);
- }
- callExpr = new CallExpr(signature,argList);
- return callExpr;
+ {
+ dataverse = MetadataConstants.METADATA_DATAVERSE_NAME;
+ funcName = token.image;
+ }
+ ( ( arg1 = Identifier() ( "." arg2 = Identifier() )? )
+ {
+ String name = arg2 == null ? arg1 : arg1 + "." + arg2;
+ ds = new LiteralExpr();
+ ds.setValue( new StringLiteral(name) );
+ argList.add(ds);
+ arity ++;
+ }
+ | ( <LEFTPAREN> nameArg = Expression()
+ {
+ argList.add(nameArg);
+ arity ++;
+ }
+ ( "," nameArg = Expression()
+ {
+ argList.add(nameArg);
+ arity++;
+ }
+ )* <RIGHTPAREN> ) )
+ {
+ FunctionSignature signature = lookupFunctionSignature(dataverse, funcName, arity);
+ if (signature == null) {
+ signature = new FunctionSignature(dataverse, funcName, arity);
+ callExpr = new CallExpr(signature,argList);
+ return callExpr;
+ }
Expression ParenthesizedExpression() throws ParseException:
diff --git a/asterix-doc/pom.xml b/asterix-doc/pom.xml
new file mode 100644
index 0000000..f44c06d
--- /dev/null
+++ b/asterix-doc/pom.xml
@@ -0,0 +1,18 @@
+<project xmlns="" xmlns:xsi="" xsi:schemaLocation="">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>asterix</artifactId>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <version>0.0.6-SNAPSHOT</version>
+ </parent>
+ <artifactId>asterix-doc</artifactId>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.3</version>
+ </plugin>
+ </plugins>
+ </build>
diff --git a/asterix-doc/src/site/markdown/ b/asterix-doc/src/site/markdown/
new file mode 100644
index 0000000..c03d7e5
--- /dev/null
+++ b/asterix-doc/src/site/markdown/
@@ -0,0 +1,191 @@
+# The Asterix Query Language, Version 1.0
+## 1. Introduction
+This document provides an overview of the Asterix Query language.
+## 2. Expressions
+ Expression ::= ( OperatorExpr | IfThenElse | FLWOGR | QuantifiedExpression )
+### Primary Expressions
+ PrimaryExpr ::= Literal | VariableRef | ParenthesizedExpression | FunctionCallExpr
+ | DatasetAccessExpression | ListConstructor | RecordConstructor
+#### Literals
+ Literal ::= StringLiteral | <INTEGER_LITERAL> | <FLOAT_LITERAL> | <DOUBLE_LITERAL> | <NULL> | <TRUE> | <FALSE>
+ StringLiteral ::= <STRING_LITERAL>
+#### Variable References
+ VariableRef ::= <VARIABLE>
+#### Parenthesized Expressions
+ ParenthesizedExpression ::= <LEFTPAREN> Expression <RIGHTPAREN>
+#### Function Calls
+ FunctionCallExpr ::= FunctionOrTypeName <LEFTPAREN> ( Expression ( "," Expression )* )? <RIGHTPAREN>
+#### Dataset Access
+ DatasetAccessExpression ::= <DATASET> ( ( Identifier ( "." Identifier )? )
+ | ( <LEFTPAREN> Expression ( "," Expression )* <RIGHTPAREN> ) )
+ Identifier ::= <IDENTIFIER> | StringLiteral
+#### Constructors
+ ListConstructor ::= ( OrderedListConstructor | UnorderedListConstructor )
+ OrderedListConstructor ::= "[" ( Expression ( "," Expression )* )? "]"
+ UnorderedListConstructor ::= "{{" ( Expression ( "," Expression )* )? "}}"
+ RecordConstructor ::= "{" ( FieldBinding ( "," FieldBinding )* )? "}"
+ FieldBinding ::= Expression ":" Expression
+### Path Expressions
+ ValueExpr ::= PrimaryExpr ( Field | Index )*
+ Field ::= "." Identifier
+ Index ::= "[" ( Expression | "?" ) "]"
+### Logical Expressions
+ OperatorExpr ::= AndExpr ( "or" AndExpr )*
+ AndExpr ::= RelExpr ( "and" RelExpr )*
+### Comparison Expressions
+ RelExpr ::= AddExpr ( ( "<" | ">" | "<=" | ">=" | "=" | "!=" | "~=" ) AddExpr )?
+### Arithmetic Expressions
+ AddExpr ::= MultExpr ( ( "+" | "-" ) MultExpr )*
+ MultExpr ::= UnaryExpr ( ( "*" | "/" | "%" | <CARET> | "idiv" ) UnaryExpr )*
+ UnaryExpr ::= ( ( "+" | "-" ) )? ValueExpr
+### FLWOGR Expression
+ FLWOGR ::= ( ForClause | LetClause ) ( Clause )* "return" Expression
+ Clause ::= ForClause | LetClause | WhereClause | OrderbyClause
+ | GroupClause | LimitClause | DistinctClause
+ ForClause ::= "for" Variable ( "at" Variable )? "in" ( Expression )
+ LetClause ::= "let" Variable ":=" Expression
+ WhereClause ::= "where" Expression
+ OrderbyClause ::= "order" "by" Expression ( ( "asc" ) | ( "desc" ) )?
+ ( "," Expression ( ( "asc" ) | ( "desc" ) )? )*
+ GroupClause ::= "group" "by" ( Variable ":=" )? Expression ( "," ( Variable ":=" )? Expression )*
+ "with" VariableRef ( "," VariableRef )*
+ LimitClause ::= "limit" Expression ( "offset" Expression )?
+ DistinctClause ::= "distinct" "by" Expression ( "," Expression )*
+ Variable ::= <VARIABLE>
+### Conditional Expression
+ IfThenElse ::= "if" <LEFTPAREN> Expression <RIGHTPAREN> "then" Expression "else" Expression
+### Quantified Expressions
+ QuantifiedExpression ::= ( ( "some" ) | ( "every" ) ) Variable "in" Expression
+ ( "," Variable "in" Expression )* "satisfies" Expression
+## 3. Statements
+ Statement ::= ( SingleStatement ( ";" )? )* <EOF>
+ SingleStatement ::= DataverseDeclaration
+ | FunctionDeclaration
+ | CreateStatement
+ | DropStatement
+ | LoadStatement
+ | SetStatement
+ | InsertStatement
+ | DeleteStatement
+ | FeedStatement
+ | Query
+### Declarations
+ DataverseDeclaration ::= "use" "dataverse" Identifier
+ SetStatement ::= "set" Identifier StringLiteral
+ FunctionDeclaration ::= "declare" "function" Identifier <LEFTPAREN> ( <VARIABLE> ( "," <VARIABLE> )* )? <RIGHTPAREN> "{" Expression "}"
+### Lifecycle Management Statements
+ CreateStatement ::= "create" ( TypeSpecification | DatasetSpecification | IndexSpecification | DataverseSpecification | FunctionSpecification )
+ DropStatement ::= "drop" ( <DATASET> QualifiedName IfExists
+ | "index" DoubleQualifiedName IfExists
+ | "type" FunctionOrTypeName IfExists
+ | "dataverse" Identifier IfExists
+ | "function" FunctionSignature IfExists )
+ IfExists ::= ( "if" "exists" )?
+ QualifiedName ::= Identifier ( "." Identifier )?
+ DoubleQualifiedName ::= Identifier "." Identifier ( "." Identifier )?
+#### Types
+ TypeSpecification ::= "type" FunctionOrTypeName IfNotExists "as" TypeExpr
+ FunctionOrTypeName ::= QualifiedName
+ IfNotExists ::= ( "if not exists" )?
+ TypeExpr ::= RecordTypeDef | TypeReference | OrderedListTypeDef | UnorderedListTypeDef
+ RecordTypeDef ::= ( "closed" | "open" )? "{" ( RecordField ( "," RecordField )* )? "}"
+ RecordField ::= Identifier ":" ( TypeExpr ) ( "?" )?
+ TypeReference ::= Identifier
+ OrderedListTypeDef ::= "[" ( TypeExpr ) "]"
+ UnorderedListTypeDef ::= "{{" ( TypeExpr ) "}}"
+#### Datasets
+ DatasetSpecification ::= "external" <DATASET> QualifiedName <LEFTPAREN> Identifier <RIGHTPAREN> IfNotExists
+ "using" AdapterName Configuration ( "hints" Properties )?
+ | "feed" <DATASET> QualifiedName <LEFTPAREN> Identifier <RIGHTPAREN> IfNotExists
+ "using" AdapterName Configuration ( ApplyFunction )? PrimaryKey ( "on" Identifier )? ( "hints" Properties )?
+ | <DATASET> QualifiedName <LEFTPAREN> Identifier <RIGHTPAREN> IfNotExists
+ PrimaryKey ( "on" Identifier )? ( "hints" Properties )?
+ AdapterName ::= Identifier
+ Configuration ::= <LEFTPAREN> ( KeyValuePair ( "," KeyValuePair )* )? <RIGHTPAREN>
+ KeyValuePair ::= <LEFTPAREN> StringLiteral "=" StringLiteral <RIGHTPAREN>
+ Properties ::= ( <LEFTPAREN> Property ( "," Property )* <RIGHTPAREN> )?
+ Property ::= Identifier "=" ( StringLiteral | <INTEGER_LITERAL> )
+ ApplyFunction ::= "apply" "function" FunctionSignature
+ FunctionSignature ::= FunctionOrTypeName "@" <INTEGER_LITERAL>
+ PrimaryKey ::= "primary" "key" Identifier ( "," Identifier )*
+#### Indices
+ IndexSpecification ::= "index" Identifier IfNotExists "on" QualifiedName <LEFTPAREN> ( Identifier ) ( "," Identifier )* <RIGHTPAREN> ( "type" IndexType )?
+ IndexType ::= "btree" | "rtree" | "keyword" | "fuzzy keyword" | "ngram" <LEFTPAREN> <INTEGER_LITERAL> <RIGHTPAREN> | "fuzzy ngram" <LEFTPAREN> <INTEGER_LITERAL> <RIGHTPAREN>
+#### Dataverses
+ DataverseSpecification ::= "dataverse" Identifier IfNotExists ( "with format" StringLiteral )?
+#### Functions
+ FunctionSpecification ::= "function" FunctionOrTypeName IfNotExists <LEFTPAREN> ( <VARIABLE> ( "," <VARIABLE> )* )? <RIGHTPAREN> "{" Expression "}"
+### Import/Export Statements
+ LoadStatement ::= "load" <DATASET> QualifiedName "using" AdapterName Configuration ( "pre-sorted" )?
+### Modification Statements
+ InsertStatement ::= "insert" "into" <DATASET> QualifiedName Query
+ DeleteStatement ::= "delete" Variable "from" <DATASET> QualifiedName ( "where" Expression )?
+### Feed Management Statements
+ FeedStatement ::= "begin" "feed" QualifiedName
+ | "suspend" "feed" QualifiedName
+ | "resume" "feed" QualifiedName
+ | "end" "feed" QualifiedName
+ | "alter" "feed" QualifiedName "set" Configuration
+### Queries
+ Query ::= Expression
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/comparators/ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/comparators/
index df21f28..865ab94 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/comparators/
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/comparators/
@@ -2,7 +2,6 @@
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -48,6 +47,7 @@
final IBinaryComparator ascPolygonComp = APolygonPartialBinaryComparatorFactory.INSTANCE
+ final IBinaryComparator rawComp = RawBinaryComparatorFactory.INSTANCE.createBinaryComparator();
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
@@ -112,7 +112,7 @@
return, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
default: {
- throw new NotImplementedException("Comparison for type " + tag + " is not implemented");
+ return, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/comparators/ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/comparators/
new file mode 100644
index 0000000..2c4f951
--- /dev/null
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/comparators/
@@ -0,0 +1,47 @@
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+public class RawBinaryComparatorFactory implements IBinaryComparatorFactory {
+ private static final long serialVersionUID = 1L;
+ public static IBinaryComparatorFactory INSTANCE = new RawBinaryComparatorFactory();
+ private RawBinaryComparatorFactory() {
+ }
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ int commonLength = Math.min(l1, l2);
+ for (int i = 0; i < commonLength; i++) {
+ if (b1[s1 + i] != b2[s2 + i]) {
+ return b1[s1 + i] - b2[s2 + i];
+ }
+ }
+ int difference = l1 - l2;
+ return difference == 0 ? 0 : (difference > 0 ? 1 : -1);
+ }
+ };
+ }
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/hash/ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/hash/
new file mode 100644
index 0000000..4aeb00e
--- /dev/null
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/dataflow/data/nontagged/hash/
@@ -0,0 +1,44 @@
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+public class RawBinaryHashFunctionFactory implements IBinaryHashFunctionFactory {
+ private static final long serialVersionUID = 1L;
+ public static IBinaryHashFunctionFactory INSTANCE = new RawBinaryHashFunctionFactory();
+ private RawBinaryHashFunctionFactory() {
+ }
+ @Override
+ public IBinaryHashFunction createBinaryHashFunction() {
+ return new IBinaryHashFunction() {
+ @Override
+ public int hash(byte[] bytes, int offset, int length) {
+ int value = 1;
+ int end = offset + length;
+ for (int i = offset; i < end; i++)
+ value = value * 31 + (int) bytes[i];
+ return value;
+ }
+ };
+ }
\ No newline at end of file
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/ b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/
index 6f0cc1c..fb9d195 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/formats/nontagged/
@@ -13,9 +13,9 @@
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -152,7 +152,7 @@
return addOffset(AIntervalPartialBinaryComparatorFactory.INSTANCE, ascending);
default: {
- throw new NotImplementedException("No binary comparator factory implemented for type " + type + " .");
+ return addOffset(RawBinaryComparatorFactory.INSTANCE, ascending);
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/
index db9d932..6d5880e 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/
@@ -70,6 +70,9 @@
if (t1.getTypeTag() == ATypeTag.RECORD) {
return (ARecordType) t1;
+ if (t1.getTypeTag() == ATypeTag.ANY) {
+ return DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE;
+ }
default: {
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/
index 3de05f8..232eace 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/
@@ -20,21 +20,23 @@
public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+ // The aggregate function will get a SingleFieldFrameTupleReference that points to the result of the ScanCollection.
+ // The list-item will always reside in the first field (column) of the SingleFieldFrameTupleReference.
+ ICopyEvaluatorFactory[] aggFuncArgs = new ICopyEvaluatorFactory[1];
+ aggFuncArgs[0] = new ColumnAccessEvalFactory(0);
+ // Create aggregate function from this scalar version.
+ FunctionIdentifier fid = AsterixBuiltinFunctions.getAggregateFunction(getIdentifier());
+ IFunctionManager mgr = FunctionManagerHolder.getFunctionManager();
+ IFunctionDescriptor fd = mgr.lookupFunction(fid);
+ AbstractAggregateFunctionDynamicDescriptor aggFuncDesc = (AbstractAggregateFunctionDynamicDescriptor) fd;
+ final ICopyAggregateFunctionFactory aggFuncFactory = aggFuncDesc.createAggregateFunctionFactory(aggFuncArgs);
return new ICopyEvaluatorFactory() {
private static final long serialVersionUID = 1L;
public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {
- // The aggregate function will get a SingleFieldFrameTupleReference that points to the result of the ScanCollection.
- // The list-item will always reside in the first field (column) of the SingleFieldFrameTupleReference.
- ICopyEvaluatorFactory[] aggFuncArgs = new ICopyEvaluatorFactory[1];
- aggFuncArgs[0] = new ColumnAccessEvalFactory(0);
- // Create aggregate function from this scalar version.
- FunctionIdentifier fid = AsterixBuiltinFunctions.getAggregateFunction(getIdentifier());
- IFunctionManager mgr = FunctionManagerHolder.getFunctionManager();
- IFunctionDescriptor fd = mgr.lookupFunction(fid);
- AbstractAggregateFunctionDynamicDescriptor aggFuncDesc = (AbstractAggregateFunctionDynamicDescriptor) fd;
- ICopyAggregateFunctionFactory aggFuncFactory = aggFuncDesc.createAggregateFunctionFactory(aggFuncArgs);
// Use ScanCollection to iterate over list items.
ScanCollectionUnnestingFunctionFactory scanCollectionFactory = new ScanCollectionUnnestingFunctionFactory(
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
index 4319b8b..e88d74e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
@@ -19,6 +19,9 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
* Represent a buffer that is backed by a physical file. Provider custom APIs
@@ -27,22 +30,32 @@
public class FileBasedBuffer extends Buffer implements IFileBasedBuffer {
private String filePath;
- private long nextWritePosition;
private FileChannel fileChannel;
private RandomAccessFile raf;
- private int size;
+ private int bufferSize;
- public FileBasedBuffer(String filePath, long offset, int size) throws IOException {
+ private int bufferLastFlushOffset;
+ private int bufferNextWriteOffset;
+ private final int diskSectorSize;
+ private final ReadWriteLock latch;
+ private final AtomicInteger referenceCount;
+ public FileBasedBuffer(String filePath, long offset, int bufferSize, int diskSectorSize) throws IOException {
this.filePath = filePath;
- this.nextWritePosition = offset;
- buffer = ByteBuffer.allocate(size);
+ buffer = ByteBuffer.allocate(bufferSize);
raf = new RandomAccessFile(new File(filePath), "rw");
fileChannel = raf.getChannel();
+ fileChannel.position(offset);;
- this.size = size;
- buffer.limit(size);
+ this.bufferSize = bufferSize;
+ buffer.limit(bufferSize);
+ bufferLastFlushOffset = 0;
+ bufferNextWriteOffset = 0;
+ this.diskSectorSize = diskSectorSize;
+ latch = new ReentrantReadWriteLock(true);
+ referenceCount = new AtomicInteger(0);
public String getFilePath() {
@@ -53,17 +66,9 @@
this.filePath = filePath;
- public long getOffset() {
- return nextWritePosition;
- }
- public void setOffset(long offset) {
- this.nextWritePosition = offset;
- }
public int getSize() {
- return buffer.limit();
+ return bufferSize;
public void clear() {
@@ -72,11 +77,18 @@
public void flush() throws IOException {
- buffer.position(0);
- buffer.limit(size);
+ //flush
+ int pos = bufferLastFlushOffset;
+ int limit = (((bufferNextWriteOffset - 1) / diskSectorSize) + 1) * diskSectorSize;
+ buffer.position(pos);
+ buffer.limit(limit);
- erase();
+ //update variables
+ bufferLastFlushOffset = limit;
+ bufferNextWriteOffset = limit;
+ buffer.limit(bufferSize);
@@ -124,45 +136,110 @@
* starting at offset.
- public void reset(String filePath, long nextWritePosition, int size) throws IOException {
+ public void reset(String filePath, long diskNextWriteOffset, int bufferSize) throws IOException {
if (!filePath.equals(this.filePath)) {
raf = new RandomAccessFile(filePath, "rw");
this.filePath = filePath;
- this.nextWritePosition = nextWritePosition;
fileChannel = raf.getChannel();
+ fileChannel.position(diskNextWriteOffset);
- buffer.limit(size);
- this.size = size;
+ buffer.limit(bufferSize);
+ this.bufferSize = bufferSize;
+ bufferLastFlushOffset = 0;
+ bufferNextWriteOffset = 0;
public void close() throws IOException {
- public void open(String filePath, long offset, int size) throws IOException {
+ public void open(String filePath, long offset, int bufferSize) throws IOException {
raf = new RandomAccessFile(filePath, "rw");
- this.nextWritePosition = offset;
fileChannel = raf.getChannel();
- buffer.limit(size);
- this.size = size;
+ buffer.limit(bufferSize);
+ this.bufferSize = bufferSize;
+ bufferLastFlushOffset = 0;
+ bufferNextWriteOffset = 0;
- public long getNextWritePosition() {
- return nextWritePosition;
+ @Override
+ public long getDiskNextWriteOffset() throws IOException {
+ return fileChannel.position();
- public void setNextWritePosition(long nextWritePosition) {
- this.nextWritePosition = nextWritePosition;
+ @Override
+ public void setDiskNextWriteOffset(long offset) throws IOException {
+ fileChannel.position(offset);
+ @Override
+ public int getBufferLastFlushOffset() {
+ return bufferLastFlushOffset;
+ }
+ @Override
+ public void setBufferLastFlushOffset(int offset) {
+ this.bufferLastFlushOffset = offset;
+ }
+ @Override
+ public int getBufferNextWriteOffset() {
+ synchronized (fileChannel) {
+ return bufferNextWriteOffset;
+ }
+ }
+ @Override
+ public void setBufferNextWriteOffset(int offset) {
+ synchronized (fileChannel) {
+ if (bufferNextWriteOffset < offset) {
+ bufferNextWriteOffset = offset;
+ }
+ }
+ }
+ @Override
+ public void acquireWriteLatch() {
+ latch.writeLock().lock();
+ }
+ @Override
+ public void releaseWriteLatch() {
+ latch.writeLock().unlock();
+ }
+ @Override
+ public void acquireReadLatch() {
+ latch.readLock().lock();
+ }
+ @Override
+ public void releaseReadLatch() {
+ latch.readLock().unlock();
+ }
+ @Override
+ public void incRefCnt() {
+ referenceCount.incrementAndGet();
+ }
+ @Override
+ public void decRefCnt() {
+ referenceCount.decrementAndGet();
+ }
+ @Override
+ public int getRefCnt() {
+ return referenceCount.get();
+ }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
index f2ffa0e..46e03f1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
@@ -38,8 +38,8 @@
return (new File(path)).mkdir();
- public static IFileBasedBuffer getFileBasedBuffer(String filePath, long offset, int size) throws IOException {
- IFileBasedBuffer fileBasedBuffer = new FileBasedBuffer(filePath, offset, size);
+ public static IFileBasedBuffer getFileBasedBuffer(String filePath, long offset, int bufferSize, int diskSectorSize) throws IOException {
+ IFileBasedBuffer fileBasedBuffer = new FileBasedBuffer(filePath, offset, bufferSize, diskSectorSize);
return fileBasedBuffer;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
index e1f9f95..a4ea3cb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
@@ -31,12 +31,34 @@
public void reset(String filePath, long offset, int size) throws IOException;
- public long getNextWritePosition();
+ public long getDiskNextWriteOffset() throws IOException;
- public void setNextWritePosition(long writePosition);
+ public void setDiskNextWriteOffset(long writePosition) throws IOException;
public void close() throws IOException;
public void open(String filePath, long offset, int size) throws IOException;
+ public int getBufferLastFlushOffset();
+ public void setBufferLastFlushOffset(int offset);
+ public int getBufferNextWriteOffset();
+ public void setBufferNextWriteOffset(int offset);
+ public void acquireWriteLatch();
+ public void releaseWriteLatch();
+ public void acquireReadLatch();
+ public void releaseReadLatch();
+ public void incRefCnt();
+ public void decRefCnt();
+ public int getRefCnt();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
index c629d03..26229a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
@@ -53,17 +53,6 @@
- * Provides a cursor for retrieving logs that satisfy a given ILogFilter
- * instance. Log records are retrieved in increasing order of lsn
- *
- * @param logFilter
- * specifies the filtering criteria for the retrieved logs
- * @return LogCursor an iterator for the retrieved logs
- * @throws ACIDException
- */
- public ILogCursor readLog(ILogFilter logFilter) throws ACIDException;
- /**
* @param logicalLogLocator TODO
* @param PhysicalLogLocator
* specifies the location of the log record to be read
@@ -72,15 +61,6 @@
public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException;
- * Flushes the log records up to the lsn represented by the
- * logicalLogLocator
- *
- * @param logicalLogLocator
- * @throws ACIDException
- */
- public void flushLog(LogicalLogLocator logicalLogLocator) throws ACIDException;
- /**
* Retrieves the configuration parameters of the ILogManager
* @return LogManagerProperties: the configuration parameters for the
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
index 80f74cb..0e24f9d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
@@ -25,41 +25,43 @@
public interface ILogRecordHelper {
- byte getLogType(LogicalLogLocator logicalLogLocator);
+ public byte getLogType(LogicalLogLocator logicalLogLocator);
- int getJobId(LogicalLogLocator logicalLogLocator);
+ public int getJobId(LogicalLogLocator logicalLogLocator);
- int getDatasetId(LogicalLogLocator logicalLogLocator);
+ public int getDatasetId(LogicalLogLocator logicalLogLocator);
- int getPKHashValue(LogicalLogLocator logicalLogLocator);
+ public int getPKHashValue(LogicalLogLocator logicalLogLocator);
- PhysicalLogLocator getPrevLSN(LogicalLogLocator logicalLogLocator);
+ public PhysicalLogLocator getPrevLSN(LogicalLogLocator logicalLogLocator);
- boolean getPrevLSN(PhysicalLogLocator physicalLogLocator, LogicalLogLocator logicalLogLocator);
+ public boolean getPrevLSN(PhysicalLogLocator physicalLogLocator, LogicalLogLocator logicalLogLocator);
- long getResourceId(LogicalLogLocator logicalLogLocator);
+ public long getResourceId(LogicalLogLocator logicalLogLocator);
- byte getResourceMgrId(LogicalLogLocator logicalLogLocater);
+ public byte getResourceMgrId(LogicalLogLocator logicalLogLocater);
- int getLogContentSize(LogicalLogLocator logicalLogLocater);
+ public int getLogContentSize(LogicalLogLocator logicalLogLocater);
- long getLogChecksum(LogicalLogLocator logicalLogLocator);
+ public long getLogChecksum(LogicalLogLocator logicalLogLocator);
- int getLogContentBeginPos(LogicalLogLocator logicalLogLocator);
+ public int getLogContentBeginPos(LogicalLogLocator logicalLogLocator);
- int getLogContentEndPos(LogicalLogLocator logicalLogLocator);
+ public int getLogContentEndPos(LogicalLogLocator logicalLogLocator);
- String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator);
+ public String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator);
- void writeLogHeader(LogicalLogLocator logicalLogLocator, byte logType, TransactionContext context, int datasetId,
+ public void writeLogHeader(LogicalLogLocator logicalLogLocator, byte logType, TransactionContext context, int datasetId,
int PKHashValue, long prevLogicalLogLocator, long resourceId, byte resourceMgrId, int logRecordSize);
- boolean validateLogRecord(LogicalLogLocator logicalLogLocator);
+ public boolean validateLogRecord(LogicalLogLocator logicalLogLocator);
- int getLogRecordSize(byte logType, int logBodySize);
+ public int getLogRecordSize(byte logType, int logBodySize);
- int getLogHeaderSize(byte logType);
+ public int getLogHeaderSize(byte logType);
- int getLogChecksumSize();
+ public int getLogChecksumSize();
+ public int getCommitLogSize();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
index 8a2b188..7e954d8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
@@ -23,27 +23,16 @@
private final LogManager logManager;
private final ILogFilter logFilter;
+ private final int logPageSize;
private IBuffer readOnlyBuffer;
private LogicalLogLocator logicalLogLocator = null;
- private long bufferIndex = 0;
- private boolean firstNext = true;
- private boolean readMemory = false;
- private long readLSN = 0;
private boolean needReloadBuffer = true;
- /**
- * @param logFilter
- */
- public LogCursor(final LogManager logManager, ILogFilter logFilter) throws ACIDException {
+ public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter,
+ int logPageSize) throws IOException, ACIDException {
this.logFilter = logFilter;
this.logManager = logManager;
- }
- public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter)
- throws IOException, ACIDException {
- this.logFilter = logFilter;
- this.logManager = logManager;
+ this.logPageSize = logPageSize;
@@ -57,7 +46,8 @@
File file = new File(filePath);
if (file.exists()) {
return FileUtil.getFileBasedBuffer(filePath, lsn
- % logManager.getLogManagerProperties().getLogPartitionSize(), size);
+ % logManager.getLogManagerProperties().getLogPartitionSize(), size, logManager
+ .getLogManagerProperties().getDiskSectorSize());
} else {
return null;
@@ -87,8 +77,7 @@
return false;
- //if the lsn to read is greater than the last flushed lsn, then read from memory
- if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+ if (logManager.isMemoryRead(logicalLogLocator.getLsn())) {
return readFromMemory(currentLogLocator);
@@ -96,10 +85,9 @@
//needReloadBuffer is set to true if the log record is read from the memory log page.
if (needReloadBuffer) {
//log page size doesn't exceed integer boundary
- int offset = (int)(logicalLogLocator.getLsn() % logManager.getLogManagerProperties().getLogPageSize());
+ int offset = (int) (logicalLogLocator.getLsn() % logPageSize);
long adjustedLSN = logicalLogLocator.getLsn() - offset;
- readOnlyBuffer = getReadOnlyBuffer(adjustedLSN, logManager.getLogManagerProperties()
- .getLogPageSize());
+ readOnlyBuffer = getReadOnlyBuffer(adjustedLSN, logPageSize);
needReloadBuffer = false;
@@ -110,14 +98,14 @@
while (logicalLogLocator.getMemoryOffset() <= readOnlyBuffer.getSize()
- logManager.getLogRecordHelper().getLogHeaderSize(LogType.COMMIT)) {
integerRead = readOnlyBuffer.readInt(logicalLogLocator.getMemoryOffset());
- if (integerRead == logManager.getLogManagerProperties().LOG_MAGIC_NUMBER) {
+ if (integerRead == LogManagerProperties.LOG_MAGIC_NUMBER) {
logRecordBeginPosFound = true;
- if (bytesSkipped > logManager.getLogManagerProperties().getLogPageSize()) {
+ if (bytesSkipped > logPageSize) {
return false; // the maximum size of a log record is limited to
// a log page size. If we have skipped as many
// bytes without finding a log record, it
@@ -133,10 +121,9 @@
// need to reload the buffer
// reduce IO by reading more pages(equal to logBufferSize) at a time.
- long lsnpos = ((logicalLogLocator.getLsn() / logManager.getLogManagerProperties().getLogPageSize()) + 1)
- * logManager.getLogManagerProperties().getLogPageSize();
+ long lsnpos = ((logicalLogLocator.getLsn() / logPageSize) + 1) * logPageSize;
- readOnlyBuffer = getReadOnlyBuffer(lsnpos, logManager.getLogManagerProperties().getLogPageSize());
+ readOnlyBuffer = getReadOnlyBuffer(lsnpos, logPageSize);
if (readOnlyBuffer != null) {
@@ -190,13 +177,12 @@
IFileBasedBuffer logPage = logManager.getLogPage(pageIndex);
synchronized (logPage) {
// need to check again if the log record in the log buffer or has reached the disk
- if (lsn > logManager.getLastFlushedLsn().get()) {
+ if (logManager.isMemoryRead(lsn)) {
//find the magic number to identify the start of the log record
int readNumber = -1;
- int logPageSize = logManager.getLogManagerProperties().getLogPageSize();
- int logMagicNumber = logManager.getLogManagerProperties().LOG_MAGIC_NUMBER;
+ int logMagicNumber = LogManagerProperties.LOG_MAGIC_NUMBER;
int bytesSkipped = 0;
boolean logRecordBeginPosFound = false;
//check whether the currentOffset has enough space to have new log record by comparing
@@ -223,7 +209,8 @@
// need to read the next log page
readOnlyBuffer = null;
- logicalLogLocator.setLsn(lsn / logPageSize + 1);
+ lsn = ((logicalLogLocator.getLsn() / logPageSize) + 1) * logPageSize;
+ logicalLogLocator.setLsn(lsn);
return next(currentLogLocator);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
index 9b8f09c..199fd0f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
@@ -27,15 +27,12 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -48,6 +45,9 @@
private final TransactionSubsystem provider;
private LogManagerProperties logManagerProperties;
private LogPageFlushThread logPageFlusher;
+ private final int logPageSize;
+ private long statLogSize;
+ private long statLogCount;
* the array of log pages. The number of log pages is configurable. Pages
@@ -62,47 +62,6 @@
private int numLogPages;
- /*
- * Initially all pages have an owner count of 1 that is the LogManager. When
- * a transaction requests to write in a log page, the owner count is
- * incremented. The log manager reserves space in the log page and puts in
- * the log header but leaves the space for the content and the checksum
- * (covering the whole log record). When the content has been put, the log
- * manager computes the checksum and puts it after the content. At this
- * point, the ownership count is decremented as the transaction is done with
- * using the page. When a page is requested to be flushed, logPageFlusher
- * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
- * only if the count is 1(LOG_WRITER: meaning that there is no other
- * transactions who own the page to write logs.) After flushing the page,
- * logPageFlusher set this count to 1.
- */
- private AtomicInteger[] logPageOwnerCount;
- static class PageOwnershipStatus {
- public static final int LOG_WRITER = 1;
- public static final int LOG_FLUSHER = 0;
- }
- /*
- * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
- * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
- * can allocate space in the page for writing a log record. Initially all
- * pages are ACTIVE. As transactions fill up space by writing log records, a
- * page may not have sufficient space left for serving a request by a
- * transaction. When this happens, the page is flushed to disk by calling
- * logPageFlusher.requestFlush(). In the requestFlush(), after
- * groupCommitWaitTime, the page status is set to INACTIVE. Then, there is
- * no more writer on the page(meaning the corresponding logPageOwnerCount is
- * 1), the page is flushed by the logPageFlusher and the status is reset to
- * ACTIVE by the logPageFlusher.
- */
- private AtomicInteger[] logPageStatus;
- static class PageState {
- public static final int INACTIVE = 0;
- public static final int ACTIVE = 1;
- }
private AtomicLong lastFlushedLSN = new AtomicLong(-1);
@@ -129,10 +88,6 @@
return lastFlushedLSN;
- public AtomicInteger getLogPageStatus(int pageIndex) {
- return logPageStatus[pageIndex];
- }
public AtomicLong getCurrentLsn() {
return lsn;
@@ -144,13 +99,19 @@
public LogManager(TransactionSubsystem provider) throws ACIDException {
this.provider = provider;
+ logPageSize = logManagerProperties.getLogPageSize();
+ statLogSize = 0;
+ statLogCount = 0;
public LogManager(TransactionSubsystem provider, String nodeId) throws ACIDException {
this.provider = provider;
+ logPageSize = logManagerProperties.getLogPageSize();
+ statLogSize = 0;
+ statLogCount = 0;
@@ -186,9 +147,6 @@
private void initLogManager() throws ACIDException {
logRecordHelper = new LogRecordHelper(this);
numLogPages = logManagerProperties.getNumLogPages();
- logPageOwnerCount = new AtomicInteger[numLogPages];
- logPageStatus = new AtomicInteger[numLogPages];
activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages);
for (int i = 0; i < numLogPages; i++) {
activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>());
@@ -199,20 +157,12 @@
* place the log anchor at the end of the last log record written.
- PhysicalLogLocator nextPhysicalLsn = initLSN();
- /*
- * initialize meta data for each log page.
- */
- for (int i = 0; i < numLogPages; i++) {
- logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER);
- logPageStatus[i] = new AtomicInteger(PageState.ACTIVE);
- }
+ initLSN();
* initialize the log pages.
- initializeLogPages(nextPhysicalLsn);
+ initializeLogPages(startingLSN);
* Instantiate and begin the LogFlusher thread. The Log Flusher thread
@@ -226,7 +176,7 @@
public int getLogPageIndex(long lsnValue) {
- return (int) (((lsnValue - startingLSN) / logManagerProperties.getLogPageSize()) % numLogPages);
+ return (int) (((lsnValue - startingLSN) / logPageSize) % numLogPages);
@@ -242,28 +192,7 @@
* record is (to be) placed.
public int getLogPageOffset(long lsnValue) {
- return (int) ((lsnValue - startingLSN) % logManagerProperties.getLogPageSize());
- }
- /*
- * a transaction thread under certain scenarios is required to wait until
- * the page where it has to write a log record becomes available for writing
- * a log record.
- */
- private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException {
- if (logPageStatus[pageIndex].get() == PageState.ACTIVE
- && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) {
- return;
- }
- try {
- synchronized (logPages[pageIndex]) {
- while (!(logPageStatus[pageIndex].get() == PageState.ACTIVE && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER)) {
- logPages[pageIndex].wait();
- }
- }
- } catch (InterruptedException e) {
- throw new ACIDException(" thread interrupted while waiting for page " + pageIndex + " to be available ", e);
- }
+ return (int) (lsnValue % logPageSize);
@@ -277,7 +206,6 @@
* @param logType: the type of log record.
private long getLsn(int entrySize, byte logType) throws ACIDException {
- long pageSize = logManagerProperties.getLogPageSize();
while (true) {
boolean forwardPage = false;
@@ -294,9 +222,9 @@
// check if the log record will cross page boundaries, a case that
// is not allowed.
- if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
+ if ((next - 1) / logPageSize != old / logPageSize || (next % logPageSize == 0)) {
- if ((old != 0 && old % pageSize == 0)) {
+ if ((old != 0 && old % logPageSize == 0)) {
// On second thought, this shall never be the case as it
// means that the lsn is
// currently at the beginning of a page and we still need to
@@ -309,7 +237,7 @@
} else {
// set the lsn to point to the beginning of the next page.
- retVal = ((old / pageSize) + 1) * pageSize;
+ retVal = ((old / logPageSize) + 1) * logPageSize;
next = retVal;
@@ -323,20 +251,6 @@
pageIndex = getNextPageInSequence(pageIndex);
- /*
- * we do not want to keep allocating LSNs if the corresponding page
- * is unavailable. Consider a scenario when the log flusher thread
- * is incredibly slow in flushing pages. Transaction threads will
- * acquire an lsn each for writing their next log record. When a
- * page has been made available, mulltiple transaction threads that
- * were waiting can continue to write their log record at the
- * assigned LSNs. Two transaction threads may get LSNs that are on
- * the same log page but actually differ by the size of the log
- * buffer. This would be erroneous. Transaction threads are made to
- * wait upfront for avoiding this situation.
- */
- waitUntillPageIsAvailableForWritingLog(pageIndex);
if (!lsn.compareAndSet(old, next)) {
// Atomic call -> returns true only when the value represented
// by lsn is same as
@@ -345,6 +259,10 @@
if (forwardPage) {
+ // forward the nextWriteOffset in the log page
+ logPages[pageIndex].setBufferNextWriteOffset(logPageSize);
addFlushRequest(prevPage, old, false);
// The transaction thread that discovers the need to forward a
@@ -352,21 +270,18 @@
} else {
- // the transaction thread has been given a space in a log page,
- // but is made to wait until the page is available.
- // (Is this needed? when does this wait happen?)
- waitUntillPageIsAvailableForWritingLog(pageIndex);
+ logPages[pageIndex].acquireReadLatch();
// increment the counter as the transaction thread now holds a
// space in the log page and hence is an owner.
- logPageOwnerCount[pageIndex].incrementAndGet();
+ logPages[pageIndex].incRefCnt();
+ logPages[pageIndex].releaseReadLatch();
// Before the count is incremented, if the flusher flushed the
// allocated page,
// then retry to get new LSN. Otherwise, the log with allocated
// lsn will be lost.
if (lastFlushedLSN.get() >= retVal) {
- logPageOwnerCount[pageIndex].decrementAndGet();
+ logPages[pageIndex].decRefCnt();
@@ -396,10 +311,10 @@
int totalLogSize = logRecordHelper.getLogRecordSize(logType, logContentSize);
// check for the total space requirement to be less than a log page.
- if (totalLogSize > logManagerProperties.getLogPageSize()) {
+ if (totalLogSize > logPageSize) {
throw new ACIDException(
" Maximum Log Content Size is "
- + (logManagerProperties.getLogPageSize() - logRecordHelper.getLogHeaderSize(LogType.UPDATE) - logRecordHelper
+ + (logPageSize - logRecordHelper.getLogHeaderSize(LogType.UPDATE) - logRecordHelper
@@ -482,16 +397,21 @@
logPages[pageIndex].writeLong(pageOffset + logRecordHelper.getLogHeaderSize(logType) + logContentSize,
- if (IS_DEBUG_MODE) {
- System.out.println("--------------> LSN(" + currentLSN + ") is written");
+ // forward the nextWriteOffset in the log page
+ int bufferNextWriteOffset = (int) ((currentLSN + totalLogSize) % logPageSize);
+ if (bufferNextWriteOffset == 0) {
+ bufferNextWriteOffset = logPageSize;
+ logPages[pageIndex].setBufferNextWriteOffset(bufferNextWriteOffset);
- // release the ownership as the log record has been placed in
- // created space.
- logPageOwnerCount[pageIndex].decrementAndGet();
+ if (logType != LogType.ENTITY_COMMIT) {
+ // release the ownership as the log record has been placed in
+ // created space.
+ logPages[pageIndex].decRefCnt();
- // indicating that the transaction thread has released ownership
- decremented = true;
+ // indicating that the transaction thread has released ownership
+ decremented = true;
+ }
if (logType == LogType.ENTITY_COMMIT) {
map = activeTxnCountMaps.get(pageIndex);
@@ -502,18 +422,42 @@
} else {
map.put(txnCtx, 1);
+ //------------------------------------------------------------------------------
+ // [Notice]
+ // reference count should be decremented
+ // after activeTxnCount is incremented, but before addFlushRequest() is called.
+ //------------------------------------------------------------------------------
+ // release the ownership as the log record has been placed in
+ // created space.
+ logPages[pageIndex].decRefCnt();
+ // indicating that the transaction thread has released ownership
+ decremented = true;
addFlushRequest(pageIndex, currentLSN, false);
} else if (logType == LogType.COMMIT) {
addFlushRequest(pageIndex, currentLSN, true);
+ if (IS_DEBUG_MODE) {
+ System.out.println("Running sum of log size: " + statLogSize + ", log count: " + statLogCount);
+ }
+ if (IS_DEBUG_MODE) {
+ System.out.println("--------------> LSN(" + currentLSN + ") is written");
+ }
+ //collect statistics
+ statLogSize += totalLogSize;
+ statLogCount++;
} catch (Exception e) {
throw new ACIDException(txnCtx, "Thread: " + Thread.currentThread().getName()
+ " logger encountered exception", e);
} finally {
if (!decremented) {
- logPageOwnerCount[pageIndex].decrementAndGet();
+ logPages[pageIndex].decRefCnt();
@@ -526,20 +470,13 @@
String filePath = LogUtil.getLogFilePath(logManagerProperties, getLogFileId(lsn));
- logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
- logManagerProperties.getLogPageSize());
- }
- @Override
- public ILogCursor readLog(ILogFilter logFilter) throws ACIDException {
- LogCursor cursor = new LogCursor(this, logFilter);
- return cursor;
+ logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition), logPageSize);
public ILogCursor readLog(PhysicalLogLocator physicalLogLocator, ILogFilter logFilter) throws IOException,
ACIDException {
- LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter);
+ LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter, logPageSize);
return cursor;
@@ -550,7 +487,7 @@
String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, lsnValue));
long fileOffset = LogUtil.getFileOffset(this, lsnValue);
- ByteBuffer buffer = ByteBuffer.allocate(logManagerProperties.getLogPageSize());
+ ByteBuffer buffer = ByteBuffer.allocate(logPageSize);
RandomAccessFile raf = null;
FileChannel fileChannel = null;
try {
@@ -603,7 +540,7 @@
/* check if the log record in the log buffer or has reached the disk. */
- if (lsnValue > getLastFlushedLsn().get()) {
+ if (isMemoryRead(lsnValue)) {
int pageIndex = getLogPageIndex(lsnValue);
int pageOffset = getLogPageOffset(lsnValue);
@@ -611,7 +548,7 @@
// minimize memory allocation overhead. current code allocates the
// log page size per reading a log record.
- byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
+ byte[] pageContent = new byte[logPageSize];
// take a lock on the log page so that the page is not flushed to
// disk interim
@@ -619,8 +556,8 @@
// need to check again (this thread may have got de-scheduled
// and must refresh!)
- if (lsnValue > getLastFlushedLsn().get()) {
+ if (isMemoryRead(lsnValue)) {
// get the log record length
logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
byte logType = pageContent[pageOffset + 4];
@@ -656,6 +593,20 @@
readDiskLog(lsnValue, logicalLogLocator);
+ public boolean isMemoryRead(long currentLSN) {
+ long flushLSN = lastFlushedLSN.get();
+ if ((flushLSN + 1) % logPageSize == 0) {
+ return false;
+ }
+ long logPageBeginOffset = flushLSN - (flushLSN % logPageSize);
+ long logPageEndOffset = logPageBeginOffset + logPageSize;
+ if (currentLSN > flushLSN || (currentLSN >= logPageBeginOffset && currentLSN < logPageEndOffset)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
public void renewLogFiles() throws ACIDException {
List<String> logFileNames = LogUtil.getLogFiles(logManagerProperties);
for (String name : logFileNames) {
@@ -670,7 +621,7 @@
- private PhysicalLogLocator initLSN() throws ACIDException {
+ private void initLSN() throws ACIDException {
PhysicalLogLocator nextPhysicalLsn = LogUtil.initializeLogAnchor(this);
startingLSN = nextPhysicalLsn.getLsn();
lastFlushedLSN.set(startingLSN - 1);
@@ -678,7 +629,6 @@" Starting lsn is : " + startingLSN);
- return nextPhysicalLsn;
private void closeLogPages() throws ACIDException {
@@ -695,9 +645,7 @@
try {
String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, startingLSN));
for (int i = 0; i < numLogPages; i++) {
- logPages[i].open(filePath,
- LogUtil.getFileOffset(this, startingLSN) + i * logManagerProperties.getLogPageSize(),
- logManagerProperties.getLogPageSize());
+ logPages[i].open(filePath, LogUtil.getFileOffset(this, startingLSN) + i * logPageSize, logPageSize);
} catch (Exception e) {
throw new ACIDException(Thread.currentThread().getName() + " unable to create log buffer", e);
@@ -710,33 +658,25 @@
- * This method shall be called by the Buffer manager when it needs to evict
- * a page from the cache. TODO: Change the implementation from a looping
- * logic to event based when log manager support is integrated with the
- * Buffer Manager.
- */
- @Override
- public synchronized void flushLog(LogicalLogLocator logicalLogLocator) throws ACIDException {
- if (logicalLogLocator.getLsn() > lsn.get()) {
- throw new ACIDException(" invalid lsn " + logicalLogLocator.getLsn());
- }
- while (lastFlushedLSN.get() < logicalLogLocator.getLsn());
- }
- /*
* Map each log page to cover a physical byte range over a log file. When a
* page is flushed, the page contents are put to disk in the corresponding
* byte range.
- private void initializeLogPages(PhysicalLogLocator physicalLogLocator) throws ACIDException {
+ private void initializeLogPages(long beginLsn) throws ACIDException {
try {
- String filePath = LogUtil.getLogFilePath(logManagerProperties,
- LogUtil.getFileId(this, physicalLogLocator.getLsn()));
+ String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, beginLsn));
+ long nextDiskWriteOffset = LogUtil.getFileOffset(this, beginLsn);
+ long nextBufferWriteOffset = nextDiskWriteOffset % logPageSize;
+ long bufferBeginOffset = nextDiskWriteOffset - nextBufferWriteOffset;
for (int i = 0; i < numLogPages; i++) {
- logPages[i] = FileUtil.getFileBasedBuffer(
- filePath,
- LogUtil.getFileOffset(this, physicalLogLocator.getLsn()) + i
- * logManagerProperties.getLogPageSize(), logManagerProperties.getLogPageSize());
+ logPages[i] = FileUtil.getFileBasedBuffer(filePath, bufferBeginOffset + i * logPageSize, logPageSize,
+ logManagerProperties.getDiskSectorSize());
+ if (i == 0) {
+ logPages[i].setBufferLastFlushOffset((int) nextBufferWriteOffset);
+ logPages[i].setBufferNextWriteOffset((int) nextBufferWriteOffset);
+ logPages[i].setDiskNextWriteOffset(nextDiskWriteOffset);
+ }
} catch (Exception e) {
@@ -764,10 +704,6 @@
return logPages[pageIndex];
- public AtomicInteger getLogPageOwnershipCount(int pageIndex) {
- return logPageOwnerCount[pageIndex];
- }
public IFileBasedBuffer[] getLogPages() {
return logPages;
@@ -829,7 +765,7 @@
private final LinkedBlockingQueue<Object>[] flushRequestQueue;
private final Object[] flushRequests;
- private int pageToFlush;
+ private int flushPageIndex;
private final long groupCommitWaitPeriod;
private boolean isRenewRequest;
@@ -843,14 +779,14 @@
flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
flushRequests[i] = new Object();
- this.pageToFlush = -1;
+ this.flushPageIndex = 0;
groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
isRenewRequest = false;
public void renew() {
isRenewRequest = true;
- pageToFlush = -1;
+ flushPageIndex = 0;
isRenewRequest = false;
@@ -886,15 +822,19 @@
public void run() {
+ int logPageSize = logManager.getLogManagerProperties().getLogPageSize();
+ int logBufferSize = logManager.getLogManagerProperties().getLogBufferSize();
+ int beforeFlushOffset = 0;
+ int afterFlushOffset = 0;
+ boolean resetFlushPageIndex = false;
while (true) {
try {
- pageToFlush = logManager.getNextPageInSequence(pageToFlush);
// A wait call on the linkedBLockingQueue. The flusher thread is
// notified when an object is added to the queue. Please note
// that each page has an associated blocking queue.
try {
- flushRequestQueue[pageToFlush].take();
+ flushRequestQueue[flushPageIndex].take();
} catch (InterruptedException ie) {
while (isRenewRequest) {
@@ -902,58 +842,67 @@
- synchronized (logManager.getLogPage(pageToFlush)) {
- // #. sleep during the groupCommitWaitTime
+ //if the log page is already full, don't wait.
+ if (logManager.getLogPage(flushPageIndex).getBufferNextWriteOffset() < logPageSize
+ - logManager.getLogRecordHelper().getCommitLogSize()) {
+ // #. sleep for the groupCommitWaitTime
+ }
- // #. set the logPageStatus to INACTIVE in order to prevent
- // other txns from writing on this page.
- logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
+ synchronized (logManager.getLogPage(flushPageIndex)) {
+ logManager.getLogPage(flushPageIndex).acquireWriteLatch();
+ try {
- // #. need to wait until the logPageOwnerCount reaches 1
- // meaning every one has finished writing logs on this page.
- while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
- sleep(0);
+ // #. need to wait until the reference count reaches 0
+ while (logManager.getLogPage(flushPageIndex).getRefCnt() != 0) {
+ sleep(0);
+ }
+ beforeFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
+ // put the content to disk (the thread still has a lock on the log page)
+ logManager.getLogPage(flushPageIndex).flush();
+ afterFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
+ // increment the last flushed lsn
+ logManager.incrementLastFlushedLsn(afterFlushOffset - beforeFlushOffset);
+ // increment currentLSN if currentLSN is less than flushLSN.
+ if (logManager.getLastFlushedLsn().get() + 1 > logManager.getCurrentLsn().get()) {
+ logManager.getCurrentLsn().set(logManager.getLastFlushedLsn().get() + 1);
+ }
+ // Map the log page to a new region in the log file if the flushOffset reached the logPageSize
+ if (afterFlushOffset == logPageSize) {
+ long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex].getDiskNextWriteOffset()
+ + logBufferSize;
+ logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1 + logBufferSize,
+ diskNextWriteOffset, flushPageIndex);
+ resetFlushPageIndex = true;
+ }
+ // decrement activeTxnCountOnIndexes
+ logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
+ } finally {
+ logManager.getLogPage(flushPageIndex).releaseWriteLatch();
- // #. set the logPageOwnerCount to 0 (LOG_FLUSHER)
- // meaning it is flushing.
- logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
- // put the content to disk (the thread still has a lock on
- // the log page)
- logManager.getLogPage(pageToFlush).flush();
- // Map the log page to a new region in the log file.
- long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
- + logManager.getLogManagerProperties().getLogBufferSize();
- logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1
- + logManager.getLogManagerProperties().getLogBufferSize(), nextWritePosition, pageToFlush);
- // increment the last flushed lsn and lastFlushedPage
- logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
- // decrement activeTxnCountOnIndexes
- logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
- // reset the count to 1
- logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
- // mark the page as ACTIVE
- logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
// #. checks the queue whether there is another flush
// request on the same log buffer
// If there is another request, then simply remove it.
- if (flushRequestQueue[pageToFlush].peek() != null) {
- flushRequestQueue[pageToFlush].take();
+ if (flushRequestQueue[flushPageIndex].peek() != null) {
+ flushRequestQueue[flushPageIndex].take();
// notify all waiting (transaction) threads.
- logManager.getLogPage(pageToFlush).notifyAll();
+ logManager.getLogPage(flushPageIndex).notifyAll();
+ if (resetFlushPageIndex) {
+ flushPageIndex = logManager.getNextPageInSequence(flushPageIndex);
+ resetFlushPageIndex = false;
+ }
} catch (IOException ioe) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
index 5040fa9..581ce4c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
@@ -28,6 +28,7 @@
public static final String NUM_LOG_PAGES_KEY = "num_log_pages";
public static final String LOG_FILE_PREFIX_KEY = "log_file_prefix";
public static final String GROUP_COMMIT_WAIT_PERIOD_KEY = "group_commit_wait_period";
+ public static final String DISK_SECTOR_SIZE_KEY = "disk_sector_size";
private static final int DEFAULT_LOG_PAGE_SIZE = 128 * 1024; //128KB
private static final int DEFAULT_NUM_LOG_PAGES = 8;
@@ -35,6 +36,7 @@
private static final long DEFAULT_GROUP_COMMIT_WAIT_PERIOD = 200; // time in millisec.
private static final String DEFAULT_LOG_FILE_PREFIX = "asterix_transaction_log";
private static final String DEFAULT_LOG_DIRECTORY = "asterix_logs/";
+ private static final int DEFAULT_DISK_SECTOR_SIZE = 4096;
// follow the naming convention <logFilePrefix>_<number> where number starts from 0
private final String logFilePrefix;
@@ -51,6 +53,8 @@
private final int logBufferSize;
// maximum size of each log file
private final long logPartitionSize;
+ // default disk sector size
+ private final int diskSectorSize;
public LogManagerProperties(Properties properties, String nodeId) {
this.logDirKey = new String(nodeId + LOG_DIR_SUFFIX_KEY);
@@ -66,6 +70,8 @@
this.logBufferSize = logPageSize * numLogPages;
//make sure that the log partition size is the multiple of log buffer size.
this.logPartitionSize = (logPartitionSize / logBufferSize) * logBufferSize;
+ this.diskSectorSize = Integer.parseInt(properties.getProperty(DISK_SECTOR_SIZE_KEY, ""
public long getLogPartitionSize() {
@@ -99,6 +105,10 @@
public String getLogDirKey() {
return logDirKey;
+ public int getDiskSectorSize() {
+ return diskSectorSize;
+ }
public String toString() {
StringBuilder builder = new StringBuilder();
@@ -108,6 +118,7 @@
builder.append("num_log_pages : " + numLogPages + FileUtil.lineSeparator);
builder.append("log_partition_size : " + logPartitionSize + FileUtil.lineSeparator);
builder.append("group_commit_wait_period : " + groupCommitWaitPeriod + FileUtil.lineSeparator);
+ builder.append("disk_sector_size : " + diskSectorSize + FileUtil.lineSeparator);
return builder.toString();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
index 6b882ef..1b65d8f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/
@@ -50,6 +50,9 @@
public class LogRecordHelper implements ILogRecordHelper {
private final int LOG_CHECKSUM_SIZE = 8;
+ private final int LOG_HEADER_PART1_SIZE = 17;
+ private final int LOG_HEADER_PART2_SIZE = 21;
private final int MAGIC_NO_POS = 0;
private final int LOG_TYPE_POS = 4;
@@ -60,7 +63,9 @@
private final int RESOURCE_ID_POS = 25;
private final int RESOURCE_MGR_ID_POS = 33;
private final int LOG_RECORD_SIZE_POS = 34;
private ILogManager logManager;
public LogRecordHelper(ILogManager logManager) {
@@ -118,7 +123,11 @@
public int getLogContentSize(LogicalLogLocator logicalLogLocater) {
- return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
+ if (getLogType(logicalLogLocater) == LogType.COMMIT || getLogType(logicalLogLocater) == LogType.ENTITY_COMMIT) {
+ return 0;
+ } else {
+ return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
+ }
@@ -178,7 +187,7 @@
/* magic no */
(logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + MAGIC_NO_POS,
- logManager.getLogManagerProperties().LOG_MAGIC_NUMBER);
+ LogManagerProperties.LOG_MAGIC_NUMBER);
/* log type */
(logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + LOG_TYPE_POS, logType);
@@ -230,18 +239,18 @@
public int getLogRecordSize(byte logType, int logBodySize) {
if (logType == LogType.UPDATE) {
- return 46 + logBodySize;
} else {
- return 25;
public int getLogHeaderSize(byte logType) {
if (logType == LogType.UPDATE) {
- return 38;
} else {
- return 17;
@@ -249,4 +258,8 @@
public int getLogChecksumSize() {
+ public int getCommitLogSize() {
+ }
diff --git a/pom.xml b/pom.xml
index 20efdb3..f4c46c6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,7 @@
+ <module>asterix-doc</module>