To be able to serialize the results on the server side obtain the printer factories for the results and pass it to the result handling metadataprovider run time.
In order to achieve this, provide the serializer factory provider to
Algebricks which in turn gets the serializer factory and passes it
on the actual operator. The operator runtime in turn uses the factory
to create the serializer object and it is used while generating the
results to serialize it.
We implement these interfaces in Asterix and for Hivestrix and
Pregelix just return null from the result handling runtime because
we don't handle result in those projects.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2975 123451ca-8445-de46-9d55-352943316053
diff --git a/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java b/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
index cb4918d..6948769 100644
--- a/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
+++ b/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
@@ -32,124 +32,105 @@
@SuppressWarnings("rawtypes")
public class HiveMetaDataProvider<S, T> implements IMetadataProvider<S, T> {
- private Operator fileSink;
- private Schema outputSchema;
- private HashMap<S, IDataSource<S>> dataSourceMap;
+ private Operator fileSink;
+ private Schema outputSchema;
+ private HashMap<S, IDataSource<S>> dataSourceMap;
- public HiveMetaDataProvider(Operator fsOp, Schema oi,
- HashMap<S, IDataSource<S>> map) {
- fileSink = fsOp;
- outputSchema = oi;
- dataSourceMap = map;
- }
-
- @Override
- public IDataSourceIndex<T, S> findDataSourceIndex(T indexId, S dataSourceId)
- throws AlgebricksException {
- return null;
- }
-
- @Override
- public IDataSource<S> findDataSource(S id) throws AlgebricksException {
- return dataSourceMap.get(id);
- }
-
- @Override
- public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource) {
- return true;
- }
-
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
- IDataSource<S> dataSource, List<LogicalVariable> scanVariables,
- List<LogicalVariable> projectVariables, boolean projectPushed,
- IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
- JobGenContext context, JobSpecification jobSpec)
- throws AlgebricksException {
-
- S desc = dataSource.getId();
- HiveScanRuntimeGenerator generator = new HiveScanRuntimeGenerator(
- (PartitionDesc) desc);
- return generator.getRuntimeOperatorAndConstraint(dataSource,
- scanVariables, projectVariables, projectPushed, context,
- jobSpec);
- }
-
- @Override
- public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(
- IDataSink sink, int[] printColumns,
- IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
-
- HiveWriteRuntimeGenerator generator = new HiveWriteRuntimeGenerator(
- (FileSinkOperator) fileSink, outputSchema);
- return generator.getWriterRuntime(inputDesc);
- }
-
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
- IDataSource<S> arg0, IOperatorSchema arg1,
- List<LogicalVariable> arg2, LogicalVariable arg3,
- RecordDescriptor arg4, JobGenContext arg5, JobSpecification arg6)
- throws AlgebricksException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
- IDataSource<S> arg0, IOperatorSchema arg1,
- List<LogicalVariable> arg2, LogicalVariable arg3,
- RecordDescriptor arg4, JobGenContext arg5, JobSpecification arg6)
- throws AlgebricksException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
- IDataSource<S> arg0, IOperatorSchema arg1,
- List<LogicalVariable> arg2, LogicalVariable arg3,
- JobGenContext arg4, JobSpecification arg5)
- throws AlgebricksException {
- // TODO Auto-generated method stub
- return null;
- }
+ public HiveMetaDataProvider(Operator fsOp, Schema oi, HashMap<S, IDataSource<S>> map) {
+ fileSink = fsOp;
+ outputSchema = oi;
+ dataSourceMap = map;
+ }
@Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
- RecordDescriptor recordDescriptor, boolean ordered, JobSpecification spec) throws AlgebricksException {
+ public IDataSourceIndex<T, S> findDataSourceIndex(T indexId, S dataSourceId) throws AlgebricksException {
return null;
}
- @Override
- public IFunctionInfo lookupFunction(FunctionIdentifier arg0) {
- return new HiveFunctionInfo(arg0, null);
- }
+ @Override
+ public IDataSource<S> findDataSource(S id) throws AlgebricksException {
+ return dataSourceMap.get(id);
+ }
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
- IDataSourceIndex<T, S> dataSource,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
- RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec) throws AlgebricksException {
- // TODO Auto-generated method stub
- return null;
- }
+ @Override
+ public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource) {
+ return true;
+ }
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
- IDataSourceIndex<T, S> dataSource,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
- RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec) throws AlgebricksException {
- // TODO Auto-generated method stub
- return null;
- }
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<S> dataSource,
+ List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
+ IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
+ throws AlgebricksException {
+
+ S desc = dataSource.getId();
+ HiveScanRuntimeGenerator generator = new HiveScanRuntimeGenerator((PartitionDesc) desc);
+ return generator.getRuntimeOperatorAndConstraint(dataSource, scanVariables, projectVariables, projectPushed,
+ context, jobSpec);
+ }
+
+ @Override
+ public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
+ int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
+
+ HiveWriteRuntimeGenerator generator = new HiveWriteRuntimeGenerator((FileSinkOperator) fileSink, outputSchema);
+ return generator.getWriterRuntime(inputDesc);
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> arg0,
+ IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, RecordDescriptor arg4,
+ JobGenContext arg5, JobSpecification arg6) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> arg0,
+ IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, RecordDescriptor arg4,
+ JobGenContext arg5, JobSpecification arg6) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> arg0,
+ IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, JobGenContext arg4,
+ JobSpecification arg5) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
+ int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+ JobSpecification spec) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
+ public IFunctionInfo lookupFunction(FunctionIdentifier arg0) {
+ return new HiveFunctionInfo(arg0, null);
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
+ IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
+ IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}