Merge branch 'master' into raman/master_ports
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
index d72ba6a..d5d7409 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
@@ -136,12 +136,12 @@
}
@Override
- public IOptimizationContext createOptimizationContext(int varCounter, int frameSize,
+ public IOptimizationContext createOptimizationContext(int varCounter,
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
PhysicalOptimizationConfig physicalOptimizationConfig) {
- return new AlgebricksOptimizationContext(varCounter, frameSize, expressionEvalSizeComputer,
+ return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
mergeAggregationExpressionFactory, expressionTypeComputer, nullableTypeComputer,
physicalOptimizationConfig);
}
@@ -260,22 +260,26 @@
AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
int frameSize = compilerProperties.getFrameSize();
+ int sortFrameLimit = (int) (compilerProperties.getSortMemorySize() / frameSize);
+ int joinFrameLimit = (int) (compilerProperties.getJoinMemorySize() / frameSize);
+ OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
+ OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalSort(sortFrameLimit);
+ OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesHybridHash(joinFrameLimit);
+
HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder(
AqlOptimizationContextFactory.INSTANCE);
+ builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
builder.setLogicalRewrites(buildDefaultLogicalRewrites());
builder.setPhysicalRewrites(buildDefaultPhysicalRewrites());
IDataFormat format = queryMetadataProvider.getFormat();
ICompilerFactory compilerFactory = builder.create();
- builder.setFrameSize(frameSize);
builder.setExpressionEvalSizeComputer(format.getExpressionEvalSizeComputer());
builder.setIMergeAggregationExpressionFactory(new AqlMergeAggregationExpressionFactory());
builder.setPartialAggregationTypeComputer(new AqlPartialAggregationTypeComputer());
builder.setExpressionTypeComputer(AqlExpressionTypeComputer.INSTANCE);
builder.setNullableTypeComputer(AqlNullableTypeComputer.INSTANCE);
- OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
- builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
ICompiler compiler = compilerFactory.createCompiler(plan, queryMetadataProvider, t.getVarCounter());
if (pc.isOptimize()) {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index e64f68f..4b3087d 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -27,6 +27,8 @@
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
@@ -99,10 +101,9 @@
ioManager = ncApplicationContext.getRootContext().getIOManager();
bufferCache = new BufferCache(ioManager, allocator, prs, pcp, fileMapManager,
storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages(),
- storageProperties.getBufferCacheMaxOpenFiles());
+ storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory());
indexLifecycleManager = new IndexLifecycleManager(storageProperties.getMemoryComponentGlobalBudget());
-
lsmIOScheduler = SynchronousScheduler.INSTANCE;
mergePolicy = new ConstantMergePolicy(storageProperties.getLSMIndexMergeThreshold(), this);
lsmBTreeOpTrackerFactory = new IndexOperationTrackerFactory(LSMBTreeIOOperationCallbackFactory.INSTANCE);
@@ -118,8 +119,16 @@
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProviderForRecovery(
this);
- txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider);
+ txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider, txnProperties);
isShuttingdown = false;
+
+ // The order of registration is important. The buffer cache must registered before recovery and transaction managers.
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) bufferCache);
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) indexLifecycleManager);
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getLogManager());
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getLockManager());
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
}
public boolean isShuttingdown() {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 5d4f6ec..9b83f00 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -13,6 +13,7 @@
import edu.uci.ics.asterix.api.http.servlet.QueryResultAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.QueryStatusAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.UpdateAPIServlet;
+import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -37,12 +38,12 @@
@Override
public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
this.appCtx = ccAppCtx;
+
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting Asterix cluster controller");
}
-
+ appCtx.setThreadFactory(AsterixThreadFactory.INSTANCE);
AsterixAppContextInfo.initialize(appCtx);
-
proxy = AsterixStateProxy.registerRemoteObject();
appCtx.setDistributedState(proxy);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 4569088..32bfeb5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -1,11 +1,15 @@
package edu.uci.ics.asterix.hyracks.bootstrap;
+import java.io.File;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.api.common.AsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
@@ -19,6 +23,7 @@
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName());
@@ -32,17 +37,19 @@
@Override
public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+ ncAppCtx.setThreadFactory(AsterixThreadFactory.INSTANCE);
ncApplicationContext = ncAppCtx;
nodeId = ncApplicationContext.getNodeId();
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Starting Asterix node controller: " + nodeId);
+ LOGGER.info("Starting Asterix node controller TAKE NOTE: " + nodeId);
}
+ JVMShutdownHook sHook = new JVMShutdownHook(this);
+ Runtime.getRuntime().addShutdownHook(sHook);
+
runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
runtimeContext.initialize();
ncApplicationContext.setApplicationObject(runtimeContext);
- JVMShutdownHook sHook = new JVMShutdownHook(this);
- Runtime.getRuntime().addShutdownHook(sHook);
// #. recover if the system is corrupted by checking system state.
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -78,6 +85,8 @@
if (isMetadataNode) {
MetadataBootstrap.stopUniverse();
}
+
+ LifeCycleComponentManager.INSTANCE.stopAll(false);
runtimeContext.deinitialize();
} else {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -89,7 +98,8 @@
@Override
public void notifyStartupComplete() throws Exception {
IAsterixStateProxy proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
- AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider)runtimeContext).getMetadataProperties();
+ AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
+ .getMetadataProperties();
if (systemState == SystemState.NEW_UNIVERSE) {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -113,11 +123,29 @@
}
MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
MetadataManager.INSTANCE.init();
- MetadataBootstrap.startUniverse( ((IAsterixPropertiesProvider)runtimeContext), ncApplicationContext,
+ MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
systemState == SystemState.NEW_UNIVERSE);
MetadataBootstrap.startDDLRecovery();
}
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting lifecycle components");
+ }
+
+ Map<String, String> lifecycleMgmtConfiguration = new HashMap<String, String>();
+ String key = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
+ String value = metadataProperties.getCoredumpPath(nodeId);
+ lifecycleMgmtConfiguration.put(key, value);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Coredump directory for NC is: " + value);
+ }
+ LifeCycleComponentManager.INSTANCE.configure(lifecycleMgmtConfiguration);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Configured:" + LifeCycleComponentManager.INSTANCE);
+ }
+
+ LifeCycleComponentManager.INSTANCE.startAll();
+
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
recoveryMgr.checkpoint(true);
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index 3e1b4b2..dc77a96 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -13,4 +13,9 @@
<value>WARNING</value>
<description>Log level for running tests/build</description>
</property>
+ <property>
+ <name>txn.log.groupcommitinterval</name>
+ <value>1</value>
+ <description>The group commit wait time in milliseconds.</description>
+ </property>
</asterixConfiguration>
diff --git a/asterix-app/src/main/resources/log.properties b/asterix-app/src/main/resources/log.properties
deleted file mode 100644
index ee8040a..0000000
--- a/asterix-app/src/main/resources/log.properties
+++ /dev/null
@@ -1 +0,0 @@
-group_commit_wait_period=1
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..b39fced 100644
--- a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
@@ -5,21 +5,21 @@
<configuration>
- <property>
- <name>mapred.job.tracker</name>
- <value>localhost:29007</value>
- </property>
- <property>
- <name>mapred.tasktracker.map.tasks.maximum</name>
- <value>20</value>
- </property>
- <property>
- <name>mapred.tasktracker.reduce.tasks.maximum</name>
- <value>20</value>
- </property>
- <property>
- <name>mapred.min.split.size</name>
- <value>65536</value>
- </property>
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>localhost:29007</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.map.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.reduce.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.max.split.size</name>
+ <value>128</value>
+ </property>
</configuration>
diff --git a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
index 653ee6c..527a0e5 100644
--- a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
@@ -8,4 +8,5 @@
use dataverse test;
for $x in dataset('TextDataset')
+order by $x.line
return $x
diff --git a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
index 8af2f5f..59425b1 100644
--- a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
@@ -1,4 +1,4 @@
+{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
+{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
{ "line": "The ASTERIX project is developing new technologies for ingesting, storing, managing, indexing, querying, analyzing, and subscribing to vast quantities of semi-structured information" }
{ "line": "The project is combining ideas from three distinct areas semi-structured data, parallel databases, and data-intensive computing to create a next-generation, open source software platform that scales by running on large, shared-nothing commodity computing clusters" }
-{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
-{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
new file mode 100644
index 0000000..14975ff
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.api;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class AsterixThreadExecutor implements Executor {
+ public final static AsterixThreadExecutor INSTANCE = new AsterixThreadExecutor();
+ private final Executor executor = Executors.newCachedThreadPool(AsterixThreadFactory.INSTANCE);
+
+ private AsterixThreadExecutor() {
+
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ executor.execute(command);
+ }
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
new file mode 100644
index 0000000..7e4735f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.api;
+
+import java.util.concurrent.ThreadFactory;
+
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
+
+public class AsterixThreadFactory implements ThreadFactory {
+
+ public final static AsterixThreadFactory INSTANCE = new AsterixThreadFactory();
+
+ private AsterixThreadFactory() {
+
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t;
+ if ((r instanceof Thread)) {
+ t = (Thread) r;
+ } else {
+ t = new Thread(r);
+ }
+ t.setUncaughtExceptionHandler(LifeCycleComponentManager.INSTANCE);
+ return t;
+ }
+
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
index 6d47e78..6b6cded 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
@@ -24,5 +24,9 @@
public Set<String> getNodeNames() {
return accessor.getNodeNames();
}
+
+ public String getCoredumpPath(String nodeId){
+ return accessor.getCoredumpPath(nodeId);
+ }
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
index 7b2f2a6..d623ae5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
@@ -16,6 +16,7 @@
import javax.xml.bind.Unmarshaller;
import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.common.configuration.Coredump;
import edu.uci.ics.asterix.common.configuration.Property;
import edu.uci.ics.asterix.common.configuration.Store;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -26,6 +27,7 @@
private final String metadataNodeName;
private final Set<String> nodeNames;
private final Map<String, String[]> stores;
+ private final Map<String, String> coredumpConfig;
private final Map<String, Property> asterixConfigurationParams;
public AsterixPropertiesAccessor() throws AsterixException {
@@ -64,6 +66,11 @@
for (Property p : asterixConfiguration.getProperty()) {
asterixConfigurationParams.put(p.getName(), p);
}
+ coredumpConfig = new HashMap<String, String>();
+ for (Coredump cd : asterixConfiguration.getCoredump()) {
+ coredumpConfig.put(cd.getNcId(), cd.getCoredumpPath());
+ }
+
}
public String getMetadataNodeName() {
@@ -82,6 +89,10 @@
return nodeNames;
}
+ public String getCoredumpPath(String nodeId) {
+ return coredumpConfig.get(nodeId);
+ }
+
public <T> T getProperty(String property, T defaultValue, IPropertyInterpreter<T> interpreter) {
Property p = asterixConfigurationParams.get(property);
if (p == null) {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
index 91bdca6..870aae4 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
@@ -5,8 +5,8 @@
private static final String STORAGE_BUFFERCACHE_PAGESIZE_KEY = "storage.buffercache.pagesize";
private static int STORAGE_BUFFERCACHE_PAGESIZE_DEFAULT = (32 << 10); // 32KB
- private static final String STORAGE_BUFFERCACHE_NUMPAGES_KEY = "storage.buffercache.numpages";
- private static final int STORAGE_BUFFERCACHE_NUMPAGES_DEFAULT = 1024;
+ private static final String STORAGE_BUFFERCACHE_SIZE_KEY = "storage.buffercache.size";
+ private static final long STORAGE_BUFFERCACHE_SIZE_DEFAULT = (32 << 20); // 32 MB
private static final String STORAGE_BUFFERCACHE_MAXOPENFILES_KEY = "storage.buffercache.maxopenfiles";
private static int STORAGE_BUFFERCACHE_MAXOPENFILES_DEFAULT = Integer.MAX_VALUE;
@@ -15,7 +15,7 @@
private static final int STORAGE_MEMORYCOMPONENT_PAGESIZE_DEFAULT = (32 << 10); // 32KB
private static final String STORAGE_MEMORYCOMPONENT_NUMPAGES_KEY = "storage.memorycomponent.numpages";
- private static final int STORAGE_MEMORYCOMPONENT_NUMPAGES_DEFAULT = 2048; // ... so 64MB components
+ private static final int STORAGE_MEMORYCOMPONENT_NUMPAGES_DEFAULT = 1024; // ... so 32MB components
private static final String STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_KEY = "storage.memorycomponent.globalbudget";
private static final long STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_DEFAULT = (1 << 30); // 1GB
@@ -35,9 +35,13 @@
PropertyInterpreters.getIntegerPropertyInterpreter());
}
+ public long getBufferCacheSize() {
+ return accessor.getProperty(STORAGE_BUFFERCACHE_SIZE_KEY, STORAGE_BUFFERCACHE_SIZE_DEFAULT,
+ PropertyInterpreters.getLongPropertyInterpreter());
+ }
+
public int getBufferCacheNumPages() {
- return accessor.getProperty(STORAGE_BUFFERCACHE_NUMPAGES_KEY, STORAGE_BUFFERCACHE_NUMPAGES_DEFAULT,
- PropertyInterpreters.getIntegerPropertyInterpreter());
+ return (int) (getBufferCacheSize() / getBufferCachePageSize());
}
public int getBufferCacheMaxOpenFiles() {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
index d97e53b..0b6ea85 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
@@ -1,6 +1,9 @@
package edu.uci.ics.asterix.common.config;
public class AsterixTransactionProperties extends AbstractAsterixProperties {
+ private static final String TXN_LOG_DIRECTORY_KEY = "txn.log.directory";
+ private static final String TXN_LOG_DIRECTORY_DEFAULT = "asterix_logs/";
+
private static final String TXN_LOG_BUFFER_NUMPAGES_KEY = "txn.log.buffer.numpages";
private static int TXN_LOG_BUFFER_NUMPAGES_DEFAULT = 8;
@@ -9,25 +12,46 @@
private static final String TXN_LOG_PARTITIONSIZE_KEY = "txn.log.partitionsize";
private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = (2 << 30); // 2GB
+
+ private static final String TXN_LOG_DISKSECTORSIZE_KEY = "txn.log.disksectorsize";
+ private static final int TXN_LOG_DISKSECTORSIZE_DEFAULT = 4096;
private static final String TXN_LOG_GROUPCOMMITINTERVAL_KEY = "txn.log.groupcommitinterval";
- private static int TXN_LOG_GROUPCOMMITINTERVAL_DEFAULT = 200; // 200ms
+ private static int TXN_LOG_GROUPCOMMITINTERVAL_DEFAULT = 10; // 0.1ms
private static final String TXN_LOG_CHECKPOINT_LSNTHRESHOLD_KEY = "txn.log.checkpoint.lsnthreshold";
private static final int TXN_LOG_CHECKPOINT_LSNTHRESHOLD_DEFAULT = (64 << 20); // 64M
private static final String TXN_LOG_CHECKPOINT_POLLFREQUENCY_KEY = "txn.log.checkpoint.pollfrequency";
private static int TXN_LOG_CHECKPOINT_POLLFREQUENCY_DEFAULT = 120; // 120s
+
+ private static final String TXN_LOG_CHECKPOINT_HISTORY_KEY = "txn.log.checkpoint.history";
+ private static int TXN_LOG_CHECKPOINT_HISTORY_DEFAULT = 0;
private static final String TXN_LOCK_ESCALATIONTHRESHOLD_KEY = "txn.lock.escalationthreshold";
private static int TXN_LOCK_ESCALATIONTHRESHOLD_DEFAULT = 1000;
private static final String TXN_LOCK_SHRINKTIMER_KEY = "txn.lock.shrinktimer";
- private static int TXN_LOCK_SHRINKTIMER_DEFAULT = 120000; // 2m
+ private static int TXN_LOCK_SHRINKTIMER_DEFAULT = 5000; // 5s
+
+ private static final String TXN_LOCK_TIMEOUT_WAITTHRESHOLD_KEY = "txn.lock.timeout.waitthreshold";
+ private static final int TXN_LOCK_TIMEOUT_WAITTHRESHOLD_DEFAULT = 60000; // 60s
+
+ private static final String TXN_LOCK_TIMEOUT_SWEEPTHRESHOLD_KEY = "txn.lock.timeout.sweepthreshold";
+ private static final int TXN_LOCK_TIMEOUT_SWEEPTHRESHOLD_DEFAULT = 10000; // 10s
public AsterixTransactionProperties(AsterixPropertiesAccessor accessor) {
super(accessor);
}
+
+ public String getLogDirectory() {
+ String logDirectory = accessor.getProperty(TXN_LOG_DIRECTORY_KEY, TXN_LOG_DIRECTORY_DEFAULT,
+ PropertyInterpreters.getStringPropertyInterpreter());
+ if (!logDirectory.endsWith("/")) {
+ logDirectory += "/";
+ }
+ return logDirectory;
+ }
public int getLogBufferNumPages() {
return accessor.getProperty(TXN_LOG_BUFFER_NUMPAGES_KEY, TXN_LOG_BUFFER_NUMPAGES_DEFAULT,
@@ -43,6 +67,11 @@
return accessor.getProperty(TXN_LOG_PARTITIONSIZE_KEY, TXN_LOG_PARTITIONSIZE_DEFAULT,
PropertyInterpreters.getLongPropertyInterpreter());
}
+
+ public int getLogDiskSectorSize() {
+ return accessor.getProperty(TXN_LOG_DISKSECTORSIZE_KEY, TXN_LOG_DISKSECTORSIZE_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
public int getGroupCommitInterval() {
return accessor.getProperty(TXN_LOG_GROUPCOMMITINTERVAL_KEY, TXN_LOG_GROUPCOMMITINTERVAL_DEFAULT,
@@ -59,6 +88,11 @@
PropertyInterpreters.getIntegerPropertyInterpreter());
}
+ public int getCheckpointHistory() {
+ return accessor.getProperty(TXN_LOG_CHECKPOINT_HISTORY_KEY, TXN_LOG_CHECKPOINT_HISTORY_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
public int getEntityToDatasetLockEscalationThreshold() {
return accessor.getProperty(TXN_LOCK_ESCALATIONTHRESHOLD_KEY, TXN_LOCK_ESCALATIONTHRESHOLD_DEFAULT,
PropertyInterpreters.getIntegerPropertyInterpreter());
@@ -68,5 +102,15 @@
return accessor.getProperty(TXN_LOCK_SHRINKTIMER_KEY, TXN_LOCK_SHRINKTIMER_DEFAULT,
PropertyInterpreters.getIntegerPropertyInterpreter());
}
+
+ public int getTimeoutWaitThreshold() {
+ return accessor.getProperty(TXN_LOCK_TIMEOUT_WAITTHRESHOLD_KEY, TXN_LOCK_TIMEOUT_WAITTHRESHOLD_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
+ public int getTimeoutSweepThreshold() {
+ return accessor.getProperty(TXN_LOCK_TIMEOUT_SWEEPTHRESHOLD_KEY, TXN_LOCK_TIMEOUT_SWEEPTHRESHOLD_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
index 9470d17..c90422c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
@@ -50,12 +50,10 @@
ACIDException;
/**
- * @param logicalLogLocator
+ * @param lsnValue
* TODO
* @param logicalLogLocator
* TODO
- * @param PhysicalLogLocator
- * specifies the location of the log record to be read
* @throws ACIDException
*/
public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index a06cc75..a7f2984 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -1,7 +1,6 @@
package edu.uci.ics.asterix.common.transactions;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext.TransactionType;
import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -42,6 +41,8 @@
public void setTransactionType(TransactionType transactionType);
+ public String prettyPrint();
+
public static final long INVALID_TIME = -1l; // used for showing a
// transaction is not waiting.
public static final int ACTIVE_STATUS = 0;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
index 2bfc00d..e57cc64 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
@@ -120,5 +120,4 @@
*/
public ITransactionSubsystem getTransactionProvider();
-
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
index 4dc943c..9387687 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
@@ -15,28 +15,16 @@
package edu.uci.ics.asterix.common.transactions;
import java.io.Serializable;
-import java.util.Properties;
+
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
public class LogManagerProperties implements Serializable {
private static final long serialVersionUID = 2084227360840799662L;
public static final int LOG_MAGIC_NUMBER = 123456789;
- public static final String LOG_DIR_SUFFIX_KEY = ".txnLogDir";
- public static final String LOG_PAGE_SIZE_KEY = "log_page_size";
- public static final String LOG_PARTITION_SIZE_KEY = "log_partition_size";
- public static final String NUM_LOG_PAGES_KEY = "num_log_pages";
- public static final String LOG_FILE_PREFIX_KEY = "log_file_prefix";
- public static final String GROUP_COMMIT_WAIT_PERIOD_KEY = "group_commit_wait_period";
- public static final String DISK_SECTOR_SIZE_KEY = "disk_sector_size";
-
- private static final int DEFAULT_LOG_PAGE_SIZE = 128 * 1024; //128KB
- private static final int DEFAULT_NUM_LOG_PAGES = 8;
- private static final long DEFAULT_LOG_PARTITION_SIZE = (long) 1024 * 1024 * 1024 * 2; //2GB
- private static final long DEFAULT_GROUP_COMMIT_WAIT_PERIOD = 200; // time in millisec.
+ public static final String LOG_DIR_SUFFIX = ".txnLogDir";
private static final String DEFAULT_LOG_FILE_PREFIX = "asterix_transaction_log";
- private static final String DEFAULT_LOG_DIRECTORY = "asterix_logs/";
- private static final int DEFAULT_DISK_SECTOR_SIZE = 4096;
// follow the naming convention <logFilePrefix>_<number> where number starts from 0
private final String logFilePrefix;
@@ -56,22 +44,19 @@
// default disk sector size
private final int diskSectorSize;
- public LogManagerProperties(Properties properties, String nodeId) {
- this.logDirKey = new String(nodeId + LOG_DIR_SUFFIX_KEY);
- this.logPageSize = Integer.parseInt(properties.getProperty(LOG_PAGE_SIZE_KEY, "" + DEFAULT_LOG_PAGE_SIZE));
- this.numLogPages = Integer.parseInt(properties.getProperty(NUM_LOG_PAGES_KEY, "" + DEFAULT_NUM_LOG_PAGES));
- long logPartitionSize = Long.parseLong(properties.getProperty(LOG_PARTITION_SIZE_KEY, ""
- + DEFAULT_LOG_PARTITION_SIZE));
- this.logDir = properties.getProperty(logDirKey, DEFAULT_LOG_DIRECTORY + nodeId);
- this.logFilePrefix = properties.getProperty(LOG_FILE_PREFIX_KEY, DEFAULT_LOG_FILE_PREFIX);
- this.groupCommitWaitPeriod = Long.parseLong(properties.getProperty(GROUP_COMMIT_WAIT_PERIOD_KEY, ""
- + DEFAULT_GROUP_COMMIT_WAIT_PERIOD));
+ public LogManagerProperties(AsterixTransactionProperties txnProperties, String nodeId) {
+ this.logDirKey = new String(nodeId + LOG_DIR_SUFFIX);
+ this.logPageSize = txnProperties.getLogBufferPageSize();
+ this.numLogPages = txnProperties.getLogBufferNumPages();
+ long logPartitionSize = txnProperties.getLogPartitionSize();
+ this.logDir = txnProperties.getLogDirectory() + nodeId;
+ this.logFilePrefix = DEFAULT_LOG_FILE_PREFIX;
+ this.groupCommitWaitPeriod = txnProperties.getGroupCommitInterval();
this.logBufferSize = logPageSize * numLogPages;
//make sure that the log partition size is the multiple of log buffer size.
this.logPartitionSize = (logPartitionSize / logBufferSize) * logBufferSize;
- this.diskSectorSize = Integer.parseInt(properties.getProperty(DISK_SECTOR_SIZE_KEY, ""
- + DEFAULT_DISK_SECTOR_SIZE));
+ this.diskSectorSize = txnProperties.getLogDiskSectorSize();
}
public long getLogPartitionSize() {
@@ -105,7 +90,7 @@
public String getLogDirKey() {
return logDirKey;
}
-
+
public int getDiskSectorSize() {
return diskSectorSize;
}
diff --git a/asterix-common/src/main/resources/schema/asterix-conf.xsd b/asterix-common/src/main/resources/schema/asterix-conf.xsd
index f53fb4b..5aefdbd 100644
--- a/asterix-common/src/main/resources/schema/asterix-conf.xsd
+++ b/asterix-common/src/main/resources/schema/asterix-conf.xsd
@@ -7,6 +7,7 @@
<xs:element name="metadataNode" type="xs:string" />
+ <xs:element name="coredumpPath" type="xs:string" />
<xs:element name="storeDirs" type="xs:string" />
<xs:element name="ncId" type="xs:string" />
<xs:element name="name" type="xs:string" />
@@ -23,6 +24,15 @@
</xs:complexType>
</xs:element>
+ <xs:element name="coredump">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element ref="mg:ncId" />
+ <xs:element ref="mg:coredumpPath" />
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
<xs:element name="property">
<xs:complexType>
<xs:sequence>
@@ -39,6 +49,7 @@
<xs:sequence>
<xs:element ref="mg:metadataNode" minOccurs="0"/>
<xs:element ref="mg:store" maxOccurs="unbounded" />
+ <xs:element ref="mg:coredump" maxOccurs="unbounded" />
<xs:element ref="mg:property" minOccurs="0" maxOccurs="unbounded" />
</xs:sequence>
</xs:complexType>
diff --git a/asterix-doc/pom.xml b/asterix-doc/pom.xml
index d987e5f..4b8cb2f 100644
--- a/asterix-doc/pom.xml
+++ b/asterix-doc/pom.xml
@@ -1,21 +1,21 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>asterix</artifactId>
- <groupId>edu.uci.ics.asterix</groupId>
- <version>0.0.6-SNAPSHOT</version>
- </parent>
- <artifactId>asterix-doc</artifactId>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-site-plugin</artifactId>
- <version>3.3</version>
- <configuration>
- <generateReports>false</generateReports>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>asterix</artifactId>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <version>0.0.6-SNAPSHOT</version>
+ </parent>
+ <artifactId>asterix-doc</artifactId>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.3</version>
+ <configuration>
+ <generateReports>false</generateReports>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/asterix-doc/src/site/markdown/AsterixDBRestAPI.md b/asterix-doc/src/site/markdown/api.md
similarity index 100%
rename from asterix-doc/src/site/markdown/AsterixDBRestAPI.md
rename to asterix-doc/src/site/markdown/api.md
diff --git a/asterix-doc/src/site/markdown/AsterixDBDataModel.md b/asterix-doc/src/site/markdown/aql/datamodel.md
similarity index 100%
rename from asterix-doc/src/site/markdown/AsterixDBDataModel.md
rename to asterix-doc/src/site/markdown/aql/datamodel.md
diff --git a/asterix-doc/src/site/markdown/AccessingExternalDataInAsterixDB.md b/asterix-doc/src/site/markdown/aql/externaldata.md
similarity index 98%
rename from asterix-doc/src/site/markdown/AccessingExternalDataInAsterixDB.md
rename to asterix-doc/src/site/markdown/aql/externaldata.md
index 7319094..e603954 100644
--- a/asterix-doc/src/site/markdown/AccessingExternalDataInAsterixDB.md
+++ b/asterix-doc/src/site/markdown/aql/externaldata.md
@@ -10,7 +10,7 @@
As an example we consider the Lineitem dataset from [TPCH schema](http://www.openlinksw.com/dataspace/doc/dav/wiki/Main/VOSTPCHLinkedData/tpch.sql).
-We assume that you have successfully created an ASTERIX instance following the instructions at [Installing Asterix Using Managix](InstallingAsterixUsingManagix.html).
+We assume that you have successfully created an ASTERIX instance following the instructions at [Installing Asterix Using Managix](../install.html).
_For constructing an example, we assume a single machine setup._
Similar to a regular dataset, an external dataset has an associated datatype. We shall first create the datatype associated with each record in Lineitem data.
diff --git a/asterix-doc/src/site/markdown/AsterixDBFunctions.md b/asterix-doc/src/site/markdown/aql/functions.md
similarity index 100%
rename from asterix-doc/src/site/markdown/AsterixDBFunctions.md
rename to asterix-doc/src/site/markdown/aql/functions.md
diff --git a/asterix-doc/src/site/markdown/AsterixQueryLanguageReference.md b/asterix-doc/src/site/markdown/aql/manual.md
similarity index 86%
rename from asterix-doc/src/site/markdown/AsterixQueryLanguageReference.md
rename to asterix-doc/src/site/markdown/aql/manual.md
index eddc157..da51dee 100644
--- a/asterix-doc/src/site/markdown/AsterixQueryLanguageReference.md
+++ b/asterix-doc/src/site/markdown/aql/manual.md
@@ -16,6 +16,10 @@
## 2. Expressions
+ Query ::= Expression
+
+An AQL query can be any legal AQL expression.
+
Expression ::= ( OperatorExpr | IfThenElse | FLWOR | QuantifiedExpression )
AQL is a fully composable expression language.
@@ -28,12 +32,6 @@
or a QuantifiedExpression (which yields a boolean value).
Each will be detailed as we explore the full AQL grammar.
-### Queries
-
- Query ::= Expression
-
-An AQL query can be any legal AQL expression.
-
### Primary Expressions
PrimaryExpr ::= Literal
@@ -226,7 +224,7 @@
AQL has the usual list of suspects, plus one, for comparing pairs of atomic values.
The "plus one" is the last operator listed above, which is the "roughly equal" operator provided for similarity queries.
-(See the separate document on AsterixDB Similarity Queries for more details on similarity matching.)
+(See the separate document on [AsterixDB Similarity Queries](similarity.html) for more details on similarity matching.)
An example comparison expression (which yields the boolean value true) is shown below.
@@ -266,24 +264,24 @@
The heart of AQL is the FLWOR (for-let-where-orderby-return) expression.
The roots of this expression were borrowed from the expression of the same name in XQuery.
A FLWOR expression starts with one or more clauses that establish variable bindings.
-A for clause binds a variable incrementally to each element of its associated expression;
+A `for` clause binds a variable incrementally to each element of its associated expression;
it includes an optional positional variable for counting/numbering the bindings.
-By default no ordering is implied or assumed by a for clause.
-A let clause binds a variable to the collection of elements computed by its associated expression.
+By default no ordering is implied or assumed by a `for` clause.
+A `let` clause binds a variable to the collection of elements computed by its associated expression.
-Following the initial for or let clause(s), a FLWOR expression may contain an arbitrary sequence of other clauses.
-The where clause in a FLWOR expression filters the preceding bindings via a boolean expression, much like a where clause does in a SQL query.
-The order by clause in a FLWOR expression induces an ordering on the data.
-The group by clause, discussed further below, forms groups based on its group by expressions,
+Following the initial `for` or `let` clause(s), a FLWOR expression may contain an arbitrary sequence of other clauses.
+The `where` clause in a FLWOR expression filters the preceding bindings via a boolean expression, much like a `where` clause does in a SQL query.
+The `order by` clause in a FLWOR expression induces an ordering on the data.
+The `group by` clause, discussed further below, forms groups based on its group by expressions,
optionally naming the expressions' values (which together form the grouping key for the expression).
-The with subclause of a group by clause specifies the variable(s) whose values should be grouped based
+The `with` subclause of a `group by` clause specifies the variable(s) whose values should be grouped based
on the grouping key(s); following the grouping clause, only the grouping key(s) and the variables named
in the with subclause remain in scope, and the named grouping variables now contain lists formed from their input values.
-The limit clause caps the number of values returned, optionally starting its result count from a specified offset.
+The `limit` clause caps the number of values returned, optionally starting its result count from a specified offset.
(Web applications can use this feature for doing pagination.)
-The distinct clause is similar to the group-by clause, but it forms no groups; it serves only to eliminate duplicate values.
+The `distinct` clause is similar to the `group-by` clause, but it forms no groups; it serves only to eliminate duplicate values.
As indicated by the grammar, the clauses in an AQL query can appear in any order.
-To interpret a query, one can think of data as flowing down through the query from the first clause to the return clause.
+To interpret a query, one can think of data as flowing down through the query from the first clause to the `return` clause.
The following example shows a FLWOR expression that selects and returns one user from the dataset FacebookUsers.
@@ -308,7 +306,7 @@
"message": $message.message
};
-In the next example, a let clause is used to bind a variable to all of a user's FacebookMessages.
+In the next example, a `let` clause is used to bind a variable to all of a user's FacebookMessages.
The query returns one record per user, with result records containing the user's name and the set of all messages by that user.
##### Example
@@ -325,7 +323,7 @@
};
The following example returns all TwitterUsers ordered by their followers count (most followers first) and language.
-Null is treated as being smaller than any other value if nulls are encountered in the ordering key(s).
+When ordering `null` is treated as being smaller than any other value if `null`s are encountered in the ordering key(s).
##### Example
@@ -333,11 +331,11 @@
order by $user.followers_count desc, $user.lang asc
return $user
-The next example illustrates the use of the group by clause in AQL.
-After the group by clause in the query, only variables that are either in the group by list or in the with list are in scope.
-The variables in the clause's with list will each contain a collection of items following the group by clause;
+The next example illustrates the use of the `group by` clause in AQL.
+After the `group by` clause in the query, only variables that are either in the `group by` list or in the `with` list are in scope.
+The variables in the clause's `with` list will each contain a collection of items following the `group by` clause;
the collected items are the values that the source variable was bound to in the tuples that formed the group.
-Null is handled as a single value for grouping.
+For grouping `null` is handled as a single value.
##### Example
@@ -350,7 +348,7 @@
"message" : $messages
}
-The use of the limit clause is illustrated in the next example.
+The use of the `limit` clause is illustrated in the next example.
##### Example
@@ -359,11 +357,11 @@
limit 2
return $user
-The final example shows how AQL's distinct by clause works.
-Each variable in scope before the distinct clause is also in scope after the distinct clause.
-This clause works similarly to group by, but for each variable that contains more than
-one value after the distinct by clause, one value is picked nondeterministically.
-(If the variable is in the distinct by list, then its value will be deterministic.)
+The final example shows how AQL's `distinct by` clause works.
+Each variable in scope before the distinct clause is also in scope after the `distinct by` clause.
+This clause works similarly to `group by`, but for each variable that contains more than
+one value after the `distinct by` clause, one value is picked nondeterministically.
+(If the variable is in the `distinct by` list, then its value will be deterministic.)
Nulls are treated as a single value when they occur in a grouping field.
##### Example
@@ -381,8 +379,8 @@
IfThenElse ::= "if" "(" Expression ")" "then" Expression "else" Expression
A conditional expression is useful for choosing between two alternative values based on a
-boolean condition. If its first (if) expression is true, its second (then) expression's
-value is returned, and otherwise its third (else) expression is returned.
+boolean condition. If its first (`if`) expression is true, its second (`then`) expression's
+value is returned, and otherwise its third (`else`) expression is returned.
The following example illustrates the form of a conditional expression.
##### Example
@@ -396,17 +394,17 @@
Quantified expressions are used for expressing existential or universal predicates involving the elements of a collection.
-The following pair of examples, each of which returns true, illustrate the use of a quantified
-expression to test that every (or some) element in the set [1, 2, 3] of integers is less than three.
+The following pair of examples illustrate the use of a quantified expression to test that every (or some) element in the set [1, 2, 3] of integers is less than three.
+The first example yields `false` and second example yields `true`.
-It is useful to note that if the set were instead the empty set, the first expression would yield true
-("every" value in an empty set satisfies the condition) while the second expression would yield false
+It is useful to note that if the set were instead the empty set, the first expression would yield `true`
+("every" value in an empty set satisfies the condition) while the second expression would yield `false`
(since there isn't "some" value, as there are no values in the set, that satisfies the condition).
##### Examples
- every $x in [ 1, 2, 3] satisfies $x < 3
- some $x in [ 1, 2, 3] satisfies $x < 3
+ every $x in [ 1, 2, 3 ] satisfies $x < 3
+ some $x in [ 1, 2, 3 ] satisfies $x < 3
## 3. Statements
@@ -423,7 +421,8 @@
In addition to expresssions for queries, AQL supports a variety of statements for data
definition and manipulation purposes as well as controlling the context to be used in
-evaluating AQL expressions. This section details the statement side of the AQL language.
+evaluating AQL expressions.
+This section details the statement side of the AQL language.
### Declarations
@@ -437,10 +436,10 @@
##### Example
use dataverse TinySocial;
-
- SetStatement ::= "set" Identifier StringLiteral
-
+
The set statement in AQL is used to control aspects of the expression evalation context for queries.
+
+ SetStatement ::= "set" Identifier StringLiteral
As an example, the following set statements request that Jaccard similarity with a similarity threshold 0.6
be used for set similarity matching when the ~= operator is used in a query expression.
@@ -450,13 +449,13 @@
set simfunction "jaccard";
set simthreshold "0.6f";
- FunctionDeclaration ::= "declare" "function" Identifier ParameterList "{" Expression "}"
- ParameterList ::= "(" ( <VARIABLE> ( "," <VARIABLE> )* )? ")"
-
When writing a complex AQL query, it can sometimes be helpful to define one or more
auxilliary functions that each address a sub-piece of the overall query.
The declare function statement supports the creation of such helper functions.
+ FunctionDeclaration ::= "declare" "function" Identifier ParameterList "{" Expression "}"
+ ParameterList ::= "(" ( <VARIABLE> ( "," <VARIABLE> )* )? ")"
+
The following is a very simple example of a temporary AQL function definition.
##### Example
@@ -487,7 +486,7 @@
To ease the authoring of reusable AQL scripts, its optional IfNotExists clause allows creation
to be requested either unconditionally or only if the the dataverse does not already exist.
If this clause is absent, an error will be returned if the specified dataverse already exists.
-The with format clause is a placeholder for future functionality that can safely be ignored.
+The `with format` clause is a placeholder for future functionality that can safely be ignored.
The following example creates a dataverse named TinySocial.
@@ -509,7 +508,7 @@
The create type statement is used to create a new named ADM datatype.
This type can then be used to create datasets or utilized when defining one or more other ADM datatypes.
-Much more information about the Asterix Data Model (ADM) is available in the data model reference guide to ADM.
+Much more information about the Asterix Data Model (ADM) is available in the [data model reference guide](datamodel.html) to ADM.
A new type can be a record type, a renaming of another type, an ordered list type, or an unordered list type.
A record type can be defined as being either open or closed.
Instances of a closed record type are not permitted to contain fields other than those specified in the create type statement.
@@ -524,12 +523,12 @@
##### Example
create type FacebookUserType as closed {
- id: int32,
- alias: string,
- name: string,
+ id: int32,
+ alias: string,
+ name: string,
user-since: datetime,
friend-ids: {{ int32 }},
- employment: [EmploymentType]
+ employment: [ EmploymentType ]
}
#### Datasets
@@ -557,7 +556,7 @@
External dataset support allows AQL queries to treat external data as though it were stored in AsterixDB,
making it possible to query "legacy" file data (e.g., Hive data) without having to physically import it into AsterixDB.
For an external dataset, an appropriate adaptor must be selected to handle the nature of the desired external data.
-(See the guide to external data for more information on the available adaptors.)
+(See the [guide to external data](externaldata.html) for more information on the available adaptors.)
The following example creates an internal dataset for storing FacefookUserType records.
It specifies that their id field is their primary key.
@@ -566,7 +565,7 @@
create internal dataset FacebookUsers(FacebookUserType) primary key id;
The next example creates an external dataset for storing LineitemType records.
-The choice of the localfs adaptor means that its data will reside in the local filesystem of the cluster nodes.
+The choice of the `localfs` adaptor means that its data will reside in the local filesystem of the cluster nodes.
The create statement provides several parameters used by the localfs adaptor;
e.g., the file format is delimited text with vertical bar being the field delimiter.
@@ -587,9 +586,8 @@
| "ngram" "(" <INTEGER_LITERAL> ")"
The create index statement creates a secondary index on one or more fields of a specified dataset.
-Supported index types include btree for totally ordered datatypes,
-rtree for spatial data,
-and keyword and ngram for textual (string) data.
+Supported index types include `btree` for totally ordered datatypes,
+`rtree` for spatial data, and `keyword` and `ngram` for textual (string) data.
AsterixDB currently requires indexed fields to be part of the named type associated with a dataset.
(Future plans include support for indexing of open fields as well.)
@@ -601,7 +599,9 @@
create index fbAuthorIdx on FacebookMessages(author-id) type btree;
The following example creates an rtree index called fbSenderLocIdx on the sender-location field of the FacebookMessages dataset.
-This index can be useful for accelerating spatial searches involving the sender-loction field.
+This index can be useful for accelerating queries that use the
+[`spatial-intersect` function](functions.html#spatial-intersect) in a predicate involving the
+sender-loction field.
##### Example
@@ -649,20 +649,12 @@
drop dataset FacebookUsers if exists;
-##### Example
-
drop index fbSenderLocIndex;
-##### Example
-
drop type FacebookUserType;
-##### Example
-
drop dataverse TinySocial;
-##### Example
-
drop function add;
### Import/Export Statements
@@ -671,7 +663,7 @@
The load statement is used to initially populate a dataset via bulk loading of data from an external file.
An appropriate adaptor must be selected to handle the nature of the desired external data.
-(See the guide to external data for more information on the available adaptors.)
+(See the [guide to external data](externaldata.html) for more information on the available adaptors.)
The following example shows how to bulk load the FacebookUsers dataset from an external file containing
data that has been prepared in ADM format.
@@ -683,6 +675,8 @@
### Modification Statements
+#### Insert
+
InsertStatement ::= "insert" "into" "dataset" QualifiedName Query
The AQL insert statement is used to insert data into a dataset.
@@ -699,7 +693,9 @@
##### Example
- insert into dataset UsersCopy (for $user in dataset FacebookUsers return $user)
+ insert into dataset UsersCopy (for $user in dataset FacebookUsers return $user
+
+#### Delete
DeleteStatement ::= "delete" Variable "from" "dataset" QualifiedName ( "where" Expression )?
@@ -711,7 +707,7 @@
If the boolean expression for a delete identifies a single object, then the delete statement itself
will be a single, atomic transaction.
If the expression identifies multiple objects, then each object deleted will be handled independently
-as a tranaction.
+as a transaction.
The following example illustrates a single-object deletion.
diff --git a/asterix-doc/src/site/markdown/AdmAql101.md b/asterix-doc/src/site/markdown/aql/primer.md
similarity index 99%
rename from asterix-doc/src/site/markdown/AdmAql101.md
rename to asterix-doc/src/site/markdown/aql/primer.md
index 994c464..fc8ea9a 100644
--- a/asterix-doc/src/site/markdown/AdmAql101.md
+++ b/asterix-doc/src/site/markdown/aql/primer.md
@@ -12,7 +12,7 @@
Most importantly, it assumes you already have a running instance of AsterixDB and that you know how to query
it using AsterixDB's basic web interface.
For more information on these topics, you should go through the steps in
-[Installing Asterix Using Managix](InstallingAsterixUsingManagix.html)
+[Installing Asterix Using Managix](../install.html)
before reading this document and make sure that you have a running AsterixDB instance ready to go.
To get your feet wet, you should probably start with a simple local installation of AsterixDB on your favorite
machine, accepting all of the default settings that Managix offers.
@@ -350,7 +350,7 @@
in the not-too-distant future we will also provide a complete reference manual for the language.
In the meantime, this will get you started down the path of using AsterixDB.
A more complete list of the supported AsterixDB primitive types and built-in functions can be
-found at [AsterixDataTypesAndFunctions](AsterixDataTypesAndFunctions.html).
+found at [Asterix Data Model (ADM)](datamodel.html) and [Asterix Functions](functions.html).
AQL is an expression language.
Even the expression 1+1 is a valid AQL query that evaluates to 2.
diff --git a/asterix-doc/src/site/markdown/SimilarityQuery.md b/asterix-doc/src/site/markdown/aql/similarity.md
similarity index 86%
rename from asterix-doc/src/site/markdown/SimilarityQuery.md
rename to asterix-doc/src/site/markdown/aql/similarity.md
index 7f6f8ae..244103c 100644
--- a/asterix-doc/src/site/markdown/SimilarityQuery.md
+++ b/asterix-doc/src/site/markdown/aql/similarity.md
@@ -19,7 +19,7 @@
AsterixDB supports [edit distance](http://en.wikipedia.org/wiki/Levenshtein_distance) (on strings) and
[Jaccard](http://en.wikipedia.org/wiki/Jaccard_index) (on sets). For
instance, in our
-[TinySocial](AdmAql101.html#ADM:_Modeling_Semistructed_Data_in_AsterixDB)
+[TinySocial](primer.html#ADM:_Modeling_Semistructed_Data_in_AsterixDB)
example, the `friend-ids` of a Facebook user forms a set
of friends, and we can define a similarity between the sets of
friends of two users. We can also convert a string to a set of grams of a length "n"
@@ -29,13 +29,13 @@
`schwarzenegger` are `sch`, `chw`, `hwa`, ..., `ger`.
AsterixDB provides
-[tokenization functions](AsterixDBFunctions.html#Tokenizing_Functions)
+[tokenization functions](functions.html#Tokenizing_Functions)
to convert strings to sets, and the
-[similarity functions](AsterixDBFunctions.html#Similarity_Functions).
+[similarity functions](functions.html#Similarity_Functions).
## Similarity Selection Queries ##
-The following [query](AsterixDBFunctions.html#edit-distance)
+The following [query](functions.html#edit-distance)
asks for all the Facebook users whose name is similar to
`Suzanna Tilson`, i.e., their edit distance is at most 2.
@@ -47,7 +47,7 @@
return $user
-The following [query](AsterixDBFunctions.html#similarity-jaccard)
+The following [query](functions.html#similarity-jaccard)
asks for all the Facebook users whose set of friend ids is
similar to `[1,5,9]`, i.e., their Jaccard similarity is at least 0.6.
@@ -81,7 +81,7 @@
## Similarity Join Queries ##
AsterixDB supports fuzzy joins between two sets. The following
-[query](AdmAql101.html#Query_5_-_Fuzzy_Join)
+[query](primer.html#Query_5_-_Fuzzy_Join)
finds, for each Facebook user, all Twitter users with names
similar to their name based on the edit distance.
@@ -129,13 +129,13 @@
The number "3" in "ngram(3)" is the length "n" in the grams. This
index can be used to optimize similarity queries on this attribute
using
-[edit-distance](AsterixDBFunctions.html#edit-distance),
-[edit-distance-check](AsterixDBFunctions.html#edit-distance-check),
-[jaccard](AsterixDBFunctions.html#similarity-jaccard),
-or [jaccard-check](AsterixDBFunctions.html#similarity-jaccard-check)
+[edit-distance](functions.html#edit-distance),
+[edit-distance-check](functions.html#edit-distance-check),
+[jaccard](functions.html#similarity-jaccard),
+or [jaccard-check](functions.html#similarity-jaccard-check)
queries on this attribute where the
similarity is defined on sets of 3-grams. This index can also be used
-to optimize queries with the "[contains()]((AsterixDBFunctions.html#contains))" predicate (i.e., substring
+to optimize queries with the "[contains()]((functions.html#contains))" predicate (i.e., substring
matching) since it can be also be solved by counting on the inverted
lists of the grams in the query string.
@@ -169,6 +169,6 @@
return $c
As shown above, keyword index can be used to optimize queries with token-based similarity predicates, including
-[similarity-jaccard](AsterixDBFunctions.html#similarity-jaccard) and
-[similarity-jaccard-check](AsterixDBFunctions.html#similarity-jaccard-check).
+[similarity-jaccard](functions.html#similarity-jaccard) and
+[similarity-jaccard-check](functions.html#similarity-jaccard-check).
diff --git a/asterix-doc/src/site/markdown/index.md b/asterix-doc/src/site/markdown/index.md
index aeb57b4..4ee2a5f 100644
--- a/asterix-doc/src/site/markdown/index.md
+++ b/asterix-doc/src/site/markdown/index.md
@@ -32,23 +32,20 @@
For the Beta release, we've got a start; for the Beta release a month or so from now, we will hopefully have much more.
The following is a list of the wiki pages and supporting documents that we have available today:
-1. [InstallingAsterixUsingManagix](InstallingAsterixUsingManagix.html) :
+1. [Installing AsterixDB using Managix](install.html) :
This is our installation guide, and it is where you should start.
This document will tell you how to obtain, install, and manage instances of [AsterixDB](https://asterixdb.googlecode.com/files/asterix-installer-0.0.4-binary-assembly.zip), including both single-machine setup (for developers) as well as cluster installations (for deployment in its intended form).
-2. [AdmAql101](AdmAql101.html) :
+2. [AsterixDB 101: An ADM and AQL Primer](aql/primer.html) :
This is a first-timers introduction to the user model of the AsterixDB BDMS, by which we mean the view of AsterixDB as seen from the perspective of an "average user" or Big Data application developer.
The AsterixDB user model consists of its data modeling features (ADM) and its query capabilities (AQL).
This document presents a tiny "social data warehousing" example and uses it as a backdrop for describing, by example, the key features of AsterixDB.
By working through this document, you will learn how to define the artifacts needed to manage data in AsterixDB, how to load data into the system, how to use most of the basic features of its query language, and how to insert and delete data dynamically.
-3. [AsterixDataTypesAndFunctions](AsterixDataTypesAndFunctions.html) :
-This is a reference document that catalogs the primitive data types and built-in functions available for use in AsterixDB schemas (in ADM) and queries (in AQL).
+3. [Asterix Data Model (ADM)](aql/datamodel.html), [Asterix Functions](aql/functions.html), and [Asterix Query Language (AQL)](aql/manual.html) :
+These are reference documents that catalog the primitive data types and built-in functions available in AQL and the reference manual for AQL itself.
-4. [AQL Reference](AsterixQueryLanguageReference.html) :
-This is the AQL language reference manual.
-
-5. [AsterixDBRestAPI](AsterixDBRestAPI.html) :
+5. [REST API to AsterixDB](api.html) :
Access to data in an AsterixDB instance is provided via a REST-based API.
This is a short document that describes the REST API entry points and their URL syntax.
diff --git a/asterix-doc/src/site/markdown/InstallingAsterixUsingManagix.md b/asterix-doc/src/site/markdown/install.md
similarity index 100%
rename from asterix-doc/src/site/markdown/InstallingAsterixUsingManagix.md
rename to asterix-doc/src/site/markdown/install.md
diff --git a/asterix-doc/src/site/resources/images/asterixlogo.png b/asterix-doc/src/site/resources/images/asterixlogo.png
new file mode 100644
index 0000000..45cd64f
--- /dev/null
+++ b/asterix-doc/src/site/resources/images/asterixlogo.png
Binary files differ
diff --git a/asterix-doc/src/site/site.xml b/asterix-doc/src/site/site.xml
index 4953eb4..b68941c 100644
--- a/asterix-doc/src/site/site.xml
+++ b/asterix-doc/src/site/site.xml
@@ -3,22 +3,33 @@
<project name="AsterixDB" xmlns="http://maven.apache.org/DECORATION/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/DECORATION/1.0.0 http://maven.apache.org/xsd/decoration-1.0.0.xsd">
- <!--
<bannerLeft>
- <name>Asterix</name>
- <src>http://asterix.ics.uci.edu/pic/img9.jpg</src>
- <href>http://asterix.ics.uci.edu/</href>
+ <name>AsterixDB</name>
+ <src>images/asterixlogo.png</src>
+ <href>/index.html</href>
</bannerLeft>
- -->
+
+ <version position="right"/>
+
+ <poweredBy><logo name="" img=""/></poweredBy>
<skin>
<groupId>org.apache.maven.skins</groupId>
<artifactId>maven-fluido-skin</artifactId>
- <version>1.2.1</version>
+ <version>1.3.0</version>
</skin>
<custom>
<fluidoSkin>
- <sideBarEnabled>true</sideBarEnabled>
+ <!-- <topBarEnabled>true</topBarEnabled>
+ <topBarIcon>
+ <name>AsterixDB</name>
+ <alt>AsterixDB</alt>
+ <src>images/asterixlogo.png</src>
+ <href>/index.html</href>
+ </topBarIcon>
+ <sideBarEnabled>false</sideBarEnabled> -->
+ <!-- <topBarContainerStyle>width: 90%;</topBarContainerStyle> -->
+ <sourceLineNumbersEnabled>true</sourceLineNumbersEnabled>
<!-- <googlePlusOne /> -->
</fluidoSkin>
</custom>
@@ -29,16 +40,19 @@
</links>
<menu name="Documentation">
- <item name="Installing Asterix using Managix" href="InstallingAsterixUsingManagix.html"/>
- <item name="AsterixDB 101: An ADM and AQL Primer" href="AdmAql101.html"/>
- <item name="Asterix Data Model (ADM)" href="AsterixDBDataModel.html"/>
- <item name="AsterixDB Functions" href="AsterixDBFunctions.html"/>
- <item name="The Asterix Query Language" href="AsterixQueryLanguageReference.html"/>
- <item name="AsterixDB Support of Similarity Queries" href="AsterixSimilarityQueries.html"/>
- <item name="Accessing External Data in AsterixDB" href="AccessingExternalDataInAsterixDB.html"/>
- <item name="REST API to AsterixDB" href="AsterixDBRestAPI.html"/>
+ <item name="Installing AsterixDB using Managix" href="install.html"/>
+ <item name="AsterixDB 101: An ADM and AQL Primer" href="aql/primer.html"/>
+ <item name="Asterix Data Model (ADM)" href="aql/datamodel.html"/>
+ <item name="Asterix Functions" href="aql/functions.html"/>
+ <item name="Asterix Query Language (AQL)" href="aql/manual.html"/>
+ <item name="AQL Support of Similarity Queries" href="aql/similarity.html"/>
+ <item name="Accessing External Data" href="aql/externaldata.html"/>
+ <item name="REST API to AsterixDB" href="api.html"/>
</menu>
<menu ref="reports"/>
+
+ <footer>© Copyright 2013 University of California, Irvine</footer>
+
</body>
</project>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
index e702ef3..63a791b 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
@@ -18,7 +18,6 @@
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 842cd67..aff6967 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -131,7 +131,8 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-dataflow-hadoop</artifactId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>${hyracks.version}</version>
</dependency>
<dependency>
<groupId>jdom</groupId>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index f1f5884..8c8880a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -23,9 +23,10 @@
* A factory class for creating the @see {CNNFeedAdapter}.
*/
public class CNNFeedAdapterFactory implements ITypedDatasetAdapterFactory {
+ private static final long serialVersionUID = 1L;
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter();
cnnFeedAdapter.configure(configuration);
return cnnFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 6fcb710..c267658 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -14,22 +14,79 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
/**
* A factory class for creating an instance of HDFSAdapter
*/
+@SuppressWarnings("deprecation")
public class HDFSAdapterFactory implements IGenericDatasetAdapterFactory {
+ private static final long serialVersionUID = 1L;
public static final String HDFS_ADAPTER_NAME = "hdfs";
+ public static final String CLUSTER_LOCATIONS = "cluster-locations";
+ public static transient String SCHEDULER = "hdfs-scheduler";
+
+ public static final String KEY_HDFS_URL = "hdfs";
+ public static final String KEY_PATH = "path";
+ public static final String KEY_INPUT_FORMAT = "input-format";
+ public static final String INPUT_FORMAT_TEXT = "text-input-format";
+ public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+ private transient AlgebricksPartitionConstraint clusterLocations;
+ private String[] readSchedule;
+ private boolean executed[];
+ private InputSplitsFactory inputSplitsFactory;
+ private ConfFactory confFactory;
+ private boolean setup = false;
+
+ private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+ private static Map<String, String> initInputFormatMap() {
+ Map<String, String> formatClassNames = new HashMap<String, String>();
+ formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+ formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+ return formatClassNames;
+ }
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
- HDFSAdapter hdfsAdapter = new HDFSAdapter(atype);
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+ if (!setup) {
+ /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+ configureJobConf(configuration);
+ JobConf conf = configureJobConf(configuration);
+ confFactory = new ConfFactory(conf);
+
+ clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+ int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+ InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+ inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+ Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+ readSchedule = scheduler.getLocationConstraints(inputSplits);
+ executed = new boolean[readSchedule.length];
+ Arrays.fill(executed, false);
+
+ setup = true;
+ }
+ JobConf conf = confFactory.getConf();
+ InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+ HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
hdfsAdapter.configure(configuration);
return hdfsAdapter;
}
@@ -39,4 +96,15 @@
return HDFS_ADAPTER_NAME;
}
+ private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+ JobConf conf = new JobConf();
+ conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+ conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+ conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+ conf.set("mapred.input.format.class",
+ (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+ return conf;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 5e28eed..ba6ade5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -14,20 +14,92 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
/**
* A factory class for creating an instance of HiveAdapter
*/
+@SuppressWarnings("deprecation")
public class HiveAdapterFactory implements IGenericDatasetAdapterFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static final String HDFS_ADAPTER_NAME = "hdfs";
+ public static final String CLUSTER_LOCATIONS = "cluster-locations";
+ public static transient String SCHEDULER = "hdfs-scheduler";
+
+ public static final String KEY_HDFS_URL = "hdfs";
+ public static final String KEY_PATH = "path";
+ public static final String KEY_INPUT_FORMAT = "input-format";
+ public static final String INPUT_FORMAT_TEXT = "text-input-format";
+ public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+ public static final String KEY_FORMAT = "format";
+ public static final String KEY_PARSER_FACTORY = "parser";
+ public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+ public static final String FORMAT_ADM = "adm";
+
+ public static final String HIVE_DATABASE = "database";
+ public static final String HIVE_TABLE = "table";
+ public static final String HIVE_HOME = "hive-home";
+ public static final String HIVE_METASTORE_URI = "metastore-uri";
+ public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
+ public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
+
+ private String[] readSchedule;
+ private boolean executed[];
+ private InputSplitsFactory inputSplitsFactory;
+ private ConfFactory confFactory;
+ private transient AlgebricksPartitionConstraint clusterLocations;
+ private boolean setup = false;
+
+ private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+ private static Map<String, String> initInputFormatMap() {
+ Map<String, String> formatClassNames = new HashMap<String, String>();
+ formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+ formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+ return formatClassNames;
+ }
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
- HiveAdapter hiveAdapter = new HiveAdapter(type);
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+ if (!setup) {
+ /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+ configureJobConf(configuration);
+ JobConf conf = configureJobConf(configuration);
+ confFactory = new ConfFactory(conf);
+
+ clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+ int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+ InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+ inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+ Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+ readSchedule = scheduler.getLocationConstraints(inputSplits);
+ executed = new boolean[readSchedule.length];
+ Arrays.fill(executed, false);
+
+ setup = true;
+ }
+ JobConf conf = confFactory.getConf();
+ InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+ HiveAdapter hiveAdapter = new HiveAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
hiveAdapter.configure(configuration);
return hiveAdapter;
}
@@ -36,4 +108,37 @@
public String getName() {
return "hive";
}
+
+ private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+ JobConf conf = new JobConf();
+
+ /** configure hive */
+ String database = (String) configuration.get(HIVE_DATABASE);
+ String tablePath = null;
+ if (database == null) {
+ tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
+ } else {
+ tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
+ + configuration.get(HIVE_TABLE);
+ }
+ configuration.put(HDFSAdapter.KEY_PATH, tablePath);
+ if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
+ throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
+ }
+
+ if (!(configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT) || configuration
+ .get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))) {
+ throw new IllegalArgumentException("file input format"
+ + configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) + " is not supported");
+ }
+
+ /** configure hdfs */
+ conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+ conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+ conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+ conf.set("mapred.input.format.class",
+ (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+ return conf;
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
index 45fd6cf..697c8ea 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
@@ -14,12 +14,14 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
+import java.io.Serializable;
+
/**
* Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
* Acts as a marker interface indicating that the implementation provides functionality
* for creating an adapter.
*/
-public interface IAdapterFactory {
+public interface IAdapterFactory extends Serializable {
/**
* Returns the display name corresponding to the Adapter type that is created by the factory.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
index 093a3dd..e8d120a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
@@ -38,6 +38,6 @@
* @return An instance of IDatasourceAdapter.
* @throws Exception
*/
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception;
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
index 0f9978e..84a5ca8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
@@ -33,6 +33,6 @@
* @return An instance of IDatasourceAdapter.
* @throws Exception
*/
- public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception;
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 2040949..659fd23 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -26,11 +26,11 @@
* an NC.
*/
public class NCFileSystemAdapterFactory implements IGenericDatasetAdapterFactory {
-
+ private static final long serialVersionUID = 1L;
public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(atype);
fsAdapter.configure(configuration);
return fsAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index bc00469..e63be17 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -25,11 +25,11 @@
* via pull-based Twitter API.
*/
public class PullBasedTwitterAdapterFactory implements ITypedDatasetAdapterFactory {
-
+ private static final long serialVersionUID = 1L;
public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
PullBasedTwitterAdapter twitterAdapter = new PullBasedTwitterAdapter();
twitterAdapter.configure(configuration);
return twitterAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index bbbea38..3cd22e8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -24,11 +24,11 @@
* RSSFeedAdapter provides the functionality of fetching an RSS based feed.
*/
public class RSSFeedAdapterFactory implements ITypedDatasetAdapterFactory {
-
+ private static final long serialVersionUID = 1L;
public static final String RSS_FEED_ADAPTER_NAME = "rss_feed";
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter();
rssFeedAdapter.configure(configuration);
return rssFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
index fb4cc99..f9f72cf 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -19,8 +19,6 @@
import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -38,61 +36,24 @@
private static final long serialVersionUID = 1L;
- private final String adapterFactory;
- private final Map<String, String> adapterConfiguration;
+ private final Map<String, Object> adapterConfiguration;
private final IAType atype;
private IGenericDatasetAdapterFactory datasourceAdapterFactory;
- public ExternalDataScanOperatorDescriptor(JobSpecification spec, String adapter, Map<String, String> arguments,
- IAType atype, RecordDescriptor rDesc) {
+ public ExternalDataScanOperatorDescriptor(JobSpecification spec, Map<String, Object> arguments, IAType atype,
+ RecordDescriptor rDesc, IGenericDatasetAdapterFactory dataSourceAdapterFactory) {
super(spec, 0, 1);
recordDescriptors[0] = rDesc;
- this.adapterFactory = adapter;
this.adapterConfiguration = arguments;
this.atype = atype;
- }
-
- @Override
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx) {
-
- /*
- Comment: The following code is commented out. This is because constraints are being set at compile time so that they can
- be propagated to upstream Asterix operators. Hyracks has to provide a way to propagate constraints to upstream operators.
- Once it is there, we will uncomment the following code.
-
- AlgebricksPartitionConstraint constraint = datasourceReadAdapter.getPartitionConstraint();
- switch (constraint.getPartitionConstraintType()) {
- case ABSOLUTE:
- String[] locations = ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations();
- for (int i = 0; i < locations.length; ++i) {
- constraintAcceptor.addConstraint(new Constraint(new PartitionLocationExpression(this.odId, i),
- new ConstantExpression(locations[i])));
- }
- constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
- new ConstantExpression(locations.length)));
-
- break;
- case COUNT:
- constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
- new ConstantExpression(((AlgebricksCountPartitionConstraint) constraint).getCount())));
- break;
- default:
- throw new IllegalStateException(" Constraint type :" + constraint.getPartitionConstraintType()
- + " not supported");
-
- }*/
-
+ this.datasourceAdapterFactory = dataSourceAdapterFactory;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
- try {
- datasourceAdapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactory).newInstance();
- } catch (Exception e) {
- throw new HyracksDataException("initialization of adapter failed", e);
- }
+
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
index 2da4e76..f07168a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
@@ -35,52 +35,46 @@
* Operator responsible for ingesting data from an external source. This
* operator uses a (configurable) adapter associated with the feed dataset.
*/
-public class FeedIntakeOperatorDescriptor extends
- AbstractSingleActivityOperatorDescriptor {
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- private final String adapterFactoryClassName;
- private final Map<String, String> adapterConfiguration;
- private final IAType atype;
- private final FeedId feedId;
+ private final String adapterFactoryClassName;
+ private final Map<String, Object> adapterConfiguration;
+ private final IAType atype;
+ private final FeedId feedId;
+ private final IAdapterFactory datasourceAdapterFactory;
- private transient IAdapterFactory datasourceAdapterFactory;
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
+ Map<String, Object> arguments, ARecordType atype, RecordDescriptor rDesc,
+ IAdapterFactory datasourceAdapterFactory) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = rDesc;
+ this.adapterFactoryClassName = adapter;
+ this.adapterConfiguration = arguments;
+ this.atype = atype;
+ this.feedId = feedId;
+ this.datasourceAdapterFactory = datasourceAdapterFactory;
+ }
- public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId,
- String adapter, Map<String, String> arguments, ARecordType atype,
- RecordDescriptor rDesc) {
- super(spec, 1, 1);
- recordDescriptors[0] = rDesc;
- this.adapterFactoryClassName = adapter;
- this.adapterConfiguration = arguments;
- this.atype = atype;
- this.feedId = feedId;
- }
-
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition,
- int nPartitions) throws HyracksDataException {
- ITypedDatasourceAdapter adapter;
- try {
- datasourceAdapterFactory = (IAdapterFactory) Class.forName(
- adapterFactoryClassName).newInstance();
- if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
- adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
- .createAdapter(adapterConfiguration, atype);
- } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
- adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
- .createAdapter(adapterConfiguration);
- } else {
- throw new IllegalStateException(
- " Unknown adapter factory type for "
- + adapterFactoryClassName);
- }
- adapter.initialize(ctx);
- } catch (Exception e) {
- throw new HyracksDataException("initialization of adapter failed",
- e);
- }
- return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
- }
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ ITypedDatasourceAdapter adapter;
+ try {
+ if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
+ adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
+ .createAdapter(adapterConfiguration, atype);
+ } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
+ adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
+ .createAdapter(adapterConfiguration);
+ } else {
+ throw new IllegalStateException(" Unknown adapter factory type for " + adapterFactoryClassName);
+ }
+ adapter.initialize(ctx);
+ } catch (Exception e) {
+ throw new HyracksDataException("initialization of adapter failed", e);
+ }
+ return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
index d0dbb98..fd40b03 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
@@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.external.feed.lifecycle.AlterFeedMessage;
import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
@@ -51,7 +52,7 @@
public void open() throws HyracksDataException {
if (adapter instanceof IManagedFeedAdapter) {
feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
- feedInboxMonitor.start();
+ AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
feedManager.registerFeedMsgQueue(feedId, inbox);
}
writer.open();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
index 440ee8c..23e545d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -36,7 +36,7 @@
private static final long serialVersionUID = 1L;
- protected Map<String, String> configuration;
+ protected Map<String, Object> configuration;
protected transient AlgebricksPartitionConstraint partitionConstraint;
protected IAType atype;
protected IHyracksTaskContext ctx;
@@ -51,15 +51,15 @@
typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
}
- protected static final Map<String, String> formatToParserFactoryMap = initializeFormatParserFactoryMap();
+ protected static final Map<String, Object> formatToParserFactoryMap = initializeFormatParserFactoryMap();
public static final String KEY_FORMAT = "format";
public static final String KEY_PARSER_FACTORY = "parser";
public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
public static final String FORMAT_ADM = "adm";
- private static Map<String, String> initializeFormatParserFactoryMap() {
- Map<String, String> map = new HashMap<String, String>();
+ private static Map<String, Object> initializeFormatParserFactoryMap() {
+ Map<String, Object> map = new HashMap<String, Object>();
map.put(FORMAT_DELIMITED_TEXT, "edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory");
map.put(FORMAT_ADM, "edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory");
return map;
@@ -77,7 +77,7 @@
* @param attribute
* The attribute whose value needs to be obtained.
*/
- public String getAdapterProperty(String attribute) {
+ public Object getAdapterProperty(String attribute) {
return configuration.get(attribute);
}
@@ -86,7 +86,7 @@
*
* @return A Map<String,String> instance representing the adapter configuration.
*/
- public Map<String, String> getConfiguration() {
+ public Map<String, Object> getConfiguration() {
return configuration;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 3898f7e..8976f7a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -70,9 +70,9 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
configuration = arguments;
- String rssURLProperty = configuration.get(KEY_RSS_URL);
+ String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index 9f8cedc..8ab252d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -69,7 +69,7 @@
public abstract void initialize(IHyracksTaskContext ctx) throws Exception;
@Override
- public abstract void configure(Map<String, String> arguments) throws Exception;
+ public abstract void configure(Map<String, Object> arguments) throws Exception;
@Override
public abstract AdapterType getAdapterType();
@@ -82,14 +82,14 @@
}
protected void configureFormat() throws Exception {
- String parserFactoryClassname = configuration.get(KEY_PARSER_FACTORY);
+ String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
if (parserFactoryClassname == null) {
- String specifiedFormat = configuration.get(KEY_FORMAT);
+ String specifiedFormat = (String) configuration.get(KEY_FORMAT);
if (specifiedFormat == null) {
throw new IllegalArgumentException(" Unspecified data format");
} else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
parserFactory = getDelimitedDataTupleParserFactory((ARecordType) atype);
- } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+ } else if (FORMAT_ADM.equalsIgnoreCase((String)configuration.get(KEY_FORMAT))) {
parserFactory = getADMDataTupleParserFactory((ARecordType) atype);
} else {
throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 1e05b2f..02eacf5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -14,18 +14,9 @@
*/
package edu.uci.ics.asterix.external.dataset.adapter;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters.Counter;
@@ -37,14 +28,9 @@
import org.apache.hadoop.mapred.TextInputFormat;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
/**
* Provides functionality for fetching external data stored in an HDFS instance.
@@ -53,118 +39,29 @@
public class HDFSAdapter extends FileSystemBasedAdapter {
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(HDFSAdapter.class.getName());
- public static final String KEY_HDFS_URL = "hdfs";
- public static final String KEY_INPUT_FORMAT = "input-format";
- public static final String INPUT_FORMAT_TEXT = "text-input-format";
- public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
-
- private Object[] inputSplits;
+ private transient String[] readSchedule;
+ private transient boolean executed[];
+ private transient InputSplit[] inputSplits;
private transient JobConf conf;
- private InputSplitsProxy inputSplitsProxy;
- private static final Map<String, String> formatClassNames = initInputFormatMap();
+ private transient AlgebricksPartitionConstraint clusterLocations;
- private static Map<String, String> initInputFormatMap() {
- Map<String, String> formatClassNames = new HashMap<String, String>();
- formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
- formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
- return formatClassNames;
- }
+ private transient String nodeName;
- public HDFSAdapter(IAType atype) {
+ public HDFSAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+ AlgebricksPartitionConstraint clusterLocations) {
super(atype);
+ this.readSchedule = readSchedule;
+ this.executed = executed;
+ this.inputSplits = inputSplits;
+ this.conf = conf;
+ this.clusterLocations = clusterLocations;
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
- configuration = arguments;
+ public void configure(Map<String, Object> arguments) throws Exception {
+ this.configuration = arguments;
configureFormat();
- configureJobConf();
- configureSplits();
- }
-
- private void configureSplits() throws IOException {
- if (inputSplitsProxy == null) {
- inputSplits = conf.getInputFormat().getSplits(conf, 0);
- }
- inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
- }
-
- private void configurePartitionConstraint() throws Exception {
- List<String> locations = new ArrayList<String>();
- Random random = new Random();
- boolean couldConfigureLocationConstraints = false;
- try {
- Map<String, Set<String>> nodeControllers = AsterixRuntimeUtil.getNodeControllerMap();
- for (Object inputSplit : inputSplits) {
- String[] dataNodeLocations = ((InputSplit) inputSplit).getLocations();
- if (dataNodeLocations == null || dataNodeLocations.length == 0) {
- throw new IllegalArgumentException("No datanode locations found: check hdfs path");
- }
-
- // loop over all replicas until a split location coincides
- // with an asterix datanode location
- for (String datanodeLocation : dataNodeLocations) {
- Set<String> nodeControllersAtLocation = null;
- try {
- nodeControllersAtLocation = nodeControllers.get(AsterixRuntimeUtil
- .getIPAddress(datanodeLocation));
- } catch (UnknownHostException uhe) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "Unknown host :" + datanodeLocation);
- }
- continue;
- }
- if (nodeControllersAtLocation == null || nodeControllersAtLocation.size() == 0) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "No node controller found at " + datanodeLocation
- + " will look at replica location");
- }
- couldConfigureLocationConstraints = false;
- } else {
- int locationIndex = random.nextInt(nodeControllersAtLocation.size());
- String chosenLocation = (String) nodeControllersAtLocation.toArray()[locationIndex];
- locations.add(chosenLocation);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.log(Level.INFO, "split : " + inputSplit + " to be processed by :" + chosenLocation);
- }
- couldConfigureLocationConstraints = true;
- break;
- }
- }
-
- /* none of the replica locations coincides with an Asterix
- node controller location.
- */
- if (!couldConfigureLocationConstraints) {
- List<String> allNodeControllers = AsterixRuntimeUtil.getAllNodeControllers();
- int locationIndex = random.nextInt(allNodeControllers.size());
- String chosenLocation = allNodeControllers.get(locationIndex);
- locations.add(chosenLocation);
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.log(Level.SEVERE, "No local node controller found to process split : " + inputSplit
- + " will be processed by a remote node controller:" + chosenLocation);
- }
- }
- }
- partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.log(Level.SEVERE, "Encountered exception :" + e + " using count constraints");
- }
- partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
- }
- }
-
- private JobConf configureJobConf() throws Exception {
- conf = new JobConf();
- conf.set("fs.default.name", configuration.get(KEY_HDFS_URL).trim());
- conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
- conf.setClassLoader(HDFSAdapter.class.getClassLoader());
- conf.set("mapred.input.dir", configuration.get(KEY_PATH).trim());
- conf.set("mapred.input.format.class", formatClassNames.get(configuration.get(KEY_INPUT_FORMAT).trim()));
- return conf;
}
public AdapterType getAdapterType() {
@@ -174,7 +71,7 @@
@Override
public void initialize(IHyracksTaskContext ctx) throws Exception {
this.ctx = ctx;
- inputSplits = inputSplitsProxy.toInputSplits(conf);
+ this.nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
}
private Reporter getReporter() {
@@ -215,98 +112,124 @@
return reporter;
}
- @SuppressWarnings("unchecked")
@Override
public InputStream getInputStream(int partition) throws IOException {
- try {
- InputStream inputStream;
- if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
- SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
- RecordReader reader = format.getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
- inputStream = new HDFSStream(reader, ctx);
- } else {
- try {
+
+ return new InputStream() {
+
+ private RecordReader<Object, Text> reader;
+ private Object key;
+ private Text value;
+ private boolean hasMore = false;
+ private int EOL = "\n".getBytes()[0];
+ private Text pendingValue = null;
+ private int currentSplitIndex = 0;
+
+ @SuppressWarnings("unchecked")
+ private boolean moveToNext() throws IOException {
+ for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[currentSplitIndex] == false) {
+ executed[currentSplitIndex] = true;
+ } else {
+ continue;
+ }
+ }
+
+ /**
+ * read the split
+ */
+ reader = getRecordReader(currentSplitIndex);
+ key = reader.createKey();
+ value = (Text) reader.createValue();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if (reader == null) {
+ if (!moveToNext()) {
+ //nothing to read
+ return -1;
+ }
+ }
+
+ int numBytes = 0;
+ if (pendingValue != null) {
+ System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+ buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+ numBytes += pendingValue.getLength() + 1;
+ pendingValue = null;
+ }
+
+ while (numBytes < len) {
+ hasMore = reader.next(key, value);
+ if (!hasMore) {
+ while (moveToNext()) {
+ hasMore = reader.next(key, value);
+ if (hasMore) {
+ //move to the next non-empty split
+ break;
+ }
+ }
+ }
+ if (!hasMore) {
+ return (numBytes == 0) ? -1 : numBytes;
+ }
+ int sizeOfNextTuple = value.getLength() + 1;
+ if (numBytes + sizeOfNextTuple > len) {
+ // cannot add tuple to current buffer
+ // but the reader has moved pass the fetched tuple
+ // we need to store this for a subsequent read call.
+ // and return this then.
+ pendingValue = value;
+ break;
+ } else {
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+ buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ }
+ return numBytes;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
+ }
+
+ private RecordReader getRecordReader(int slitIndex) throws IOException {
+ if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+ SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
+ } else {
TextInputFormat format = (TextInputFormat) conf.getInputFormat();
RecordReader reader = format.getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
- inputStream = new HDFSStream(reader, ctx);
- } catch (FileNotFoundException e) {
- throw new HyracksDataException(e);
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
}
}
- return inputStream;
- } catch (Exception e) {
- throw new IOException(e);
- }
+
+ };
}
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- if (partitionConstraint == null) {
- configurePartitionConstraint();
- }
- return partitionConstraint;
- }
-
-}
-
-class HDFSStream extends InputStream {
-
- private RecordReader<Object, Text> reader;
- private final Object key;
- private final Text value;
- private boolean hasMore = false;
- private static final int EOL = "\n".getBytes()[0];
- private Text pendingValue = null;
-
- public HDFSStream(RecordReader<Object, Text> reader, IHyracksTaskContext ctx) throws Exception {
- this.reader = reader;
- key = reader.createKey();
- try {
- value = (Text) reader.createValue();
- } catch (ClassCastException cce) {
- throw new Exception("value is not of type org.apache.hadoop.io.Text"
- + " type not supported in sequence file format", cce);
- }
- }
-
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- int numBytes = 0;
- if (pendingValue != null) {
- System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
- buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
- numBytes += pendingValue.getLength() + 1;
- pendingValue = null;
- }
-
- while (numBytes < len) {
- hasMore = reader.next(key, value);
- if (!hasMore) {
- return (numBytes == 0) ? -1 : numBytes;
- }
- int sizeOfNextTuple = value.getLength() + 1;
- if (numBytes + sizeOfNextTuple > len) {
- // cannot add tuple to current buffer
- // but the reader has moved pass the fetched tuple
- // we need to store this for a subsequent read call.
- // and return this then.
- pendingValue = value;
- break;
- } else {
- System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
- buffer[offset + numBytes + value.getLength()] = (byte) EOL;
- numBytes += sizeOfNextTuple;
- }
- }
- return numBytes;
- }
-
- @Override
- public int read() throws IOException {
- throw new NotImplementedException("Use read(byte[], int, int");
+ return clusterLocations;
}
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index 3731eba..5e48834 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -16,6 +16,9 @@
import java.util.Map;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,6 +27,7 @@
/**
* Provides the functionality of fetching data in form of ADM records from a Hive dataset.
*/
+@SuppressWarnings("deprecation")
public class HiveAdapter extends AbstractDatasourceAdapter {
private static final long serialVersionUID = 1L;
@@ -37,8 +41,9 @@
private HDFSAdapter hdfsAdapter;
- public HiveAdapter(IAType atype) {
- this.hdfsAdapter = new HDFSAdapter(atype);
+ public HiveAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+ AlgebricksPartitionConstraint clusterLocations) {
+ this.hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
this.atype = atype;
}
@@ -48,33 +53,9 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
- configuration = arguments;
- configureHadoopAdapter();
- }
-
- private void configureHadoopAdapter() throws Exception {
- String database = configuration.get(HIVE_DATABASE);
- String tablePath = null;
- if (database == null) {
- tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
- } else {
- tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
- + configuration.get(HIVE_TABLE);
- }
- configuration.put(HDFSAdapter.KEY_PATH, tablePath);
- if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
- throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
- }
-
- if (!(configuration.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_TEXT) || configuration
- .get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_SEQUENCE))) {
- throw new IllegalArgumentException("file input format" + configuration.get(HDFSAdapter.KEY_INPUT_FORMAT)
- + " is not supported");
- }
-
- hdfsAdapter = new HDFSAdapter(atype);
- hdfsAdapter.configure(configuration);
+ public void configure(Map<String, Object> arguments) throws Exception {
+ this.configuration = arguments;
+ this.hdfsAdapter.configure(arguments);
}
@Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
index b0dc32f..031f34f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
@@ -70,7 +70,7 @@
* @return String the value corresponding to the configuration parameter
* represented by the key- attributeKey.
*/
- public String getAdapterProperty(String propertyKey);
+ public Object getAdapterProperty(String propertyKey);
/**
* Configures the IDatasourceAdapter instance.
@@ -100,7 +100,7 @@
* providing all arguments as a set of (key,value) pairs. These
* arguments are put into the metadata.
*/
- public void configure(Map<String, String> arguments) throws Exception;
+ public void configure(Map<String, Object> arguments) throws Exception;
/**
* Returns a list of partition constraints. A partition constraint can be a
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index ef39d45..9abc92a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -44,9 +44,9 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
this.configuration = arguments;
- String[] splits = arguments.get(KEY_PATH).split(",");
+ String[] splits = ((String) arguments.get(KEY_PATH)).split(",");
configureFileSplits(splits);
configureFormat();
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index ebfbcad..66d9f98 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -31,9 +31,8 @@
*/
public class PullBasedTwitterAdapter extends PullBasedAdapter implements IManagedFeedAdapter {
-
private static final long serialVersionUID = 1L;
-
+
public static final String QUERY = "query";
public static final String INTERVAL = "interval";
@@ -49,7 +48,7 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
configuration = arguments;
String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
@@ -69,12 +68,12 @@
}
@Override
- public void stop() {
+ public void stop() {
tweetClient.stop();
}
@Override
- public void alter(Map<String, String> properties) {
+ public void alter(Map<String, String> properties) {
alterRequested = true;
this.alteredParams = properties;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 2a07472..06cddfd 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -63,8 +63,8 @@
tupleFieldValues = new String[recordType.getFieldNames().length];
}
- public void initialize(Map<String, String> params) {
- this.keywords = params.get(PullBasedTwitterAdapter.QUERY);
+ public void initialize(Map<String, Object> params) {
+ this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
this.query = new Query(keywords);
query.setRpp(100);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 611183c..ccd6516 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -72,9 +72,9 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
configuration = arguments;
- String rssURLProperty = configuration.get(KEY_RSS_URL);
+ String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
@@ -94,7 +94,7 @@
}
protected void reconfigure(Map<String, String> arguments) {
- String rssURLProperty = configuration.get(KEY_RSS_URL);
+ String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty != null) {
initializeFeedURLs(rssURLProperty);
}
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
index 90c797d..38be234 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
@@ -49,6 +49,7 @@
import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
import edu.uci.ics.asterix.common.configuration.Store;
+import edu.uci.ics.asterix.common.configuration.Coredump;
import edu.uci.ics.asterix.event.driver.EventDriver;
import edu.uci.ics.asterix.event.management.EventrixClient;
import edu.uci.ics.asterix.event.management.EventUtil;
@@ -238,6 +239,14 @@
}
configuration.setStore(stores);
+ List<Coredump> coredump = new ArrayList<Coredump>();
+ String coredumpDir = null;
+ for (Node node : cluster.getNode()) {
+ coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
+ coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(), coredumpDir));
+ }
+ configuration.setCoredump(coredump);
+
File asterixConfDir = new File(InstallerDriver.getAsterixDir() + File.separator + asterixInstanceName);
asterixConfDir.mkdirs();
diff --git a/asterix-installer/src/main/resources/conf/asterix-configuration.xml b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
index bfec2db..7ee7e88 100644
--- a/asterix-installer/src/main/resources/conf/asterix-configuration.xml
+++ b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
@@ -22,10 +22,11 @@
</property>
<property>
- <name>storage.buffercache.numpages</name>
- <value>1024</value>
- <description>The number of pages allocated to the disk buffer cache.
- (Default = "1024")
+ <name>storage.buffercache.size</name>
+ <value>33554432</value>
+ <description>The size of memory allocated to the disk buffer cache.
+ The value should be a multiple of the buffer cache page size(Default
+ = "33554432" // 32MB)
</description>
</property>
@@ -47,15 +48,15 @@
<property>
<name>storage.memorycomponent.numpages</name>
- <value>4096</value>
+ <value>1024</value>
<description>The number of pages to allocate for a memory component.
- (Default = 4096)
+ (Default = 1024)
</description>
</property>
<property>
<name>storage.memorycomponent.globalbudget</name>
- <value>263435456</value>
+ <value>1073741824</value>
<description>The total size of memory in bytes that the sum of all
open memory
components cannot exceed. (Default = "1073741824" // 1GB)
@@ -79,6 +80,14 @@
</property>
<property>
+ <name>txn.log.directory</name>
+ <value>asterix_logs/</value>
+ <description>The directory location for transaction logs. (Default =
+ "asterix_logs/")
+ </description>
+ </property>
+
+ <property>
<name>txn.log.buffer.numpages</name>
<value>8</value>
<description>The number of in-memory log buffer pages. (Default = "8")
@@ -103,10 +112,17 @@
</property>
<property>
+ <name>txn.log.disksectorsize</name>
+ <value>4096</value>
+ <description>The size of a disk sector. (Default = "4096")
+ </description>
+ </property>
+
+ <property>
<name>txn.log.groupcommitinterval</name>
- <value>200</value>
+ <value>1</value>
<description>The group commit wait time in milliseconds. (Default =
- "200" // 2ms)
+ "10" // 0.1ms)
</description>
</property>
@@ -127,6 +143,14 @@
</property>
<property>
+ <name>txn.log.checkpoint.history</name>
+ <value>0</value>
+ <description>The number of old log partition files to keep before
+ discarding. (Default = "0")
+ </description>
+ </property>
+
+ <property>
<name>txn.lock.escalationthreshold</name>
<value>1000</value>
<description>The number of entity level locks that need to be acquired
@@ -137,9 +161,26 @@
<property>
<name>txn.lock.shrinktimer</name>
- <value>120000</value>
+ <value>5000</value>
<description>The time in milliseconds to wait before deallocating
- unused lock manager memory. (Default = "120000" // 120s)
+ unused lock manager memory. (Default = "5000" // 5s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.timeout.waitthreshold</name>
+ <value>60000</value>
+ <description>The time in milliseconds to wait before labeling a
+ transaction which has been waiting for a lock timed-out. (Default =
+ "60000" // 60s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.timeout.sweepthreshold</name>
+ <value>10000</value>
+ <description>The time in milliseconds the timeout thread waits between
+ sweeps to detect timed-out transactions. (Default = "10000" // 10s)
</description>
</property>
@@ -176,8 +217,8 @@
<property>
<name>api.port</name>
- <value>19101</value>
- <description>The port for the ASTERIX API server. (Default = 19101)
+ <value>19002</value>
+ <description>The port for the ASTERIX API server. (Default = 19002)
</description>
</property>
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index f9f5260..c33d130 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -43,38 +43,29 @@
* received from the metadata node, to avoid contacting the metadata node
* repeatedly. We assume that this metadata manager is the only metadata manager
* in an Asterix cluster. Therefore, no separate cache-invalidation mechanism is
- * needed at this point.
- * Assumptions/Limitations:
- * The metadata subsystem is started during NC Bootstrap start, i.e., when
- * Asterix is deployed.
- * The metadata subsystem is destroyed in NC Bootstrap end, i.e., when Asterix
- * is undeployed.
- * The metadata subsystem consists of the MetadataManager and the MatadataNode.
- * The MetadataManager provides users access to the metadata.
- * The MetadataNode implements direct access to the storage layer on behalf of
- * the MetadataManager, and translates the binary representation of ADM into
- * Java objects for consumption by the MetadataManager's users.
- * There is exactly one instance of the MetadataManager and of the MetadataNode
- * in the cluster, which may or may not be co-located on the same machine (or in
- * the same JVM).
- * The MetadataManager exists in the same JVM as its user's (e.g., the query
- * compiler).
- * The MetadataNode exists in the same JVM as it's transactional components
- * (LockManager, LogManager, etc.)
- * Users shall access the metadata only through the MetadataManager, and never
- * via the MetadataNode directly.
+ * needed at this point. Assumptions/Limitations: The metadata subsystem is
+ * started during NC Bootstrap start, i.e., when Asterix is deployed. The
+ * metadata subsystem is destroyed in NC Bootstrap end, i.e., when Asterix is
+ * undeployed. The metadata subsystem consists of the MetadataManager and the
+ * MatadataNode. The MetadataManager provides users access to the metadata. The
+ * MetadataNode implements direct access to the storage layer on behalf of the
+ * MetadataManager, and translates the binary representation of ADM into Java
+ * objects for consumption by the MetadataManager's users. There is exactly one
+ * instance of the MetadataManager and of the MetadataNode in the cluster, which
+ * may or may not be co-located on the same machine (or in the same JVM). The
+ * MetadataManager exists in the same JVM as its user's (e.g., the query
+ * compiler). The MetadataNode exists in the same JVM as it's transactional
+ * components (LockManager, LogManager, etc.) Users shall access the metadata
+ * only through the MetadataManager, and never via the MetadataNode directly.
* Multiple threads may issue requests to the MetadataManager concurrently. For
* the sake of accessing metadata, we assume a transaction consists of one
- * thread.
- * Users are responsible for locking the metadata (using the MetadataManager
- * API) before issuing requests.
- * The MetadataNode is responsible for acquiring finer-grained locks on behalf
- * of requests from the MetadataManager. Currently, locks are acquired per
- * BTree, since the BTree does not acquire even finer-grained locks yet
- * internally.
- * The metadata can be queried with AQL DML like any other dataset, but can only
- * be changed with AQL DDL.
- * The transaction ids for metadata transactions must be unique across the
+ * thread. Users are responsible for locking the metadata (using the
+ * MetadataManager API) before issuing requests. The MetadataNode is responsible
+ * for acquiring finer-grained locks on behalf of requests from the
+ * MetadataManager. Currently, locks are acquired per BTree, since the BTree
+ * does not acquire even finer-grained locks yet internally. The metadata can be
+ * queried with AQL DML like any other dataset, but can only be changed with AQL
+ * DDL. The transaction ids for metadata transactions must be unique across the
* cluster, i.e., metadata transaction ids shall never "accidentally" overlap
* with transaction ids of regular jobs or other metadata transactions.
*/
@@ -220,7 +211,7 @@
@Override
public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
- // add dataset into metadataNode
+ // add dataset into metadataNode
try {
metadataNode.addDataset(ctx.getJobId(), dataset);
} catch (RemoteException e) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 117f492..29bafa9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -34,6 +34,7 @@
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
@@ -101,6 +102,7 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -119,6 +121,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
@@ -157,6 +160,7 @@
private final AsterixStorageProperties storageProperties;
private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
+ private static Scheduler hdfsScheduler;
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
@@ -178,6 +182,16 @@
this.defaultDataverse = defaultDataverse;
this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+ try {
+ if (hdfsScheduler == null) {
+ //set the singleton hdfs scheduler
+ hdfsScheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), ccContext
+ .getClusterControllerInfo().getClientNetPort());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
public void setJobId(JobId jobId) {
@@ -343,8 +357,8 @@
adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
}
- adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties(),
- itemType);
+ adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+ wrapProperties(datasetDetails.getProperties()), itemType);
} catch (AlgebricksException ae) {
throw ae;
} catch (Exception e) {
@@ -362,7 +376,7 @@
RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
- adapterFactoryClassname, datasetDetails.getProperties(), rt, scannerDesc);
+ wrapPropertiesEmpty(datasetDetails.getProperties()), rt, scannerDesc, adapterFactory);
AlgebricksPartitionConstraint constraint;
try {
@@ -427,14 +441,15 @@
}
if (adapterFactory instanceof ITypedDatasetAdapterFactory) {
- adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties());
+ adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(wrapProperties(datasetDetails
+ .getProperties()));
adapterOutputType = ((ITypedDatasourceAdapter) adapter).getAdapterOutputType();
} else if (adapterFactory instanceof IGenericDatasetAdapterFactory) {
String outputTypeName = datasetDetails.getProperties().get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME);
adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
outputTypeName).getDatatype();
adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
- datasetDetails.getProperties(), adapterOutputType);
+ wrapProperties(datasetDetails.getProperties()), adapterOutputType);
} else {
throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
}
@@ -451,7 +466,8 @@
FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
dataset.getDataverseName(), dataset.getDatasetName()), adapterFactoryClassname,
- datasetDetails.getProperties(), (ARecordType) adapterOutputType, feedDesc);
+ this.wrapPropertiesEmpty(datasetDetails.getProperties()), (ARecordType) adapterOutputType, feedDesc,
+ adapterFactory);
AlgebricksPartitionConstraint constraint = null;
try {
@@ -1484,4 +1500,32 @@
return FormatUtils.getDefaultFormat();
}
+ /**
+ * Add HDFS scheduler and the cluster location constraint into the scheduler
+ *
+ * @param properties
+ * the original dataset properties
+ * @return a new map containing the original dataset properties and the scheduler/locations
+ */
+ private Map<String, Object> wrapProperties(Map<String, String> properties) {
+ Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+ wrappedProperties.putAll(properties);
+ wrappedProperties.put(HDFSAdapterFactory.SCHEDULER, hdfsScheduler);
+ wrappedProperties.put(HDFSAdapterFactory.CLUSTER_LOCATIONS, getClusterLocations());
+ return wrappedProperties;
+ }
+
+ /**
+ * Adapt the original properties to a string-object map
+ *
+ * @param properties
+ * the original properties
+ * @return the new stirng-object map
+ */
+ private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
+ Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+ wrappedProperties.putAll(properties);
+ return wrappedProperties;
+ }
+
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 84b989d..afdf343 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -19,7 +19,6 @@
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
@@ -47,48 +46,15 @@
IManagedFeedAdapter {
private static final long serialVersionUID = 1L;
+ private FileSystemBasedAdapter coreAdapter;
+ private String format;
- public static final String KEY_FILE_SYSTEM = "fs";
- public static final String LOCAL_FS = "localfs";
- public static final String HDFS = "hdfs";
-
- private final FileSystemBasedAdapter coreAdapter;
- private final Map<String, String> configuration;
- private final String fileSystem;
- private final String format;
-
- public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, String> configuration) throws Exception {
+ public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, Object> configuration,
+ FileSystemBasedAdapter coreAdapter, String format) throws Exception {
super(atype);
- checkRequiredArgs(configuration);
- fileSystem = configuration.get(KEY_FILE_SYSTEM);
- String adapterFactoryClass = null;
- if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
- adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
- } else if (fileSystem.equals(HDFS)) {
- adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
- } else {
- throw new AsterixException("Unsupported file system type " + fileSystem);
- }
- format = configuration.get(KEY_FORMAT);
- IGenericDatasetAdapterFactory adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(
- adapterFactoryClass).newInstance();
- coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, atype);
this.configuration = configuration;
- }
-
- private void checkRequiredArgs(Map<String, String> configuration) throws Exception {
- if (configuration.get(KEY_FILE_SYSTEM) == null) {
- throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
- }
- if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
- throw new Exception("Record type not specified (output-type-name=?)");
- }
- if (configuration.get(KEY_PATH) == null) {
- throw new Exception("File path not specified (path=?)");
- }
- if (configuration.get(KEY_FORMAT) == null) {
- throw new Exception("File format not specified (format=?)");
- }
+ this.coreAdapter = coreAdapter;
+ this.format = format;
}
@Override
@@ -103,7 +69,7 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
coreAdapter.configure(arguments);
}
@@ -189,16 +155,16 @@
private final ARecordType recordType;
private final IDataParser dataParser;
- private final Map<String, String> configuration;
+ private final Map<String, Object> configuration;
public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
- char fieldDelimiter, Map<String, String> configuration) {
+ char fieldDelimiter, Map<String, Object> configuration) {
this.recordType = recordType;
dataParser = new DelimitedDataParser(recordType, valueParserFactories, fieldDelimiter);
this.configuration = configuration;
}
- public RateControlledTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
+ public RateControlledTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
this.recordType = recordType;
dataParser = new ADMDataParser();
this.configuration = configuration;
@@ -221,10 +187,10 @@
public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
- Map<String, String> configuration) {
+ Map<String, Object> configuration) {
super(ctx, recType);
this.dataParser = dataParser;
- String propValue = configuration.get(INTER_TUPLE_INTERVAL);
+ String propValue = (String) configuration.get(INTER_TUPLE_INTERVAL);
if (propValue != null) {
interTupleInterval = Long.parseLong(propValue);
} else {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 6c32acb..bf1c268 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -16,7 +16,9 @@
import java.util.Map;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
+import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
@@ -28,10 +30,37 @@
* source file has been ingested.
*/
public class RateControlledFileSystemBasedAdapterFactory implements IGenericDatasetAdapterFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static final String KEY_FILE_SYSTEM = "fs";
+ public static final String LOCAL_FS = "localfs";
+ public static final String HDFS = "hdfs";
+ public static final String KEY_PATH = "path";
+ public static final String KEY_FORMAT = "format";
+
+ private IGenericDatasetAdapterFactory adapterFactory;
+ private String format;
+ private boolean setup = false;
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
- return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration);
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType type) throws Exception {
+ if (!setup) {
+ checkRequiredArgs(configuration);
+ String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
+ String adapterFactoryClass = null;
+ if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
+ adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
+ } else if (fileSystem.equals(HDFS)) {
+ adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
+ } else {
+ throw new AsterixException("Unsupported file system type " + fileSystem);
+ }
+ format = (String) configuration.get(KEY_FORMAT);
+ adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
+ setup = true;
+ }
+ return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration,
+ (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, type), format);
}
@Override
@@ -39,4 +68,19 @@
return "file_feed";
}
+ private void checkRequiredArgs(Map<String, Object> configuration) throws Exception {
+ if (configuration.get(KEY_FILE_SYSTEM) == null) {
+ throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
+ }
+ if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
+ throw new Exception("Record type not specified (output-type-name=?)");
+ }
+ if (configuration.get(KEY_PATH) == null) {
+ throw new Exception("File path not specified (path=?)");
+ }
+ if (configuration.get(KEY_FORMAT) == null) {
+ throw new Exception("File format not specified (format=?)");
+ }
+ }
+
}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
index d5e525a..d583352 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
@@ -437,6 +437,20 @@
return s.toString();
}
+
+ public String coreDump() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\t firstUpgrader: " + firstUpgrader);
+ sb.append("\n\t firstWaiter: " + firstWaiter);
+ sb.append("\n\t lastHolder: " + lastHolder);
+ sb.append("\n\t ISCount: " + ISCount);
+ sb.append("\n\t IXCount: " + IXCount);
+ sb.append("\n\t SCount: " + SCount);
+ sb.append("\n\t XCount: " + XCount);
+ sb.append("\n\t entityResourceHT");
+ sb.append(entityResourceHT.prettyPrint());
+ return sb.toString();
+ }
/////////////////////////////////////////////////////////
// set/get method for private variable
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
index 5d81e8a..1e944bc 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -28,13 +30,12 @@
*/
public class EntityInfoManager {
- public static final int SHRINK_TIMER_THRESHOLD = 120000; //2min
-
private ArrayList<ChildEntityInfoArrayManager> pArray;
private int allocChild; //used to allocate the next free EntityInfo slot.
private long shrinkTimer;
private boolean isShrinkTimerOn;
private int occupiedSlots;
+ private int shrinkTimerThreshold;
// ////////////////////////////////////////////////
// // begin of unit test
@@ -127,12 +128,13 @@
// // end of unit test
// ////////////////////////////////////////////////
- public EntityInfoManager() {
+ public EntityInfoManager(int shrinkTimerThreshold) {
pArray = new ArrayList<ChildEntityInfoArrayManager>();
pArray.add(new ChildEntityInfoArrayManager());
allocChild = 0;
occupiedSlots = 0;
isShrinkTimerOn = false;
+ this.shrinkTimerThreshold = shrinkTimerThreshold;
}
public int allocate(int jobId, int datasetId, int entityHashVal, byte lockMode) {
@@ -206,7 +208,7 @@
if (size > 1 && size * ChildEntityInfoArrayManager.NUM_OF_SLOTS / usedSlots >= 3) {
if (isShrinkTimerOn) {
- if (System.currentTimeMillis() - shrinkTimer >= SHRINK_TIMER_THRESHOLD) {
+ if (System.currentTimeMillis() - shrinkTimer >= shrinkTimerThreshold) {
isShrinkTimerOn = false;
return true;
}
@@ -272,29 +274,35 @@
if (child.isDeinitialized()) {
continue;
}
- s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
- s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
- s.append("\tjid\t").append("did\t").append("PK\t").append("DLM\t").append("DLC\t").append("ELM\t")
- .append("ELC\t").append("NEA\t").append("PJR\t").append("NJR\n");
- for (int j = 0; j < ChildEntityInfoArrayManager.NUM_OF_SLOTS; j++) {
- s.append(j).append(": ");
- s.append("\t" + child.getJobId(j));
- s.append("\t" + child.getDatasetId(j));
- s.append("\t" + child.getPKHashVal(j));
- s.append("\t" + child.getDatasetLockMode(j));
- s.append("\t" + child.getDatasetLockCount(j));
- s.append("\t" + child.getEntityLockMode(j));
- s.append("\t" + child.getEntityLockCount(j));
- s.append("\t" + child.getNextEntityActor(j));
- s.append("\t" + child.getPrevJobResource(j));
- s.append("\t" + child.getNextJobResource(j));
- //s.append("\t" + child.getNextDatasetActor(j));
- s.append("\n");
- }
- s.append("\n");
+ s.append("child[" + i + "]");
+ s.append(child.prettyPrint());
}
return s.toString();
}
+
+ public void coreDump(OutputStream os) {
+ StringBuilder sb = new StringBuilder("\n\t########### EntityLockInfoManager Status #############\n");
+ int size = pArray.size();
+ ChildEntityInfoArrayManager child;
+
+ sb.append("Number of Child: " + size + "\n");
+ for (int i = 0; i < size; i++) {
+ try {
+ child = pArray.get(i);
+ sb.append("child[" + i + "]");
+ sb.append(child.prettyPrint());
+
+ os.write(sb.toString().getBytes());
+ } catch (IOException e) {
+ //ignore IOException
+ }
+ sb = new StringBuilder();
+ }
+ }
+
+ public int getShrinkTimerThreshold() {
+ return shrinkTimerThreshold;
+ }
public void initEntityInfo(int slotNum, int jobId, int datasetId, int PKHashVal, byte lockMode) {
pArray.get(slotNum / ChildEntityInfoArrayManager.NUM_OF_SLOTS).initEntityInfo(
@@ -567,6 +575,29 @@
public int getFreeSlotNum() {
return freeSlotNum;
}
+
+ public String prettyPrint() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+ sb.append("\n\tfreeSlotNum:" + getFreeSlotNum() + "\n");
+ sb.append("\tjid\t").append("did\t").append("PK\t").append("DLM\t").append("DLC\t").append("ELM\t")
+ .append("ELC\t").append("NEA\t").append("PJR\t").append("NJR\n");
+ for (int j = 0; j < ChildEntityInfoArrayManager.NUM_OF_SLOTS; j++) {
+ sb.append(j).append(": ");
+ sb.append("\t" + getJobId(j));
+ sb.append("\t" + getDatasetId(j));
+ sb.append("\t" + getPKHashVal(j));
+ sb.append("\t" + getDatasetLockMode(j));
+ sb.append("\t" + getDatasetLockCount(j));
+ sb.append("\t" + getEntityLockMode(j));
+ sb.append("\t" + getEntityLockCount(j));
+ sb.append("\t" + getNextEntityActor(j));
+ sb.append("\t" + getPrevJobResource(j));
+ sb.append("\t" + getNextJobResource(j));
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
//////////////////////////////////////////////////////////////////
// set/get method for each field of EntityInfo plus freeSlot
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
index ca00aa2..2fae460 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -271,22 +273,35 @@
if (child.isDeinitialized()) {
continue;
}
- s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
- s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
- s.append("\tX\t").append("S\t").append("LH\t").append("FW\t").append("UP\n");
- for (int j = 0; j < ChildEntityLockInfoArrayManager.NUM_OF_SLOTS; j++) {
- s.append(j).append(": ");
- s.append("\t" + child.getXCount(j));
- s.append("\t" + child.getSCount(j));
- s.append("\t" + child.getLastHolder(j));
- s.append("\t" + child.getFirstWaiter(j));
- s.append("\t" + child.getUpgrader(j));
- s.append("\n");
- }
- s.append("\n");
+ s.append("child[" + i + "]");
+ s.append(child.prettyPrint());
}
return s.toString();
}
+
+ public void coreDump(OutputStream os) {
+ StringBuilder sb = new StringBuilder("\n\t########### EntityLockInfoManager Status #############\n");
+ int size = pArray.size();
+ ChildEntityLockInfoArrayManager child;
+
+ sb.append("Number of Child: " + size + "\n");
+ for (int i = 0; i < size; i++) {
+ try {
+ child = pArray.get(i);
+ sb.append("child[" + i + "]");
+ sb.append(child.prettyPrint());
+
+ os.write(sb.toString().getBytes());
+ } catch (IOException e) {
+ //ignore IOException
+ }
+ sb = new StringBuilder();
+ }
+ }
+
+ public int getShrinkTimerThreshold() {
+ return SHRINK_TIMER_THRESHOLD;
+ }
//debugging method
public String printWaiters(int slotNum) {
@@ -736,6 +751,23 @@
public int getFreeSlotNum() {
return freeSlotNum;
}
+
+ public String prettyPrint() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+ sb.append("\n\tfreeSlotNum:" + getFreeSlotNum());
+ sb.append("\n\tX\t").append("S\t").append("LH\t").append("FW\t").append("UP\n");
+ for (int j = 0; j < ChildEntityLockInfoArrayManager.NUM_OF_SLOTS; j++) {
+ sb.append(j).append(": ");
+ sb.append("\t" + getXCount(j));
+ sb.append("\t" + getSCount(j));
+ sb.append("\t" + getLastHolder(j));
+ sb.append("\t" + getFirstWaiter(j));
+ sb.append("\t" + getUpgrader(j));
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
//////////////////////////////////////////////////////////////////
// set/get method for each field of EntityLockInfo plus freeSlot
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
index d846603..86eb02b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
@@ -192,9 +192,9 @@
}
}
- public void decreaseDatasetISLockCount(int datasetId) {
+ public void decreaseDatasetISLockCount(int datasetId, int entityToDatasetLockEscalationThreshold) {
int count = datasetISLockHT.get(datasetId);
- if (count >= LockManager.ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) {
+ if (count >= entityToDatasetLockEscalationThreshold) {
//do not decrease the count since it is already escalated.
} else if (count > 1) {
datasetISLockHT.upsert(datasetId, count - 1);
@@ -275,6 +275,17 @@
}
return s.toString();
}
+
+ public String coreDump() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\t datasetISLockHT");
+ sb.append(datasetISLockHT.prettyPrint());
+ sb.append("\n\t firstWaitingResource: " + firstWaitingResource);
+ sb.append("\n\t lastHoldingResource: " + lastHoldingResource);
+ sb.append("\n\t upgradingResource: " + upgradingResource);
+ sb.append("\n\t jobCtx.jobId: " + jobCtx.getJobId());
+ return sb.toString();
+ }
/////////////////////////////////////////////////////////
// set/get method for private variable
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 8fb7494..f0875e0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -15,13 +15,17 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
@@ -33,6 +37,7 @@
import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
/**
* An implementation of the ILockManager interface for the
@@ -42,7 +47,7 @@
* @author pouria, kisskys
*/
-public class LockManager implements ILockManager {
+public class LockManager implements ILockManager, ILifeCycleComponent {
public static final boolean IS_DEBUG_MODE = false;//true
//This variable indicates that the dataset granule X lock request is allowed when
@@ -52,8 +57,6 @@
public static final boolean ALLOW_DATASET_GRANULE_X_LOCK_WITH_OTHER_CONCURRENT_LOCK_REQUESTS = false;
public static final boolean ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET = true;
- //Threshold must be greater than 1 and should be reasonably large enough not to escalate too soon.
- public static final int ESCALATE_TRHESHOLD_ENTITY_TO_DATASET = 1000;
private static final int DO_ESCALATE = 0;
private static final int ESCALATED = 1;
private static final int DONOT_ESCALATE = 2;
@@ -91,7 +94,7 @@
this.waiterLatch = new ReentrantReadWriteLock(true);
this.jobHT = new HashMap<JobId, JobInfo>();
this.datasetResourceHT = new HashMap<DatasetId, DatasetLockInfo>();
- this.entityInfoManager = new EntityInfoManager();
+ this.entityInfoManager = new EntityInfoManager(txnSubsystem.getTransactionProperties().getLockManagerShrinkTimer());
this.lockWaiterManager = new LockWaiterManager();
this.entityLockInfoManager = new EntityLockInfoManager(entityInfoManager, lockWaiterManager);
this.deadlockDetector = new DeadlockDetector(jobHT, datasetResourceHT, entityLockInfoManager,
@@ -106,6 +109,10 @@
this.lockRequestTracker = new LockRequestTracker();
}
}
+
+ public AsterixTransactionProperties getTransactionProperties() {
+ return this.txnSubsystem.getTransactionProperties();
+ }
@Override
public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
@@ -193,7 +200,7 @@
if (doEscalate) {
throw new IllegalStateException(
"ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set to "
- + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+ + txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
}
}
}
@@ -287,9 +294,9 @@
}
int count = jobInfo.getDatasetISLockCount(datasetId);
- if (count == ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) {
+ if (count == txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold()) {
return DO_ESCALATE;
- } else if (count > ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) {
+ } else if (count > txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold()) {
return ESCALATED;
} else {
return DONOT_ESCALATE;
@@ -772,7 +779,7 @@
if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
if (!isInstant && datasetLockMode == LockMode.IS) {
- jobInfo.decreaseDatasetISLockCount(datasetId.getId());
+ jobInfo.decreaseDatasetISLockCount(datasetId.getId(), txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
}
}
@@ -1286,7 +1293,7 @@
//We don't want to allow the lock escalation when there is a first lock request on a dataset.
throw new IllegalStateException(
"ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set to "
- + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+ + txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
}
}
}
@@ -2046,6 +2053,182 @@
unlatchLockTable();
}
}
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ if (dumpState) {
+
+ //#. dump Configurable Variables
+ dumpConfVars(os);
+
+ //#. dump jobHT
+ dumpJobInfo(os);
+
+ //#. dump datasetResourceHT
+ dumpDatasetLockInfo(os);
+
+ //#. dump entityLockInfoManager
+ dumpEntityLockInfo(os);
+
+ //#. dump entityInfoManager
+ dumpEntityInfo(os);
+
+ //#. dump lockWaiterManager
+
+ dumpLockWaiterInfo(os);
+ try {
+ os.flush();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+
+ private void dumpConfVars(OutputStream os) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+ sb.append("\nESCALATE_TRHESHOLD_ENTITY_TO_DATASET: " + txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
+ sb.append("\nSHRINK_TIMER_THRESHOLD (entityLockInfoManager): "
+ + entityLockInfoManager.getShrinkTimerThreshold());
+ sb.append("\nSHRINK_TIMER_THRESHOLD (entityInfoManager): " + entityInfoManager.getShrinkTimerThreshold());
+ sb.append("\nSHRINK_TIMER_THRESHOLD (lockWaiterManager): " + lockWaiterManager.getShrinkTimerThreshold());
+ sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpJobInfo(OutputStream os) {
+ JobId jobId;
+ JobInfo jobInfo;
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ sb.append("\n>>dump_begin\t>>----- [JobInfo] -----");
+ Set<Map.Entry<JobId, JobInfo>> entrySet = jobHT.entrySet();
+ if (entrySet != null) {
+ for (Map.Entry<JobId, JobInfo> entry : entrySet) {
+ if (entry != null) {
+ jobId = entry.getKey();
+ if (jobId != null) {
+ sb.append("\n" + jobId);
+ } else {
+ sb.append("\nJID:null");
+ }
+
+ jobInfo = entry.getValue();
+ if (jobInfo != null) {
+ sb.append(jobInfo.coreDump());
+ } else {
+ sb.append("\nJobInfo:null");
+ }
+ }
+ }
+ }
+ sb.append("\n>>dump_end\t>>----- [JobInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpDatasetLockInfo(OutputStream os) {
+ DatasetId datasetId;
+ DatasetLockInfo datasetLockInfo;
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ sb.append("\n>>dump_begin\t>>----- [DatasetLockInfo] -----");
+ Set<Map.Entry<DatasetId, DatasetLockInfo>> entrySet = datasetResourceHT.entrySet();
+ if (entrySet != null) {
+ for (Map.Entry<DatasetId, DatasetLockInfo> entry : entrySet) {
+ if (entry != null) {
+ datasetId = entry.getKey();
+ if (datasetId != null) {
+ sb.append("\nDatasetId:" + datasetId.getId());
+ } else {
+ sb.append("\nDatasetId:null");
+ }
+
+ datasetLockInfo = entry.getValue();
+ if (datasetLockInfo != null) {
+ sb.append(datasetLockInfo.coreDump());
+ } else {
+ sb.append("\nDatasetLockInfo:null");
+ }
+ }
+ sb.append("\n>>dump_end\t>>----- [DatasetLockInfo] -----\n");
+ os.write(sb.toString().getBytes());
+
+ //create a new sb to avoid possible OOM exception
+ sb = new StringBuilder();
+ }
+ }
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpEntityLockInfo(OutputStream os) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ sb.append("\n>>dump_begin\t>>----- [EntityLockInfo] -----");
+ entityLockInfoManager.coreDump(os);
+ sb.append("\n>>dump_end\t>>----- [EntityLockInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpEntityInfo(OutputStream os) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ sb.append("\n>>dump_begin\t>>----- [EntityInfo] -----");
+ entityInfoManager.coreDump(os);
+ sb.append("\n>>dump_end\t>>----- [EntityInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpLockWaiterInfo(OutputStream os) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ sb.append("\n>>dump_begin\t>>----- [LockWaiterInfo] -----");
+ lockWaiterManager.coreDump(os);
+ sb.append("\n>>dump_end\t>>----- [LockWaiterInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
}
class ConsecutiveWakeupContext {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index 0781276..02fda06 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -7,7 +7,14 @@
import java.util.NoSuchElementException;
import java.util.Scanner;
+import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
+import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
+import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
+import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
+import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
@@ -18,7 +25,7 @@
public class LockManagerDeterministicUnitTest {
- public static void main(String args[]) throws ACIDException, IOException {
+ public static void main(String args[]) throws ACIDException, IOException, AsterixException {
//initialize controller thread
String requestFileName = new String(
"src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockRequestFile");
@@ -39,8 +46,9 @@
String requestFileName;
long defaultWaitTime;
- public LockRequestController(String requestFileName) throws ACIDException {
- this.txnProvider = new TransactionSubsystem("LockManagerPredefinedUnitTest", null);;
+ public LockRequestController(String requestFileName) throws ACIDException, AsterixException {
+ this.txnProvider = new TransactionSubsystem("LockManagerPredefinedUnitTest", null,
+ new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
this.workerReadyQueue = new WorkerReadyQueue();
this.requestList = new ArrayList<LockRequest>();
this.expectedResultList = new ArrayList<ArrayList<Integer>>();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
index b2cc6ab..a942325 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
@@ -3,7 +3,10 @@
import java.util.ArrayList;
import java.util.Random;
+import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
@@ -28,9 +31,10 @@
private static int jobId = 0;
private static Random rand;
- public static void main(String args[]) throws ACIDException {
+ public static void main(String args[]) throws ACIDException, AsterixException {
int i;
- TransactionSubsystem txnProvider = new TransactionSubsystem("LockManagerRandomUnitTest", null);
+ TransactionSubsystem txnProvider = new TransactionSubsystem("LockManagerRandomUnitTest", null,
+ new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
rand = new Random(System.currentTimeMillis());
for (i = 0; i < MAX_NUM_OF_ENTITY_LOCK_JOB; i++) {
System.out.println("Creating " + i + "th EntityLockJob..");
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
index bd414de..c6fcbd5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
/**
@@ -258,28 +260,42 @@
StringBuilder s = new StringBuilder("\n########### LockWaiterManager Status #############\n");
int size = pArray.size();
ChildLockWaiterArrayManager child;
- LockWaiter waiter;
for (int i = 0; i < size; i++) {
child = pArray.get(i);
if (child.isDeinitialized()) {
continue;
}
- s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
- s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
- for (int j = 0; j < ChildLockWaiterArrayManager.NUM_OF_SLOTS; j++) {
- waiter = child.getLockWaiter(j);
- s.append(j).append(": ");
- s.append("\t" + waiter.getEntityInfoSlot());
- s.append("\t" + waiter.needWait());
- s.append("\t" + waiter.isVictim());
- s.append("\n");
- }
- s.append("\n");
+ s.append("child[" + i + "]");
+ s.append(child.prettyPrint());
}
return s.toString();
}
+ public void coreDump(OutputStream os) {
+ StringBuilder sb = new StringBuilder("\n########### LockWaiterManager Status #############\n");
+ int size = pArray.size();
+ ChildLockWaiterArrayManager child;
+
+ sb.append("Number of Child: " + size + "\n");
+ for (int i = 0; i < size; i++) {
+ try {
+ child = pArray.get(i);
+ sb.append("child[" + i + "]");
+ sb.append(child.prettyPrint());
+
+ os.write(sb.toString().getBytes());
+ } catch (IOException e) {
+ //ignore IOException
+ }
+ sb = new StringBuilder();
+ }
+ }
+
+ public int getShrinkTimerThreshold() {
+ return SHRINK_TIMER_THRESHOLD;
+ }
+
public LockWaiter getLockWaiter(int slotNum) {
return pArray.get(slotNum / ChildLockWaiterArrayManager.NUM_OF_SLOTS).getLockWaiter(
slotNum % ChildLockWaiterArrayManager.NUM_OF_SLOTS);
@@ -364,4 +380,20 @@
public int getFreeSlotNum() {
return freeSlotNum;
}
+
+ public String prettyPrint() {
+ LockWaiter waiter;
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+ sb.append("\n\tfreeSlotNum:" + getFreeSlotNum() + "\n");
+ for (int j = 0; j < ChildLockWaiterArrayManager.NUM_OF_SLOTS; j++) {
+ waiter = getLockWaiter(j);
+ sb.append(j).append(": ");
+ sb.append("\t" + waiter.getEntityInfoSlot());
+ sb.append("\t" + waiter.needWait());
+ sb.append("\t" + waiter.isVictim());
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
index a53c890..7cbd193 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
@@ -2,6 +2,7 @@
import java.util.LinkedList;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
/**
@@ -14,19 +15,21 @@
*/
public class TimeOutDetector {
- static final long TIME_OUT_THRESHOLD = 60000;
- static final long SWEEP_PERIOD = 10000;//120000;
LockManager lockMgr;
Thread trigger;
LinkedList<LockWaiter> victimList;
+ int timeoutThreshold;
+ int sweepThreshold;
public TimeOutDetector(LockManager lockMgr) {
this.victimList = new LinkedList<LockWaiter>();
this.lockMgr = lockMgr;
this.trigger = new Thread(new TimeoutTrigger(this));
+ this.timeoutThreshold = lockMgr.getTransactionProperties().getTimeoutWaitThreshold();
+ this.sweepThreshold = lockMgr.getTransactionProperties().getTimeoutSweepThreshold();
trigger.setDaemon(true);
- trigger.start();
+ AsterixThreadExecutor.INSTANCE.execute(trigger);
}
public void sweep() throws ACIDException {
@@ -38,7 +41,7 @@
}
public void checkAndSetVictim(LockWaiter waiterObj) {
- if (System.currentTimeMillis() - waiterObj.getBeginWaitTime() >= TIME_OUT_THRESHOLD) {
+ if (System.currentTimeMillis() - waiterObj.getBeginWaitTime() >= timeoutThreshold) {
waiterObj.setVictim(true);
waiterObj.setWait(false);
victimList.add(waiterObj);
@@ -67,7 +70,7 @@
public void run() {
while (true) {
try {
- Thread.sleep(TimeOutDetector.SWEEP_PERIOD);
+ Thread.sleep(owner.sweepThreshold);
owner.sweep(); // Trigger the timeout detector (the owner) to
// initiate sweep
} catch (InterruptedException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 318996a..31c6e78 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -17,6 +17,7 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -31,6 +32,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.FileBasedBuffer;
import edu.uci.ics.asterix.common.transactions.FileUtil;
@@ -49,8 +51,9 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
-public class LogManager implements ILogManager {
+public class LogManager implements ILogManager, ILifeCycleComponent {
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
@@ -110,7 +113,7 @@
public LogManager(TransactionSubsystem provider) throws ACIDException {
this.provider = provider;
- initLogManagerProperties(this.provider.getId());
+ logManagerProperties = new LogManagerProperties(this.provider.getTransactionProperties(), this.provider.getId());
logPageSize = logManagerProperties.getLogPageSize();
initLogManager();
statLogSize = 0;
@@ -119,43 +122,13 @@
public LogManager(TransactionSubsystem provider, String nodeId) throws ACIDException {
this.provider = provider;
- initLogManagerProperties(nodeId);
+ logManagerProperties = new LogManagerProperties(provider.getTransactionProperties(), nodeId);
logPageSize = logManagerProperties.getLogPageSize();
initLogManager();
statLogSize = 0;
statLogCount = 0;
}
- /*
- * initialize the log manager properties either from the configuration file
- * on disk or with default values
- */
- private void initLogManagerProperties(String nodeId) throws ACIDException {
- LogManagerProperties logProperties = null;
- InputStream is = null;
- try {
- is = this.getClass().getClassLoader()
- .getResourceAsStream(TransactionManagementConstants.LogManagerConstants.LOG_CONF_FILE);
-
- Properties p = new Properties();
-
- if (is != null) {
- p.load(is);
- }
- logProperties = new LogManagerProperties(p, nodeId);
-
- } catch (IOException ioe) {
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- throw new ACIDException("unable to close input stream ", e);
- }
- }
- }
- logManagerProperties = logProperties;
- }
-
private void initLogManager() throws ACIDException {
logRecordHelper = new LogRecordHelper(this);
numLogPages = logManagerProperties.getNumLogPages();
@@ -185,7 +158,7 @@
*/
logPageFlusher = new LogPageFlushThread(this);
logPageFlusher.setDaemon(true);
- logPageFlusher.start();
+ AsterixThreadExecutor.INSTANCE.execute(logPageFlusher);
}
public int getLogPageIndex(long lsnValue) {
@@ -758,6 +731,60 @@
map.clear();
}
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ if (dumpState) {
+ //#. dump Configurable Variables
+ dumpConfVars(os);
+
+ //#. dump LSNInfo
+ dumpLSNInfo(os);
+
+ try {
+ os.flush();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+
+ private void dumpConfVars(OutputStream os) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+ sb.append(logManagerProperties.toString());
+ sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpLSNInfo(OutputStream os) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n>>dump_begin\t>>----- [LSNInfo] -----");
+ sb.append("\nstartingLSN: " + startingLSN);
+ sb.append("\ncurrentLSN: " + lsn.get());
+ sb.append("\nlastFlushedLSN: " + lastFlushedLSN.get());
+ sb.append("\n>>dump_end\t>>----- [LSNInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
}
/*
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
index a934d6c..635518d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -11,8 +11,8 @@
public class CheckpointThread extends Thread {
- private static final long LSN_THRESHOLD = 64 * 1024 * 1024;
- private long checkpointTermInSecs = 120; //seconds.
+ private long lsnThreshold;
+ private long checkpointTermInSecs;
private long lastMinMCTFirstLSN = 0;
@@ -20,12 +20,11 @@
private final IIndexLifecycleManager indexLifecycleManager;
public CheckpointThread(IRecoveryManager recoveryMgr, IIndexLifecycleManager indexLifecycleManager,
- long checkpointTermInSecs) {
+ long lsnThreshold, long checkpointTermInSecs) {
this.recoveryMgr = recoveryMgr;
this.indexLifecycleManager = indexLifecycleManager;
- if (this.checkpointTermInSecs < checkpointTermInSecs) {
- this.checkpointTermInSecs = checkpointTermInSecs;
- }
+ this.lsnThreshold = lsnThreshold;
+ this.checkpointTermInSecs = checkpointTermInSecs;
}
@Override
@@ -39,7 +38,7 @@
}
currentMinMCTFirstLSN = getMinMCTFirstLSN();
- if (currentMinMCTFirstLSN - lastMinMCTFirstLSN > LSN_THRESHOLD) {
+ if (currentMinMCTFirstLSN - lastMinMCTFirstLSN > lsnThreshold) {
try {
recoveryMgr.checkpoint(false);
lastMinMCTFirstLSN = currentMinMCTFirstLSN;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 50d4625..7947558 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -22,7 +22,9 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -57,6 +59,7 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
@@ -83,11 +86,12 @@
* not in place completely. Once we have physical logging implemented, we would
* add support for crash recovery.
*/
-public class RecoveryManager implements IRecoveryManager {
+public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
private final TransactionSubsystem txnSubsystem;
+ private final int checkpointHistory;
/**
* A file at a known location that contains the LSN of the last log record
@@ -98,6 +102,7 @@
public RecoveryManager(TransactionSubsystem TransactionProvider) throws ACIDException {
this.txnSubsystem = TransactionProvider;
+ this.checkpointHistory = this.txnSubsystem.getTransactionProperties().getCheckpointHistory();
}
/**
@@ -430,7 +435,7 @@
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting sharp checkpoint ... ");
}
-
+
LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
String logDir = logMgr.getLogManagerProperties().getLogDir();
@@ -514,15 +519,17 @@
//#. delete the previous checkpoint files
if (prevCheckpointFiles != null) {
- for (File file : prevCheckpointFiles) {
- file.delete();
+ // sort the filenames lexicographically to keep the latest checkpointHistory files.
+ Arrays.sort(prevCheckpointFiles);
+ for (int i = 0; i < prevCheckpointFiles.length - this.checkpointHistory; ++i) {
+ prevCheckpointFiles[i].delete();
}
}
if (isSharpCheckpoint) {
logMgr.renewLogFiles();
}
-
+
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Completed sharp checkpoint.");
}
@@ -782,6 +789,16 @@
+ undoCount);
}
}
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ //no op
+ }
}
class TxnId {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 2659d8f..d4360ef 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -172,5 +172,15 @@
return (o == this);
}
-
+ public String prettyPrint() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n" + jobId + "\n");
+ sb.append("transactionType: " + transactionType);
+ sb.append("firstLogLocator: " + firstLogLocator.getLsn() + "\n");
+ sb.append("lastLogLocator: " + lastLogLocator.getLsn() + "\n");
+ sb.append("TransactionState: " + txnState + "\n");
+ sb.append("startWaitTime: " + startWaitTime + "\n");
+ sb.append("status: " + status + "\n");
+ return sb.toString();
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index 7d04f61..25b9195 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -26,10 +26,6 @@
}
public static class LogManagerConstants {
- public static final String LOG_CONF_DIR = "log_conf";
- public static final String LOG_CONF_FILE = "log.properties";
- public static final String ASTERIX_CONF_DIR = "src/main/resources";
- public static final String DEFAULT_LOG_DIR = "asterix_logs";
public static final int TERMINAL_LSN = -1;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 4e8808f..04f12ac 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -14,8 +14,11 @@
*/
package edu.uci.ics.asterix.transaction.management.service.transaction;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -26,15 +29,18 @@
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
/**
* An implementation of the @see ITransactionManager interface that provides
* implementation of APIs for governing the lifecycle of a transaction.
*/
-public class TransactionManager implements ITransactionManager {
+public class TransactionManager implements ITransactionManager, ILifeCycleComponent {
+
+ public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName());
private final TransactionSubsystem transactionProvider;
- private Map<JobId, ITransactionContext> ITransactionContextRepository = new HashMap<JobId, ITransactionContext>();
+ private Map<JobId, ITransactionContext> transactionContextRepository = new HashMap<JobId, ITransactionContext>();
private AtomicInteger maxJobId = new AtomicInteger(0);
public TransactionManager(TransactionSubsystem provider) {
@@ -61,7 +67,7 @@
} finally {
txnContext.releaseResources();
transactionProvider.getLockManager().releaseLocks(txnContext);
- ITransactionContextRepository.remove(txnContext.getJobId());
+ transactionContextRepository.remove(txnContext.getJobId());
txnContext.setTxnState(TransactionState.ABORTED);
}
}
@@ -72,7 +78,7 @@
setMaxJobId(jobId.getId());
ITransactionContext txnContext = new TransactionContext(jobId, transactionProvider);
synchronized (this) {
- ITransactionContextRepository.put(jobId, txnContext);
+ transactionContextRepository.put(jobId, txnContext);
}
return txnContext;
}
@@ -80,13 +86,13 @@
@Override
public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
setMaxJobId(jobId.getId());
- synchronized (ITransactionContextRepository) {
+ synchronized (transactionContextRepository) {
- ITransactionContext context = ITransactionContextRepository.get(jobId);
+ ITransactionContext context = transactionContextRepository.get(jobId);
if (context == null) {
- context = ITransactionContextRepository.get(jobId);
+ context = transactionContextRepository.get(jobId);
context = new TransactionContext(jobId, transactionProvider);
- ITransactionContextRepository.put(jobId, context);
+ transactionContextRepository.put(jobId, context);
}
return context;
}
@@ -107,13 +113,13 @@
if (PKHashVal != -1) {
transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
/*****************************
- try {
- //decrease the transaction reference count on index
- txnContext.decreaseActiveTransactionCountOnIndexes();
- } catch (HyracksDataException e) {
- throw new ACIDException("failed to complete index operation", e);
- }
- *****************************/
+ * try {
+ * //decrease the transaction reference count on index
+ * txnContext.decreaseActiveTransactionCountOnIndexes();
+ * } catch (HyracksDataException e) {
+ * throw new ACIDException("failed to complete index operation", e);
+ * }
+ *****************************/
return;
}
@@ -131,7 +137,7 @@
} finally {
txnContext.releaseResources();
transactionProvider.getLockManager().releaseLocks(txnContext); // release
- ITransactionContextRepository.remove(txnContext.getJobId());
+ transactionContextRepository.remove(txnContext.getJobId());
txnContext.setTxnState(TransactionState.COMMITTED);
}
}
@@ -151,12 +157,69 @@
public TransactionSubsystem getTransactionProvider() {
return transactionProvider;
}
-
+
public void setMaxJobId(int jobId) {
maxJobId.set(Math.max(maxJobId.get(), jobId));
}
-
+
public int getMaxJobId() {
return maxJobId.get();
}
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ if (dumpState) {
+ //#. dump TxnContext
+ dumpTxnContext(os);
+
+ try {
+ os.flush();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+
+ private void dumpTxnContext(OutputStream os) {
+ JobId jobId;
+ ITransactionContext txnCtx;
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+ Set<Map.Entry<JobId, ITransactionContext>> entrySet = transactionContextRepository.entrySet();
+ if (entrySet != null) {
+ for (Map.Entry<JobId, ITransactionContext> entry : entrySet) {
+ if (entry != null) {
+ jobId = entry.getKey();
+ if (jobId != null) {
+ sb.append("\n" + jobId);
+ } else {
+ sb.append("\nJID:null");
+ }
+
+ txnCtx = entry.getValue();
+ if (txnCtx != null) {
+ sb.append(txnCtx.prettyPrint());
+ } else {
+ sb.append("\nTxnCtx:null");
+ }
+ }
+ }
+ }
+
+ sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index eb6e9a2..bde44de 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.asterix.transaction.management.service.transaction;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.ILockManager;
@@ -42,10 +43,12 @@
private final IndexLoggerRepository loggerRepository;
private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
private final CheckpointThread checkpointThread;
+ private final AsterixTransactionProperties txnProperties;
- public TransactionSubsystem(String id, IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider)
- throws ACIDException {
+ public TransactionSubsystem(String id, IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider,
+ AsterixTransactionProperties txnProperties) throws ACIDException {
this.id = id;
+ this.txnProperties = txnProperties;
this.transactionManager = new TransactionManager(this);
this.logManager = new LogManager(this);
this.lockManager = new LockManager(this);
@@ -55,7 +58,8 @@
this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
if (asterixAppRuntimeContextProvider != null) {
this.checkpointThread = new CheckpointThread(recoveryManager,
- asterixAppRuntimeContextProvider.getIndexLifecycleManager(), 0);
+ asterixAppRuntimeContextProvider.getIndexLifecycleManager(),
+ this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency());
} else {
this.checkpointThread = null;
}
@@ -89,6 +93,10 @@
return asterixAppRuntimeContextProvider;
}
+ public AsterixTransactionProperties getTransactionProperties() {
+ return txnProperties;
+ }
+
public String getId() {
return id;
}
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java
index 07f7474..862c996 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java
@@ -16,7 +16,10 @@
import java.io.IOException;
+import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -28,10 +31,11 @@
recoveryManager.startRecovery(true);
}
- public static void main(String args[]) throws IOException, ACIDException {
+ public static void main(String args[]) throws IOException, ACIDException, AsterixException {
String id = "nc1";
try {
- TransactionSubsystem factory = new TransactionSubsystem(id, null);
+ TransactionSubsystem factory = new TransactionSubsystem(id, null, new AsterixTransactionProperties(
+ new AsterixPropertiesAccessor()));
IRecoveryManager recoveryManager = factory.getRecoveryManager();
recoveryManager.startRecovery(true);
} catch (ACIDException acide) {
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
index 8f56e3b..b9605ef 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
@@ -18,7 +18,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ILogManager;
@@ -48,8 +51,9 @@
transactions = new Transaction[workload.numActiveThreads];
}
- public void beginWorkload() throws ACIDException {
- provider = new TransactionSubsystem("nc1", null);
+ public void beginWorkload() throws ACIDException, AsterixException {
+ provider = new TransactionSubsystem("nc1", null, new AsterixTransactionProperties(
+ new AsterixPropertiesAccessor()));
logManager = provider.getLogManager();
lockManager = provider.getLockManager();
provider.getTransactionalResourceRepository().registerTransactionalResourceManager(DummyResourceMgr.id,
@@ -83,7 +87,7 @@
System.out.println(" Avg Content Creation time :" + BasicLogger.getAverageContentCreationTime());
}
- public static void main(String args[]) {
+ public static void main(String args[]) throws AsterixException {
WorkloadProperties workload = new WorkloadProperties();
TransactionWorkloadSimulator simulator = new TransactionWorkloadSimulator(workload);
try {
@@ -177,12 +181,12 @@
byte logActionType = LogActionType.REDO_UNDO;
long pageId = 0;
if (!retry) {
- lockMode = (byte)(random.nextInt(2));
+ lockMode = (byte) (random.nextInt(2));
}
tempDatasetId.setId(resourceID);
TransactionWorkloadSimulator.lockManager.lock(tempDatasetId, -1, lockMode, context);
- TransactionWorkloadSimulator.logManager.log(logType, context, resourceID,
- -1, resourceID, ResourceType.LSM_BTREE, logSize, null, logger, memLSN);
+ TransactionWorkloadSimulator.logManager.log(logType, context, resourceID, -1, resourceID,
+ ResourceType.LSM_BTREE, logSize, null, logger, memLSN);
retry = false;
Thread.currentThread().sleep(TransactionWorkloadSimulator.workload.thinkTime);
logCount.incrementAndGet();
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
index 66e9bdf..c322b31 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
@@ -17,7 +17,10 @@
import java.io.IOException;
import java.util.Random;
+import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ILogManager;
@@ -47,9 +50,10 @@
private LogicalLogLocator memLSN;
private TransactionSubsystem transactionProvider;
- public TransactionSimulator(IResource resource, IResourceManager resourceMgr) throws ACIDException {
+ public TransactionSimulator(IResource resource, IResourceManager resourceMgr) throws ACIDException, AsterixException {
String id = "nc1";
- transactionProvider = new TransactionSubsystem(id, null);
+ transactionProvider = new TransactionSubsystem(id, null, new AsterixTransactionProperties(
+ new AsterixPropertiesAccessor()));
transactionManager = transactionProvider.getTransactionManager();
logManager = transactionProvider.getLogManager();
lockManager = transactionProvider.getLockManager();
@@ -102,7 +106,7 @@
/**
* @param args
*/
- public static void main(String[] args) throws IOException, ACIDException {
+ public static void main(String[] args) throws IOException, ACIDException, AsterixException {
String fileDir = "testdata";
String fileName = "counterFile";
IResource resource = new FileResource(fileDir, fileName);