redash/tests/query_runner/test_athena.py

191 lines
8.0 KiB
Python

"""
Some test cases around the Glue catalog.
"""
from unittest import TestCase
import botocore
import mock
from botocore.stub import Stubber
from redash.query_runner.athena import Athena
class TestGlueSchema(TestCase):
def setUp(self):
client = botocore.session.get_session().create_client(
'glue', region_name='mars-east-1', aws_access_key_id='foo', aws_secret_access_key='bar'
)
self.stubber = Stubber(client)
self.patcher = mock.patch('boto3.client')
mocked_client = self.patcher.start()
mocked_client.return_value = client
def tearDown(self):
self.patcher.stop()
def test_external_table(self):
"""Unpartitioned table crawled through a JDBC connection"""
query_runner = Athena({'glue': True, 'region': 'mars-east-1'})
self.stubber.add_response('get_databases', {'DatabaseList': [{'Name': 'test1'}]}, {})
self.stubber.add_response(
'get_tables',
{
'TableList': [
{
'Name': 'jdbc_table',
'StorageDescriptor': {
'Columns': [{'Name': 'row_id', 'Type': 'int'}],
'Location': 'Database.Schema.Table',
'Compressed': False,
'NumberOfBuckets': -1,
'SerdeInfo': {'Parameters': {}},
'BucketColumns': [],
'SortColumns': [],
'Parameters': {
'CrawlerSchemaDeserializerVersion': '1.0',
'CrawlerSchemaSerializerVersion': '1.0',
'UPDATED_BY_CRAWLER': 'jdbc',
'classification': 'sqlserver',
'compressionType': 'none',
'connectionName': 'jdbctest',
'typeOfData': 'view',
},
'StoredAsSubDirectories': False,
},
'PartitionKeys': [],
'TableType': 'EXTERNAL_TABLE',
'Parameters': {
'CrawlerSchemaDeserializerVersion': '1.0',
'CrawlerSchemaSerializerVersion': '1.0',
'UPDATED_BY_CRAWLER': 'jdbc',
'classification': 'sqlserver',
'compressionType': 'none',
'connectionName': 'jdbctest',
'typeOfData': 'view',
},
}
]
},
{'DatabaseName': 'test1'},
)
with self.stubber:
assert query_runner.get_schema() == [{'columns': ['row_id'], 'name': 'test1.jdbc_table'}]
def test_partitioned_table(self):
"""
Partitioned table as created by a GlueContext
"""
query_runner = Athena({'glue': True, 'region': 'mars-east-1'})
self.stubber.add_response('get_databases', {'DatabaseList': [{'Name': 'test1'}]}, {})
self.stubber.add_response(
'get_tables',
{
'TableList': [
{
'Name': 'partitioned_table',
'StorageDescriptor': {
'Columns': [{'Name': 'sk', 'Type': 'int'}],
'Location': 's3://bucket/prefix',
'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
'Compressed': False,
'NumberOfBuckets': -1,
'SerdeInfo': {
'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe',
'Parameters': {'serialization.format': '1'},
},
'BucketColumns': [],
'SortColumns': [],
'Parameters': {},
'SkewedInfo': {
'SkewedColumnNames': [],
'SkewedColumnValues': [],
'SkewedColumnValueLocationMaps': {},
},
'StoredAsSubDirectories': False,
},
'PartitionKeys': [{'Name': 'category', 'Type': 'int'}],
'TableType': 'EXTERNAL_TABLE',
'Parameters': {'EXTERNAL': 'TRUE', 'transient_lastDdlTime': '1537505313'},
}
]
},
{'DatabaseName': 'test1'},
)
with self.stubber:
assert query_runner.get_schema() == [{'columns': ['sk', 'category'], 'name': 'test1.partitioned_table'}]
def test_view(self):
query_runner = Athena({'glue': True, 'region': 'mars-east-1'})
self.stubber.add_response('get_databases', {'DatabaseList': [{'Name': 'test1'}]}, {})
self.stubber.add_response(
'get_tables',
{
'TableList': [
{
'Name': 'view',
'StorageDescriptor': {
'Columns': [{'Name': 'sk', 'Type': 'int'}],
'Location': '',
'Compressed': False,
'NumberOfBuckets': 0,
'SerdeInfo': {},
'SortColumns': [],
'StoredAsSubDirectories': False,
},
'PartitionKeys': [],
'ViewOriginalText': '/* Presto View: ... */',
'ViewExpandedText': '/* Presto View */',
'TableType': 'VIRTUAL_VIEW',
'Parameters': {'comment': 'Presto View', 'presto_view': 'true'},
}
]
},
{'DatabaseName': 'test1'},
)
with self.stubber:
assert query_runner.get_schema() == [{'columns': ['sk'], 'name': 'test1.view'}]
def test_dodgy_table_does_not_break_schema_listing(self):
"""
For some reason, not all Glue tables contain a "PartitionKeys" entry.
This may be a Athena Catalog to Glue catalog migration issue.
"""
query_runner = Athena({'glue': True, 'region': 'mars-east-1'})
self.stubber.add_response('get_databases', {'DatabaseList': [{'Name': 'test1'}]}, {})
self.stubber.add_response(
'get_tables',
{
'TableList': [
{
'Name': 'csv',
'StorageDescriptor': {
'Columns': [{'Name': 'region', 'Type': 'string'}],
'Location': 's3://bucket/files/',
'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
'Compressed': False,
'NumberOfBuckets': 0,
'SerdeInfo': {
'SerializationLibrary': 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe',
'Parameters': {'field.delim': '|', 'skip.header.line.count': '1'},
},
'SortColumns': [],
'StoredAsSubDirectories': False,
},
'Parameters': {'classification': 'csv'},
}
]
},
{'DatabaseName': 'test1'},
)
with self.stubber:
assert query_runner.get_schema() == [{'columns': ['region'], 'name': 'test1.csv'}]