adapt to new field mapping syntax and add unit tests

This commit is contained in:
sseifert 2017-01-09 14:37:18 +01:00
parent 9126874c87
commit 187b557eee
2 changed files with 190 additions and 43 deletions

View File

@ -1,5 +1,6 @@
import json
import requests
import re
from collections import OrderedDict
@ -30,55 +31,57 @@ def parse_issue(issue, field_mapping):
result = OrderedDict()
result['key'] = issue['key']
for k, v in issue['fields'].iteritems():
for k, v in issue['fields'].iteritems():#
output_name = field_mapping.get_output_field_name(k)
member_names = field_mapping.get_dict_members(k)
# if field mapping is defined optionally change output key and parsing rules for value
if k in field_mapping:
mapping = field_mapping[k]
output_key = k
if 'name' in mapping:
output_key = mapping['name']
put_value(result, output_key, v, mapping)
if isinstance(v, dict):
if len(member_names) > 0:
# if field mapping with dict member mappings defined get value of each member
for member_name in member_names:
if member_name in v:
result[field_mapping.get_dict_output_field_name(k,member_name)] = v[member_name]
else:
# these special mapping rules are kept for backwards compatibility
if 'key' in v:
result['{}_key'.format(output_name)] = v['key']
if 'name' in v:
result['{}_name'.format(output_name)] = v['name']
if k in v:
result[output_name] = v[k]
if 'watchCount' in v:
result[output_name] = v['watchCount']
elif isinstance(v, list):
if len(member_names) > 0:
# if field mapping with dict member mappings defined get value of each member
for member_name in member_names:
listValues = []
for listItem in v:
if isinstance(listItem, dict):
if member_name in listItem:
listValues.append(listItem[member_name])
if len(listValues) > 0:
result[field_mapping.get_dict_output_field_name(k,member_name)] = ','.join(listValues)
else:
# otherwise support list values only for non-dict items
listValues = []
for listItem in v:
if not isinstance(listItem, dict):
listValues.append(listItem)
if len(listValues) > 0:
result[output_name] = ','.join(listValues)
else:
put_value(result, k, v, {})
result[output_name] = v
return result
def put_value(result, k, v, mapping):
if isinstance(v, dict):
if 'member' in mapping:
result[k] = v[mapping['member']]
else:
# these special mapping rules are kept for backwards compatibility
if 'key' in v:
result['{}_key'.format(k)] = v['key']
if 'name' in v:
result['{}_name'.format(k)] = v['name']
if k in v:
result[k] = v[k]
if 'watchCount' in v:
result[k] = v['watchCount']
elif isinstance(v, list):
listValues = []
for listItem in v:
if isinstance(listItem, dict):
if 'member' in mapping:
listValues.append(listItem[mapping['member']])
else:
listValues.append(listItem)
if len(listValues) > 0:
result[k] = ','.join(listValues)
else:
result[k] = v
def parse_issues(data, field_mapping):
results = ResultSet()
@ -94,6 +97,46 @@ def parse_count(data):
return results
class FieldMapping:
def __init__(cls, query_field_mapping):
cls.mapping = []
for k, v in query_field_mapping.iteritems():
field_name = k
member_name = None
# check for member name contained in field name
member_parser = re.search('(\w+)\.(\w+)', k)
if (member_parser):
field_name = member_parser.group(1)
member_name = member_parser.group(2)
cls.mapping.append({
'field_name': field_name,
'member_name': member_name,
'output_field_name': v
})
def get_output_field_name(cls,field_name):
for item in cls.mapping:
if item['field_name'] == field_name and not item['member_name']:
return item['output_field_name']
return field_name
def get_dict_members(cls,field_name):
member_names = []
for item in cls.mapping:
if item['field_name'] == field_name and item['member_name']:
member_names.append(item['member_name'])
return member_names
def get_dict_output_field_name(cls,field_name, member_name):
for item in cls.mapping:
if item['field_name'] == field_name and item['member_name'] == member_name:
return item['output_field_name']
return None
class JiraJQL(BaseQueryRunner):
noop_query = '{"queryType": "count"}'
@ -135,7 +178,7 @@ class JiraJQL(BaseQueryRunner):
try:
query = json.loads(query)
query_type = query.pop('queryType', 'select')
field_mapping = query.pop('fieldMapping', {})
field_mapping = FieldMapping(query.pop('fieldMapping', {}))
if query_type == 'count':
query['maxResults'] = 1

View File

