mirror of
https://github.com/valitydev/atomic-threat-coverage.git
synced 2024-11-07 01:55:21 +00:00
Fix bug, add comments and formatting
This commit is contained in:
parent
e93ee791ed
commit
3fe6fed0dc
@ -43,47 +43,61 @@ def main(**kwargs):
|
|||||||
lp_list = load_yamls(kwargs['lp_path'])[0]
|
lp_list = load_yamls(kwargs['lp_path'])[0]
|
||||||
alerts, path_to_alerts = load_yamls(kwargs['alerts_path'])
|
alerts, path_to_alerts = load_yamls(kwargs['alerts_path'])
|
||||||
result = []
|
result = []
|
||||||
|
|
||||||
|
# Iterate through alerts and pathes to them
|
||||||
for alert, path in zip(alerts, path_to_alerts):
|
for alert, path in zip(alerts, path_to_alerts):
|
||||||
threats = [tag for tag in alert['tags'] if tag.startswith('attack')]
|
threats = [tag for tag in alert['tags'] if tag.startswith('attack')]
|
||||||
# For every dataNeeded file we do that - for every DN_ID in alert check if its in DataNeeded Title
|
# For every dataNeeded file we do that - for every DN_ID in alert check if its in DataNeeded Title
|
||||||
if alert.get('additions') is None:
|
if alert.get('additions') is None:
|
||||||
alert['additions'] = [alert]
|
alert['additions'] = [alert]
|
||||||
for addition in alert['additions']:
|
for addition in alert['additions']:
|
||||||
eventID = str(addition['detection']['selection']['EventID'])
|
pass
|
||||||
dn_titles = main_dn_calculatoin_func(path)
|
|
||||||
alert_dns = [data for data in dn_list if data['title'] in dn_titles]
|
|
||||||
for dn in alert_dns:
|
dn_titles = main_dn_calculatoin_func(path)
|
||||||
logging_policy = [l for l in lp_list if l['title'] in dn['loggingpolicy'] ]
|
alert_dns = [data for data in dn_list if data['title'] in dn_titles]
|
||||||
if isinstance(logging_policy, list):
|
logging_policies = []
|
||||||
if len(logging_policy) > 0:
|
|
||||||
logging_policy = logging_policy[0]
|
|
||||||
else:
|
for dn in alert_dns:
|
||||||
logging_policy = {'title': "-", 'eventID': [-1,]}
|
# If there are logging policies in DN that we havent added yet - add them
|
||||||
dn['loggingpolicy'] = logging_policy
|
logging_policies.extend([l for l in lp_list if l['title'] in dn['loggingpolicy'] and l not in logging_policies ])
|
||||||
tactics = [f'{ta_mapping[threat][1]}: {ta_mapping[threat][0]}' for threat in threats if threat in ta_mapping.keys() ]
|
# If there are no logging policices at all - make an empty one just to make one row in csv
|
||||||
techniques = [threat for threat in threats if threat.startswith('attack.t')]
|
if not isinstance(logging_policies, list) or len(logging_policies) == 0:
|
||||||
for tactic in tactics:
|
logging_policies = [{'title': "-", 'eventID': [-1, ]}]
|
||||||
for technique in techniques:
|
|
||||||
for dn in alert_dns:
|
# Name tactics and threats correctly
|
||||||
lp = dn['loggingpolicy']
|
tactics = [f'{ta_mapping[threat][1]}: {ta_mapping[threat][0]}' for threat in threats if threat in ta_mapping.keys() ]
|
||||||
|
techniques = [threat for threat in threats if threat.startswith('attack.t')]
|
||||||
|
|
||||||
|
# Append alert data to a result array
|
||||||
|
for tactic in tactics:
|
||||||
|
for technique in techniques:
|
||||||
|
for dn in alert_dns:
|
||||||
|
for lp in logging_policies:
|
||||||
for field in dn['fields']:
|
for field in dn['fields']:
|
||||||
for eventID in lp['eventID']:
|
for eventID in lp['eventID']:
|
||||||
eventID = str(eventID)
|
eventID = str(eventID)
|
||||||
result.append(
|
result.append(
|
||||||
[tactic,technique, alert['title'],field,
|
[tactic,technique, alert['title'],field,
|
||||||
dn['platform'],dn['type'],dn['channel'],eventID, lp['title'].replace('\n','')])
|
dn['platform'],dn['type'],dn['channel'],eventID, lp['title'].replace('\n','')])
|
||||||
|
|
||||||
|
# Write a result array as csv
|
||||||
with open('../analytics.csv', 'w', newline='') as csvfile:
|
with open('../analytics.csv', 'w', newline='') as csvfile:
|
||||||
alertswriter = csv.writer(csvfile, delimiter=',') # maybe need some quoting
|
alertswriter = csv.writer(csvfile, delimiter=',') # maybe need some quoting
|
||||||
alertswriter.writerow(['tactic','technique', 'title', 'field', 'dn_PLATFORM', 'dn_TYPE',
|
alertswriter.writerow(['tactic','technique', 'title', 'field', 'dn_PLATFORM', 'dn_TYPE',
|
||||||
'dn_channel', 'dn_event_id', 'logging_policy_title '])
|
'dn_channel', 'dn_event_id', 'logging_policy_title '])
|
||||||
for row in result:
|
for row in result:
|
||||||
alertswriter.writerow(row)
|
alertswriter.writerow(row)
|
||||||
|
|
||||||
|
|
||||||
print('Export succesfull')
|
print('Export succesfull')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
opts, args = getopt.getopt(sys.argv[1:], "", ["alerts_path=", "dataneeded_path=", "loggingpolicies_path=", "help"])
|
opts, args = getopt.getopt(sys.argv[1:], "", ["alerts_path=", "dataneeded_path=", "loggingpolicies_path=", "help"])
|
||||||
|
|
||||||
# complex check in case '--help' would be in some path
|
# complex check in case '--help' would be in some path
|
||||||
if len(sys.argv) > 1 and '--help' in sys.argv[1] and len(sys.argv[1]) < 7:
|
if len(sys.argv) > 1 and '--help' in sys.argv[1] and len(sys.argv[1]) < 7:
|
||||||
print(HELP_MESSAGE)
|
print(HELP_MESSAGE)
|
||||||
|
Loading…
Reference in New Issue
Block a user