Ref: #431 Added new URLhaus API

This commit is contained in:
Nils Kuhnert 2019-02-25 10:32:31 +01:00
parent a60a5a2cda
commit d948e293e2
No known key found for this signature in database
GPG Key ID: 3591DD5B3103ED50
7 changed files with 77 additions and 100 deletions

View File

@ -1,27 +1,12 @@
{
"name": "URLhaus",
"author": "ninoseki",
"author": "ninoseki, Nils Kuhnert",
"license": "MIT",
"url": "https://github.com/ninoseki/cortex_URLhaus_analyzer",
"version": "1.1",
"description": "Search domains, URLs or hashes on URLhaus.",
"dataTypeList": ["domain", "url", "hash"],
"version": "2.0",
"description": "Search domains, IPs, URLs or hashes on URLhaus.",
"dataTypeList": ["domain", "url", "hash", "ip"],
"command": "URLhaus/URLhaus_analyzer.py",
"configurationItems": [
{
"name": "cache.duration",
"description": "Define the cache duration",
"type": "number",
"multi": false,
"required": true,
"defaultValue": 300
},
{
"name": "cache.root",
"description": "Define the path to the stored data",
"type": "string",
"multi": false,
"required": false
}
]
}

View File

@ -1,64 +0,0 @@
from bs4 import BeautifulSoup
from diskcache import Cache
import requests
class URLhaus:
"""Simple client to query URLhaus by abuse.ch.
:param query: domain, url or hash.
:param cache_duration: Duration before refreshing the cache (in seconds).
Ignored if `cache_duration` is 0.
:param cache_root: Path where to store the cached file.
:type query: string
:type cache_duration: int
:type cache_root: str
"""
def __init__(self,
query,
cache_duration=300,
cache_root="/tmp/cortex/URLhaus"):
self.URL = "https://urlhaus.abuse.ch/browse.php"
self.query = query
self.cache = None
if cache_duration > 0:
self.cache = Cache(cache_root)
self.cache_duration = cache_duration
def _get_raw_data(self):
try:
return self.cache[self.query.encode('utf-8')]
except(AttributeError, TypeError):
return self.fetch()
except KeyError:
self.cache.set(
self.query.encode('utf-8'),
self.fetch(),
expire=self.cache_duration)
return self.cache[self.query.encode('utf-8')]
def search(self):
res = self._get_raw_data()
return self.parse(res)
def fetch(self):
payload = {"search": self.query}
return requests.get(self.URL, params=payload).text
def parse(self, doc):
results = []
soup = BeautifulSoup(doc, "html.parser")
table = soup.find("table", class_="table")
rows = table.find_all("tr")[1:]
for row in rows:
cols = row.find_all("td")
results.append({
"dateadded": cols[0].text,
"malware_url": cols[1].text,
"link": cols[1].find("a").attrs.get("href"),
"status": cols[2].text,
"tags": cols[3].text.split(),
"gsb": cols[4].text if len(cols) > 5 else None,
"reporter": cols[5].text if len(cols) > 5 else cols[4].text
})
return results

View File

@ -1,27 +1,34 @@
#!/usr/bin/env python3
from cortexutils.analyzer import Analyzer
from URLhaus import URLhaus
from URLhaus_client import URLhausClient
class URLhausAnalyzer(Analyzer):
def __init__(self):
Analyzer.__init__(self)
def search(self, indicator):
"""
Searches for a website using the indicator
:param indicator: domain, url, hash
:type indicator: str
:return: dict
"""
return URLhaus(indicator).search()
def run(self):
targets = ["domain", "url", "hash"]
if self.get_data() is not None and self.data_type in targets:
self.report({
'results': self.search(self.get_data())
})
data = self.get_data()
if not data:
self.error('No observable or file given.')
results = {}
if self.data_type == 'url':
results = URLhausClient.search_url(data)
elif self.data_type in ['domain', 'ip']:
results = URLhausClient.search_host(data)
elif self.data_type == 'hash':
if len(data) in [32, 64]:
results = URLhausClient.search_payload(data)
else:
self.error('Only sha256 and md5 supported by URLhaus.')
else:
self.error('Datatype not supported.')
results.update({
'data_type': self.data_type
})
self.report(results)
def summary(self, raw):
taxonomies = []

View File

@ -0,0 +1,51 @@
import requests
BASEURL = 'https://urlhaus-api.abuse.ch/v1/'
class URLhausClient(object):
@staticmethod
def __request(endpoint, key, value) -> dict:
results = requests.post(
BASEURL + endpoint + '/',
{key: value}
).json()
if results['query_status'] in ['ok', 'no_results']:
return results
else:
raise ValueError('Given value seems not to be valuid: <{}: {}>.'.format(key, value))
@staticmethod
def search_url(url: str) -> dict:
return URLhausClient.__request(
'url',
'url',
url
)
@staticmethod
def search_host(host: str) -> dict:
return URLhausClient.__request(
'host',
'host',
host
)
@staticmethod
def search_payload(payload_hash: str) -> dict:
if len(payload_hash) == 32:
return URLhausClient.__request(
'payload',
'md5_hash',
payload_hash
)
elif len(payload_hash) == 64:
return URLhausClient.__request(
'payload',
'sha256_hash',
payload_hash
)
else:
raise ValueError('Only sha256 and md5 hashes are allowed.')

View File

@ -1,4 +1,2 @@
beautifulsoup4
cortexutils
diskcache
requests