Add support for group_ids param instead of groups param

This commit is contained in:
Rusty Brooks 2019-11-18 12:38:32 -06:00
parent 65d96829a2
commit 1c25a71a1b
3 changed files with 85 additions and 41 deletions

3
.gitignore vendored
View File

@ -89,6 +89,7 @@ ENV/
.ropeproject .ropeproject
# IntelliJ cruft # IntelliJ cruft
.idea/* .idea/**
.vscode/**
build build

View File

@ -203,6 +203,7 @@ class OTXv2(object):
:param tags(list of strings) short keywords to associate with your pulse :param tags(list of strings) short keywords to associate with your pulse
:param references(list of strings, preferably URLs) external references for this threat :param references(list of strings, preferably URLs) external references for this threat
:param indicators(list of objects) IOCs to include in pulse :param indicators(list of objects) IOCs to include in pulse
:param group_ids(list of integers) Group IDs for groups pulse should be added to. You must be a member of the group and able to add pulses to the group
:return: request body response :return: request body response
:raises BadRequest (400) On failure, BadRequest will be raised containing the invalid fields. :raises BadRequest (400) On failure, BadRequest will be raised containing the invalid fields.
@ -223,7 +224,8 @@ class OTXv2(object):
'TLP': kwargs.get('TLP', kwargs.get('tlp', 'green')), 'TLP': kwargs.get('TLP', kwargs.get('tlp', 'green')),
'tags': kwargs.get('tags', []), 'tags': kwargs.get('tags', []),
'references': kwargs.get('references', []), 'references': kwargs.get('references', []),
'indicators': kwargs.get('indicators', []) 'indicators': kwargs.get('indicators', []),
'groups': kwargs.get('group_ids', []),
} }
# name is required. Public is too but will be set True if not specified. # name is required. Public is too but will be set True if not specified.
if not body.get('name'): if not body.get('name'):

View File

@ -29,12 +29,16 @@ def create_user(username, password, email):
Create a user, and get the API key Create a user, and get the API key
""" """
print("creating user {}".format(username)) print("creating user {}".format(username))
requests.post(ALIEN_DEV_SERVER + 'otxapi/qatests/setup/', json={"users": [{ "username": username, "password": password, "email": email}]}) requests.post(ALIEN_DEV_SERVER + 'otxapi/qatests/setup/', json={"users": [
{"username": username, "password": password, "email": email, "group_ids": [64, 51, 2931]}
]})
r = requests.post(ALIEN_DEV_SERVER + 'auth/login', json={"username": username, "password": password}) r = requests.post(ALIEN_DEV_SERVER + 'auth/login', json={"username": username, "password": password})
j = json.loads(r.text) j = json.loads(r.text)
r = requests.get(ALIEN_DEV_SERVER + 'otxapi/user/?detailed=true', headers={'Authorization': j['key']}) r = requests.get(ALIEN_DEV_SERVER + 'otxapi/user/?detailed=true', headers={'Authorization': j['key']})
j = r.json() j = r.json()
return j['api_keys'][0]['api_key'] API_KEY = j['api_keys'][0]['api_key']
return API_KEY
def delete_user(username): def delete_user(username):
@ -52,6 +56,7 @@ class TestOTXv2(unittest.TestCase):
self.api_key = api_key or ALIEN_API_APIKEY self.api_key = api_key or ALIEN_API_APIKEY
self.otx = OTXv2(self.api_key, server=ALIEN_DEV_SERVER) self.otx = OTXv2(self.api_key, server=ALIEN_DEV_SERVER)
'''
class TestSubscriptionsInvalidKey(TestOTXv2): class TestSubscriptionsInvalidKey(TestOTXv2):
""" """
Confirm InvalidAPIKey class is raised for API Key failures Confirm InvalidAPIKey class is raised for API Key failures
@ -273,9 +278,10 @@ class TestIndicatorDetails(TestOTXv2):
full_details = self.otx.get_indicator_details_full(IndicatorTypes.EMAIL, "me@rustybrooks.com") full_details = self.otx.get_indicator_details_full(IndicatorTypes.EMAIL, "me@rustybrooks.com")
self.assertTrue(sorted(full_details.keys()) == sorted(IndicatorTypes.EMAIL.sections)) self.assertTrue(sorted(full_details.keys()) == sorted(IndicatorTypes.EMAIL.sections))
# pprint.pprint(full_details) # pprint.pprint(full_details)
'''
class TestPulseCreate(TestOTXv2): class TestPulseCreate(TestOTXv2):
'''
def test_create_pulse_simple(self): def test_create_pulse_simple(self):
name = "Pyclient-simple-unittests-" + generate_rand_string(8, charset=string.hexdigits).lower() name = "Pyclient-simple-unittests-" + generate_rand_string(8, charset=string.hexdigits).lower()
# print("test_create_pulse_simple submitting pulse: " + name) # print("test_create_pulse_simple submitting pulse: " + name)
@ -373,47 +379,82 @@ class TestPulseCreate(TestOTXv2):
return return
def test_create_pulse_and_edit_via_patch_pulse(self): def test_create_pulse_and_edit_via_patch_pulse(self):
""" """
Test: create a pulse then add indicators via a patch pulse object Test: create a pulse then add indicators via a patch pulse object
""" """
indicator_list = [ {'indicator': "one.com", 'type': 'domain'} ] indicator_list = [ {'indicator': "one.com", 'type': 'domain'} ]
name = "Pyclient-indicators-unittests-modify-pulse-patch-pulse" name = "Pyclient-indicators-unittests-modify-pulse-patch-pulse"
response = self.otx.create_pulse(name=name, public=False, indicators=indicator_list) response = self.otx.create_pulse(name=name, public=False, indicators=indicator_list)
pulse_id = response['id'] pulse_id = response['id']
# Edit the pulse using a patch pulse object # Edit the pulse using a patch pulse object
# We could also edit indicators etc. here # We could also edit indicators etc. here
pp = PatchPulse(pulse_id) pp = PatchPulse(pulse_id)
pp.add("tags", ["addtag1", "addtag2"]) pp.add("tags", ["addtag1", "addtag2"])
pp.set("description","New Description") pp.set("description","New Description")
response = self.otx.edit_pulse(pulse_id, pp.getBody()) response = self.otx.edit_pulse(pulse_id, pp.getBody())
new_tags = str(response['tags']) new_tags = str(response['tags'])
self.assertTrue('addtag1' in new_tags) self.assertTrue('addtag1' in new_tags)
return return
def test_create_pulse_tlp(self): def test_create_pulse_tlp(self):
""" """
Test: pulse with each TLP. Test: pulse with each TLP.
""" """
charset = string.ascii_letters charset = string.ascii_letters
indicator_list = [ indicator_list = [
{'indicator': generate_rand_string(10, charset=charset) + ".com", 'type': IndicatorTypes.DOMAIN.name, 'description': 'evil domain (unittests)'}, {'indicator': generate_rand_string(10, charset=charset) + ".com", 'type': IndicatorTypes.DOMAIN.name, 'description': 'evil domain (unittests)'},
{'indicator': generate_rand_string(3, charset=charset) + "." + generate_rand_string(10, charset=charset) + ".com", 'type': IndicatorTypes.HOSTNAME.name, 'description': 'evil hostname (unittests)'} {'indicator': generate_rand_string(3, charset=charset) + "." + generate_rand_string(10, charset=charset) + ".com", 'type': IndicatorTypes.HOSTNAME.name, 'description': 'evil hostname (unittests)'}
] ]
name = "Pyclient-tlp-unittests-" + generate_rand_string(8, charset=string.hexdigits).lower() name = "Pyclient-tlp-unittests-" + generate_rand_string(8, charset=string.hexdigits).lower()
tlps = ['red', 'amber', 'green', 'white'] tlps = ['red', 'amber', 'green', 'white']
for tlp in tlps: for tlp in tlps:
# print("test_create_pulse_tlp: submitting pulse: {}".format({"name": name, "tlp": tlp})) # print("test_create_pulse_tlp: submitting pulse: {}".format({"name": name, "tlp": tlp}))
response = self.otx.create_pulse(name=name, public=False, tlp=tlp, indicators=indicator_list) response = self.otx.create_pulse(name=name, public=False, tlp=tlp, indicators=indicator_list)
self.assertTrue(response.get('name', '') == name) self.assertTrue(response.get('name', '') == name)
self.assertTrue(response.get('TLP', '') == tlp) self.assertTrue(response.get('TLP', '') == tlp)
self.assertFalse(response.get('public')) self.assertFalse(response.get('public'))
return return
'''
def test_create_pulse_groups(self):
"""
Test: pulse with different sets of group ids
Test user needs to be a member of the groups used in this test: 64, 51
Additionall we will use the test groups 1 and 2, that it is NOT a member of
"""
charset = string.ascii_letters
indicator_list = [
{'indicator': generate_rand_string(10, charset=charset) + ".com", 'type': IndicatorTypes.DOMAIN.name, 'description': 'evil domain (unittests)'},
{'indicator': generate_rand_string(3, charset=charset) + "." + generate_rand_string(10, charset=charset) + ".com", 'type': IndicatorTypes.HOSTNAME.name, 'description': 'evil hostname (unittests)'}
]
for groups, expected in [
([], []),
(None, []),
([1, 51], 'error'), # Not a member of group 1
# ([51, 2931], [51, 2931]),
# ([64, 51, 1], 'error'),
# ([1], 'error'),
]:
name = "Pyclient-tlp-unittests-" + generate_rand_string(8, charset=string.hexdigits).lower()
print(groups, expected)
if expected == 'error':
with self.assertRaises(BadRequest):
self.otx.create_pulse(name=name, indicators=indicator_list, group_ids=groups)
else:
response = self.otx.create_pulse(name=name, indicators=indicator_list, group_ids=groups)
self.assertEqual(response.get('name', ''), name)
self.assertEqual(response.get('group_ids'), expected)
return
'''
class TestPulseCreateInvalidKey(TestOTXv2): class TestPulseCreateInvalidKey(TestOTXv2):
def setUp(self, **kwargs): def setUp(self, **kwargs):
super(TestPulseCreateInvalidKey, self).setUp(**{'api_key': "ALIEN_API_APIKEY"}) super(TestPulseCreateInvalidKey, self).setUp(**{'api_key': "ALIEN_API_APIKEY"})
@ -740,7 +781,7 @@ class TestOTXv2Cached(unittest.TestCase):
self.assertIsNotNone(pulse.get('tags', None)) self.assertIsNotNone(pulse.get('tags', None))
self.assertIsNotNone(pulse.get('references', None)) self.assertIsNotNone(pulse.get('references', None))
self.assertIsNotNone(res.get('exact_match')) self.assertIsNotNone(res.get('exact_match'))
'''
if __name__ == '__main__': if __name__ == '__main__':
username = "qatester-git-{}".format(rand) username = "qatester-git-{}".format(rand)