#206 rewrite DT analyzer: remove local lib, change program name, update json files, upgrade program to use up to date official domaintools_api lib from github. Can be run with python2 and python3

This commit is contained in:
Jérôme Leonard 2018-04-08 16:42:24 +02:00 committed by To-om
parent 9d11e9fa28
commit 9674b2dabc
18 changed files with 63 additions and 600 deletions

View File

@ -6,7 +6,7 @@
"license": "AGPL-V3",
"description": "Use DomainTools to get a list of domain names sharing the same IP address.",
"dataTypeList": ["ip", "domain","fqdn"],
"command": "DomainTools/domaintools.py",
"command": "DomainTools/domaintools_analyzer.py",
"baseConfig": "DomainTools",
"config": {
"service": "reverse-ip"

View File

@ -6,7 +6,7 @@
"license": "AGPL-V3",
"description": "Use DomainTools to get a list of domain names that share the same primary or secondary name server.",
"dataTypeList": ["domain"],
"command": "DomainTools/domaintools.py",
"command": "DomainTools/domaintools_analyzer.py",
"baseConfig": "DomainTools",
"config": {
"service": "name-server-domains"

View File

@ -6,7 +6,7 @@
"license": "AGPL-V3",
"description": "Use DomainTools to get a list of domain names which share the same registrant information.",
"dataTypeList": ["mail", "ip", "domain", "other"],
"command": "DomainTools/domaintools.py",
"command": "DomainTools/domaintools_analyzer.py",
"baseConfig": "DomainTools",
"config": {
"service": "reverse-whois"

View File

@ -6,7 +6,7 @@
"license": "AGPL-V3",
"description": "Use DomainTools to get a list of historical Whois records associated with a domain name.",
"dataTypeList": ["domain"],
"command": "DomainTools/domaintools.py",
"command": "DomainTools/domaintools_analyzer.py",
"baseConfig": "DomainTools",
"config": {
"service": "whois/history"

View File

@ -6,7 +6,7 @@
"license": "AGPL-V3",
"description": "Use DomainTools to get the ownership record for a domain with basic registration details.",
"dataTypeList": ["domain"],
"command": "DomainTools/domaintools.py",
"command": "DomainTools/domaintools_analyzer.py",
"baseConfig": "DomainTools",
"config": {
"service": "whois/parsed"

View File

@ -6,7 +6,7 @@
"license": "AGPL-V3",
"description": "Use DomainTools to get the ownership record for an IP address with basic registration details.",
"dataTypeList": ["ip"],
"command": "DomainTools/domaintools.py",
"command": "DomainTools/domaintools_analyzer.py",
"baseConfig": "DomainTools",
"config": {
"service": "whois"

View File

@ -1,2 +0,0 @@
#__import__('pkg_resources').declare_namespace(__name__)

View File

@ -1,133 +0,0 @@
import os
from domaintools import utils
from domaintools.exceptions import ServiceException
from domaintools.transport.curl import CurlRestService
"""
This file is part of the domaintoolsAPI_python_wrapper package.
For the full copyright and license information, please view the LICENSE
file that was distributed with this source code.
"""
class Configuration(object):
def __init__(self, ini_resource = None):
"""
Construct the class
Initiliaze it with default values (if no config given)
"""
#use DomainTools free API
self.use_free_api = False
#server host
self.host = None
#server port
self.port = None
#sub url (version)
self.sub_url = None
#beginning url
self.base_url = None
#domaintools API username
self.username = None
#domaintools API password
self.password = None
#secure authentication
self.secure_path = True
#default return type
self.return_type = None
#transport type (curl, etc.)
self.transport_type = None
#transport object in charge of calling API
self.transport = None
#proxy url
self.proxy = None
#default configuration file path
self.default_config_path = os.path.realpath(os.path.dirname(__file__)+'/../conf/api.ini')
#default configuration
self.default_config = {
'username' : '',
'key' : '',
'use_free_api' : False,
'host' : 'api.domaintools.com',
'version' : 'v1',
'port' : '80',
'secure_auth' : True,
'return_type' : 'json',
'transport_type' : 'curl',
'content_type' : 'application/json',
'proxy' : None
}
#dictionary to map the good transport class
self.transport_map = {
'curl' : CurlRestService
}
if(ini_resource == None): ini_resource = self.default_config_path
config = {}
if(type(ini_resource) is dict):
config = ini_resource
else:
config = utils.load_config_file(ini_resource)
self.init(config)
def validateParams(self, config):
"""
Validate options from a given array
Merge with the default configuration
"""
config = dict(self.default_config,**config)
if config['username'].strip() == '':
raise ServiceException(ServiceException.EMPTY_API_USERNAME)
if config['key'].strip() == '':
raise ServiceException(ServiceException.EMPTY_API_KEY)
try:
transport = self.transport_map[config['transport_type']]()
except Exception as e:
config['transport_type'] = self.default_config['transport_type'];
return config
def init(self, config):
"""
Initialize the configuration Object
Returns the configuration dictionary
"""
config = self.validateParams(config);
self.use_free_api = True if config['use_free_api'] in ('True','true',1) else False
self.host = ('free' if self.use_free_api==True else '')+config['host'];
self.port = config['port'];
self.sub_url = config['version'];
self.username = config['username'];
self.password = config['key'];
self.secure_auth = True if config['secure_auth'] in ('True','true','1') else False;
self.return_type = config['return_type'];
self.content_type = config['content_type'];
self.transport_type = config['transport_type'];
self.proxy = config['proxy'];
self.base_url = 'http://' + self.host +':' + self.port + '/' + self.sub_url;
self.transport = self.transport_map[config['transport_type']](self.content_type)

View File

@ -1,216 +0,0 @@
from domaintools.api.configuration import Configuration
from domaintools.api.response import Response
import hmac
import hashlib
import urllib
from datetime import datetime
from domaintools.exceptions import ServiceException
from domaintools.exceptions import ServiceUnavailableException
from domaintools.exceptions import NotAuthorizedException
from domaintools.exceptions import InternalServerErrorException
from domaintools.exceptions import NotFoundException
from domaintools.exceptions import ServiceUnavailableException
from domaintools.exceptions import BadRequestException
"""
This file is part of the domaintoolsAPI_python_wrapper package.
For the full copyright and license information, please view the LICENSE
file that was distributed with this source code.
"""
class Request(object):
def __init__(self, configuration=None):
"""
Construction of the class with an optional given configuration object
(If no configuration given, the default one is taken)
"""
"Configuration (credentials, host,...)"
self.configuration = None
#Name of the service to call
self.service_name = ''
#Type of the return
self.return_type = None
#Authorized return types
self.authorized_return_types = ('json', 'xml', 'html')
#Url of the resource to call
self.url = None
#Dictionary of options
self.options = {}
#Name of the domain to use
self.domain_name = ''
#The raw response sent by domaintoolsAPI
self.raw_response = None
#The proxy url
self.proxy = None
self.configuration = Configuration() if(configuration == None) else configuration
def service(self, service_name=''):
"""Specifies the name of the service to call"""
self.service_name = service_name
return self
def withType(self, return_type):
"""This function allows you to specify the return type of the service"""
self.return_type = return_type
return self
def domain(self, domain_name=''):
"""Set the domain name to use for the API request"""
self.domain_name = domain_name
return self
def where(self, options):
"""
This function allows you to specify an options dictionary
The current options dictionary is merged with a new one
"""
if type(options) is not dict: raise ServiceException(ServiceException.INVALID_OPTIONS)
self.options.update(options)
return self
def query(self, query):
"""Alias for self.where({'query'=>query})"""
return self.where({'query':query})
def toJson(self):
"""Alias for self.withType('json')"""
return self.withType('json')
def toXml(self):
"""Alias for self.withType('xml')"""
return self.withType('xml')
def toHtml(self):
"""Alias for self.withType('html')"""
return self.withType('html')
def add_credentials_options(self):
"""Add credentials to the Options dictionary (if necessary)."""
api_username = self.configuration.username
api_key = self.configuration.password
self.options['api_username'] = api_username
if self.configuration.secure_auth == True:
timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
uri = '/' + self.configuration.sub_url + ('/' if self.domain_name.strip()=='' else '/' + self.domain_name + '/') + self.service_name
self.options['timestamp'] = timestamp
params = ''.join([api_username, timestamp, uri])
self.options['signature'] = hmac.new(api_key, params, digestmod=hashlib.sha1).hexdigest()
else:
self.options['api_key'] = api_key
def get_service_name(self):
"""getter of service_name"""
return self.service_name
def get_options(self):
"""getter of options"""
return self.options
def set_transport(self, transport):
"""setter of transport"""
self.configuration.transport = transport
def get_return_type(self):
"""getter of return_type"""
if self.return_type != None:
return_type = self.return_type
else:
return_type = self.configuration.return_type
return return_type
def build_options(self):
"""build all options in self.options"""
self.options['format'] = self.get_return_type()
self.add_credentials_options()
def build_url(self):
"""Depending on the service name, and the options we built the good url to request"""
query_string = urllib.urlencode(self.options)
self.url = self.configuration.base_url + ('/' if self.domain_name.strip()=='' else '/' + self.domain_name + '/') + self.service_name + '?' + query_string
def execute(self, debug=False):
"""Make the request on the service, and return the response"""
raw_response = ''
self.build_options()
if self.return_type == None: self.options['format'] = 'json'
self.build_url()
if debug==True: return self.url
self.raw_response = self.request()
if self.return_type == None: return Response(self)
return self.raw_response
def debug(self):
return self.execute(True)
def request(self):
"""
Makes a request on the service (curl by default), and returns the response if the http status code is 200,
else returns an exception.
"""
transport = self.configuration.transport
response = ''
try:
response = transport.get(self.url, self.configuration.proxy)
except Exception as e:
raise ServiceUnavailableException()
status = transport.get_status()
if status==200:
return response
elif status==400:
raise BadRequestException()
elif status==403:
raise NotAuthorizedException()
elif status==404:
raise NotFoundException()
elif status==500:
raise InternalServerErrorException()
elif status==503:
raise ServiceUnavailableException()
else:
raise ServiceException('Empty response')

View File

@ -1,57 +0,0 @@
import json
from domaintools.utils import obj
"""
This file is part of the domaintoolsAPI_python_wrapper package.
For the full copyright and license information, please view the LICENSE
file that was distributed with this source code.
"""
class Response(object):
def __init__(self, request=None):
"""
Constructs the Response object
"""
#json string respresenting the response
self.json = None
#json object representation
self.jsonObject = None
#request object for API
self.request = request
self.mergeJson(request.raw_response)
def mergeJson(self, raw=None):
"""
define self.json and self.jsonObject only if conversion worked
otherwise we keep the old values
"""
jsonObject = obj(json.loads(raw))
if(jsonObject == None): raise ServiceException()
self.json = raw
self.jsonObject = jsonObject
def __getattr__(self, name):
"""
Magic get method to create an alias :
self.history <=> self.jsonObject.response.history
if self.history already exists => return value
elseif self.jsonObject.response.history exists => return value
else => return None
"""
try:
return object.__getattribute__(self, name)
except AttributeError:
try:
return object.__getattribute__(self.jsonObject.response, name)
except AttributeError:
return None

View File

@ -1,41 +0,0 @@
"""
This file is part of the domaintoolsAPI_python_wrapper package.
For the full copyright and license information, please view the LICENSE
file that was distributed with this source code.
"""
class ServiceException(Exception):
INVALID_CONFIG_PATH = "Config file do not exist";
UNKNOWN_SERVICE_NAME = "Unknown service name";
EMPTY_API_KEY = "Empty API key";
EMPTY_API_USERNAME = "Empty API username";
UNKNOWN_RETURN_TYPE = "Unknown return type. (json or xml or html required)";
INVALID_DOMAIN = "Domain/Ip invalide";
INVALID_OPTIONS = "Invalid options; options must be an array";
TRANSPORT_NOT_FOUND = "Transport not found; it must refer to a class that extends RESTService";
DOMAIN_CALL_REQUIRED = "Domain is required for this service";
IP_CALL_REQUIRED = "Ip address is required for this service";
EMPTY_CALL_REQUIRED = "No domain or ip is required for this service";
INVALID_REQUEST_OBJECT = "Invalid object; DomaintoolsAPI instance required";
INVALID_JSON_STRING = "Invalid json string; a valid one is required";
class BadRequestException(ServiceException):
pass
class InternalServerErrorException(ServiceException):
pass
class NotAuthorizedException(ServiceException):
pass
class NotFoundException(ServiceException):
pass
class ServiceUnavailableException(ServiceException):
pass

View File

@ -1,11 +0,0 @@
from domaintools.transport.rest import RestService
"""
This file is part of the domaintoolsAPI_python_wrapper package.
For the full copyright and license information, please view the LICENSE
file that was distributed with this source code.
"""
class CurlRestService(RestService):
pass

View File

@ -1,41 +0,0 @@
import urllib2
from urllib2 import Request, urlopen, URLError, HTTPError
"""
This file is part of the domaintoolsAPI_python_wrapper package.
For the full copyright and license information, please view the LICENSE
file that was distributed with this source code.
"""
class RestService(object):
def __init__(self, content_type='json', options={}):
self.options = options
self.content_type = content_type
self.status_code = 200
def get(self, url, proxy=None):
if proxy:
proxy = urllib2.ProxyHandler({'http': proxy})
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
try:
response = urllib2.urlopen(url)
except HTTPError, e:
resp = e.read()
self.status_code = e.code
except URLError, e:
resp = e.read()
self.status_code = e.code
else:
self.status_code = response.code
resp = response.read()
return resp
def get_status(self):
return self.status_code

View File

@ -1,46 +0,0 @@
"""
This file is part of the domaintoolsAPI_python_wrapper package.
For the full copyright and license information, please view the LICENSE
file that was distributed with this source code.
"""
class obj(object):
"""
Turns a dictionary to object
http://stackoverflow.com/questions/1305532/convert-python-dict-to-object
http://stackoverflow.com/questions/4017572/how-can-i-make-an-alias-to-a-non-function-member-attribute-in-a-python-class
"""
def __init__(self, d):
for a, b in d.items():
if isinstance(b, (list, tuple)):
setattr(self, a, [obj(x) if isinstance(x, dict) else x for x in b])
else:
setattr(self, a, obj(b) if isinstance(b, dict) else b)
def load_config_file(filename):
"""
Load a simple ini file
Taken from http://www.decalage.info/fr/python/configparser
"""
COMMENT_CHAR = '#'
OPTION_CHAR = '='
options = {}
f = open(filename)
for line in f:
# First, remove comments:
if COMMENT_CHAR in line:
# split on comment char, keep only the part before
line, comment = line.split(COMMENT_CHAR, 1)
# Second, find lines with an option=value:
if OPTION_CHAR in line:
# split on option char:
option, value = line.split(OPTION_CHAR, 1)
# strip spaces:
option = option.strip()
value = value.strip()
# store in dictionary:
options[option] = value
f.close()
return options

View File

@ -1,15 +1,14 @@
#!/usr/bin/env python
# encoding: utf-8
import sys
import os
import json
import codecs
from domaintools.api.request import Request, Configuration
from domaintools.exceptions import NotFoundException
from domaintools.exceptions import NotAuthorizedException
from domaintools.exceptions import ServiceUnavailableException
from domaintools import API
from cortexutils.analyzer import Analyzer
@ -20,6 +19,41 @@ class DomainToolsAnalyzer(Analyzer):
self.service = self.get_param(
'config.service', None, 'Service parameter is missing')
def domaintools(self, data):
"""
:param service:
:return:
"""
if (self.service == 'reverse-ip' and self.data_type == 'ip'):
self.service = 'host-domains'
api = API(self.get_param('config.username'), self.get_param('config.key'))
if self.service == 'reverse-ip' and self.data_type in ['domain', 'ip', 'fqdn']:
response = api.reverse_ip(data).response()
elif self.service == 'host-domains' and self.data_type == 'ip':
response = api.host_domains(data).response()
elif self.service == 'name-server-domains' and self.data_type == 'domain':
response = api.reverse_name_server(data).response()
elif self.service == 'whois/history' and self.data_type == 'domain':
response = api.whois_history(data).response()
elif self.service == 'whois/parsed' and self.data_type == 'domain':
response = api.parsed_whois(data).response()
elif self.service == 'reverse-whois':
response = api.reverse_whois(data, mode='purchase').response()
elif self.service == 'whois' and self.data_type == 'ip':
response = api.whois(data).response()
return response
def summary(self, raw):
r = {
"service": self.service,
@ -92,37 +126,11 @@ class DomainToolsAnalyzer(Analyzer):
def run(self):
data = self.get_data()
if 'proxy' in self.artifact['config']:
del self.artifact['config']['proxy']
if self.service == 'reverse-ip' and self.data_type == 'ip':
self.service = 'host-domains'
if self.service == 'reverse-whois':
query = {
'terms': data,
'mode': "purchase"
}
data = ''
else:
query = {}
if (self.service == 'reverse-ip' and self.data_type in ['domain', 'ip', 'fqdn']) or \
(self.service == 'host-domains' and self.data_type == 'ip') or \
(self.service == 'name-server-domains' and self.data_type == 'domain') or \
(self.service == 'whois/history' and self.data_type == 'domain') or \
(self.service == 'whois/parsed' and self.data_type == 'domain') or \
(self.service == 'reverse-whois') or \
(self.service == 'whois' and self.data_type == 'ip'):
response = {}
try:
configuration = Configuration(self.get_param('config'))
response = Request(configuration).service(self.service).domain(data).where(query).toJson().execute()
r = self.domaintools(data)
r = json.loads(response)
if 'response' in r:
self.report(r['response'])
self.report(r.get('response'))
elif 'error' in r and 'message' in r['error']:
self.error(r['error']['message'])
else:

View File

@ -1 +1,3 @@
cortexutils
domaintools_api ; python_version < '3.5'
git+https://github.com/DomainTools/python_api.git ; python_version >= '3.5'