Merge branch 'master' of https://code.google.com/p/asterixdb
Conflicts:
asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
asterix-installer/src/main/resources/conf/asterix-configuration.xml
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index 1f3801c..e13da67 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -35,6 +35,7 @@
import edu.uci.ics.asterix.optimizer.rules.IntroduceRapidFrameFlushProjectRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
import edu.uci.ics.asterix.optimizer.rules.IntroduceStaticTypeCastForInsertRule;
+import edu.uci.ics.asterix.optimizer.rules.IntroduceUnnestForCollectionToSequenceRule;
import edu.uci.ics.asterix.optimizer.rules.LoadRecordFieldsRule;
import edu.uci.ics.asterix.optimizer.rules.NestGroupByRule;
import edu.uci.ics.asterix.optimizer.rules.NestedSubplanToJoinRule;
@@ -110,6 +111,7 @@
public final static List<IAlgebraicRewriteRule> buildNormalizationRuleCollection() {
List<IAlgebraicRewriteRule> normalization = new LinkedList<IAlgebraicRewriteRule>();
+ normalization.add(new IntroduceUnnestForCollectionToSequenceRule());
normalization.add(new EliminateSubplanRule());
normalization.add(new EnforceOrderByAfterSubplan());
normalization.add(new PushAggFuncIntoStandaloneAggregateRule());
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceUnnestForCollectionToSequenceRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceUnnestForCollectionToSequenceRule.java
new file mode 100644
index 0000000..984f4dd
--- /dev/null
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceUnnestForCollectionToSequenceRule.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.optimizer.rules;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.asterix.aql.util.FunctionUtils;
+import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule introduces a unnest operator for the collection-to-sequence function (if the input to the function is a collection).
+ *
+ * @author yingyib
+ */
+public class IntroduceUnnestForCollectionToSequenceRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+ return false;
+ }
+ AssignOperator assign = (AssignOperator) op;
+ List<Mutable<ILogicalExpression>> exprs = assign.getExpressions();
+ if (exprs.size() != 1) {
+ return false;
+ }
+ ILogicalExpression expr = exprs.get(0).getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return false;
+ }
+ AbstractFunctionCallExpression func = (AbstractFunctionCallExpression) expr;
+ if (func.getFunctionIdentifier() != AsterixBuiltinFunctions.COLLECTION_TO_SEQUENCE) {
+ return false;
+ }
+
+ IVariableTypeEnvironment env = assign.computeInputTypeEnvironment(context);
+ ILogicalExpression argExpr = func.getArguments().get(0).getValue();
+ IAType outerExprType = (IAType) env.getType(expr);
+ IAType innerExprType = (IAType) env.getType(argExpr);
+ if (outerExprType.equals(innerExprType)) {
+ /** nothing is changed with the collection-to-sequence function, remove the collection-sequence function call */
+ assign.getExpressions().set(0, new MutableObject<ILogicalExpression>(argExpr));
+ return true;
+ }
+ /** change the assign operator to an unnest operator */
+ LogicalVariable var = assign.getVariables().get(0);
+ @SuppressWarnings("unchecked")
+ UnnestOperator unnest = new UnnestOperator(var, new MutableObject<ILogicalExpression>(
+ new UnnestingFunctionCallExpression(
+ FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION),
+ new MutableObject<ILogicalExpression>(argExpr))));
+ unnest.getInputs().addAll(assign.getInputs());
+ opRef.setValue(unnest);
+ context.computeAndSetTypeEnvironmentForOperator(unnest);
+ return true;
+ }
+}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
index a2545ff..fc4bb03d7 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/translator/AqlExpressionToPlanTranslator.java
@@ -187,13 +187,13 @@
ILogicalOperator topOp = p.first;
ProjectOperator project = (ProjectOperator) topOp;
LogicalVariable resVar = project.getVariables().get(0);
+
if (outputDatasetName == null) {
FileSplit outputFileSplit = metadataProvider.getOutputFile();
if (outputFileSplit == null) {
outputFileSplit = getDefaultOutputFileLocation();
}
metadataProvider.setOutputFile(outputFileSplit);
- String resultNodeName = outputFileSplit.getNodeName();
List<Mutable<ILogicalExpression>> writeExprList = new ArrayList<Mutable<ILogicalExpression>>(1);
writeExprList.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(resVar)));
@@ -202,6 +202,20 @@
topOp = new DistributeResultOperator(writeExprList, sink);
topOp.getInputs().add(new MutableObject<ILogicalOperator>(project));
} else {
+ /** add the collection-to-sequence right before the final project, because dataset only accept non-collection records */
+ LogicalVariable seqVar = context.newVar();
+ @SuppressWarnings("unchecked")
+ /** This assign adds a marker function collection-to-sequence: if the input is a singleton collection, unnest it; otherwise do nothing. */
+ AssignOperator assignCollectionToSequence = new AssignOperator(seqVar,
+ new MutableObject<ILogicalExpression>(
+ new ScalarFunctionCallExpression(AsterixBuiltinFunctions
+ .getAsterixFunctionInfo(AsterixBuiltinFunctions.COLLECTION_TO_SEQUENCE),
+ new MutableObject<ILogicalExpression>(new VariableReferenceExpression(resVar)))));
+ assignCollectionToSequence.getInputs().add(
+ new MutableObject<ILogicalOperator>(project.getInputs().get(0).getValue()));
+ project.getInputs().get(0).setValue(assignCollectionToSequence);
+ project.getVariables().set(0, seqVar);
+ resVar = seqVar;
AqlDataSource targetDatasource = validateDatasetInfo(metadataProvider, stmt.getDataverseName(),
stmt.getDatasetName());
@@ -225,7 +239,6 @@
}
AssignOperator assign = new AssignOperator(vars, exprs);
assign.getInputs().add(new MutableObject<ILogicalOperator>(project));
-
Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
resVar));
ILogicalOperator leafOperator = null;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
index d72ba6a..d5d7409 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
@@ -136,12 +136,12 @@
}
@Override
- public IOptimizationContext createOptimizationContext(int varCounter, int frameSize,
+ public IOptimizationContext createOptimizationContext(int varCounter,
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
PhysicalOptimizationConfig physicalOptimizationConfig) {
- return new AlgebricksOptimizationContext(varCounter, frameSize, expressionEvalSizeComputer,
+ return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
mergeAggregationExpressionFactory, expressionTypeComputer, nullableTypeComputer,
physicalOptimizationConfig);
}
@@ -260,22 +260,26 @@
AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
int frameSize = compilerProperties.getFrameSize();
+ int sortFrameLimit = (int) (compilerProperties.getSortMemorySize() / frameSize);
+ int joinFrameLimit = (int) (compilerProperties.getJoinMemorySize() / frameSize);
+ OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
+ OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalSort(sortFrameLimit);
+ OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesHybridHash(joinFrameLimit);
+
HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder(
AqlOptimizationContextFactory.INSTANCE);
+ builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
builder.setLogicalRewrites(buildDefaultLogicalRewrites());
builder.setPhysicalRewrites(buildDefaultPhysicalRewrites());
IDataFormat format = queryMetadataProvider.getFormat();
ICompilerFactory compilerFactory = builder.create();
- builder.setFrameSize(frameSize);
builder.setExpressionEvalSizeComputer(format.getExpressionEvalSizeComputer());
builder.setIMergeAggregationExpressionFactory(new AqlMergeAggregationExpressionFactory());
builder.setPartialAggregationTypeComputer(new AqlPartialAggregationTypeComputer());
builder.setExpressionTypeComputer(AqlExpressionTypeComputer.INSTANCE);
builder.setNullableTypeComputer(AqlNullableTypeComputer.INSTANCE);
- OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
- builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
ICompiler compiler = compilerFactory.createCompiler(plan, queryMetadataProvider, t.getVarCounter());
if (pc.isOptimize()) {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index 7a9ead4..9c04b80 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -105,7 +105,7 @@
indexLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository);
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProviderForRecovery(
this);
- txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider);
+ txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider, txnProperties);
isShuttingdown = false;
// The order of registration is important. The buffer cache must registered before recovery and transaction managers.
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index 3e1b4b2..dc77a96 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -13,4 +13,9 @@
<value>WARNING</value>
<description>Log level for running tests/build</description>
</property>
+ <property>
+ <name>txn.log.groupcommitinterval</name>
+ <value>1</value>
+ <description>The group commit wait time in milliseconds.</description>
+ </property>
</asterixConfiguration>
diff --git a/asterix-app/src/main/resources/log.properties b/asterix-app/src/main/resources/log.properties
deleted file mode 100644
index ee8040a..0000000
--- a/asterix-app/src/main/resources/log.properties
+++ /dev/null
@@ -1 +0,0 @@
-group_commit_wait_period=1
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-delete-all.plan b/asterix-app/src/test/resources/optimizerts/results/scan-delete-all.plan
index e969e5d..cba987c 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-delete-all.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-delete-all.plan
@@ -3,8 +3,8 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$19(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$19] |PARTITIONED|
+ -- STABLE_SORT [$$20(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$20] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan b/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
index 4f0d931..e1852af 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
@@ -10,8 +10,8 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$14(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$14] |PARTITIONED|
+ -- STABLE_SORT [$$15(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$15] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
@@ -20,4 +20,4 @@
-- BTREE_SEARCH |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- ASSIGN |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-delete.plan b/asterix-app/src/test/resources/optimizerts/results/scan-delete.plan
index 0608f69..a5265c8 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-delete.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-delete.plan
@@ -3,8 +3,8 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$21(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$21] |PARTITIONED|
+ -- STABLE_SORT [$$22(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$22] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan b/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan
index 1458c66..8057666 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan
@@ -13,8 +13,8 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$6(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$6] |PARTITIONED|
+ -- STABLE_SORT [$$7(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$7] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-insert.plan b/asterix-app/src/test/resources/optimizerts/results/scan-insert.plan
index 0b756d9..51d3060 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-insert.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-insert.plan
@@ -3,8 +3,8 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INSERT_DELETE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$6(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$6] |PARTITIONED|
+ -- STABLE_SORT [$$7(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$7] |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
diff --git a/asterix-app/src/test/resources/runtimets/queries/user-defined-functions/query-issue244/query-issue244.1.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/user-defined-functions/query-issue244/query-issue244.1.ddl.aql
new file mode 100644
index 0000000..6f7b5e0
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/user-defined-functions/query-issue244/query-issue244.1.ddl.aql
@@ -0,0 +1,25 @@
+/*
+ * Description : This test case is to verify the fix for issue244
+ : https://code.google.com/p/asterixdb/issues/detail?id=244
+ * Expected Res : Success
+ * Date : 4th June 2013
+ */
+
+
+drop dataverse test if exists;
+create dataverse test;
+
+use dataverse test;
+
+create type TypeA as open {
+ id : int32,
+ name : string
+}
+
+create dataset t1(TypeA) primary key id;
+
+create dataset t2(TypeA) primary key id;
+
+create function f1(){
+for $m in dataset('t1') return $m
+};
diff --git a/asterix-app/src/test/resources/runtimets/queries/user-defined-functions/query-issue244/query-issue244.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/user-defined-functions/query-issue244/query-issue244.2.update.aql
new file mode 100644
index 0000000..a7b9500
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/user-defined-functions/query-issue244/query-issue244.2.update.aql
@@ -0,0 +1,16 @@
+/*
+ * Description : This test case is to verify the fix for issue244
+ : https://code.google.com/p/asterixdb/issues/detail?id=244
+ * Expected Res : Success
+ * Date : 4th June 2013
+ */
+
+use dataverse test;
+
+insert into dataset t1({"id":21,"name":"John"});
+insert into dataset t1({"id":34,"name":"Bill"});
+insert into dataset t1({"id":41,"name":"Joy"});
+insert into dataset t1({"id":16,"name":"Sam"});
+insert into dataset t1({"id":67,"name":"Ravi"});
+
+insert into dataset t2(f1());
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/user-defined-functions/query-issue244/query-issue244.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/user-defined-functions/query-issue244/query-issue244.3.query.aql
new file mode 100644
index 0000000..68e4679
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/user-defined-functions/query-issue244/query-issue244.3.query.aql
@@ -0,0 +1,12 @@
+/*
+ * Description : This test case is to verify the fix for issue244
+ : https://code.google.com/p/asterixdb/issues/detail?id=244
+ * Expected Res : Success
+ * Date : 4th June 2013
+ */
+
+use dataverse test;
+
+for $l in dataset('t2')
+order by $l.id
+return $l;
diff --git a/asterix-app/src/test/resources/runtimets/results/user-defined-functions/query-issue244/query-issue244.1.adm b/asterix-app/src/test/resources/runtimets/results/user-defined-functions/query-issue244/query-issue244.1.adm
new file mode 100644
index 0000000..92f6371
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/user-defined-functions/query-issue244/query-issue244.1.adm
@@ -0,0 +1,5 @@
+{ "id": 16, "name": "Sam" }
+{ "id": 21, "name": "John" }
+{ "id": 34, "name": "Bill" }
+{ "id": 41, "name": "Joy" }
+{ "id": 67, "name": "Ravi" }
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index 4713b2e..9711930 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -4182,6 +4182,11 @@
<output-dir compare="Text">issue289_query</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="user-defined-functions">
+ <compilation-unit name="query-issue244">
+ <output-dir compare="Text">query-issue244</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="hints">
<test-case FilePath="hints">
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
index 253b30c..753778a 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
@@ -5,8 +5,8 @@
private static final String STORAGE_BUFFERCACHE_PAGESIZE_KEY = "storage.buffercache.pagesize";
private static int STORAGE_BUFFERCACHE_PAGESIZE_DEFAULT = (32 << 10); // 32KB
- private static final String STORAGE_BUFFERCACHE_NUMPAGES_KEY = "storage.buffercache.numpages";
- private static final int STORAGE_BUFFERCACHE_NUMPAGES_DEFAULT = 1024;
+ private static final String STORAGE_BUFFERCACHE_SIZE_KEY = "storage.buffercache.size";
+ private static final long STORAGE_BUFFERCACHE_SIZE_DEFAULT = (32 << 20); // 32 MB
private static final String STORAGE_BUFFERCACHE_MAXOPENFILES_KEY = "storage.buffercache.maxopenfiles";
private static int STORAGE_BUFFERCACHE_MAXOPENFILES_DEFAULT = Integer.MAX_VALUE;
@@ -15,7 +15,7 @@
private static final int STORAGE_MEMORYCOMPONENT_PAGESIZE_DEFAULT = (32 << 10); // 32KB
private static final String STORAGE_MEMORYCOMPONENT_NUMPAGES_KEY = "storage.memorycomponent.numpages";
- private static final int STORAGE_MEMORYCOMPONENT_NUMPAGES_DEFAULT = 8; // ... so 32MB components
+ private static final int STORAGE_MEMORYCOMPONENT_NUMPAGES_DEFAULT = 1024; // ... so 32MB components
private static final String STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_KEY = "storage.memorycomponent.globalbudget";
private static final long STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_DEFAULT = 536870912; // 512MB
@@ -35,9 +35,13 @@
PropertyInterpreters.getIntegerPropertyInterpreter());
}
+ public long getBufferCacheSize() {
+ return accessor.getProperty(STORAGE_BUFFERCACHE_SIZE_KEY, STORAGE_BUFFERCACHE_SIZE_DEFAULT,
+ PropertyInterpreters.getLongPropertyInterpreter());
+ }
+
public int getBufferCacheNumPages() {
- return accessor.getProperty(STORAGE_BUFFERCACHE_NUMPAGES_KEY, STORAGE_BUFFERCACHE_NUMPAGES_DEFAULT,
- PropertyInterpreters.getIntegerPropertyInterpreter());
+ return (int) (getBufferCacheSize() / getBufferCachePageSize());
}
public int getBufferCacheMaxOpenFiles() {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
index d97e53b..0b6ea85 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
@@ -1,6 +1,9 @@
package edu.uci.ics.asterix.common.config;
public class AsterixTransactionProperties extends AbstractAsterixProperties {
+ private static final String TXN_LOG_DIRECTORY_KEY = "txn.log.directory";
+ private static final String TXN_LOG_DIRECTORY_DEFAULT = "asterix_logs/";
+
private static final String TXN_LOG_BUFFER_NUMPAGES_KEY = "txn.log.buffer.numpages";
private static int TXN_LOG_BUFFER_NUMPAGES_DEFAULT = 8;
@@ -9,25 +12,46 @@
private static final String TXN_LOG_PARTITIONSIZE_KEY = "txn.log.partitionsize";
private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = (2 << 30); // 2GB
+
+ private static final String TXN_LOG_DISKSECTORSIZE_KEY = "txn.log.disksectorsize";
+ private static final int TXN_LOG_DISKSECTORSIZE_DEFAULT = 4096;
private static final String TXN_LOG_GROUPCOMMITINTERVAL_KEY = "txn.log.groupcommitinterval";
- private static int TXN_LOG_GROUPCOMMITINTERVAL_DEFAULT = 200; // 200ms
+ private static int TXN_LOG_GROUPCOMMITINTERVAL_DEFAULT = 10; // 0.1ms
private static final String TXN_LOG_CHECKPOINT_LSNTHRESHOLD_KEY = "txn.log.checkpoint.lsnthreshold";
private static final int TXN_LOG_CHECKPOINT_LSNTHRESHOLD_DEFAULT = (64 << 20); // 64M
private static final String TXN_LOG_CHECKPOINT_POLLFREQUENCY_KEY = "txn.log.checkpoint.pollfrequency";
private static int TXN_LOG_CHECKPOINT_POLLFREQUENCY_DEFAULT = 120; // 120s
+
+ private static final String TXN_LOG_CHECKPOINT_HISTORY_KEY = "txn.log.checkpoint.history";
+ private static int TXN_LOG_CHECKPOINT_HISTORY_DEFAULT = 0;
private static final String TXN_LOCK_ESCALATIONTHRESHOLD_KEY = "txn.lock.escalationthreshold";
private static int TXN_LOCK_ESCALATIONTHRESHOLD_DEFAULT = 1000;
private static final String TXN_LOCK_SHRINKTIMER_KEY = "txn.lock.shrinktimer";
- private static int TXN_LOCK_SHRINKTIMER_DEFAULT = 120000; // 2m
+ private static int TXN_LOCK_SHRINKTIMER_DEFAULT = 5000; // 5s
+
+ private static final String TXN_LOCK_TIMEOUT_WAITTHRESHOLD_KEY = "txn.lock.timeout.waitthreshold";
+ private static final int TXN_LOCK_TIMEOUT_WAITTHRESHOLD_DEFAULT = 60000; // 60s
+
+ private static final String TXN_LOCK_TIMEOUT_SWEEPTHRESHOLD_KEY = "txn.lock.timeout.sweepthreshold";
+ private static final int TXN_LOCK_TIMEOUT_SWEEPTHRESHOLD_DEFAULT = 10000; // 10s
public AsterixTransactionProperties(AsterixPropertiesAccessor accessor) {
super(accessor);
}
+
+ public String getLogDirectory() {
+ String logDirectory = accessor.getProperty(TXN_LOG_DIRECTORY_KEY, TXN_LOG_DIRECTORY_DEFAULT,
+ PropertyInterpreters.getStringPropertyInterpreter());
+ if (!logDirectory.endsWith("/")) {
+ logDirectory += "/";
+ }
+ return logDirectory;
+ }
public int getLogBufferNumPages() {
return accessor.getProperty(TXN_LOG_BUFFER_NUMPAGES_KEY, TXN_LOG_BUFFER_NUMPAGES_DEFAULT,
@@ -43,6 +67,11 @@
return accessor.getProperty(TXN_LOG_PARTITIONSIZE_KEY, TXN_LOG_PARTITIONSIZE_DEFAULT,
PropertyInterpreters.getLongPropertyInterpreter());
}
+
+ public int getLogDiskSectorSize() {
+ return accessor.getProperty(TXN_LOG_DISKSECTORSIZE_KEY, TXN_LOG_DISKSECTORSIZE_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
public int getGroupCommitInterval() {
return accessor.getProperty(TXN_LOG_GROUPCOMMITINTERVAL_KEY, TXN_LOG_GROUPCOMMITINTERVAL_DEFAULT,
@@ -59,6 +88,11 @@
PropertyInterpreters.getIntegerPropertyInterpreter());
}
+ public int getCheckpointHistory() {
+ return accessor.getProperty(TXN_LOG_CHECKPOINT_HISTORY_KEY, TXN_LOG_CHECKPOINT_HISTORY_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
public int getEntityToDatasetLockEscalationThreshold() {
return accessor.getProperty(TXN_LOCK_ESCALATIONTHRESHOLD_KEY, TXN_LOCK_ESCALATIONTHRESHOLD_DEFAULT,
PropertyInterpreters.getIntegerPropertyInterpreter());
@@ -68,5 +102,15 @@
return accessor.getProperty(TXN_LOCK_SHRINKTIMER_KEY, TXN_LOCK_SHRINKTIMER_DEFAULT,
PropertyInterpreters.getIntegerPropertyInterpreter());
}
+
+ public int getTimeoutWaitThreshold() {
+ return accessor.getProperty(TXN_LOCK_TIMEOUT_WAITTHRESHOLD_KEY, TXN_LOCK_TIMEOUT_WAITTHRESHOLD_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
+ public int getTimeoutSweepThreshold() {
+ return accessor.getProperty(TXN_LOCK_TIMEOUT_SWEEPTHRESHOLD_KEY, TXN_LOCK_TIMEOUT_SWEEPTHRESHOLD_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
index 89816aa..9387687 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
@@ -15,28 +15,16 @@
package edu.uci.ics.asterix.common.transactions;
import java.io.Serializable;
-import java.util.Properties;
+
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
public class LogManagerProperties implements Serializable {
private static final long serialVersionUID = 2084227360840799662L;
public static final int LOG_MAGIC_NUMBER = 123456789;
- public static final String LOG_DIR_SUFFIX_KEY = ".txnLogDir";
- public static final String LOG_PAGE_SIZE_KEY = "log_page_size";
- public static final String LOG_PARTITION_SIZE_KEY = "log_partition_size";
- public static final String NUM_LOG_PAGES_KEY = "num_log_pages";
- public static final String LOG_FILE_PREFIX_KEY = "log_file_prefix";
- public static final String GROUP_COMMIT_WAIT_PERIOD_KEY = "group_commit_wait_period";
- public static final String DISK_SECTOR_SIZE_KEY = "disk_sector_size";
-
- private static final int DEFAULT_LOG_PAGE_SIZE = 128 * 1024; //128KB
- private static final int DEFAULT_NUM_LOG_PAGES = 8;
- private static final long DEFAULT_LOG_PARTITION_SIZE = (long) 1024 * 1024 * 1024 * 2; //2GB
- private static final long DEFAULT_GROUP_COMMIT_WAIT_PERIOD = 200; // time in millisec.
+ public static final String LOG_DIR_SUFFIX = ".txnLogDir";
private static final String DEFAULT_LOG_FILE_PREFIX = "asterix_transaction_log";
- private static final String DEFAULT_LOG_DIRECTORY = "asterix_logs/";
- private static final int DEFAULT_DISK_SECTOR_SIZE = 4096;
// follow the naming convention <logFilePrefix>_<number> where number starts from 0
private final String logFilePrefix;
@@ -56,22 +44,19 @@
// default disk sector size
private final int diskSectorSize;
- public LogManagerProperties(Properties properties, String nodeId) {
- this.logDirKey = new String(nodeId + LOG_DIR_SUFFIX_KEY);
- this.logPageSize = Integer.parseInt(properties.getProperty(LOG_PAGE_SIZE_KEY, "" + DEFAULT_LOG_PAGE_SIZE));
- this.numLogPages = Integer.parseInt(properties.getProperty(NUM_LOG_PAGES_KEY, "" + DEFAULT_NUM_LOG_PAGES));
- long logPartitionSize = Long.parseLong(properties.getProperty(LOG_PARTITION_SIZE_KEY, ""
- + DEFAULT_LOG_PARTITION_SIZE));
- this.logDir = properties.getProperty(logDirKey, DEFAULT_LOG_DIRECTORY + nodeId);
- this.logFilePrefix = properties.getProperty(LOG_FILE_PREFIX_KEY, DEFAULT_LOG_FILE_PREFIX);
- this.groupCommitWaitPeriod = Long.parseLong(properties.getProperty(GROUP_COMMIT_WAIT_PERIOD_KEY, ""
- + DEFAULT_GROUP_COMMIT_WAIT_PERIOD));
+ public LogManagerProperties(AsterixTransactionProperties txnProperties, String nodeId) {
+ this.logDirKey = new String(nodeId + LOG_DIR_SUFFIX);
+ this.logPageSize = txnProperties.getLogBufferPageSize();
+ this.numLogPages = txnProperties.getLogBufferNumPages();
+ long logPartitionSize = txnProperties.getLogPartitionSize();
+ this.logDir = txnProperties.getLogDirectory() + nodeId;
+ this.logFilePrefix = DEFAULT_LOG_FILE_PREFIX;
+ this.groupCommitWaitPeriod = txnProperties.getGroupCommitInterval();
this.logBufferSize = logPageSize * numLogPages;
//make sure that the log partition size is the multiple of log buffer size.
this.logPartitionSize = (logPartitionSize / logBufferSize) * logBufferSize;
- this.diskSectorSize = Integer.parseInt(properties.getProperty(DISK_SECTOR_SIZE_KEY, ""
- + DEFAULT_DISK_SECTOR_SIZE));
+ this.diskSectorSize = txnProperties.getLogDiskSectorSize();
}
public long getLogPartitionSize() {
diff --git a/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java b/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
index ac8a3bd..d13dd2e 100644
--- a/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
+++ b/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
@@ -236,7 +236,7 @@
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Method failed: " + method.getStatusLine());
String[] errors = handleError(method);
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
- throw new Exception("DDL operation failed: " + errors[0]);
+ throw new Exception("DML operation failed: " + errors[0]);
}
}
diff --git a/asterix-doc/src/site/markdown/aql/manual.md b/asterix-doc/src/site/markdown/aql/manual.md
index da51dee..f1c3fbd 100644
--- a/asterix-doc/src/site/markdown/aql/manual.md
+++ b/asterix-doc/src/site/markdown/aql/manual.md
@@ -601,14 +601,23 @@
The following example creates an rtree index called fbSenderLocIdx on the sender-location field of the FacebookMessages dataset.
This index can be useful for accelerating queries that use the
[`spatial-intersect` function](functions.html#spatial-intersect) in a predicate involving the
-sender-loction field.
+sender-location field.
##### Example
create index fbSenderLocIndex on FacebookMessages(sender-location) type rtree;
+The following example creates a 3-gram index called fbUserIdx on the name field of the FacebookUsers dataset.
+This index can be used to accelerate some similarity or substring maching queries on the name field.
+For details refer to the [document on similarity queries](similarity.html#NGram_Index).
+
+##### Example
+
+ create index fbUserIdx on FacebookUsers(name) type ngram(3);
+
The following example creates a keyword index called fbMessageIdx on the message field of the FacebookMessages dataset.
-This index can be useful for accelerating text searches involving the message field.
+This keyword index can be used to optimize queries with token-based similarity predicates on the message field.
+For details refer to the [document on similarity queries](similarity.html#Keyword_Index).
##### Example
diff --git a/asterix-doc/src/site/resources/images/hyrax_ts.png b/asterix-doc/src/site/resources/images/hyrax_ts.png
new file mode 100644
index 0000000..001c788
--- /dev/null
+++ b/asterix-doc/src/site/resources/images/hyrax_ts.png
Binary files differ
diff --git a/asterix-doc/src/site/site.xml b/asterix-doc/src/site/site.xml
index b68941c..017bf10 100644
--- a/asterix-doc/src/site/site.xml
+++ b/asterix-doc/src/site/site.xml
@@ -11,7 +11,11 @@
<version position="right"/>
- <poweredBy><logo name="" img=""/></poweredBy>
+ <poweredBy>
+ <logo name="Hyracks"
+ href="https://code.google.com/p/hyracks/"
+ img="images/hyrax_ts.png"/>
+ </poweredBy>
<skin>
<groupId>org.apache.maven.skins</groupId>
diff --git a/asterix-installer/src/main/resources/conf/asterix-configuration.xml b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
index f0d76bf..8394a96 100644
--- a/asterix-installer/src/main/resources/conf/asterix-configuration.xml
+++ b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
@@ -22,10 +22,11 @@
</property>
<property>
- <name>storage.buffercache.numpages</name>
- <value>1024</value>
- <description>The number of pages allocated to the disk buffer cache.
- (Default = "1024")
+ <name>storage.buffercache.size</name>
+ <value>33554432</value>
+ <description>The size of memory allocated to the disk buffer cache.
+ The value should be a multiple of the buffer cache page size(Default
+ = "33554432" // 32MB)
</description>
</property>
@@ -79,6 +80,14 @@
</property>
<property>
+ <name>txn.log.directory</name>
+ <value>asterix_logs/</value>
+ <description>The directory location for transaction logs. (Default =
+ "asterix_logs/")
+ </description>
+ </property>
+
+ <property>
<name>txn.log.buffer.numpages</name>
<value>8</value>
<description>The number of in-memory log buffer pages. (Default = "8")
@@ -103,10 +112,17 @@
</property>
<property>
+ <name>txn.log.disksectorsize</name>
+ <value>4096</value>
+ <description>The size of a disk sector. (Default = "4096")
+ </description>
+ </property>
+
+ <property>
<name>txn.log.groupcommitinterval</name>
- <value>200</value>
+ <value>1</value>
<description>The group commit wait time in milliseconds. (Default =
- "200" // 2ms)
+ "10" // 0.1ms)
</description>
</property>
@@ -127,6 +143,14 @@
</property>
<property>
+ <name>txn.log.checkpoint.history</name>
+ <value>0</value>
+ <description>The number of old log partition files to keep before
+ discarding. (Default = "0")
+ </description>
+ </property>
+
+ <property>
<name>txn.lock.escalationthreshold</name>
<value>1000</value>
<description>The number of entity level locks that need to be acquired
@@ -137,9 +161,26 @@
<property>
<name>txn.lock.shrinktimer</name>
- <value>120000</value>
+ <value>5000</value>
<description>The time in milliseconds to wait before deallocating
- unused lock manager memory. (Default = "120000" // 120s)
+ unused lock manager memory. (Default = "5000" // 5s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.timeout.waitthreshold</name>
+ <value>60000</value>
+ <description>The time in milliseconds to wait before labeling a
+ transaction which has been waiting for a lock timed-out. (Default =
+ "60000" // 60s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.timeout.sweepthreshold</name>
+ <value>10000</value>
+ <description>The time in milliseconds the timeout thread waits between
+ sweeps to detect timed-out transactions. (Default = "10000" // 10s)
</description>
</property>
@@ -176,8 +217,8 @@
<property>
<name>api.port</name>
- <value>19101</value>
- <description>The port for the ASTERIX API server. (Default = 19101)
+ <value>19002</value>
+ <description>The port for the ASTERIX API server. (Default = 19002)
</description>
</property>
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
index 2a732ac..f717653 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/functions/AsterixBuiltinFunctions.java
@@ -31,6 +31,7 @@
import edu.uci.ics.asterix.om.typecomputer.impl.CastListResultTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.CastRecordResultTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.ClosedRecordConstructorResultType;
+import edu.uci.ics.asterix.om.typecomputer.impl.CollectionToSequenceTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.ConcatNonNullTypeComputer;
import edu.uci.ics.asterix.om.typecomputer.impl.FieldAccessByIndexResultType;
import edu.uci.ics.asterix.om.typecomputer.impl.InjectFailureTypeComputer;
@@ -561,6 +562,8 @@
public static final FunctionIdentifier NOT_NULL = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "not-null",
1);
+ public static final FunctionIdentifier COLLECTION_TO_SEQUENCE = new FunctionIdentifier(
+ FunctionConstants.ASTERIX_NS, "" + "collection-to-sequence", 1);
public static IFunctionInfo getAsterixFunctionInfo(FunctionIdentifier fid) {
IFunctionInfo finfo = registeredFunctions.get(fid);
@@ -862,6 +865,8 @@
addFunction(INTERVAL_CONSTRUCTOR_START_FROM_DATETIME, OptionalAIntervalTypeComputer.INSTANCE);
addFunction(INTERVAL_CONSTRUCTOR_START_FROM_TIME, OptionalAIntervalTypeComputer.INSTANCE);
+ addPrivateFunction(COLLECTION_TO_SEQUENCE, CollectionToSequenceTypeComputer.INSTANCE);
+
String metadataFunctionLoaderClassName = "edu.uci.ics.asterix.metadata.functions.MetadataBuiltinFunctions";
try {
Class.forName(metadataFunctionLoaderClassName);
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/CollectionToSequenceTypeComputer.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/CollectionToSequenceTypeComputer.java
new file mode 100644
index 0000000..5066d4b
--- /dev/null
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/typecomputer/impl/CollectionToSequenceTypeComputer.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.om.typecomputer.impl;
+
+import edu.uci.ics.asterix.om.typecomputer.base.IResultTypeComputer;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AbstractCollectionType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.types.TypeHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+
+/**
+ * This function is to make a sequence of records and a singleton collection of records
+ * present in a uniformed manner.
+ *
+ * @author yingyib
+ */
+public class CollectionToSequenceTypeComputer implements IResultTypeComputer {
+
+ public static final CollectionToSequenceTypeComputer INSTANCE = new CollectionToSequenceTypeComputer();
+
+ @Override
+ public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env,
+ IMetadataProvider<?, ?> metadataProvider) throws AlgebricksException {
+ AbstractFunctionCallExpression func = (AbstractFunctionCallExpression) expression;
+ ILogicalExpression arg = func.getArguments().get(0).getValue();
+
+ IAType argType = (IAType) env.getType(arg);
+ if (TypeHelper.canBeNull(argType)) {
+ IAType nonOptionalType = TypeHelper.getNonOptionalType(argType);
+ if (nonOptionalType.getTypeTag() == ATypeTag.ORDEREDLIST
+ || nonOptionalType.getTypeTag() == ATypeTag.UNORDEREDLIST) {
+ /** if the collection is null, that corresponds to an empty sequence */
+ argType = nonOptionalType;
+ }
+ }
+
+ ATypeTag argTypeTag = argType.getTypeTag();
+ if (argTypeTag == ATypeTag.ORDEREDLIST || argTypeTag == ATypeTag.UNORDEREDLIST) {
+ /** if the input is a singleton list, return it's item type if any */
+ AbstractCollectionType collectionType = (AbstractCollectionType) argType;
+ return collectionType.getItemType();
+ } else {
+ /** if the input is not a singleton list, return the original input type */
+ return argType;
+ }
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
index 206a3bf..1e944bc 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
@@ -30,13 +30,12 @@
*/
public class EntityInfoManager {
- public static final int SHRINK_TIMER_THRESHOLD = 120000; //2min
-
private ArrayList<ChildEntityInfoArrayManager> pArray;
private int allocChild; //used to allocate the next free EntityInfo slot.
private long shrinkTimer;
private boolean isShrinkTimerOn;
private int occupiedSlots;
+ private int shrinkTimerThreshold;
// ////////////////////////////////////////////////
// // begin of unit test
@@ -129,12 +128,13 @@
// // end of unit test
// ////////////////////////////////////////////////
- public EntityInfoManager() {
+ public EntityInfoManager(int shrinkTimerThreshold) {
pArray = new ArrayList<ChildEntityInfoArrayManager>();
pArray.add(new ChildEntityInfoArrayManager());
allocChild = 0;
occupiedSlots = 0;
isShrinkTimerOn = false;
+ this.shrinkTimerThreshold = shrinkTimerThreshold;
}
public int allocate(int jobId, int datasetId, int entityHashVal, byte lockMode) {
@@ -208,7 +208,7 @@
if (size > 1 && size * ChildEntityInfoArrayManager.NUM_OF_SLOTS / usedSlots >= 3) {
if (isShrinkTimerOn) {
- if (System.currentTimeMillis() - shrinkTimer >= SHRINK_TIMER_THRESHOLD) {
+ if (System.currentTimeMillis() - shrinkTimer >= shrinkTimerThreshold) {
isShrinkTimerOn = false;
return true;
}
@@ -301,7 +301,7 @@
}
public int getShrinkTimerThreshold() {
- return SHRINK_TIMER_THRESHOLD;
+ return shrinkTimerThreshold;
}
public void initEntityInfo(int slotNum, int jobId, int datasetId, int PKHashVal, byte lockMode) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
index 8bf5304..86eb02b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
@@ -192,9 +192,9 @@
}
}
- public void decreaseDatasetISLockCount(int datasetId) {
+ public void decreaseDatasetISLockCount(int datasetId, int entityToDatasetLockEscalationThreshold) {
int count = datasetISLockHT.get(datasetId);
- if (count >= LockManager.ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) {
+ if (count >= entityToDatasetLockEscalationThreshold) {
//do not decrease the count since it is already escalated.
} else if (count > 1) {
datasetISLockHT.upsert(datasetId, count - 1);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 96e95d9..9768198 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -25,6 +25,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
@@ -56,8 +57,6 @@
public static final boolean ALLOW_DATASET_GRANULE_X_LOCK_WITH_OTHER_CONCURRENT_LOCK_REQUESTS = false;
public static final boolean ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET = true;
- //Threshold must be greater than 1 and should be reasonably large enough not to escalate too soon.
- public static final int ESCALATE_TRHESHOLD_ENTITY_TO_DATASET = 1000;
private static final int DO_ESCALATE = 0;
private static final int ESCALATED = 1;
private static final int DONOT_ESCALATE = 2;
@@ -95,7 +94,7 @@
this.waiterLatch = new ReentrantReadWriteLock(true);
this.jobHT = new HashMap<JobId, JobInfo>();
this.datasetResourceHT = new HashMap<DatasetId, DatasetLockInfo>();
- this.entityInfoManager = new EntityInfoManager();
+ this.entityInfoManager = new EntityInfoManager(txnSubsystem.getTransactionProperties().getLockManagerShrinkTimer());
this.lockWaiterManager = new LockWaiterManager();
this.entityLockInfoManager = new EntityLockInfoManager(entityInfoManager, lockWaiterManager);
this.deadlockDetector = new DeadlockDetector(jobHT, datasetResourceHT, entityLockInfoManager,
@@ -110,6 +109,10 @@
this.lockRequestTracker = new LockRequestTracker();
}
}
+
+ public AsterixTransactionProperties getTransactionProperties() {
+ return this.txnSubsystem.getTransactionProperties();
+ }
@Override
public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
@@ -197,7 +200,7 @@
if (doEscalate) {
throw new IllegalStateException(
"ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set to "
- + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+ + txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
}
}
}
@@ -291,9 +294,9 @@
}
int count = jobInfo.getDatasetISLockCount(datasetId);
- if (count == ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) {
+ if (count == txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold()) {
return DO_ESCALATE;
- } else if (count > ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) {
+ } else if (count > txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold()) {
return ESCALATED;
} else {
return DONOT_ESCALATE;
@@ -779,7 +782,7 @@
if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
if (!isInstant && datasetLockMode == LockMode.IS) {
- jobInfo.decreaseDatasetISLockCount(datasetId.getId());
+ jobInfo.decreaseDatasetISLockCount(datasetId.getId(), txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
}
}
@@ -1294,7 +1297,7 @@
//We don't want to allow the lock escalation when there is a first lock request on a dataset.
throw new IllegalStateException(
"ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set to "
- + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+ + txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
}
}
}
@@ -2094,7 +2097,7 @@
try {
StringBuilder sb = new StringBuilder();
sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
- sb.append("\nESCALATE_TRHESHOLD_ENTITY_TO_DATASET: " + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+ sb.append("\nESCALATE_TRHESHOLD_ENTITY_TO_DATASET: " + txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
sb.append("\nSHRINK_TIMER_THRESHOLD (entityLockInfoManager): "
+ entityLockInfoManager.getShrinkTimerThreshold());
sb.append("\nSHRINK_TIMER_THRESHOLD (entityInfoManager): " + entityInfoManager.getShrinkTimerThreshold());
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index 0781276..02fda06 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -7,7 +7,14 @@
import java.util.NoSuchElementException;
import java.util.Scanner;
+import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
+import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
+import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
+import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
@@ -18,7 +25,7 @@
public class LockManagerDeterministicUnitTest {
- public static void main(String args[]) throws ACIDException, IOException {
+ public static void main(String args[]) throws ACIDException, IOException, AsterixException {
//initialize controller thread
String requestFileName = new String(
"src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockRequestFile");
@@ -39,8 +46,9 @@
String requestFileName;
long defaultWaitTime;
- public LockRequestController(String requestFileName) throws ACIDException {
- this.txnProvider = new TransactionSubsystem("LockManagerPredefinedUnitTest", null);;
+ public LockRequestController(String requestFileName) throws ACIDException, AsterixException {
+ this.txnProvider = new TransactionSubsystem("LockManagerPredefinedUnitTest", null,
+ new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
this.workerReadyQueue = new WorkerReadyQueue();
this.requestList = new ArrayList<LockRequest>();
this.expectedResultList = new ArrayList<ArrayList<Integer>>();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
index b2cc6ab..a942325 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
@@ -3,7 +3,10 @@
import java.util.ArrayList;
import java.util.Random;
+import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
@@ -28,9 +31,10 @@
private static int jobId = 0;
private static Random rand;
- public static void main(String args[]) throws ACIDException {
+ public static void main(String args[]) throws ACIDException, AsterixException {
int i;
- TransactionSubsystem txnProvider = new TransactionSubsystem("LockManagerRandomUnitTest", null);
+ TransactionSubsystem txnProvider = new TransactionSubsystem("LockManagerRandomUnitTest", null,
+ new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
rand = new Random(System.currentTimeMillis());
for (i = 0; i < MAX_NUM_OF_ENTITY_LOCK_JOB; i++) {
System.out.println("Creating " + i + "th EntityLockJob..");
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
index 05052f0..7cbd193 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
@@ -15,17 +15,19 @@
*/
public class TimeOutDetector {
- static final long TIME_OUT_THRESHOLD = 60000;
- static final long SWEEP_PERIOD = 10000;//120000;
LockManager lockMgr;
Thread trigger;
LinkedList<LockWaiter> victimList;
+ int timeoutThreshold;
+ int sweepThreshold;
public TimeOutDetector(LockManager lockMgr) {
this.victimList = new LinkedList<LockWaiter>();
this.lockMgr = lockMgr;
this.trigger = new Thread(new TimeoutTrigger(this));
+ this.timeoutThreshold = lockMgr.getTransactionProperties().getTimeoutWaitThreshold();
+ this.sweepThreshold = lockMgr.getTransactionProperties().getTimeoutSweepThreshold();
trigger.setDaemon(true);
AsterixThreadExecutor.INSTANCE.execute(trigger);
}
@@ -39,7 +41,7 @@
}
public void checkAndSetVictim(LockWaiter waiterObj) {
- if (System.currentTimeMillis() - waiterObj.getBeginWaitTime() >= TIME_OUT_THRESHOLD) {
+ if (System.currentTimeMillis() - waiterObj.getBeginWaitTime() >= timeoutThreshold) {
waiterObj.setVictim(true);
waiterObj.setWait(false);
victimList.add(waiterObj);
@@ -68,7 +70,7 @@
public void run() {
while (true) {
try {
- Thread.sleep(TimeOutDetector.SWEEP_PERIOD);
+ Thread.sleep(owner.sweepThreshold);
owner.sweep(); // Trigger the timeout detector (the owner) to
// initiate sweep
} catch (InterruptedException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index d4f874b..ba7e093 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -113,7 +113,7 @@
public LogManager(TransactionSubsystem provider) throws ACIDException {
this.provider = provider;
- initLogManagerProperties(this.provider.getId());
+ logManagerProperties = new LogManagerProperties(this.provider.getTransactionProperties(), this.provider.getId());
logPageSize = logManagerProperties.getLogPageSize();
initLogManager();
statLogSize = 0;
@@ -122,43 +122,13 @@
public LogManager(TransactionSubsystem provider, String nodeId) throws ACIDException {
this.provider = provider;
- initLogManagerProperties(nodeId);
+ logManagerProperties = new LogManagerProperties(provider.getTransactionProperties(), nodeId);
logPageSize = logManagerProperties.getLogPageSize();
initLogManager();
statLogSize = 0;
statLogCount = 0;
}
- /*
- * initialize the log manager properties either from the configuration file
- * on disk or with default values
- */
- private void initLogManagerProperties(String nodeId) throws ACIDException {
- LogManagerProperties logProperties = null;
- InputStream is = null;
- try {
- is = this.getClass().getClassLoader()
- .getResourceAsStream(TransactionManagementConstants.LogManagerConstants.LOG_CONF_FILE);
-
- Properties p = new Properties();
-
- if (is != null) {
- p.load(is);
- }
- logProperties = new LogManagerProperties(p, nodeId);
-
- } catch (IOException ioe) {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- throw new ACIDException("unable to close input stream ", e);
- }
- }
- }
- logManagerProperties = logProperties;
- }
-
private void initLogManager() throws ACIDException {
logRecordHelper = new LogRecordHelper(this);
numLogPages = logManagerProperties.getNumLogPages();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
index b59c48f..68df619 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -11,8 +11,8 @@
public class CheckpointThread extends Thread {
- private static final long LSN_THRESHOLD = 64 * 1024 * 1024;
- private long checkpointTermInSecs = 120; //seconds.
+ private long lsnThreshold;
+ private long checkpointTermInSecs;
private long lastMinMCTFirstLSN = 0;
@@ -20,12 +20,11 @@
private final IIndexLifecycleManager indexLifecycleManager;
public CheckpointThread(IRecoveryManager recoveryMgr, IIndexLifecycleManager indexLifecycleManager,
- long checkpointTermInSecs) {
+ long lsnThreshold, long checkpointTermInSecs) {
this.recoveryMgr = recoveryMgr;
this.indexLifecycleManager = indexLifecycleManager;
- if (this.checkpointTermInSecs < checkpointTermInSecs) {
- this.checkpointTermInSecs = checkpointTermInSecs;
- }
+ this.lsnThreshold = lsnThreshold;
+ this.checkpointTermInSecs = checkpointTermInSecs;
}
@Override
@@ -39,7 +38,7 @@
}
currentMinMCTFirstLSN = getMinMCTFirstLSN();
- if (currentMinMCTFirstLSN - lastMinMCTFirstLSN > LSN_THRESHOLD) {
+ if (currentMinMCTFirstLSN - lastMinMCTFirstLSN > lsnThreshold) {
try {
recoveryMgr.checkpoint(false);
lastMinMCTFirstLSN = currentMinMCTFirstLSN;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index ee27061..7b95358 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -24,6 +24,7 @@
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -90,6 +91,7 @@
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
private final TransactionSubsystem txnSubsystem;
+ private final int checkpointHistory;
/**
* A file at a known location that contains the LSN of the last log record
@@ -100,6 +102,7 @@
public RecoveryManager(TransactionSubsystem TransactionProvider) throws ACIDException {
this.txnSubsystem = TransactionProvider;
+ this.checkpointHistory = this.txnSubsystem.getTransactionProperties().getCheckpointHistory();
}
/**
@@ -516,8 +519,10 @@
//#. delete the previous checkpoint files
if (prevCheckpointFiles != null) {
- for (File file : prevCheckpointFiles) {
- file.delete();
+ // sort the filenames lexicographically to keep the latest checkpointHistory files.
+ Arrays.sort(prevCheckpointFiles);
+ for (int i = 0; i < prevCheckpointFiles.length - this.checkpointHistory; ++i) {
+ prevCheckpointFiles[i].delete();
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index 7d04f61..25b9195 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -26,10 +26,6 @@
}
public static class LogManagerConstants {
- public static final String LOG_CONF_DIR = "log_conf";
- public static final String LOG_CONF_FILE = "log.properties";
- public static final String ASTERIX_CONF_DIR = "src/main/resources";
- public static final String DEFAULT_LOG_DIR = "asterix_logs";
public static final int TERMINAL_LSN = -1;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index eb6e9a2..bde44de 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.asterix.transaction.management.service.transaction;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.ILockManager;
@@ -42,10 +43,12 @@
private final IndexLoggerRepository loggerRepository;
private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
private final CheckpointThread checkpointThread;
+ private final AsterixTransactionProperties txnProperties;
- public TransactionSubsystem(String id, IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider)
- throws ACIDException {
+ public TransactionSubsystem(String id, IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider,
+ AsterixTransactionProperties txnProperties) throws ACIDException {
this.id = id;
+ this.txnProperties = txnProperties;
this.transactionManager = new TransactionManager(this);
this.logManager = new LogManager(this);
this.lockManager = new LockManager(this);
@@ -55,7 +58,8 @@
this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
if (asterixAppRuntimeContextProvider != null) {
this.checkpointThread = new CheckpointThread(recoveryManager,
- asterixAppRuntimeContextProvider.getIndexLifecycleManager(), 0);
+ asterixAppRuntimeContextProvider.getIndexLifecycleManager(),
+ this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency());
} else {
this.checkpointThread = null;
}
@@ -89,6 +93,10 @@
return asterixAppRuntimeContextProvider;
}
+ public AsterixTransactionProperties getTransactionProperties() {
+ return txnProperties;
+ }
+
public String getId() {
return id;
}
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java
index 07f7474..862c996 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java
@@ -16,7 +16,10 @@
import java.io.IOException;
+import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -28,10 +31,11 @@
recoveryManager.startRecovery(true);
}
- public static void main(String args[]) throws IOException, ACIDException {
+ public static void main(String args[]) throws IOException, ACIDException, AsterixException {
String id = "nc1";
try {
- TransactionSubsystem factory = new TransactionSubsystem(id, null);
+ TransactionSubsystem factory = new TransactionSubsystem(id, null, new AsterixTransactionProperties(
+ new AsterixPropertiesAccessor()));
IRecoveryManager recoveryManager = factory.getRecoveryManager();
recoveryManager.startRecovery(true);
} catch (ACIDException acide) {
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
index 8f56e3b..b9605ef 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
@@ -18,7 +18,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ILogManager;
@@ -48,8 +51,9 @@
transactions = new Transaction[workload.numActiveThreads];
}
- public void beginWorkload() throws ACIDException {
- provider = new TransactionSubsystem("nc1", null);
+ public void beginWorkload() throws ACIDException, AsterixException {
+ provider = new TransactionSubsystem("nc1", null, new AsterixTransactionProperties(
+ new AsterixPropertiesAccessor()));
logManager = provider.getLogManager();
lockManager = provider.getLockManager();
provider.getTransactionalResourceRepository().registerTransactionalResourceManager(DummyResourceMgr.id,
@@ -83,7 +87,7 @@
System.out.println(" Avg Content Creation time :" + BasicLogger.getAverageContentCreationTime());
}
- public static void main(String args[]) {
+ public static void main(String args[]) throws AsterixException {
WorkloadProperties workload = new WorkloadProperties();
TransactionWorkloadSimulator simulator = new TransactionWorkloadSimulator(workload);
try {
@@ -177,12 +181,12 @@
byte logActionType = LogActionType.REDO_UNDO;
long pageId = 0;
if (!retry) {
- lockMode = (byte)(random.nextInt(2));
+ lockMode = (byte) (random.nextInt(2));
}
tempDatasetId.setId(resourceID);
TransactionWorkloadSimulator.lockManager.lock(tempDatasetId, -1, lockMode, context);
- TransactionWorkloadSimulator.logManager.log(logType, context, resourceID,
- -1, resourceID, ResourceType.LSM_BTREE, logSize, null, logger, memLSN);
+ TransactionWorkloadSimulator.logManager.log(logType, context, resourceID, -1, resourceID,
+ ResourceType.LSM_BTREE, logSize, null, logger, memLSN);
retry = false;
Thread.currentThread().sleep(TransactionWorkloadSimulator.workload.thinkTime);
logCount.incrementAndGet();
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
index 66e9bdf..c322b31 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
@@ -17,7 +17,10 @@
import java.io.IOException;
import java.util.Random;
+import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ILogManager;
@@ -47,9 +50,10 @@
private LogicalLogLocator memLSN;
private TransactionSubsystem transactionProvider;
- public TransactionSimulator(IResource resource, IResourceManager resourceMgr) throws ACIDException {
+ public TransactionSimulator(IResource resource, IResourceManager resourceMgr) throws ACIDException, AsterixException {
String id = "nc1";
- transactionProvider = new TransactionSubsystem(id, null);
+ transactionProvider = new TransactionSubsystem(id, null, new AsterixTransactionProperties(
+ new AsterixPropertiesAccessor()));
transactionManager = transactionProvider.getTransactionManager();
logManager = transactionProvider.getLogManager();
lockManager = transactionProvider.getLockManager();
@@ -102,7 +106,7 @@
/**
* @param args
*/
- public static void main(String[] args) throws IOException, ACIDException {
+ public static void main(String[] args) throws IOException, ACIDException, AsterixException {
String fileDir = "testdata";
String fileName = "counterFile";
IResource resource = new FileResource(fileDir, fileName);