Merge branch 'release/1.5.0'

This commit is contained in:
Nabil Adouani 2017-07-05 13:30:27 +02:00
commit 0ced7bec60
199 changed files with 1304 additions and 530 deletions

View File

@ -1,6 +1,6 @@
{
"name": "Abuse_Finder",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",
@ -10,7 +10,7 @@
"max_tlp":3,
"service":""
},
"description": "Use CERT-SG's Abuse Finder to find the abuse contact associated with domain names, URLs, IPs and email addresses",
"dataTypeList": ["ip", "domain", "url","email"],
"description": "Find abuse contacts associated with domain names, URLs, IPs and email addresses",
"dataTypeList": ["ip", "domain", "url", "mail"],
"command": "Abuse_Finder/abusefinder.py"
}

View File

@ -14,6 +14,14 @@ logging.getLogger("tldextract").setLevel(logging.CRITICAL)
class AbuseFinderAnalyzer(Analyzer):
def summary(self, raw):
taxonomies = []
if raw['abuse_finder'] and raw['abuse_finder'].get('abuse'):
for abuse in raw['abuse_finder']['abuse']:
taxonomies.append(self.build_taxonomy("info", "Abuse_Finder", "Address", abuse))
return {"taxonomies": taxonomies}
def abuse(self):
if self.data_type == "ip":
return ip_abuse(self.getData())

View File

@ -0,0 +1,12 @@
{
"name": "CERTatPassiveDNS",
"author": "Nils Kuhnert, CERT-Bund",
"license": "AGPL-V3",
"url": "https://github.com/BSI-CERT-Bund/cortex-analyzers",
"version": "2.0",
"baseConfig": "CERTatPassiveDNS",
"config": {},
"description": "Checks CERT.at Passive DNS for a given domain, API Key via cert.at.",
"dataTypeList": ["domain", "fqdn"],
"command": "CERTatPassiveDNS/certat_passivedns.py"
}

View File

@ -0,0 +1,37 @@
#!/usr/bin/env python3
from cortexutils.analyzer import Analyzer
from whois_wrapper import query
class CERTatPassiveDNSAnalyzer(Analyzer):
"""Very simple passive dns wrapper for pdns.cert.at. Needs no credentials because access is controlled through
firewall rules. If you want to get access, you have to contact CERT.AT, but:
CERT.AT pDNS is not a public service. It is only available for national / governmental CERTs in good standing with
CERT.AT. For access, you have to get in contact with CERT.AT.
"""
def __init__(self):
Analyzer.__init__(self)
self.limit = self.get_param('config.limit', '100')
def run(self):
self.report({'results': query(self.getData(), int(self.limit))})
def summary(self, raw):
taxonomies = []
level = "info"
namespace = "CERT.at"
predicate = "PassiveDNS"
results = raw.get('results')
r = len(results)
if r == 0 or r == 1:
value = "\"{} hit\"".format(r)
else:
value = "\"{} hits\"".format(r)
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomies}
if __name__ == '__main__':
CERTatPassiveDNSAnalyzer().run()

View File

@ -0,0 +1 @@
cortexutils

View File

@ -0,0 +1,2 @@
#!/usr/bin/env bash
whois -h pdns.cert.at " $1"

View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
from re import findall
from subprocess import check_output
def __query(domain, limit=100):
"""Using the shell script to query pdns.cert.at is a hack, but python raises an error every time using subprocess
functions to call whois. So this hack is avoiding calling whois directly. Ugly, but works.
:param domain: The domain pdns is queried with.
:type domain: str
:param limit: Maximum number of results
:type limit: int
:returns: str -- Console output from whois call.
:rtype: str
"""
s = check_output(['./whois.sh', '--limit {} {}'.format(limit, domain)], universal_newlines=True)
return s
def __process_results(results):
"""Processes the result from __query to get valid json from every entry.
:param results: Results from __query
:type results: str
:returns: python list of dictionaries containing the relevant results.
:rtype: list
"""
result_list = []
# Splts the result and cuts first and last dataset which are comments
split = results.split(sep='\n\n')[1:-1]
for entry in split:
entry_dict = {}
for value in entry.split('\n'):
if len(value) < 1:
continue
(desc, val) = value.split(': ')
entry_dict[desc.replace('-', '')] = val.strip(' ')
result_list.append(entry_dict)
return result_list
def query(domain: str, limit: int=100):
"""Queries and returns a python dict with results.
:param domain: domain that should be queried
:type domain: str
:param limit: number of entries to return
:type limit: int
:returns: query results
:rtype: list
"""
return __process_results(__query(domain, limit))

View File

@ -3,7 +3,7 @@
"author": "Nils Kuhnert, CERT-Bund",
"license": "AGPL-V3",
"url": "https://github.com/BSI-CERT-Bund/cortex-analyzers",
"version": "1.0",
"version": "2.0",
"baseConfig": "CIRCLPassiveDNS",
"config": {},
"description": "Check CIRCL's Passive DNS for a given domain or URL",

View File

@ -37,7 +37,25 @@ class CIRCLPassiveDNSAnalyzer(Analyzer):
return clean_result
def summary(self, raw):
return {'hits': len(raw.get('results'))}
taxonomies = []
level = "info"
namespace = "CIRCL"
predicate = "PassiveDNS"
if ("results" in raw):
r = len(raw.get('results'))
if r == 0 or r == 1:
value = "\"{} record\"".format(r)
else:
value = "\"{} records\"".format(r)
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomies}
def run(self):
query = ''

View File

@ -3,7 +3,7 @@
"author": "Nils Kuhnert, CERT-Bund",
"license": "AGPL-V3",
"url": "https://github.com/BSI-CERT-Bund/cortex-analyzers",
"version": "1.0",
"version": "2.0",
"baseConfig": "CIRCLPassiveSSL",
"config": {},
"description": "Check CIRCL's Passive SSL for a given IP address or a X509 certificate hash",

View File

@ -66,17 +66,27 @@ class CIRCLPassiveSSLAnalyzer(Analyzer):
return {'query': cquery,
'cert': cfetch}
def summary(self, raw):
if raw.get('cert', None):
result = {'num_ips_used_cert': raw.get('query').get('hits')}
# Not available for all certificates
if raw.get('cert').get('icsi', None):
result['validated'] = raw.get('cert').get('icsi').get('validated')
result['lastseen'] = raw.get('cert').get('icsi').get('last_seen')
return result
def summary(self, raw):
taxonomies = []
level = "info"
namespace = "CIRCL"
predicate = "PassiveSSL"
if (self.data_type == 'hash') and ("query" in raw):
r = raw.get('query', 0).get('hits', 0)
if (self.data_type == 'ip') and ("certificates" in raw):
r = len(raw['certificates'])
if r == 0 or r == 1:
value = "\"{} record\"".format(r)
else:
return {'num_certs_by_ip': len(raw.get(self.getData()).get('certificates'))}
value = "\"{} records\"".format(r)
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomies}
def run(self):
if self.data_type == 'certificate_hash' or self.data_type == 'hash':

