Tor blutmagie (#139)

* Add TorBlutmagie analyzer

* Add reports for TheHive usage

* Properly escape artifacts in report

* Add documentation to class

* Remove dependency with pyfscache to ensure Python3 compatibility

* Fix taxonomies building when no result returned from analyzer

* Set up cache key to be a private class variable instead of some magic number

* Fix getting data when cache is deactivated

* Respect contribution policy

* Force usage of python3 to execute analyzer

* Fix long TheHive report
This commit is contained in:
srilumpa 2017-12-18 11:22:22 +01:00 committed by Nabil Adouani
parent 1c4d3ec591
commit cf66ed2ce9
6 changed files with 221 additions and 0 deletions

View File

@ -0,0 +1,15 @@
{
"name": "TorBlutmagie",
"author": "Marc-André DOLL, STARC by EXAPROBE",
"license": "AGPL-V3",
"url": "https://github.com/CERT-BDF/Cortex-Analyzers",
"version": "1.0",
"baseConfig": "TorBlutmagie",
"config": {
"check_tlp": false,
"max_tlp": 3
},
"description": "Query http://torstatus.blutmagie.de/query_export.php/Tor_query_EXPORT.csv for TOR exit nodes IP addresses or names.",
"dataTypeList": ["ip", "domain", "fqdn"],
"command": "TorBlutmagie/tor_blutmagie_analyzer.py"
}

View File

@ -0,0 +1,3 @@
cortexutils
requests
diskcache

View File

@ -0,0 +1,110 @@
import requests
import csv
from diskcache import Cache
class TorBlutmagieClient:
"""Simple client to query torstatus.blutmagie.de for exit nodes.
The client will download http://torstatus.blutmagie.de/query_export.php/Tor_query_EXPORT.csv
and check if a specified IP address, FQDN or domain is present in it.
It will cache the response for `cache_duration` seconds to avoid
too much latency.
:param cache_duration: Duration before refreshing the cache (in seconds).
Ignored if `cache_duration` is 0.
:param cache_root: Path where to store the cached file
downloaded from torstatus.blutmagie.de
:type cache_duration: int
:type cache_root: str
"""
def __init__(self, cache_duration=3600, cache_root='/tmp/cortex/tor_project'):
self.session = requests.Session()
self.cache_duration = cache_duration
if self.cache_duration > 0:
self.cache = Cache(cache_root)
self.url = 'http://torstatus.blutmagie.de/query_export.php/Tor_query_EXPORT.csv'
__cache_key = __name__ + ':raw_data'
def _get_raw_data(self):
try:
return self.cache[self.__cache_key]
except (AttributeError, TypeError):
return self.session.get(self.url).text.encode('utf-8')
except KeyError:
self.cache.set(
self.__cache_key,
self.session.get(self.url).text.encode('utf-8'),
expire=self.cache_duration, read=True)
return self.cache[self.__cache_key]
def _get_data(self):
return csv.DictReader(
self._get_raw_data().decode('utf-8').splitlines(),
delimiter=',')
def _extract_fields(self, line):
return {
'hostname': line['Hostname'],
'name': line['Router Name'],
'country_code': line['Country Code'],
'ip': line['IP Address'],
'as_name': line['ASName'],
'as_number': line['ASNumber']
}
def _get_node_from_domain(self, domain):
results = []
for line in self._get_data():
if domain.lower() in line['Hostname'].lower():
results.append(self._extract_fields(line))
return results
def _get_node_from_fqdn(self, fqdn):
results = []
for line in self._get_data():
if fqdn.lower() == line['Hostname'].lower():
results.append(self._extract_fields(line))
break
return results
def _get_node_from_ip(self, ip):
results = []
for line in self._get_data():
if ip == line['IP Address']:
results.append(self._extract_fields(line))
break
return results
def search_tor_node(self, data_type, data):
"""Lookup an artifact to check if it is a known tor exit node.
:param data_type: The artifact type. Must be one of 'ip', 'fqdn'
or 'domain'
:param data: The artifact to lookup
:type data_type: str
:type data: str
:return: Data relative to the tor node. If the looked-up artifact is
related to a tor exit node it will contain a `nodes` array.
That array will contains a list of nodes containing the
following keys:
- name: name given to the router
- ip: their IP address
- hostname: Hostname of the router
- country_code: ISO2 code of the country hosting the router
- as_name: ASName registering the router
- as_number: ASNumber registering the router
Otherwise, `nodes` will be empty.
:rtype: list
"""
results = []
if data_type == 'ip':
results = self._get_node_from_ip(data)
elif data_type == 'fqdn':
results = self._get_node_from_fqdn(data)
elif data_type == 'domain':
results = self._get_node_from_domain(data)
else:
pass
return {"nodes": results}

View File

@ -0,0 +1,45 @@
#!/usr/bin/env python3
from cortexutils.analyzer import Analyzer
import tor_blutmagie
class TorBlutmagieAnalyzer(Analyzer):
"""Cortex analyzer to query TorBlutmagie for exit nodes IP addresses and/or names"""
def __init__(self):
Analyzer.__init__(self)
self.cache_duration = self.getParam('config.cache.duration', 3600)
self.cache_root = self.getParam(
'config.cache.root', '/tmp/cortex/tor_project'
)
self.client = tor_blutmagie.TorBlutmagieClient(
cache_duration=self.cache_duration,
cache_root=self.cache_root
)
def summary(self, raw):
taxonomies = []
if ('nodes' in raw):
r = len(raw['nodes'])
if r == 0 or r == 1:
value = "{} node".format(r)
else:
value = "{} nodes".format(r)
if r > 0:
level = 'suspicious'
else:
level = 'info'
taxonomies.append(
self.build_taxonomy(level, 'TorBlutmagie', 'Node', value))
return {"taxonomies": taxonomies}
def run(self):
if self.data_type not in ['ip', 'domain', 'fqdn']:
return self.error('Not an IP address, FQDN or domain name')
report = self.client.search_tor_node(self.data_type, self.get_data())
self.report(report)
if __name__ == '__main__':
TorBlutmagieAnalyzer().run()

View File

@ -0,0 +1,45 @@
<div class="panel panel-warning" ng-if="success && content.nodes.length > 0">
<div class="panel-heading">
Tor nodes - {{artifact.data | fang}}
</div>
<div class="panel-body">
<div ng-repeat="n in content.nodes" class="panel panel-default">
<div class="panel-heading">
{{n.name}}
</div>
<div class="panel-body">
<dl class="dl-horizontal">
<dt>Address</dt>
<dd>{{n.hostname | fang}} ({{n.ip | fang}})</dd>
<dt>AS</dt>
<dd>{{n.as_name}} ({{n.as_number}})</dd>
<dt>Country</dt>
<dd>{{n.country_code}}</dd>
</dl>
<span>
<i class="fa fa-search"></i>
<a ng-href="http://torstatus.blutmagie.de/cgi-bin/whois.pl?ip={{n.ip}}" target="_blank">WHOIS</a>
</span>
</div>
</div>
</div>
</div>
<div class="panel panel-info" ng-if="success && content.nodes.length == 0">
<div class="panel-heading">
Tor nodes - {{artifact.data | fang}}
</div>
<div class="panel-body">
<b>No matches.</b>
</div>
</div>
<!-- General error -->
<div class="panel panel-danger" ng-if="!success">
<div class="panel-heading">
<strong>{{(artifact.data || artifact.attachment.name) | fang}}</strong>
</div>
<div class="panel-body">
{{content.errorMessage}}
</div>
</div>

View File

@ -0,0 +1,3 @@
<span class="label" ng-repeat="t in content.taxonomies" ng-class="{'info': 'label-info', 'safe': 'label-success', 'suspicious': 'label-warning', 'malicious':'label-danger'}[t.level]">
{{t.namespace}}:{{t.predicate}}={{t.value}}
</span>