@ -0,0 +1,104 @@
from unittest import TestCase
from redash.query_runner.jql import FieldMapping, parse_issue
class TestFieldMapping(TestCase):
def test_empty(self):
field_mapping = FieldMapping({})
self.assertEqual(field_mapping.get_output_field_name('field1'), 'field1')
self.assertEqual(field_mapping.get_dict_output_field_name('field1','member1'), None)
self.assertEqual(field_mapping.get_dict_members('field1'), [])
def test_with_mappings(self):
field_mapping = FieldMapping({
'field1': 'output_name_1',
'field2.member1': 'output_name_2',
'field2.member2': 'output_name_3'
})
self.assertEqual(field_mapping.get_output_field_name('field1'), 'output_name_1')
self.assertEqual(field_mapping.get_dict_output_field_name('field1','member1'), None)
self.assertEqual(field_mapping.get_dict_members('field1'), [])
self.assertEqual(field_mapping.get_output_field_name('field2'), 'field2')
self.assertEqual(field_mapping.get_dict_output_field_name('field2','member1'), 'output_name_2')
self.assertEqual(field_mapping.get_dict_output_field_name('field2','member2'), 'output_name_3')
self.assertEqual(field_mapping.get_dict_output_field_name('field2','member3'), None)
self.assertEqual(field_mapping.get_dict_members('field2'), ['member1','member2'])
class TestParseIssue(TestCase):
issue = {
'key': 'KEY-1',
'fields': {
'string_field': 'value1',
'int_field': 123,
'string_list_field': ['value1','value2'],
'dict_field': {'member1':'value1','member2': 'value2'},
'dict_list_field': [
{'member1':'value1a','member2': 'value2a'},
{'member1':'value1b','member2': 'value2b'}
],
'dict_legacy': {'key':'legacyKey','name':'legacyName','dict_legacy':'legacyValue'},
'watchers': {'watchCount':10}
}
}
def test_no_mapping(self):
result = parse_issue(self.issue, FieldMapping({}))
self.assertEqual(result['key'], 'KEY-1')
self.assertEqual(result['string_field'], 'value1')
self.assertEqual(result['int_field'], 123)
self.assertEqual(result['string_list_field'], 'value1,value2')
self.assertEqual('dict_field' in result, False)
self.assertEqual('dict_list_field' in result, False)
self.assertEqual(result['dict_legacy'], 'legacyValue')
self.assertEqual(result['dict_legacy_key'], 'legacyKey')
self.assertEqual(result['dict_legacy_name'], 'legacyName')
self.assertEqual(result['watchers'], 10)
def test_mapping(self):
result = parse_issue(self.issue, FieldMapping({
'string_field': 'string_output_field',
'string_list_field': 'string_output_list_field',
'dict_field.member1': 'dict_field_1',
'dict_field.member2': 'dict_field_2',
'dict_list_field.member1': 'dict_list_field_1',
'dict_legacy.key': 'dict_legacy',
'watchers.watchCount': 'watchCount',
}))
self.assertEqual(result['key'], 'KEY-1')
self.assertEqual(result['string_output_field'], 'value1')
self.assertEqual(result['int_field'], 123)
self.assertEqual(result['string_output_list_field'], 'value1,value2')
self.assertEqual(result['dict_field_1'], 'value1')
self.assertEqual(result['dict_field_2'], 'value2')
self.assertEqual(result['dict_list_field_1'], 'value1a,value1b')
self.assertEqual(result['dict_legacy'], 'legacyKey')
self.assertEqual('dict_legacy_key' in result, False)
self.assertEqual('dict_legacy_name' in result, False)
self.assertEqual('watchers' in result, False)
self.assertEqual(result['watchCount'], 10)
def test_mapping_nonexisting_field(self):
result = parse_issue(self.issue, FieldMapping({
'non_existing_field': 'output_name1',
'dict_field.non_existing_member': 'output_name2',
'dict_list_field.non_existing_member': 'output_name3'
}))
self.assertEqual(result['key'], 'KEY-1')
self.assertEqual(result['string_field'], 'value1')
self.assertEqual(result['int_field'], 123)
self.assertEqual(result['string_list_field'], 'value1,value2')
self.assertEqual('dict_field' in result, False)
self.assertEqual('dict_list_field' in result, False)
self.assertEqual(result['dict_legacy'], 'legacyValue')
self.assertEqual(result['dict_legacy_key'], 'legacyKey')
self.assertEqual(result['dict_legacy_name'], 'legacyName')
self.assertEqual(result['watchers'], 10)