View File

@ -1,6 +1,6 @@
{
"name": "DNSDB_DomainName",
"version": "1.1",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "DNSDB_IPHistory",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "DNSDB_NameHistory",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -40,9 +40,24 @@ class DnsDbAnalyzer(Analyzer):
return row
def summary(self, raw):
return {
"records": len(raw["records"])
}
# taxonomy = {"level": "info", "namespace": "Farsight", "predicate": "DNSDB", "value": 0}
taxonomies = []
level = "info"
namespace = "Farsight"
predicate = "DNSDB"
if ("records" in raw):
r = len(raw["records"])
if r == 0 or r == 1:
value = "\"{} record\"".format(r)
else:
value = "\"{} records\"".format(r)
taxonomies.append(self.build_taxonomy(level, namespace,predicate,value))
return {'taxonomies': taxonomies}
def run(self):
try:

View File

@ -1,6 +1,6 @@
{
"name": "DomainTools_ReverseIP",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "DomainTools_ReverseNameServer",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "DomainTools_ReverseWhois",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "DomainTools_WhoisHistory",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "DomainTools_WhoisLookup",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "DomainTools_WhoisLookup_IP",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -21,35 +21,58 @@ class DomainToolsAnalyzer(Analyzer):
'config.service', None, 'Service parameter is missing')
def summary(self, raw):
result = {
r = {
"service": self.service,
"dataType": self.data_type
}
if("ip_addresses" in raw):
result["ip"] = {
r["ip"] = {
"address": raw["ip_addresses"]["ip_address"],
"domain_count": raw["ip_addresses"]["domain_count"]
}
if("domain_count" in raw):
result["domain_count"] = {
r["domain_count"] = {
"current": raw["domain_count"]["current"],
"historic": raw["domain_count"]["historic"]
}
if("registrant" in raw):
result["registrant"] = raw["registrant"]
r["registrant"] = raw["registrant"]
elif("response" in raw and "registrant" in raw["response"]):
result["registrant"] = raw["response"]["registrant"]
r["registrant"] = raw["response"]["registrant"]
if("parsed_whois" in raw):
result["registrar"] = raw["parsed_whois"]["registrar"]["name"]
r["registrar"] = raw["parsed_whois"]["registrar"]["name"]
#
if("name_server" in raw):
result["name_server"] = raw["name_server"]["hostname"]
result["domain_count"] = raw["name_server"]["total"]
r["name_server"] = raw["name_server"]["hostname"]
r["domain_count"] = raw["name_server"]["total"]
taxonomies = []
# Prepare predicate and value for each service
if r["service"] == "reverse-ip":
taxonomies.append(self.build_taxonomy("info", "DT", "Reverse_IP","\"{}, {} domains\"".format(r["ip"]["address"], r["ip"]["domain_count"])))
if r["service"] == "name-server-domains":
taxonomies.append(self.build_taxonomy("info", "DT", "Reverse_Name_Server","\"{}, {} domains\"".format(r["name_server"], r["domain_count"])))
if r["service"] == "reverse-whois":
taxonomies.append(self.build_taxonomy("info", "DT", "Reverse_Whois","\"curr:{} / hist:{} domains\"".format(r["domain_count"]["current"], r["domain_count"]["historic"])))
if r["service"] == "whois/history":
taxonomies.append(self.build_taxonomy("info", "DT", "Whois_History","\"{}, {} domains \"".format(r["name_server"], r["domain_count"])))
if (r["service"] == "whois/parsed") or (r['service'] == "whois"):
if r["registrar"]:
taxonomies.append(self.build_taxonomy("info", "DT", "Whois", "\"REGISTRAR:{}\"".format(r["registrar"])))
if r["registrant"]:
taxonomies.append(self.build_taxonomy("info", "DT", "Whois", "\"REGISTRANT:{}\"".format(r["registrant"])))
result = {'taxonomies': taxonomies}
return result
def run(self):

View File

@ -1,6 +1,6 @@
{
"name": "File_Info",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -138,18 +138,40 @@ class FileAnalyzer(Analyzer):
# SUMMARY
def summary(self, fullReport):
taxonomies = []
level = "info"
namespace = "FileInfo"
predicate = "Filetype"
if fullReport['Mimetype'] in ['application/x-dosexec']:
return self.PE_Summary(fullReport)
if fullReport['Mimetype'] in ['application/pdf']:
return self.PDF_Summary(fullReport)
if (fullReport['filetype'] in ['DOC','DOCM','DOCX',
pereport = self.PE_Summary(fullReport)
taxonomies.append(self.build_taxonomy(level, namespace, predicate, pereport['filetype']))
elif fullReport['Mimetype'] in ['application/pdf']:
pdfreport = self.PDF_Summary(fullReport)
value = "\"{}\"".format(pdfreport['filetype'])
if pdfreport['suspicious']:
level = 'suspicious'
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
elif (fullReport['filetype'] in ['DOC','DOCM','DOCX',
'XLS', 'XLSM', 'XLSX',
'PPT', "PPTM", 'PPTX']):
return self.MSOffice_Summary(fullReport)
msreport = self.MSOffice_Summary(fullReport)
value = "\"{}\"".format(msreport['filetype'])
if msreport['suspicious']:
level = 'suspicious'
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
else:
value = "\"{}\"".format(fullReport['filetype'])
level = 'info'
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
result = {'taxonomies': taxonomies}
return result
return {
'filetype': fullReport['filetype']
}
def SpecificInfo(self,report):
# run specific program for PE

View File

@ -3,7 +3,7 @@
"author": "Nils Kuhnert, CERT-Bund",
"license": "AGPL-V3",
"url": "https://github.com/BSI-CERT-Bund/cortex-analyzers",
"version": "1.0",
"version": "2.0",
"baseConfig": "FireHOLBlocklists",
"config": {
"check_tlp": false,

View File

@ -20,14 +20,14 @@ class FireholBlocklistsAnalyzer(Analyzer):
Analyzer.__init__(self)
# Get config parameters
self.path = self.getParam('config.blocklistpath', '/tmp/fireholblocklists')
self.path = self.getParam('config.blocklistpath', None, 'No path to blocklists provided.')
self.ignoreolderthandays = self.getParam('config.ignoreolderthandays', 365)
self.utc = pytz.UTC
self.now = dt.datetime.now(tz=self.utc)
# Check if directory exists
if not os.path.exists(self.path):
os.mkdir(self.path, mode=0o700)
os.mkdir(self.path, 0700)
# Downloading/updating the list is implemented with an external cronjob which git pulls the repo
# Read files in the given path and prepare file lists for ip- and netsets
@ -80,7 +80,8 @@ class FireholBlocklistsAnalyzer(Analyzer):
else:
if ip in l:
# On match append to hits and break; next file!
hits.append({'list': ipsetname, 'description': description.get(ipsetname), 'file_date': file_date.get(ipsetname)})
hits.append({'list': ipsetname, 'description': description.get(ipsetname),
'file_date': file_date.get(ipsetname)})
break
# Second: check the netsets
@ -88,14 +89,14 @@ class FireholBlocklistsAnalyzer(Analyzer):
with open('{}/{}'.format(self.path, netset)) as afile:
netsetname = netset.split('.')[0]
description.update({netsetname: ''})
file_date.update({ipsetname : ''})
file_date.update({netsetname: ''})
for l in afile:
if l[0] == '#':
# Check for date and break if too old
if '# Source File Date: ' in l:
datestr = re.sub('# Source File Date: ', '', l.rstrip('\n'))
date = parse(datestr)
file_date[ipsetname] = str(date)
file_date[netsetname] = str(date)
if (self.now - date).days > self.ignoreolderthandays:
break
description[netsetname] += re.sub(r'^\[.*\] \(.*\) [a-zA-Z0-9.\- ]*$', '', l.lstrip('# '))\
@ -103,7 +104,8 @@ class FireholBlocklistsAnalyzer(Analyzer):
else:
try:
if ipaddress.ip_address(ip) in ipaddress.ip_network(u'{}'.format(l.split('\n')[0])):
hits.append({'list': netsetname, 'description': description.get(netsetname), 'file_date': file_date.get(ipsetname)})
hits.append({'list': netsetname, 'description': description.get(netsetname),
'file_date': file_date.get(netsetname)})
break
except ValueError as e:
self.error('ValueError occured. Used values: ipnetwork {}, ip to check {}, file {}.'
@ -112,13 +114,26 @@ class FireholBlocklistsAnalyzer(Analyzer):
return hits
def summary(self, raw):
result = {
'count': raw.get('count'),
'hits': []
}
for hit in raw.get('hits'):
result['hits'].append(hit.get('list'))
return result
taxonomies = []
level = "info"
namespace = "Firehol"
predicate = "Blocklists"
value = "\"0 hit\""
if 'count' in raw:
r = raw.get('count', 0)
if r == 0 or r == 1:
value = "\"{} hit\"".format(r)
else:
value = "\"{} hits\"".format(r)
if r > 0:
level = "suspicious"
else:
level = "safe"
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomies}
def run(self):
ip = self.getData()

View File

@ -1,6 +1,6 @@
{
"name": "Fortiguard_URLCategory",
"version": "1.1",
"version": "2.0",
"author": "Eric Capuano",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -10,7 +10,25 @@ from cortexutils.analyzer import Analyzer
class URLCategoryAnalyzer(Analyzer):
def summary(self, raw):
return {'category': raw['category']}
taxonomies = []
if 'category' in raw:
r = raw.get('category')
value = "\"{}\"".format(r)
if r == "Malicious Websites":
level = "malicious"
elif r == "Suspicious Websites":
level = "suspicious"
elif r == "Not Rated":
level = "info"
else:
level = "safe"
taxonomies.append(self.build_taxonomy(level, "Fortiguard", "URLCat", value))
result = {"taxonomies": taxonomies}
return result
def run(self):
Analyzer.run(self)

View File

@ -3,9 +3,12 @@
"author": "Nils Kuhnert, CERT-Bund",
"license": "AGPL-V3",
"url": "https://github.com/BSI-CERT-Bund/cortex-analyzers",
"version": "1.0",
"version": "2.0",
"baseConfig": "GoogleSafebrowsing",
"config": {},
"config": {
"check_tlp": true,
"max_tlp": 1
},
"description": "Check URLs and domain names against Google Safebrowsing",
"dataTypeList": ["url", "domain"],
"command": "GoogleSafebrowsing/safebrowsing_analyzer.py"

View File

@ -20,10 +20,31 @@ class SafebrowsingAnalyzer(Analyzer):
)
def summary(self, raw):
result = {"matches":0}
# taxonomy = {"level":"info", "namespace": "Google", "predicate": "Safebrowsing", "value":0}
taxonomies = []
level = "info"
namespace = "Google"
predicate = "Safebrowsing"
value = "\"0 match\""
if ("results" in raw):
result["matches"] = len(raw['results'])
return result
r = len(raw['results'])
if r == 0 or r == 1:
value = "\"{} match\"".format(r)
else:
value = "\"{} matches\"".format(r)
if r > 0:
level = "malicious"
else:
level = "safe"
# level : info, safe, suspicious, malicious
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomies}
def run(self):
report = []

View File

@ -1,6 +1,6 @@
{
"name": "Hipposcore",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "HippoMore",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -15,29 +15,41 @@ class HippoAnalyzer(Analyzer):
self.url = self.getParam('config.url', None, 'Missing URL for Hippocampe API')
self.service = self.getParam('config.service', None, 'Service parameter is missing')
def moreSummary(self, raw):
def more_summary(self, raw):
data = self.getData()
result = {}
result[data] = 0
if(data in raw):
result[data] = len(raw[data])
if data in raw:
result[data] = len(raw.get(data))
return result
def scoreSummary(self, raw):
def score_summary(self, raw):
data = self.getData()
result = {}
if(data in raw):
result[data] = raw[data]["hipposcore"]
if data in raw:
result[data] = raw.get(data).get("hipposcore")
return result
def summary(self, raw):
if (self.service == 'hipposcore'):
return self.scoreSummary(raw)
elif (self.service == 'more'):
return self.moreSummary(raw)
taxonomies = []
level = "safe"
namespace = "Hippocampe"
predicate = "Score"
if self.service == 'hipposcore':
value = self.score_summary(raw)[self.getData()]
if value > 0:
level = "malicious"
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
elif self.service == 'more':
value = self.more_summary(raw)[self.getData()]
if value > 0:
level = "malicious"
taxonomies.append(self.build_taxonomy(level, namespace, predicate, "\"{} record(s)\"".format(value)))
return {"taxonomies": taxonomies}
def run(self):
data = self.getData()
@ -51,9 +63,8 @@ class HippoAnalyzer(Analyzer):
post_data = json_data.encode('utf-8')
headers = {'Content-Type': 'application/json'}
response = {}
try:
request = urllib2.Request(self.url + self.service, post_data, headers)
request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers)
response = urllib2.urlopen(request)
report = json.loads(response.read())

View File

@ -1,6 +1,6 @@
{
"name": "JoeSandbox_File_Analysis_Inet",
"version": "1.1",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "JoeSandbox_File_Analysis_Noinet",
"version": "1.1",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "JoeSandbox_Url_Analysis",
"version": "1.1",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -26,7 +26,32 @@ class JoeSandboxAnalyzer(Analyzer):
'dataType': self.data_type
}
result.update(raw['detection'])
taxonomies = []
level = "info"
namespace = "JSB"
predicate = "Report"
value = "\"Clean\""
r = raw['detection']
value = "\"{}/{}\"".format(r["score"], r["maxscore"])
if r["clean"]:
level = "safe"
elif r["suspicious"]:
level = "suspicious"
elif r["malicious"]:
level = "malicious"
else:
level = "info"
value = "Unknown"
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
result.update({"taxonomies":taxonomies})
return result

15
analyzers/MISP/MISP.json Normal file
View File

@ -0,0 +1,15 @@
{
"name": "MISP",
"author": "Nils Kuhnert, CERT-Bund",
"license": "AGPL-V3",
"url": "https://github.com/BSI-CERT-Bund/cortex-analyzers",
"version": "2.0",
"baseConfig": "MISP",
"config": {
"check_tlp": false,
"max_tlp": 3
},
"description": "Check if this IOC has been processed in different MISP instances.",
"dataTypeList": ["domain", "ip", "url", "fqdn", "uri_path","user-agent", "hash", "email", "mail", "mail_subject" , "registry", "regexp", "other", "filename"],
"command": "MISP/misp.py"
}

View File

@ -1,15 +0,0 @@
{
"name": "MISP_Search",
"version": "1.1",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",
"baseConfig": "MISP",
"config": {
"check_tlp": false,
"service": "search"
},
"description": "Search MISP events that have the observable provided as input",
"dataTypeList": ["domain", "filename", "fqdn", "hash", "ip", "mail", "mail_subject", "other", "regexp", "registry", "uri_path", "url", "user-agent"],
"command": "MISP/misp_analyzer.py"
}

62
analyzers/MISP/misp.py Executable file
View File

@ -0,0 +1,62 @@
#!/usr/bin/env python
from cortexutils.analyzer import Analyzer
from mispclient import MISPClient
class MISPAnalyzer(Analyzer):
"""Searches for given IOCs in configured misp instances. All standard data types are supported."""
def __init__(self):
Analyzer.__init__(self)
self.misp = MISPClient(url=self.getParam('config.url', None, 'No MISP url given.'),
key=self.getParam('config.key', None, 'No MISP api key given.'),
ssl=self.getParam('config.certpath', True),
name=self.getParam('config.name', None))
def summary(self, raw):
taxonomies = []
level = "info"
namespace = "MISP"
predicate = "Search"
value = "\"0\""
data = []
for r in raw['results']:
for res in r['result']:
if 'uuid' in res:
data.append(res['uuid'])
# return number of unique events
if data == []:
value = "\"0 event\""
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
else:
value = "\"{} event(s)\"".format(len(list(set(data))))
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomies}
def run(self):
if self.data_type == 'hash':
response = self.misp.search_hash(self.getData())
elif self.data_type == 'url':
response = self.misp.search_url(self.getData())
elif self.data_type == 'domain' or self.data_type == 'fqdn':
response = self.misp.search_domain(self.getData())
elif self.data_type == 'mail' or self.data_type == 'mail_subject':
response = self.misp.search_mail(self.getData())
elif self.data_type == 'ip':
response = self.misp.search_ip(self.getData())
elif self.data_type == 'registry':
response = self.misp.search_registry(self.getData())
elif self.data_type == 'filename':
response = self.misp.search_filename(self.getData())
else:
response = self.misp.searchall(self.getData())
self.report({'results': response})
if __name__ == '__main__':
MISPAnalyzer().run()

View File

@ -1,95 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
from cortexutils.analyzer import Analyzer
from pymisp import PyMISP
class MISPAnalyzer(Analyzer):
def __init__(self):
Analyzer.__init__(self)
self.service = self.getParam('config.service', None, 'MISP service is missing')
self.url = self.getParam('config.url', None, 'MISP url is missing')
if self.get_param('config.key'):
self.api_key = self.get_param('config.key')
else:
self.api_key = self.get_param('config.api_key', None, 'MISP key for API is missing')
def summary(self, raw):
result = {
'service': self.service,
'dataType': self.data_type
}
# search service
if self.service == 'search':
result['events'] = len(raw['events'])
else:
result['events'] = 0
return result
def run(self):
Analyzer.run(self)
data = self.getData()
try:
# search service
if self.service == 'search':
misp = PyMISP(self.url, self.api_key)
result = misp.search_all(data)
events = []
if 'response' in result:
# Trim the report to make it readable in a browser
# Remove null events
result['response'] = list(filter(lambda e: e != {'Event': None}, result['response']))
for e in result['response']:
if 'Event' in e and e['Event']:
event = e['Event']
# Remove attributes
if 'Attribute' in event:
del event['Attribute']
# Remove org
if 'Org' in event:
del event['Org']
# Remove related events
if 'RelatedEvent' in event:
del event['RelatedEvent']
# Remove shadow attributes
if 'ShadowAttribute' in event:
del event['ShadowAttribute']
# Remove sharing group
if 'SharingGroup' in event:
del event['SharingGroup']
# Remove sharing group
if 'Galaxy' in event:
del event['Galaxy']
# Replace tags by a string array
if 'Tag' in event:
tags = list((t['name'] for t in event['Tag']))
del event['Tag']
event['tags'] = tags
# Add url to the MISP event
if 'id' in event:
event['url'] = self.url + '/events/view/' + event['id']
if 'publish_timestamp' in event:
event['publish_timestamp'] = long(event['publish_timestamp']) * 1000
events.append(event)
self.report({"events": events})
else:
self.error('Unknown MISP service')
except Exception as e:
self.unexpectedError(e)
if __name__ == '__main__':
MISPAnalyzer().run()

261
analyzers/MISP/mispclient.py Executable file
View File

@ -0,0 +1,261 @@
#!/usr/bin/env python
import pymisp
import os
class EmptySearchtermError(Exception):
"""Exception raised, when no search terms are given."""
pass
class MISPClient:
"""The MISPClient class just hides the "complexity" of the queries. All params can be lists to query more than one
MISP instance.
:param url: URL of MISP instance
:type url: [str, list]
:param key: API key
:type key: [str, list]
:param ssl: Use/dont' use ssl or path to ssl cert if not possible to verify through trusted CAs
:type ssl: [bool, list, str]
:param name: Name of the MISP instance, is sent back in the report for matching the results.
:type name: [str, list]
"""
def __init__(self, url, key, ssl=True, name='Unnamed'):
self.misp_connections = []
if type(url) is list:
for idx, server in enumerate(url):
verify = True
if os.path.isfile(ssl[idx]):
verify = ssl[idx]
self.misp_connections.append(pymisp.PyMISP(url=server,
key=key[idx],
ssl=verify))
else:
verify = True
if os.path.isfile(ssl):
verify = ssl
self.misp_connections.append(pymisp.PyMISP(url=url,
key=key,
ssl=verify))
self.misp_name = name
@staticmethod
def __misphashtypes():
"""Just for better readability, all __misp*type methods return just a list of misp data types
:returns: MISP hash data types
:rtype: list
"""
hashtypes = ['md5', 'sha1', 'sha256', 'ssdeep', 'sha224', 'sha384', 'sha512', 'sha512/224', 'sha512/256',
'tlsh', 'authentihash']
filenames = []
for h in hashtypes:
filenames.append('filename|{0}'.format(h))
return hashtypes + filenames
@staticmethod
def __mispurltypes():
"""Just for better readability, all __misp*type methods return just a list of misp data types
:returns: misp url/domain data types
:rtype: list
"""
return ['domain', 'domain|ip', 'url', 'link', 'named pipe', 'uri']
@staticmethod
def __mispdomaintypes():
"""Just for better readability, all __misp*type methods return just a list of misp data types
:returns: data types containing domains
:rtype: list
"""
return ['domain', 'hostname', 'domain|ip', 'email-src', 'email-dst', 'url', 'link', 'named pipe',
'target-email', 'uri', 'whois-registrant-email', 'dns-soa-email', 'hostname|port', 'jabber-id']
@staticmethod
def __mispmailtypes():
"""Just for better readability, all __misp*type methods return just a list of misp data types
:returns: misp mail data types
:rtype: list
"""
return ['email-src', 'email-dst', 'target-email', 'email-subject', 'email-attachment', 'whois-registrant-email',
'dns-soa-email', 'email-header']
@staticmethod
def __mispiptypes():
"""Just for better readability, all __misp*type methods return just a list of misp data types
:returns: ip data types
:rtype: list
"""
return ['ip-src', 'ip-dst', 'domain|ip', 'ip-src|port', 'ip-dst|port']
@staticmethod
def __mispregistrytypes():
"""Just for better readability, all __misp*type methods return just a list of misp data types
:returns: misp regkey data types
:rtype: list
"""
return ['regkey', 'regkey|value']
@staticmethod
def __mispfilenametypes():
"""Just for better readability, all __misp*type methods return just a list of misp data types
:returns: data types containing filenames
:rtype: list
"""
return ['filename', 'filename|md5', 'filename|sha1', 'filename|sha256', 'filename|ssdeep', 'filename|sha224',
'filename|sha384', 'filename|sha512', 'filename|sha512/224', 'filename|sha512/256', 'filename|tlsh',
'filename|authentihash']
def __clean_relatedevent(self, related_events):
"""
Strip relatedevent sub content of event for lighter output.
:param related_events:
:return:
"""
response = []
for event in related_events:
ev = {}
ev['info'] = event['Event']['info']
ev['id'] = event['Event']['id']
response.append(ev)
return response
def __clean_event(self, misp_event):
"""
Strip event data for lighter output. Analyer report only contains useful data.
:param event: misp event
:return: misp event
"""
filters = ['Attribute',
'ShadowAttribute',
'Org',
'ShadowAttribute',
'SharingGroup',
'sharing_group_id',
'disable_correlation',
'locked',
'publish_timestamp',
'attribute_count',
'attribute_count',
'analysis',
'published',
'distribution',
'proposal_email_lock']
for filter in filters:
if filter in misp_event:
del misp_event[filter]
if 'RelatedEvent' in misp_event:
misp_event['RelatedEvent'] = self.__clean_relatedevent(misp_event['RelatedEvent'])
return misp_event
def __clean(self, misp_response):
"""
:param misp_response:
:return:
"""
response = []
for event in misp_response.get('response', []):
response.append(self.__clean_event(event['Event']))
return response
def __search(self, value, type_attribute):
"""Search method call wrapper.
:param value: value to search for.
:type value: str
:param type_attribute: attribute types to search for.
:type type_attribute: [list, none]
"""
results = []
if not value:
raise EmptySearchtermError
for idx, connection in enumerate(self.misp_connections):
misp_response = connection.search(type_attribute=type_attribute, values=value)
results.append({'url': connection.root_url,
'name': self.misp_name[idx],
'result': self.__clean(misp_response)})
return results
def search_url(self, searchterm):
"""Search for URLs
:type searchterm: str
:rtype: list
"""
return self.__search(type_attribute=self.__mispurltypes(), value=searchterm)
def search_hash(self, searchterm):
"""Search for hashes
:type searchterm: str
:rtype: list
"""
return self.__search(type_attribute=self.__misphashtypes(), value=searchterm)
def search_domain(self, searchterm):
"""Search for domains
:type searchterm: str
:rtype: list
"""
return self.__search(type_attribute=self.__mispdomaintypes(), value=searchterm)
def search_mail(self, searchterm):
"""Search for emails
:type searchterm: str
:rtype: list
"""
return self.__search(type_attribute=self.__mispmailtypes(), value=searchterm)
def search_ip(self, searchterm):
"""Search for ips
:type searchterm: str
:rtype: list
"""
return self.__search(type_attribute=self.__mispiptypes(), value=searchterm)
def search_registry(self, searchterm):
"""Search for registry keys and values
:type searchterm: str
:rtype: list
"""
return self.__search(type_attribute=self.__mispregistrytypes(), value=searchterm)
def search_filename(self, searchterm):
"""Search for filenames
:type searchterm: str
:rtype: list
"""
return self.__search(type_attribute=self.__mispfilenametypes(), value=searchterm)
def searchall(self, searchterm):
"""Search through all attribute types, this could be really slow.
:type searchterm: str
:rtype: list
"""
return self.__search(type_attribute=None, value=searchterm)

View File

@ -1,6 +1,6 @@
{
"name": "MaxMind_GeoIP",
"version": "2.0",
"version": "3.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -58,13 +58,18 @@ class MaxMindAnalyzer(Analyzer):
}
def summary(self, raw):
result = {}
taxonomy = {"level": "info", "namespace": "MaxMind", "predicate": "Location", "value": 0}
taxonomies = []
level = "info"
namespace = "MaxMind"
predicate = "Location"
value = "\"\""
if("continent" in raw):
result["country"] = raw["country"]["name"]
result["continent"] = raw["continent"]["name"]
value = "\"{}/{}\"".format(raw["country"]["name"], raw["continent"]["name"])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return result
return {"taxonomies":taxonomies}
def run(self):
Analyzer.run(self)

View File

@ -1,6 +1,6 @@
{
"name": "Msg_Parser",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -17,13 +17,17 @@ class MsgParserAnalyzer(Analyzer):
self.filepath = self.getParam('file', None, 'File is missing')
def summary(self, raw):
result = {
"attachments": 0
}
if("attachments" in raw):
result["attachments"] = len(raw["attachments"])
taxonomies = []
level = "info"
namespace = "MsgParser"
predicate = "Attachments"
value = "\"0\""
return result
if("attachments" in raw):
value = len(raw["attachments"])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomy}
def run(self):
if self.data_type == 'file':

View File

@ -1,6 +1,6 @@
{
"name": "Nessus",
"version": "1.0",
"version": "2.0",
"author": "Guillaume Rousse",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -37,7 +37,34 @@ class NessusAnalyzer(Analyzer):
summary["medium"] = count[2]
summary["high"] = count[3]
summary["critical"] = count[4]
return summary
taxonomies = []
level = "info"
namespace = "Nessus"
predicate = "Info"
value = "\"0\""
if summary["info"] > 0:
value = summary["info"]
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
if summary["low"] >0:
value = summary["low"]
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
if summary["medium"] >0:
value = summary["medium"]
level = "suspicious"
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
if summary["high"] > 0:
value = summary["high"]
level = "suspicious"
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
if summary["critical"] >0:
value = summary["critical"]
level = "malicious"
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomies}
def run(self):
Analyzer.run(self)
@ -51,7 +78,10 @@ class NessusAnalyzer(Analyzer):
if self.data_type == 'fqdn':
address = IPAddress(socket.gethostbyname(data))
else:
try:
address = IPAddress(data)
except Exception as e:
self.error("{}".format(e))
if not any(address in IPNetwork(network)
for network in self.allowed_networks):
self.error('Invalid target: not in any allowed network')

View File

@ -1,6 +1,6 @@
{
"name": "OTXQuery",
"version": "1.0",
"version": "2.0",
"author": "Eric Capuano",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -154,9 +154,14 @@ class OTXQueryAnalyzer(Analyzer):
self.error('API Error! Please verify data type is correct.')
def summary(self, raw):
return {
"pulse_count": raw["pulse_count"]
}
taxonomies = []
level = "info"
namespace = "OTX"
predicate = "Pulses"
value = "\"{}\"".format(raw["pulse_count"])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomies}
def run(self):
Analyzer.run(self)

View File

@ -1,6 +1,6 @@
{
"name": "PassiveTotal_Enrichment",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "PassiveTotal_Malware",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "PassiveTotal_Osint",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "PassiveTotal_Passive_Dns",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "PassiveTotal_Ssl_Certificate_Details",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "PassiveTotal_Ssl_Certificate_History",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "PassiveTotal_Unique_Resolutions",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "PassiveTotal_Whois_Details",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -17,58 +17,90 @@ class PassiveTotalAnalyzer(Analyzer):
self.api_key = self.getParam('config.key', None, 'PassiveTotal API key is missing')
def summary(self, raw):
result = {
'service': self.service,
'dataType': self.data_type
}
taxonomies = []
level = "info"
namespace = "PT"
predicate = "Service"
value = "\"False\""
result = {}
# malware service
if self.service == 'malware':
predicate = "Malware"
if 'results' in raw and raw['results']:
result['malware'] = True
level = "malicious"
else:
result['malware'] = False
level = "safe"
value = "\"{}\"".format(result['malware'])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
# osint service
elif self.service == 'osint':
predicate = "OSINT"
if 'results' in raw and raw['results']:
result['osint'] = True
else:
result['osint'] = False
value = "\"{}\"".format(result['osint'])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
# passive dns service
elif self.service == 'passive_dns':
if 'firstSeen' in raw and raw['firstSeen']:
result['firstSeen'] = raw['firstSeen']
if 'lastSeen' in raw and raw['lastSeen']:
result['lastSeen'] = raw['lastSeen']
predicate = "PassiveDNS"
if 'totalRecords' in raw and raw['totalRecords']:
result['total'] = raw['totalRecords']
else:
result['total'] = 0
if result['total'] < 2:
value = "\"{} record\"".format(result['total'])
else:
value = "\"{} records\"".format(result['total'])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
# ssl certificate details service
elif self.service == 'ssl_certificate_details':
predicate = "SSL"
if 'sha1' in raw:
result['ssl'] = True
else:
result['ssl'] = False
value = "\"{}\"".format(result['ssl'])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
# ssl certificate history service
elif self.service == 'ssl_certificate_history':
predicate = "SSLCertHistory"
if 'results' in raw and raw['results']:
result['ssl'] = True
result['total'] = len(raw['results'])
value = "\"{} record(s)\"".format(result['total'])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
# unique resolutions service
elif self.service == 'unique_resolutions':
predicate = "UniqueResolution"
if 'total' in raw:
result['total'] = raw['total']
value = "\"{} record(s)\"".format(result['total'])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
# whois details service
elif self.service == 'whois_details':
predicate = "Whois"
if 'registrant' in raw and 'organization' in raw['registrant'] and raw['registrant']['organization']:
result['registrant'] = raw['registrant']['organization']
elif 'registrant' in raw and 'name' in raw['registrant'] and raw['registrant']['name']:
result['registrant'] = raw['registrant']['name']
value = "\"REGISTRANT: {}\"".format(result['registrant'])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
if 'registrar' in raw and raw['registrar']:
result['registrar'] = raw['registrar']
value = "\"REGISTRAR: {}\"".format(result['registrar'])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return result
return {"taxonomies":taxonomies}
def run(self):
Analyzer.run(self)

View File

@ -1,6 +1,6 @@
{
"name": "PhishTank_CheckURL",
"version": "1.0",
"version": "2.0",
"author": "Eric Capuano",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -19,17 +19,30 @@ class phishtankAnalyzer(Analyzer):
'Missing PhishTank API key')
def phishtank_checkurl(self, data):
#debug('>> phishtank_checkurl ' + str(data))
url = 'http://checkurl.phishtank.com/checkurl/'
postdata = {'url': data, 'format':'json','app_key': self.phishtank_key}
r = requests.post(url, data=postdata)
return json.loads(r.content)
def summary(self, raw):
if ('in_database' in raw) :
return {'in_database':raw['in_database'],
'verified':raw['verified'],
'verified_at':raw['verified_at']}
taxonomies = []
value = "\"False\""
level = ""
if 'in_database' in raw and raw['in_database'] == "True":
value = "\"{}\"".format(raw['in_database'])
if raw.get('verified'):
level = "malicious"
else:
level = "suspicious"
else:
level = "safe"
value = "\"False\""
taxonomies.append(self.build_taxonomy(level, "PhishTank", "In_Database", value))
result = {"taxonomies":taxonomies}
return result
def run(self):
if self.service == 'query':

View File

@ -1,6 +1,6 @@
{
"name": "PhishingInitiative_Lookup",
"version": "1.0",
"version": "2.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -12,9 +12,18 @@ class phishinginitiativeAnalyzer(Analyzer):
'Missing PhishingInitiative API key')
def summary(self,raw):
return {
"status": raw["tag_label"]
}
taxonomies = []
level = "safe"
namespace = "PhishingInitiative"
predicate = "Status"
value = "\"Clean\""
if raw["tag_label"] == "phishing":
level = "malicious"
value = "\"{}\"".format(raw["tag_label"])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomies}
def run(self):
Analyzer.run(self)

View File

@ -3,7 +3,7 @@
"license": "AGPL-V3",
"author": "Nils Kuhnert, CERT-Bund",
"url": "https://github.com/BSI-CERT-Bund/cortex-analyzers",
"version": "1.0",
"version": "2.0",
"baseConfig": "VMRay",
"config": {
"cert": false,

View File

@ -42,19 +42,48 @@ class VMRayAnalyzer(Analyzer):
self.error('Data type currently not supported')
def summary(self, raw):
result = {
taxonomies = []
level = "info"
namespace = "VMRay"
predicate = "Scan"
value = "\"0\""
r = {
'reports': []
}
if raw.get('scanreport', None) and len(raw.get('scanreport').get('data')) > 0:
for scan in raw.get('scanreport').get('data'):
result['reports'].append({
r['reports'].append({
'score': scan.get('sample_score'),
'sample_severity': scan.get('sample_severity'),
'sample_last_reputation_severity': scan.get('sample_last_reputation_severity'),
'url': scan.get('sample_webif_url')
})
return result
if len(r["reports"]) == 0:
value = "\"No Scan\""
level = "info"
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
else:
for s in r["reports"]:
i = 1
if s["sample_severity"] == "not_suspicious":
level = "safe"
elif s["sample_severity"] == "malicious":
level = "malicious"
else:
level = "info"
if r["reports"] > 1:
value = "\"{}( from scan {})\"".format(s["score"], i)
else:
value = "{}".format(s["score"])
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
i += 1
return {"taxonomies": taxonomies}
if __name__ == '__main__':
VMRayAnalyzer().run()

View File

@ -1,6 +1,6 @@
{
"name": "VirusTotal_GetReport",
"version": "2.0",
"version": "3.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -1,6 +1,6 @@
{
"name": "VirusTotal_Scan",
"version": "2.0",
"version": "3.0",
"author": "CERT-BDF",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"license": "AGPL-V3",

View File

@ -64,6 +64,12 @@ class VirusTotalAnalyzer(Analyzer):
self.error('Scan not found')
def summary(self, raw):
taxonomies = []
level = "info"
namespace = "VT"
predicate = "Score"
value = "\"0\""
result = {
"has_result": True
}
@ -80,18 +86,41 @@ class VirusTotalAnalyzer(Analyzer):
if self.service == "get":
if("scans" in raw):
result["scans"] = len(raw["scans"])
value = "\"{}/{}\"".format(result["positives"], result["total"])
if result["positives"] == 0:
level = "safe"
elif result["positives"] < 5:
level = "suspicious"
else:
level = "malicious"
if("resolutions" in raw):
result["resolutions"] = len(raw["resolutions"])
value = "\"{} resolution(s)\"".format(result["resolutions"])
if result["resolutions"] == 0:
level = "safe"
elif result["resolutions"] < 5:
level = "suspicious"
else:
level = "malicious"
if("detected_urls" in raw):
result["detected_urls"] = len(raw["detected_urls"])
value = "\"{} detected_url(s)\"".format(result["detected_urls"])
if result["detected_urls"] == 0:
level = "safe"
elif result["detected_urls"] < 5:
level = "suspicious"
else:
level = "malicious"
if("detected_downloaded_samples" in raw):
result["detected_downloaded_samples"] = len(
raw["detected_downloaded_samples"])
return result
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomies}
def run(self):
Analyzer.run(self)

View File

@ -3,7 +3,7 @@
"author": "Nils Kuhnert, CERT-Bund",
"license": "AGPL-V3",
"url": "https://github.com/BSI-CERT-Bund/cortex-analyzers",
"version": "1.0",
"version": "2.0",
"baseConfig": "Virusshare",
"config": {},
"description": "Search for MD5 hashes in Virusshare.com hash list",

View File

@ -21,7 +21,24 @@ class VirusshareAnalyzer(Analyzer):
self.filelist = os.listdir(self.path)
def summary(self, raw):
return {'isonvs': raw["isonvs"]}
taxonomies = []
level = "safe"
namespace = "Virusshare"
predicate = "Search"
value = "\"Unknown\""
if raw["isonvs"]:
if raw["isonvs"] == "Unknown":
value = "\"Not MD5\""
level = "suspicious"
else:
value = "\"Found\""
level = "malicious"
else:
value = "\"Not Found\""
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {'taxonomies': taxonomies}
def run(self):
searchhash = ''

View File

@ -3,7 +3,7 @@
"author": "Nils Kuhnert, CERT-Bund",
"license": "AGPL-V3",
"url": "https://github.com/BSI-CERT-Bund/cortex-analyzers",
"version": "1.0",
"version": "2.0",
"baseConfig": "Yara",
"config": {},
"description": "Check files against YARA rules",

View File

@ -1,5 +1,8 @@
#!/usr/bin/env python
# encoding: utf-8
from cortexutils.analyzer import Analyzer
import os
import yara
@ -45,7 +48,19 @@ class YaraAnalyzer(Analyzer):
return result
def summary(self, raw):
return {"matches":len(raw["results"])}
taxonomies = []
level = "info"
namespace = "Yara"
predicate = "Match"
value = "\"{} rule(s)\"".format(len(raw["results"]))
if len(raw["results"]) == 0:
level = "safe"
else:
level = "malicious"
taxonomies.append(self.build_taxonomy(level, namespace, predicate, value))
return {"taxonomies": taxonomies}
def run(self):
if self.data_type == 'file':

View File

@ -74,6 +74,7 @@ class Analyzer:
os.environ['https_proxy'] = self.https_proxy
def __set_encoding(self):
try:
if sys.stdout.encoding != 'UTF-8':
if sys.version_info[0] == 3:
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer, 'strict')
@ -84,6 +85,8 @@ class Analyzer:
sys.stderr = codecs.getwriter('utf-8')(sys.stderr.buffer, 'strict')
else:
sys.stderr = codecs.getwriter('utf-8')(sys.stderr, 'strict')
except:
pass
def __get_param(self, source, name, default=None, message=None):
"""Extract a specific parameter from given source.
@ -119,7 +122,6 @@ class Analyzer:
:return: Data (observable value) given through Cortex"""
return self.get_param('data', None, 'Missing data field')
def get_param(self, name, default=None, message=None):
"""Just a wrapper for Analyzer.__get_param.
:param name: Name of the parameter to get. JSON-like syntax, e.g. `config.username`
@ -128,6 +130,21 @@ class Analyzer:
return self.__get_param(self.__input, name, default, message)
def build_taxonomy(self, level, namespace, predicate, value):
"""
:param level: info, safe, suspicious or malicious
:param namespace: Name of analyzer
:param predicate: Name of service
:param value: value
:return: dict
"""
return {
'level': level,
'namespace': namespace,
'predicate': predicate,
'value': value
}
def summary(self, raw):
"""Returns a summary, needed for 'short.html' template. Overwrite it for your needs!

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name='cortexutils',
version='1.1.1',
version='1.2.0',
description='A Python library for including utility classes for Cortex analyzers',
long_description=open('README').read(),
author='TheHive-Project',
@ -21,6 +21,7 @@ setup(
'Topic :: Security',
'Topic :: Software Development :: Libraries :: Python Modules'],
py_modules=[
'future',
'cortexutils.analyzer',
'cortexutils.extractor'
],

View File

@ -0,0 +1,3 @@
<span class="label" ng-repeat="t in content.taxonomies" ng-class="{'info': 'label-info', 'safe': 'label-success', 'suspicious': 'label-warning', 'malicious':'label-danger'}[t.level]">
{{t.namespace}}:{{t.predicate}}={{t.value}}
</span>

View File

@ -0,0 +1,52 @@
<div class="panel panel-info" ng-if="success">
<div class="panel-heading">
CERT.at Passive DNS Report
</div>
<div class="panel-body">
<p ng-if="content.results.length == 0">
No result found.
</p>
<table class="table" ng-if="content.results">
<thead>
<th>Count</th>
<th>rrtype</th>
<th>data</th>
<th>First time</th>
<th>Last time</th>
</thead>
<tbody ng-repeat="r in content.results | orderBy:'-seenlast'">
<tr>
<td>{{r.countrequested}}</td>
<td>{{r.rrtype}}</td>
<td ng-if="r.rrtype !== 'SOA' ">
{{r.rraddress || r.rrdname}}
</td>
<td ng-if="r.rrtype == 'SOA'">
<p>primary: {{r.rrprimary}}</p>
<p>hostmaster: {{r.rrhostmaster}}</p>
<p>serial: {{r.rrserial}}</p>
<p>refresh: {{r.rrrefresh}}</p>
<p>retry: {{r.rrretry}}</p>
<p>expire: {{r.rrexpire}}</p>
<p>minimum: {{r.rrminimum}}</p>
</td>
<td>{{r.seenfirst}}</td>
<td>{{r.seenlast}}</td>
</tr>
</tbody>
</table>
</div>
</div>
<div class="panel panel-danger" ng-if="!success">
<div class="panel-heading">
CERT.at Passive DNS Report <b>Error</b>
</div>
<div class="panel-body">
<dl class="dl-horizontal">
<dt>Error: </dt>
<dd>{{content.errorMessage}}</dd>
</dl>
</div>
</div>

View File

@ -0,0 +1,3 @@
<span class="label" ng-repeat="t in content.taxonomies" ng-class="{'info': 'label-info', 'safe': 'label-success', 'suspicious': 'label-warning', 'malicious':'label-danger'}[t.level]">
{{t.namespace}}:{{t.predicate}}={{t.value}}
</span>

View File

@ -1 +0,0 @@
<span ng-if="content" class="label label-info">CIRCL:PassiveDNS= {{content.hits}} hits</span>&nbsp;

View File

@ -0,0 +1,3 @@
<span class="label" ng-repeat="t in content.taxonomies" ng-class="{'info': 'label-info', 'safe': 'label-success', 'suspicious': 'label-warning', 'malicious':'label-danger'}[t.level]">
{{t.namespace}}:{{t.predicate}}={{t.value}}
</span>

View File

@ -1,4 +0,0 @@
<span ng-if="success">
<span class="label label-info" ng-if="content.num_ips_used_cert >= 0">CIRCL:PassiveSSL= {{content.num_ips_used_cert}} IPs used that certificate</span>&nbsp;
<span class="label label-info" ng-if="content.num_certs_by_ip >= 0">CIRCL:PassiveSSL= {{content.num_certs_by_ip}} different certificates on IP<span>&nbsp;
</span>&nbsp;

View File

@ -0,0 +1,3 @@
<span class="label" ng-repeat="t in content.taxonomies" ng-class="{'info': 'label-info', 'safe': 'label-success', 'suspicious': 'label-warning', 'malicious':'label-danger'}[t.level]">
{{t.namespace}}:{{t.predicate}}={{t.value}}
</span>

View File

@ -1 +0,0 @@
<span ng-if="content.records" class="label label-info">DNSDB:Domain Name= {{content.records}} records</span>

View File

@ -0,0 +1,3 @@
<span class="label" ng-repeat="t in content.taxonomies" ng-class="{'info': 'label-info', 'safe': 'label-success', 'suspicious': 'label-warning', 'malicious':'label-danger'}[t.level]">
{{t.namespace}}:{{t.predicate}}={{t.value}}
</span>&nbsp;

View File

@ -1 +0,0 @@
<span ng-if="content.records" class="label label-info">DNSDB:IP History= {{content.records}} records</span>

View File

@ -0,0 +1,3 @@
<span class="label" ng-repeat="t in content.taxonomies" ng-class="{'info': 'label-info', 'safe': 'label-success', 'suspicious': 'label-warning', 'malicious':'label-danger'}[t.level]">
{{t.namespace}}:{{t.predicate}}={{t.value}}
</span>

View File

@ -1 +0,0 @@
<span ng-if="content.records" class="label label-info">DNSDB:Name History= {{content.records}} records</span>

View File

@ -0,0 +1,3 @@
<span class="label" ng-repeat="t in content.taxonomies" ng-class="{'info': 'label-info', 'safe': 'label-success', 'suspicious': 'label-warning', 'malicious':'label-danger'}[t.level]">
{{t.namespace}}:{{t.predicate}}={{t.value}}
</span>

View File

@ -1 +0,0 @@
<span ng-if="content.ip.address || content.ip.domain_count" class="label label-info">DT:ReverseIP={{content.ip.address}}: {{content.ip.domain_count}} domains found</span>

View File

@ -0,0 +1,3 @@
<span class="label" ng-repeat="t in content.taxonomies" ng-class="{'info': 'label-info', 'safe': 'label-success', 'suspicious': 'label-warning', 'malicious':'label-danger'}[t.level]">
{{t.namespace}}:{{t.predicate}}={{t.value}}
</span>

View File

@ -1 +0,0 @@
<span ng-if="content.name_server" class="label label-info">DT:ReverseNameServer= {{content.name_server}}, {{content.domain_count}} domains</span>

View File

@ -0,0 +1,3 @@
<span class="label" ng-repeat="t in content.taxonomies" ng-class="{'info': 'label-info', 'safe': 'label-success', 'suspicious': 'label-warning', 'malicious':'label-danger'}[t.level]">
{{t.namespace}}:{{t.predicate}}={{t.value}}
</span>

View File

@ -1,3 +0,0 @@
<span ng-if="content.domain_count.current || content.domain_count.historic" class="label label-info">
DT:ReverseWhois= curr:{{content.domain_count.current}}/hist:{{content.domain_count.historic}} domains found
</span>

Some files were not shown because too many files have changed in this diff Show More