Merge branch 'develop' of https://github.com/TheHive-Project/Cortex-Analyzers into feature/update_docs

This commit is contained in:
dadokkio 2021-02-03 10:15:58 +01:00
commit a711fa58cf
18 changed files with 1048 additions and 230 deletions

View File

@ -5,7 +5,7 @@
"url": "https://github.com/BSI-CERT-Bund/censys-analyzer",
"version": "1.0",
"description": "Check IPs, certificate hashes or domains against censys.io.",
"dataTypeList": ["ip", "hash", "domain"],
"dataTypeList": ["ip", "hash", "domain", "other"],
"baseConfig": "Censys",
"command": "Censys/censys_analyzer.py",
"configurationItems": [

View File

@ -24,6 +24,18 @@ class CensysAnalyzer(Analyzer):
None,
"No API-Key for Censys given. Please add it to the cortex configuration.",
)
self.__fields = self.get_param(
'parameters.fields',
["updated_at", "ip"]
)
self.__max_records = self.get_param(
'parameters.max_records',
1000
)
self.__flatten = self.get_param(
'parameters.flatten',
True
)
def search_hosts(self, ip):
"""
@ -57,14 +69,34 @@ class CensysAnalyzer(Analyzer):
c = CensysWebsites(api_id=self.__uid, api_secret=self.__api_key)
return c.view(dom)
def search_ipv4(self, search):
"""
Searches for hosts in IPv4 base
:param search:search as string
:type search: str
:return: dict
"""
c = CensysIPv4(api_id=self.__uid, api_secret=self.__api_key)
return [x for x in c.search(search, fields=self.__fields, max_records=self.__max_records, flatten=self.__flatten)]
def run(self):
try:
if self.data_type == "ip":
self.report({"ip": self.search_hosts(self.get_data())})
elif self.data_type == "hash":
self.report({"cert": self.search_certificate(self.get_data())})
elif self.data_type == "domain" or self.data_type == "fqdn":
self.report({"website": self.search_website(self.get_data())})
if self.data_type == 'ip':
self.report({
'ip': self.search_hosts(self.get_data())
})
elif self.data_type == 'hash':
self.report({
'cert': self.search_certificate(self.get_data())
})
elif self.data_type == 'domain' or self.data_type == 'fqdn':
self.report({
'website': self.search_website(self.get_data())
})
elif self.data_type == 'other':
self.report({
'matches': self.search_ipv4(self.get_data())
})
else:
self.error(
"Data type not supported. Please use this analyzer with data types hash, ip or domain."
@ -80,36 +112,23 @@ class CensysAnalyzer(Analyzer):
def summary(self, raw):
taxonomies = []
if "ip" in raw:
raw = raw["ip"]
service_count = len(raw.get("protocols", []))
heartbleed = (
raw.get("443", {})
.get("https", {})
.get("heartbleed", {})
.get("heartbleed_vulnerable", False)
)
taxonomies.append(
self.build_taxonomy("info", "Censys", "OpenServices", service_count)
)
if 'ip' in raw:
raw = raw['ip']
service_count = len(raw.get('protocols', []))
heartbleed = raw.get('443', {}).get('https', {}).get('heartbleed', {}).get('heartbleed_vulnerable', False)
taxonomies.append(self.build_taxonomy('info', 'Censys', 'OpenServices', service_count))
if heartbleed:
taxonomies.append(
self.build_taxonomy(
"malicious", "Censys", "Heartbleed", "vulnerable"
)
)
elif "website" in raw:
raw = raw["website"]
service_count = len(raw.get("tags", []))
taxonomies.append(self.build_taxonomy('malicious', 'Censys', 'Heartbleed', 'vulnerable'))
taxonomies.append(
self.build_taxonomy("info", "Censys", "OpenServices", service_count)
)
elif "cert" in raw:
raw = raw["cert"]
trusted_count = len(raw.get("validation", []))
validator_count = len(raw.get("validation", []))
elif 'website' in raw:
raw = raw['website']
service_count = len(raw.get('tags', []))
taxonomies.append(self.build_taxonomy('info', 'Censys', 'OpenServices', service_count))
elif 'cert' in raw:
raw = raw['cert']
trusted_count = len(raw.get('validation', []))
validator_count = len(raw.get('validation', []))
for _, validator in raw.get("validation", []).items():
if (
@ -131,15 +150,17 @@ class CensysAnalyzer(Analyzer):
)
)
else:
taxonomies.append(
self.build_taxonomy(
"info",
"Censys",
"TrustedCount",
"{}/{}".format(trusted_count, validator_count),
)
)
return {"taxonomies": taxonomies}
taxonomies.append(self.build_taxonomy('info', 'Censys', 'TrustedCount', '{}/{}'.format(
trusted_count, validator_count
)))
elif 'matches' in raw:
result_count = len(raw.get('matches', []))
taxonomies.append(self.build_taxonomy('info', 'Censys ipv4 search', 'results', result_count))
return {
'taxonomies': taxonomies
}
if __name__ == "__main__":

View File

@ -1,13 +1,13 @@
{
"name": "GreyNoise",
"version": "2.3",
"version": "3.0",
"author": "Nclose",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "APLv2",
"description": "Determine whether an IP has known scanning activity using GreyNoise.",
"dataTypeList": ["ip"],
"baseConfig": "GreyNoise",
"command": "GreyNoise/greynoise.py",
"command": "GreyNoise/greynoisev3.py",
"configurationItems": [
{
"name": "key",

View File

@ -1,142 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import requests
from collections import defaultdict, OrderedDict
from cortexutils.analyzer import Analyzer
class GreyNoiseAnalyzer(Analyzer):
"""
GreyNoise API docs: https://github.com/GreyNoise-Intelligence/api.greynoise.io
"""
@staticmethod
def _get_level(current_level, new_classification):
"""
Map GreyNoise classifications to Cortex maliciousness levels.
Accept a Cortex level and a GreyNoise classification, the return the more malicious of the two.
:param current_level: A Cortex maliciousness level
https://github.com/TheHive-Project/CortexDocs/blob/master/api/how-to-create-an-analyzer.md#output
:param new_classification: A classification field value from a GreyNoise record
https://github.com/GreyNoise-Intelligence/api.greynoise.io#v1queryip
:return: The more malicious of the 2 submitted values as a Cortex maliciousness level
"""
classification_level_map = OrderedDict([
('info', 'info'),
('benign', 'safe'),
('suspicious', 'suspicious'),
('malicious', 'malicious')
])
levels = classification_level_map.values()
new_level = classification_level_map.get(new_classification, 'info')
new_index = levels.index(new_level)
try:
current_index = levels.index(current_level)
except ValueError: # There is no existing level
current_index = -1
return new_level if new_index > current_index else current_level
def run(self):
if self.data_type == "ip":
api_key = self.get_param('config.key', None)
url = 'https://api.greynoise.io/v2/experimental/gnql?query=ip:%s' % self.get_data()
if api_key:
headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Key': '%s' % api_key }
else:
headers = {'Content-Type': 'application/x-www-form-urlencoded' }
response = requests.get(url, headers=headers)
if not (200 <= response.status_code < 300):
self.error('Unable to query GreyNoise API\n{}'.format(response.text))
self.report(response.json())
else:
self.notSupported()
def summary(self, raw):
"""
Return one taxonomy summarizing the reported tags
If there is only one tag, use it as the predicate
If there are multiple tags, use "entries" as the predicate
Use the total count as the value
Use the most malicious level found
Examples:
Input
{
"actor": SCANNER1,
"classification": ""
}
Output
GreyNoise:SCANNER1 = 1 (info)
Input
{
"actor": SCANNER1,
"classification": "malicious"
},
{
"classification": "benign"
}
Output
GreyNoise:SCANNER1 = 2 (malicious)
Input
{
"actor": SCANNER1,
"classification": ""
},
{
"actor": SCANNER1,
"classification": "safe"
},
{
"actor": SCANNER2,
"classification": ""
}
Output
GreyNoise:entries = 3 (safe)
"""
try:
taxonomies = []
if raw.get('data'):
final_level = None
taxonomy_data = defaultdict(int)
for record in raw.get('data', []):
actor = record.get('actor', 'unknown')
classification = record.get('classification', 'unknown')
taxonomy_data[actor] += 1
final_level = self._get_level(final_level, classification)
if len(taxonomy_data) > 1: # Multiple tags have been found
taxonomies.append(self.build_taxonomy(final_level, 'GreyNoise', 'entries', len(taxonomy_data)))
else: # There is only one tag found, possibly multiple times
for actor, count in taxonomy_data.iteritems():
taxonomies.append(self.build_taxonomy(final_level, 'GreyNoise', actor, count))
else:
taxonomies.append(self.build_taxonomy('info', 'GreyNoise', 'Records', 'None'))
return {"taxonomies": taxonomies}
except Exception as e:
self.error('Summary failed\n{}'.format(e.message))
if __name__ == '__main__':
GreyNoiseAnalyzer().run()

View File

@ -0,0 +1,142 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from collections import defaultdict, OrderedDict
from cortexutils.analyzer import Analyzer
from greynoise import GreyNoise
class GreyNoiseAnalyzer(Analyzer):
"""
GreyNoise API docs: https://developer.greynoise.io/reference#noisecontextip-1
"""
def run(self):
if self.data_type == "ip":
api_key = self.get_param("config.key", None)
api_client = GreyNoise(
api_key=api_key,
timeout=30,
integration_name="greynoise-cortex-analyzer-v3.0",
)
try:
self.report(api_client.ip(self.get_data()))
except Exception as e:
self.error("Unable to query GreyNoise API\n{}".format(e))
else:
self.notSupported()
def summary(self, raw):
"""
Return two taxonomies
Examples:
Input
{
"seen": True,
"actor": "SCANNER1",
"classification": "benign",
"tags": ['a', 'b', 'c']
}
Output
GreyNoise:tags = 3 (Safe)
GreyNoise:actor = SCANNER1 (Safe)
Input
{
"seen": True,
"actor": "SCANNER1",
"classification": "unknown",
"tags": ['a', 'b', 'c']
}
Output
GreyNoise:tags = 3 (Suspicious)
GreyNoise:classification = unknown (Info)
Input
{
"seen": True,
"actor": "SCANNER1",
"classification": "unknown",
"tags": ['a', 'b']
}
Output
GreyNoise:tags = 2 (Info)
GreyNoise:classification = unknown (Info)
Input
{
"seen": True,
"actor": "SCANNER1",
"classification": "malicious",
"tags": ['a', 'b', 'c']
}
Output
GreyNoise:tags = 3 (Malicious)
GreyNoise:classification = malicious (Malicious)
Input
{
"seen": "False"
}
Output
GreyNoise:Seen last 60 days = False (Info)
"""
classification_level_map = {
"benign": lambda x: "safe",
"unknown": lambda tag_count: "info"
if (not tag_count) or (tag_count <= 2)
else "suspicious",
"malicious": lambda x: "malicious",
}
try:
taxonomies = []
seen = raw.get("seen", False)
if seen:
tag_count = len(raw.get("tags", []))
classification = raw.get("classification", "unknown")
actor = raw.get("actor")
t1_level = classification_level_map.get(classification)(tag_count)
t1_namespace = "GreyNoise"
t1_predicate = "tags"
t1_value = tag_count
# print('{}:{} = {} ({})'.format(t1_namespace, t1_predicate, t1_value, t1_level))
taxonomies.append(
self.build_taxonomy(t1_level, t1_namespace, t1_predicate, t1_value)
)
t2_level = classification_level_map.get(classification)(None)
t2_namespace = "GreyNoise"
t2_predicate = (
"actor" if classification == "benign" else "classification"
)
t2_value = actor if classification == "benign" else classification
# print('{}:{} = {} ({})'.format(t2_namespace, t2_predicate, t2_value, t2_level))
taxonomies.append(
self.build_taxonomy(t2_level, t2_namespace, t2_predicate, t2_value)
)
else:
taxonomies.append(
self.build_taxonomy(
classification_level_map.get("unknown")(None),
"GreyNoise",
"Seen last 60 days",
False,
)
)
return {"taxonomies": taxonomies}
except Exception as e:
self.error("Summary failed\n{}".format(e.message))
if __name__ == "__main__":
GreyNoiseAnalyzer().run()

View File

@ -1,2 +1,2 @@
cortexutils
requests
greynoise

View File

@ -19,13 +19,22 @@ class TorProjectClient:
Ignored if `cache_duration` is 0.
:param cache_root: Path where to store the cached file
downloaded from torproject.org
:param proxies: Proxies to be using during requests session
:type ttl: int
:type cache_duration: int
:type cache_root: str
"""
def __init__(self, ttl=86400, cache_duration=3600,
cache_root='/tmp/cortex/tor_project'):
def __init__(
self,
ttl=86400,
cache_duration=3600,
cache_root="/tmp/cortex/tor_project",
proxies=None,
):
self.session = requests.Session()
if proxies:
self.session.proxies.update(proxies)
self.delta = None
self.cache = None
if ttl > 0:
@ -33,21 +42,22 @@ class TorProjectClient:
if cache_duration > 0:
self.cache = Cache(cache_root)
self.cache_duration = cache_duration
self.url = 'https://check.torproject.org/exit-addresses'
self.url = "https://check.torproject.org/exit-addresses"
__cache_key = __name__ + ':raw_data'
__cache_key = __name__ + ":raw_data"
def _get_raw_data(self):
try:
return self.cache['raw_data']
except(AttributeError, TypeError):
return self.cache["raw_data"]
except (AttributeError, TypeError):
return self.session.get(self.url).text
except KeyError:
self.cache.set(
'raw_data',
"raw_data",
self.session.get(self.url).text,
expire=self.cache_duration)
return self.cache['raw_data']
expire=self.cache_duration,
)
return self.cache["raw_data"]
def search_tor_node(self, ip):
"""Lookup an IP address to check if it is a known tor exit node.
@ -65,14 +75,13 @@ class TorProjectClient:
tmp = {}
present = datetime.utcnow().replace(tzinfo=pytz.utc)
for line in self._get_raw_data().splitlines():
params = line.split(' ')
if params[0] == 'ExitNode':
tmp['node'] = params[1]
elif params[0] == 'ExitAddress':
tmp['last_status'] = params[2] + 'T' + params[3] + '+0000'
last_status = parse(tmp['last_status'])
if (self.delta is None or
(present - last_status) < self.delta):
params = line.split(" ")
if params[0] == "ExitNode":
tmp["node"] = params[1]
elif params[0] == "ExitAddress":
tmp["last_status"] = params[2] + "T" + params[3] + "+0000"
last_status = parse(tmp["last_status"])
if self.delta is None or (present - last_status) < self.delta:
data[params[1]] = tmp
tmp = {}
else:

View File

@ -5,37 +5,39 @@ import tor_project
class TorProjectAnalyzer(Analyzer):
"""Cortex analyzer to query TorProject for exit nodes IP addresses"""
def __init__(self):
Analyzer.__init__(self)
self.ttl = self.get_param('config.ttl', 86400)
self.cache_duration = self.get_param('config.cache.duration', 3600)
self.cache_root = self.get_param(
'config.cache.root', '/tmp/cortex/tor_project'
)
self.ttl = self.get_param("config.ttl", 86400)
self.cache_duration = self.get_param("config.cache.duration", 3600)
self.cache_root = self.get_param("config.cache.root", "/tmp/cortex/tor_project")
self.proxies = {
"https": self.get_param("config.proxy_https"),
"http": self.get_param("config.proxy_http"),
}
self.client = tor_project.TorProjectClient(
ttl=self.ttl,
cache_duration=self.cache_duration,
cache_root=self.cache_root
cache_root=self.cache_root,
proxies=self.proxies,
)
def summary(self, raw):
taxonomies = []
level = 'info'
level = "info"
value = False
if ("node" in raw):
level = 'suspicious'
if "node" in raw:
level = "suspicious"
value = True
taxonomies.append(
self.build_taxonomy(level, 'TorProject', 'Node', value))
taxonomies.append(self.build_taxonomy(level, "TorProject", "Node", value))
return {"taxonomies": taxonomies}
def run(self):
if self.data_type != 'ip':
return self.error('Not an IP address')
if self.data_type != "ip":
return self.error("Not an IP address")
report = self.client.search_tor_node(self.get_data())
self.report(report)
if __name__ == '__main__':
if __name__ == "__main__":
TorProjectAnalyzer().run()

View File

@ -0,0 +1,65 @@
{
"name": "CheckPoint Lock",
"version": "1.0",
"author": "@dadokkio LDO-CERT",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Lock ip on CheckPoint Gaia",
"dataTypeList": ["thehive:case_artifact"],
"command": "CheckPoint/checkpoint.py",
"baseConfig": "CheckPoint",
"config": {
"service": "lock"
},
"configurationItems": [
{
"name": "server",
"description": "Checkpoint API server",
"type": "string",
"multi": false,
"required": true
},
{
"name": "username",
"description": "CheckPoint username",
"type": "string",
"multi": false,
"required": true
},
{
"name": "password",
"description": "CheckPoint password",
"type": "string",
"multi": false,
"required": true
},
{
"name": "group_name",
"description": "CheckPoint group name ip will be added/removed from",
"type": "string",
"multi": false,
"required": true
},
{
"name": "exclusions",
"description": "ip/subnet that cannot be locked or unlocked",
"type": "string",
"multi": true,
"required": false
},
{
"name": "added_tag",
"description": "Tag added to observable when adding to FW",
"type": "string",
"multi": false,
"required": false
},
{
"name": "removed_tag",
"description": "Tag added to observable when removing from FW",
"type": "string",
"multi": false,
"required": false
}
]
}

View File

@ -0,0 +1,65 @@
{
"name": "CheckPoint Unlock",
"version": "1.0",
"author": "@dadokkio LDO-CERT",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Unlock ip on CheckPoint Gaia",
"dataTypeList": ["thehive:case_artifact"],
"command": "CheckPoint/checkpoint.py",
"baseConfig": "CheckPoint",
"config": {
"service": "unlock"
},
"configurationItems": [
{
"name": "server",
"description": "Checkpoint API server",
"type": "string",
"multi": false,
"required": true
},
{
"name": "username",
"description": "CheckPoint username",
"type": "string",
"multi": false,
"required": true
},
{
"name": "password",
"description": "CheckPoint password",
"type": "string",
"multi": false,
"required": true
},
{
"name": "group_name",
"description": "CheckPoint group name ip will be added/removed from",
"type": "string",
"multi": false,
"required": true
},
{
"name": "exclusions",
"description": "ip/subnet that cannot be locked or unlocked",
"type": "string",
"multi": true,
"required": false
},
{
"name": "added_tag",
"description": "Tag added to observable when adding to FW",
"type": "string",
"multi": false,
"required": false
},
{
"name": "removed_tag",
"description": "Tag added to observable when removing from FW",
"type": "string",
"multi": false,
"required": false
}
]
}

View File

@ -0,0 +1,22 @@
### CkeckPoint
This responder permits you to add/remove selected observable from a specific group.
Some notes:
- API must permit access from cortex machine.
- First login from API must be manual because it needs fingerprint acceptance. This will generate a fingerprints.txt file that must be placed near to the analyzer python file.
- It doesn't work in dockerized analyzer!
- If group doesn't exists it'll be created [when blocking]. At the moment without any default rule.
#### Requirements
The following options are required in CheckPoint Responder configuration:
- `server` : URL of CheckPoint instance
- `username`: user accessing CheckPoint instance
- `password`: password for the user accessing CheckPoint instance
- `group_name`: name of the group ip will be added to or removed

View File

@ -0,0 +1,174 @@
#!/usr/bin/env python3
# encoding: utf-8
import os
import json
import ipaddress
from cortexutils.responder import Responder
from cpapi import APIClient, APIClientArgs
class CheckPoint(Responder):
def __init__(self):
Responder.__init__(self)
# Mail settings
server = self.get_param("config.server", None, "Missing server in config")
self.username = self.get_param(
"config.username", None, "Missing username in config"
)
self.password = self.get_param(
"config.password", None, "Missing password in config"
)
try:
fingerprint_path = "{}/fingerprints.txt".format(os.path.dirname(__file__))
fingerprint = json.loads(open(fingerprint_path, "r").read())[server]
self.client_args = APIClientArgs(server=server, fingerprint=fingerprint)
except:
self.error(
"Fingerprint check failed. It should be locate here {}".format(
fingerprint_path
)
)
self.service = self.get_param("config.service", None)
self.group_name = self.get_param(
"config.group_name", None, "Missing group_name in config"
)
self.exclusions = self.get_param("config.exclusions", [])
self.added_tag = self.get_param("config.added_tag", None)
self.removed_tag = self.get_param("config.removed_tag", None)
def run(self):
Responder.run(self)
data = self.get_param("data.data")
try:
data = ipaddress.ip_address(data)
except ValueError:
self.error("{} is not a valid ip".format(data))
for excl in self.exclusions:
try:
excl = ipaddress.ip_address(excl)
if data == excl:
self.error("{} in exclusions".format(data))
except ValueError:
try:
excl = ipaddress.ip_network(excl)
if data in excl:
self.error("{} in exclusions".format(data))
except ValueError:
continue
data = str(data)
return_dict = {}
with APIClient(self.client_args) as client:
login = client.login(self.username, self.password)
if not login.success:
self.error("Login failed!")
if self.service == "lock":
# Check if group exists
get_group_response = client.api_call(
"show-group", {"name": self.group_name}
)
if not get_group_response.success:
# if no create it
add_group_response = client.api_call(
"add-group", {"name": self.group_name}
)
if not add_group_response.success:
self.error(
"Error during group creation: {}".format(
add_group_response.error_message
)
)
else:
client.api_call("publish", {})
return_dict["group_created"] = True
else:
return_dict["group_created"] = False
# Check if host exists
get_host_response = client.api_call("show-host", {"name": data})
if not get_host_response.success:
return_dict["host_created"] = True
# Create host from ip
add_host_response = client.api_call(
"add-host",
{
"name": data,
"ip-address": data,
"comments": "From TheHive responder",
},
)
if not add_host_response.success:
self.error(
"Error during host creation: {}".format(
add_host_response.error_message
)
)
else:
client.api_call("publish", {})
return_dict["host_created"] = False
# Add observable to group
response = client.api_call(
"set-group",
{"name": self.group_name, "members": {"add": data}},
)
if not response.success:
self.error(
"Error adding host to group: {}".format(response.error_message)
)
else:
# COMMIT CHANGES
client.api_call("publish", {})
return_dict["Success"] = True
elif self.service == "unlock":
# Check if host exists
get_host_response = client.api_call("show-host", {"name": data})
if not get_host_response.success:
self.error(
"Host doen't exists: {}".format(get_host_response.error_message)
)
# Remove observable from group
response = client.api_call(
"set-group",
{"name": self.group_name, "members": {"remove": data}},
)
if not response.success:
self.error(
"Error removing host from group: {}".format(
response.error_message
)
)
else:
# COMMIT CHANGES
client.api_call("publish", {})
return_dict["Success"] = True
self.report({"message": return_dict})
def operations(self, raw):
if self.service == "lock" and self.added_tag:
return [self.build_operation("AddTagToArtifact", tag=self.added_tag)]
elif self.service == "unlock" and self.removed_tag:
return [self.build_operation("AddTagToArtifact", tag=self.removed_tag)]
if __name__ == "__main__":
CheckPoint().run()

View File

@ -0,0 +1,2 @@
cortexutils
-e git+https://github.com/CheckPointSW/cp_mgmt_api_python_sdk#egg=cpapi cpapi

View File

@ -0,0 +1,96 @@
{
"name": "MailIncidentStatus",
"version": "1.0",
"author": "Manuel Krucker",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Mail a detailed status information of an incident case. The mail is sent to recipients specified by tags prefixed with 'mail='. The responder respects tlp definitions. For tlp:amber mail addresse and for tlp:green mail domains must be pre-defined in the configuration. For tlp:red sending mails is denied. The responser also uses thehive4py to collect information about the status of the tasks of the incidents.",
"dataTypeList": ["thehive:case"],
"command": "MailIncidentStatus/mailincidentstatus.py",
"baseConfig": "MailIncidentStatus",
"configurationItems": [
{
"name": "from",
"description": "email address from which the mail is send",
"type": "string",
"multi": false,
"required": true
},
{
"name": "smtp_host",
"description": "SMTP server used to send mail",
"type": "string",
"multi": false,
"required": true,
"defaultValue": "localhost"
},
{
"name": "smtp_port",
"description": "SMTP server port",
"type": "number",
"multi": false,
"required": true,
"defaultValue": 25
},
{
"name": "smtp_user",
"description": "SMTP server user",
"type": "string",
"multi": false,
"required": false,
"defaultValue": "user"
},
{
"name": "smtp_pwd",
"description": "SMTP server password",
"type": "string",
"multi": false,
"required": false,
"defaultValue": "pwd"
},
{
"name": "mail_subject_prefix",
"description": "Prefix of the mail subject",
"type": "string",
"multi": false,
"required": false,
"defaultValue": "Incident Case Notification: "
},
{
"name": "mail_html_style_tag_content",
"description": "The css content of the style tag for the HTML mail body. Define table, th, hd, .first, and .second elements.",
"type": "string",
"multi": false,
"required": false,
"defaultValue": "table { border: 1px solid black; border-collapse: collapse; text-align: left; vertical-align: top; th { border: 1px solid black; border-collapse: collapse; text-align: left;} td { border: 1px solid black; border-collapse: collapse; text-align: left;} .first { width: 150px; min-width: 150px; max-width: 150px; background-color: #ffe8d4; } .second { background-color: #d7d9f2;}"
},
{
"name": "tlp_amber_mail_addresses",
"description": "Mail addresses which are allowed to receive tlp:amber classified incidents",
"type": "string",
"multi": true,
"required": false
},
{
"name": "tlp_green_mail_domains",
"description": "Mail domains which are allowed to receive tlp:green classified incidents",
"type": "string",
"multi": true,
"required": false
},
{
"name": "thehive_url",
"description": "URL pointing to your TheHive installation, e.g. 'http://127.0.0.1:9000'",
"type": "string",
"multi": false,
"required": true
},
{
"name": "thehive_apikey",
"description": "TheHive API key which is used get tasks and other elements of the incident",
"type": "string",
"multi": false,
"required": true
}
]
}

View File

@ -0,0 +1,359 @@
#!/usr/bin/env python3
# encoding: utf-8
import ssl
import smtplib
import datetime
from cortexutils.responder import Responder
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
from thehive4py.api import TheHiveApi
class MailIncidentStatus(Responder):
def __init__(self):
Responder.__init__(self)
# Mail settings
self.smtp_host = self.get_param("config.smtp_host", "localhost")
self.smtp_port = self.get_param("config.smtp_port", "25")
self.mail_from = self.get_param(
"config.from", None, "Missing sender email address"
)
self.smtp_user = self.get_param("config.smtp_user", "user", None)
self.smtp_pwd = self.get_param("config.smtp_pwd", "pwd", None)
# TheHive4py settings
self.thehive_url = self.get_param(
"config.thehive_url", None, "TheHive URL missing!"
)
self.thehive_apikey = self.get_param(
"config.thehive_apikey", None, "TheHive API key missing!"
)
self.tlp_green_mail_domains = self.get_param(
"config.tlp_green_mail_domains",
None,
"Error reading tlp_green_mail_domains",
)
self.tlp_amber_mail_addresses = self.get_param(
"config.tlp_amber_mail_addresses",
None,
"Error reading tlp_amber_mail_addresses",
)
def run(self):
Responder.run(self)
# Validate Config
self.validate_Config()
# Check data_type
if not self.data_type == "thehive:case":
self.error("data type not type 'thehive:case'")
caseID = self.get_param("data.id", None, "case.id is missing")
# CREATE MAIL BODY
body = self.get_HTMLMailBody()
# GET RECIPIENTS
# Search recipient address in case tags
tags = self.get_param("data.tags", None, "recipient address not found in tags")
mail_addresses = [
t[5:].strip('"')
for t in tags
if t.startswith("mail=") or t.startswith("mail:")
]
if len(mail_addresses) == 0:
self.error("recipient address not found in tags")
# CHECK RECIPIENTS FOR CONFORMANCE WITH TLP
self.check_TLPConformance(mail_addresses)
# PREPARE MAIL
# SEND MAIL
message = ""
for mail_address in mail_addresses:
msg = MIMEMultipart()
subject = (
self.get_param("config.mail_subject_prefix", "", None)
+ caseID
+ " "
+ self.get_param("data.title", None, "title is missing")
)
msg["Subject"] = subject
msg["From"] = self.mail_from
msg["Date"] = formatdate(localtime=True)
# msg.attach(MIMEText(body, "plain", "utf-8"))
msg.attach(MIMEText(body, "html", "utf-8"))
msg["To"] = mail_address
if self.smtp_user and self.smtp_pwd:
try:
context = ssl.create_default_context()
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.ehlo()
server.starttls(context=context)
server.ehlo()
server.login(self.smtp_user, self.smtp_pwd)
server.send_message(msg, self.mail_from, mail_address)
except smtplib.SMTPNotSupportedError:
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.ehlo()
server.login(self.smtp_user, self.smtp_pwd)
server.send_message(msg, self.mail_from, mail_address)
else:
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.send_message(msg, self.mail_from, mail_address)
# SET RETURN MESSAGE
message += "message sent to " + mail_address + ";"
self.report({"message": message})
def validate_Config(self):
"""
The configuration contains mail domains and mail addresses. This is validated before.
"""
status = True
# Check mail domains
for domain in self.tlp_green_mail_domains:
# Just a simple basic step if a '.' is part of the string
if "." not in domain:
self.error(
domain
+ " is no valid domain name. Please change configuration 'tlp_green_mail_domains'"
)
status = False
# Check mail addresses
for address in self.tlp_amber_mail_addresses:
# Just a simple basic step if an @ is part of the string
if "@" not in address:
self.error(
address
+ " is no valid mail address. Please change configuration 'tlp_amber_mail_addresses'"
)
status = False
return status
def check_TLPConformance(self, mail_addresses):
"""
The TLP should be respected when sending the incident status. The following rules are applied:
* TLP: Red : Sending mails not allowd -> Error is returned
* TLP: Amber: Check if mail address is listed in configuration item 'tlp_amber_mail_domains'
* TLP: Green: Check if mail domain is listed in configuration item 'tlp_green_mail_domains'
* TLP: White: No checks applied, every recipient receives an email
"""
tlp = self.get_param("data.tlp", None, "Reading data.tlp failed.")
if tlp == 0:
# tlp:white
pass
elif tlp == 1:
# tlp:green
domains = list(map(lambda x: x.split("@")[1], mail_addresses))
for domain in domains:
if domain not in self.tlp_green_mail_domains:
self.error(
"No mails sent. The domain '"
+ domain
+ "'is not listed in the configuration. Add the domain to the configuration or remove the mail addresses with this domains from the incident case tags.\n\nCurrent tlp_green_mail_domains config:\n"
+ ",".join(self.tlp_green_mail_domains)
)
elif tlp == 2:
# tlp:amber
for mail_address in mail_addresses:
if mail_address not in self.tlp_amber_mail_addresses:
self.error(
"No mails sent. The mail address '"
+ mail_address
+ "' is not listed in the configuration. Add the address to the configuration or remove the mail address from the incident case tags.\n\nCurrent tlp_amber_mail_addresses config:\n"
+ ",".join(self.tlp_amber_mail_addresses)
)
elif tlp == 3:
self.error(
"The incident has the TLP value 'tlp:red'. Sending mails is not allowed for this tlp classifcation."
)
else:
self.error("TLP is an undefined value.")
def get_HTMLMailBody(self):
body = ""
caseID = self.get_param("data.id", None, "case.id is missing")
case_row = ("CaseID", caseID)
title_row = ("Title", self.get_param("data.title"))
severity_row = (
"Severity",
self.get_HTMLSeverityString(self.get_param("data.severity")),
)
tlp_row = (
"TLP",
str(self.get_param("data.tlp", None, "Reading data.tlp failed.")),
)
status_row = ("Status", self.get_param("data.status"))
description_row = ("Description", self.get_param("data.description"))
## Create tasks summary
tasks_row = ("Task Summary", self.get_HTMLCaseTaskSummary(caseID))
## Time and dates
time = self.get_param(
"data.startDate",
)
date_str = (datetime.datetime.fromtimestamp(time / 1e3)).strftime(
"%m/%d/%Y %H:%M"
)
startDate_row = ("StartDate", date_str)
time = self.get_param("data.createdAt")
date_str = (datetime.datetime.fromtimestamp(time / 1e3)).strftime(
"%m/%d/%Y %H:%M"
)
createdAt_row = ("createdAt", date_str)
createdBy_row = ("createdBy", self.get_param("data.createdBy"))
time = self.get_param("data.updatedAt")
if time:
date_str = (datetime.datetime.fromtimestamp(time / 1e3)).strftime(
"%m/%d/%Y %H:%M"
)
else:
date_str = "Unknown"
updatedAt_row = ("updatedAt", date_str)
updated_by = self.get_param("data.updatedBy")
if updated_by:
updatedBy_row = ("updatedBy", updated_by)
else:
updatedBy_row = ("updatedBy", "Unknown")
table_rows = [
case_row,
title_row,
severity_row,
tlp_row,
status_row,
description_row,
tasks_row,
startDate_row,
createdAt_row,
createdBy_row,
updatedAt_row,
updatedBy_row,
]
## Custom fields
cust_fields = self.get_param(
"data.customFields", None, "Error loading customFields"
)
cust_field_rows = []
for item in sorted(cust_fields):
# value of item is dictionary with one element
# sample: "scope-accounts-compromised":{"string":"makr"}
cust_value_type = next(iter(cust_fields.get(item)))
if cust_value_type == "date":
date_int = (cust_fields.get(item)).get(cust_value_type)
if date_int:
date_str = (
datetime.datetime.fromtimestamp(date_int / 1e3)
).strftime("%m/%d/%Y %H:%M")
else:
date_str = "Date not set"
cust_value_str = date_str
else:
cust_value_str = str((cust_fields.get(item)).get(cust_value_type))
cust_field_rows.append((item, cust_value_str))
table_rows.extend(cust_field_rows)
body = self.create_HTMLTable(table_rows)
return body
def get_HTMLSeverityString(self, severity):
if severity == 1:
return '<p style="color:DeepSkyBlue">Low</p>'
elif severity == 2:
return '<p style="color:Orange">Medium</p>'
elif severity == 3:
return '<p style="color:Red">High</p>'
elif severity == 4:
return '<p style="color:DarkRed">Critical</p>'
else:
return "Severtiy mapping failed"
def create_HTMLTable(self, two_tuple_list):
"""
Create a HTML tabel out of a list of string tuples. In the frist colum the first element of the tuple is representated, in the second column the second element of the tuple is present.
"""
mail_style_tag_content = self.get_param(
"config.mail_html_style_tag_content",
None,
"Error loading config 'config.mail_html_style_tag_content'",
)
explode_lists = "".join(
[
'<tr><td align="left">{}</td><td align="left">{}</td></tr>\n'.format(
i[0], i[1]
)
for i in two_tuple_list
]
)
html = (
"<!DOCTYPE html>\n"
"<html>\n"
"<head>\n"
# meta definitions
'<meta charset="UTF-8"/><meta http-equiv="Content-Type" content="text/html"; charset="utf-8"/>\n'
# styles
"<style>{}</style>\n"
"</head>\n\n"
"<body>\n"
'<table width="100%" cellpadding="0" cellspacing="0" border="0" bgcolor="#FFFFFF" align="center">\n'
'<colgroup><col class="first"/><col class="second"/></colgroup>\n'
'<tr><td colspan="2">Incident Status Report</td></tr>\n'
"{}"
"</table>\n"
"</body>\n</html>\n"
).format(
mail_style_tag_content,
explode_lists,
)
# return the HTML code
return html
def get_HTMLCaseTaskSummary(self, caseID):
"""
Get all tasks of a given incident, and calculate statistics of the task. Return them as HTML string.
"""
# get case tasks by th4py
api = TheHiveApi(self.thehive_url, self.thehive_apikey)
response = api.get_case_tasks(caseID)
# create statistics
t_total = 0
t_compl = 0
t_inpro = 0
t_waiti = 0
t_cance = 0
for t in response.json():
t_total += 1
if t["status"] == "Completed":
t_compl += 1
if t["status"] == "InProgress":
t_inpro += 1
if t["status"] == "Waiting":
t_waiti += 1
if t["status"] == "Cancel":
t_cance += 1
# in progress
summary = (
"Completed: {1}/{0}<br/>"
"InProgress: {2}/{0}<br/>"
"Waiting: {3}/{0}<br/>"
"Canceled: {4}/{0}"
).format(t_total, t_compl, t_inpro, t_waiti, t_cance)
return summary
if __name__ == "__main__":
MailIncidentStatus().run()

View File

@ -0,0 +1,2 @@
cortexutils
thehive4py

View File

@ -3,7 +3,7 @@
GreyNoise results for <strong>{{artifact.data}}</strong>
</div>
<div class="panel-body">
<table class="table" ng-if="content.data">
<table class="table" ng-if="content">
<thead>
<th>Classification</th>
<th>Last update</th>
@ -11,17 +11,18 @@
<th>Cve</th>
<th>Tags</th>
</thead>
<tbody ng-repeat="record in content.data | orderBy:'-last_updated'">
<tbody>
<tr>
<td>{{record.classification}}</td>
<td>{{record.last_seen}}</td>
<td>{{record.actor}}</td>
<td>{{record.cve.join(', ')}}</td>
<td>{{record.tags.join(' ,')}}</td>
<td>{{content.classification}}</td>
<td>{{content.last_seen}}</td>
<td>{{content.actor}}</td>
<td>{{content.cve.join(', ')}}</td>
<td>{{content.tags.join(' ,')}}</td>
</tr>
<tr>
<td colspan="5">
Metadata: <pre>{{record.metadata | json}}</pre>
Metadata:
<pre>{{content.metadata | json}}</pre>
</td>
</tr>
</tbody>
@ -38,4 +39,4 @@
<div class="panel-body">
{{content.errorMessage}}
</div>
</div>
</div>