OTXP-3825 - Allow more customization with replace_pulse_indicators, amend tests

This commit is contained in:
Rusty Brooks 2019-12-05 13:41:56 -06:00
parent 3443af6453
commit 73a2755dac
2 changed files with 24 additions and 18 deletions

View File

@ -541,6 +541,7 @@ class OTXv2(object):
:return: Return the new pulse
"""
expire_date = datetime.datetime.utcnow().isoformat()
current_indicators = {x['indicator']: x for x in self.get_pulse_indicators(pulse_id)}
@ -551,8 +552,13 @@ class OTXv2(object):
if indicator['indicator'] not in current_indicators:
indicators_to_add.append(indicator)
else:
this_ind = current_indicators[indicator['indicator']]
indicators_to_amend.append({"id": this_ind["id"], "expiration": "", "title": "", "is_active": 1})
indicator.update({
'id': current_indicators[indicator['indicator']]['id'],
'title': indicator.get('title', ''),
'expiration': indicator.get('expiration', ''),
'is_active': 1,
})
indicators_to_amend.append(indicator)
del current_indicators[indicator['indicator']]
for indicator in current_indicators.values():

View File

@ -382,12 +382,12 @@ class TestPulseCreate(TestOTXv2):
response = self.otx.create_pulse(name=name, public=False, indicators=indicator_list)
pulse_id = response['id']
check_fields = ['indicator', 'type', 'expiration', 'is_active']
check_fields = ['indicator', 'type', 'expiration', 'is_active', 'title']
expected = [
{'indicator': u'bar@alienvault.com', 'type': u'email', 'expiration': None, 'is_active': 1},
{'indicator': u'foo@alienvault.com', 'type': u'email', 'expiration': None, 'is_active': 1},
{'indicator': u'one.com', 'type': u'domain', 'expiration': None, 'is_active': 1},
{'indicator': u'two.com', 'type': u'domain', 'expiration': None, 'is_active': 1},
{'indicator': u'bar@alienvault.com', 'type': u'email', 'expiration': None, 'is_active': 1, 'title': u''},
{'indicator': u'foo@alienvault.com', 'type': u'email', 'expiration': None, 'is_active': 1, 'title': u''},
{'indicator': u'one.com', 'type': u'domain', 'expiration': None, 'is_active': 1, 'title': u''},
{'indicator': u'two.com', 'type': u'domain', 'expiration': None, 'is_active': 1, 'title': u''},
]
actual = sorted([{f: x[f] for f in check_fields} for x in self.otx.get_pulse_indicators(pulse_id)], key=lambda x: x['indicator'])
self.assertEqual(expected, actual)
@ -401,11 +401,11 @@ class TestPulseCreate(TestOTXv2):
]
self.otx.replace_pulse_indicators(pulse_id, new_indicators)
expected = [
{'indicator': u'bar@alienvault.com', 'type': u'email', 'expiration': None, 'is_active': 1},
{'indicator': u'foo@alienvault.com', 'type': u'email', 'expiration': None, 'is_active': 1},
{'indicator': u'one.com', 'type': u'domain', 'expiration': 'today', 'is_active': 1},
{'indicator': u'three.com', 'type': u'domain', 'expiration': None, 'is_active': 1},
{'indicator': u'two.com', 'type': u'domain', 'expiration': None, 'is_active': 1},
{'indicator': u'bar@alienvault.com', 'type': u'email', 'expiration': None, 'is_active': 1, 'title': u''},
{'indicator': u'foo@alienvault.com', 'type': u'email', 'expiration': None, 'is_active': 1, 'title': u''},
{'indicator': u'one.com', 'type': u'domain', 'expiration': 'today', 'is_active': 1, 'title': u'Expired'},
{'indicator': u'three.com', 'type': u'domain', 'expiration': None, 'is_active': 1, 'title': u''},
{'indicator': u'two.com', 'type': u'domain', 'expiration': None, 'is_active': 1, 'title': u''},
]
actual = sorted(
[{f: x[f] for f in check_fields} for x in self.otx.get_pulse_indicators(pulse_id)],
@ -419,18 +419,18 @@ class TestPulseCreate(TestOTXv2):
# add one.com back, which should reactivate it, and leave two.com out which should expire it
new_indicators = [
{'indicator': "one.com", 'type': 'domain'},
{'indicator': "one.com", 'type': 'domain', 'title': 'new title'},
{'indicator': "three.com", 'type': 'domain'},
{'indicator': "foo@alienvault.com", 'type': 'email'},
{'indicator': "bar@alienvault.com", 'type': 'email'},
]
self.otx.replace_pulse_indicators(pulse_id, new_indicators)
expected = [
{'indicator': u'bar@alienvault.com', 'type': u'email', 'expiration': None, 'is_active': 1},
{'indicator': u'foo@alienvault.com', 'type': u'email', 'expiration': None, 'is_active': 1},
{'indicator': u'one.com', 'type': u'domain', 'expiration': None, 'is_active': 1},
{'indicator': u'three.com', 'type': u'domain', 'expiration': None, 'is_active': 1},
{'indicator': u'two.com', 'type': u'domain', 'expiration': 'today', 'is_active': 1},
{'indicator': u'bar@alienvault.com', 'type': u'email', 'expiration': None, 'is_active': 1, 'title': u''},
{'indicator': u'foo@alienvault.com', 'type': u'email', 'expiration': None, 'is_active': 1, 'title': u''},
{'indicator': u'one.com', 'type': u'domain', 'expiration': None, 'is_active': 1, 'title': u'new title'},
{'indicator': u'three.com', 'type': u'domain', 'expiration': None, 'is_active': 1, 'title': u''},
{'indicator': u'two.com', 'type': u'domain', 'expiration': 'today', 'is_active': 1, 'title': u'Expired'},
]
actual = sorted(
[{f: x[f] for f in check_fields} for x in self.otx.get_pulse_indicators(pulse_id)],