mirror of
https://github.com/empayre/OTX-Python-SDK.git
synced 2024-11-06 01:45:25 +00:00
Merge branch 'master' of https://github.com/AlienVault-Labs/OTX-Python-SDK
This commit is contained in:
commit
53865b753a
8
IndicatorTypes.py
Normal file → Executable file
8
IndicatorTypes.py
Normal file → Executable file
@ -118,6 +118,14 @@ CVE = IndicatorTypes(
|
||||
sections=["general"],
|
||||
slug="cve"
|
||||
)
|
||||
YARA = IndicatorTypes(
|
||||
name="YARA",
|
||||
description="YARA rule",
|
||||
api_support=True,
|
||||
sections=['general'],
|
||||
slug='YARA',
|
||||
)
|
||||
|
||||
|
||||
# all_types list of supported IOC types for pulse indicators
|
||||
all_types = [
|
||||
|
71
OTXv2.py
Normal file → Executable file
71
OTXv2.py
Normal file → Executable file
@ -29,6 +29,8 @@ SEARCH_USERS = "{}/search/users".format(API_V1_ROOT) #
|
||||
PULSE_DETAILS = "{}/pulses/".format(API_V1_ROOT) # pulse meta data
|
||||
PULSE_INDICATORS = PULSE_DETAILS + "indicators" # pulse indicators
|
||||
PULSE_CREATE = "{}/pulses/create".format(API_V1_ROOT) # create pulse
|
||||
USER_PULSES = "{}/pulses/user/{{}}".format(API_V1_ROOT) # pulse feed for a user
|
||||
MY_PULSES = "{}/pulses/my".format(API_V1_ROOT) # pulse feed for a user
|
||||
SUBSCRIBE_PULSE = "{}/pulses/{{}}/subscribe".format(API_V1_ROOT) # subscribe to pulse
|
||||
UNSUBSCRIBE_PULSE = "{}/pulses/{{}}/unsubscribe".format(API_V1_ROOT) # unsubscribe from pulse
|
||||
INDICATOR_DETAILS = "{}/indicators/".format(API_V1_ROOT) # indicator details
|
||||
@ -288,7 +290,7 @@ class OTXv2(object):
|
||||
)
|
||||
return indicator_url
|
||||
|
||||
def walkapi_iter(self, url, max_page=None, max_items=None):
|
||||
def walkapi_iter(self, url, max_page=None, max_items=None, method='GET', body=None):
|
||||
next_page_url = url
|
||||
count = 0
|
||||
item_count = 0
|
||||
@ -297,7 +299,13 @@ class OTXv2(object):
|
||||
if max_page and count > max_page:
|
||||
break
|
||||
|
||||
data = self.get(next_page_url)
|
||||
if method == 'GET':
|
||||
data = self.get(next_page_url)
|
||||
elif method == 'POST':
|
||||
data = self.post(next_page_url, body=body)
|
||||
else:
|
||||
raise Exception("Unsupported method type: {}".format(method))
|
||||
|
||||
for el in data['results']:
|
||||
item_count += 1
|
||||
if max_items and item_count > max_items:
|
||||
@ -307,11 +315,11 @@ class OTXv2(object):
|
||||
|
||||
next_page_url = data["next"]
|
||||
|
||||
def walkapi(self, url, iter=False, max_page=None, max_items=None):
|
||||
def walkapi(self, url, iter=False, max_page=None, max_items=None, method='GET', body=None):
|
||||
if iter:
|
||||
return self.walkapi_iter(url, max_page=max_page, max_items=max_items)
|
||||
return self.walkapi_iter(url, max_page=max_page, max_items=max_items, method=method, body=body)
|
||||
else:
|
||||
return list(self.walkapi_iter(url, max_page=max_page, max_items=max_items))
|
||||
return list(self.walkapi_iter(url, max_page=max_page, max_items=max_items, method=method, body=body))
|
||||
|
||||
def getall(self, modified_since=None, author_name=None, limit=20, max_page=None, max_items=None, iter=False):
|
||||
"""
|
||||
@ -412,7 +420,7 @@ class OTXv2(object):
|
||||
resource.update(additional_fields)
|
||||
return resource
|
||||
|
||||
def get_all_indicators(self, author_name=None, modified_since=None, indicator_types=IndicatorTypes.all_types, limit=20, max_page=None):
|
||||
def get_all_indicators(self, author_name=None, modified_since=None, indicator_types=IndicatorTypes.all_types, limit=20, max_page=None, max_items=None):
|
||||
"""
|
||||
Get all the indicators contained within your pulses of the IndicatorTypes passed.
|
||||
By default returns all IndicatorTypes.
|
||||
@ -420,7 +428,7 @@ class OTXv2(object):
|
||||
:return: yields the indicator object for use
|
||||
"""
|
||||
name_list = IndicatorTypes.to_name_list(indicator_types)
|
||||
for pulse in self.getall_iter(author_name=author_name, modified_since=modified_since, limit=limit, max_page=max_page):
|
||||
for pulse in self.getall_iter(author_name=author_name, modified_since=modified_since, limit=limit, max_page=max_page, max_items=max_items):
|
||||
for indicator in pulse["indicators"]:
|
||||
if indicator["type"] in name_list:
|
||||
yield indicator
|
||||
@ -454,7 +462,7 @@ class OTXv2(object):
|
||||
meta_data = self.get(pulse_url)
|
||||
return meta_data
|
||||
|
||||
def get_pulse_indicators(self, pulse_id, limit=20):
|
||||
def get_pulse_indicators(self, pulse_id, limit=100):
|
||||
"""
|
||||
For a given pulse_id, get list of indicators (IOCs)
|
||||
:param pulse_id: Object ID specify which pulse to get indicators from
|
||||
@ -464,8 +472,7 @@ class OTXv2(object):
|
||||
if not isinstance(pulse_id, string_types) or not re.match(r"^[0-9a-zA-Z]{24}$", pulse_id):
|
||||
raise BadRequest("pulse_id should be a 24 character hex string")
|
||||
|
||||
url = PULSE_DETAILS + str(pulse_id) + "/indicators"
|
||||
url = self.create_url(PULSE_DETAILS + str(pulse_id) + "/indicators", limit=limit)
|
||||
url = self.create_url(PULSE_DETAILS + str(pulse_id) + "/indicators", limit=limit)
|
||||
return self.walkapi(url)
|
||||
|
||||
def edit_pulse(self, pulse_id, body):
|
||||
@ -488,32 +495,13 @@ class OTXv2(object):
|
||||
:param pulse_id: The pulse you are replacing the indicators with
|
||||
:param new_indicators: The set of new indicators
|
||||
:return: Return the new pulse
|
||||
|
||||
"""
|
||||
current_indicators = self.get_pulse_indicators(pulse_id)
|
||||
current_indicator_values = []
|
||||
current_indicator_indicators = []
|
||||
|
||||
for indicator in current_indicators:
|
||||
current_indicator_values.append(indicator["indicator"])
|
||||
current_indicator_indicators.append(indicator)
|
||||
|
||||
new_indicator_values = []
|
||||
indicators_to_add = []
|
||||
|
||||
for indicator in new_indicators:
|
||||
new_indicator_value = indicator["indicator"]
|
||||
new_indicator_values.append(new_indicator_value)
|
||||
if new_indicator_value not in current_indicator_values:
|
||||
indicators_to_add.append(indicator)
|
||||
|
||||
body = {
|
||||
response = self.edit_pulse(pulse_id, body={
|
||||
'indicators': {
|
||||
'add': indicators_to_add
|
||||
'add': new_indicators
|
||||
}
|
||||
}
|
||||
|
||||
response = self.patch(self.create_url(PULSE_DETAILS + str(pulse_id)), body=body)
|
||||
})
|
||||
return response
|
||||
|
||||
def replace_pulse_indicators(self, pulse_id, new_indicators):
|
||||
@ -604,6 +592,12 @@ class OTXv2(object):
|
||||
|
||||
return self.get(url)
|
||||
|
||||
def get_user_pulses(self, username, query=None, max_items=200):
|
||||
return self.walkapi(self.create_url(USER_PULSES.format(username), limit=50, q=query), max_items=max_items)
|
||||
|
||||
def get_my_pulses(self, query=None, max_items=200):
|
||||
return self.walkapi(self.create_url(MY_PULSES, limit=50, q=query), max_items=max_items)
|
||||
|
||||
def follow_user(self, username):
|
||||
url = FOLLOW_USER.format(username)
|
||||
return self.get(url)
|
||||
@ -653,14 +647,21 @@ class OTXv2(object):
|
||||
if do_close:
|
||||
file_handle.close()
|
||||
|
||||
def submitted_files(self, limit=50, first_page=1, max_page=None, max_items=None):
|
||||
def submitted_files(self, limit=50, hashes=None, first_page=1, max_page=None, max_items=None):
|
||||
"""
|
||||
Get status of submitted files
|
||||
:param hashes: list of sha256 hashes to check the results of (optional)
|
||||
:return: list of dicts, each dict describing the status of one file
|
||||
"""
|
||||
return self.walkapi(
|
||||
self.create_url(SUBMITTED_FILES, page=first_page, limit=limit),
|
||||
max_page=max_page, max_items=max_items
|
||||
self.create_url(SUBMITTED_FILES),
|
||||
max_page=max_page,
|
||||
method='POST',
|
||||
body={
|
||||
'hashes': hashes,
|
||||
'page': first_page,
|
||||
'limit': limit,
|
||||
},
|
||||
)
|
||||
|
||||
def submit_url(self, url):
|
||||
|
108
examples/PulseManager.py
Normal file → Executable file
108
examples/PulseManager.py
Normal file → Executable file
@ -1,3 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
'''
|
||||
PulseManager OTX Example Script
|
||||
|
||||
@ -8,19 +10,33 @@ An example script that:
|
||||
I've redcated some bits so I can share publicly, so this won't run as is
|
||||
|
||||
'''
|
||||
import socket
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import traceback
|
||||
import IndicatorTypes
|
||||
import sys
|
||||
|
||||
from OTXv2 import OTXv2
|
||||
log = logging.getLogger(__name__)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
class PulseManager():
|
||||
# store OTX API key in environment variable OTX_API_KEY
|
||||
API_KEY = os.getenv("OTX_API_KEY")
|
||||
|
||||
# Only need to set this environment variable if you need to override the default value (typically you do not)
|
||||
OTX_SERVER = os.getenv("OTX_SERVER")
|
||||
|
||||
class PulseManager(object):
|
||||
def __init__(self):
|
||||
self.otx = OTXv2("otx_key", "otx_server")
|
||||
self.otx = OTXv2(API_KEY, server=OTX_SERVER)
|
||||
|
||||
# Returns the pulse id if it can find it, else 'NoPulse'
|
||||
def pulse_name(self, malware_name):
|
||||
return "{} - Malware Domain Feed".format(malware_name)
|
||||
|
||||
def indicators(self, domains):
|
||||
return [{'indicator': domain, 'type': 'hostname', 'title': 'Command and Control'} for domain in domains]
|
||||
|
||||
# Returns the pulse id if it can find it, else None
|
||||
def find_pulse(self, malware_name):
|
||||
try:
|
||||
# Timeout for network connections
|
||||
@ -30,74 +46,62 @@ class PulseManager():
|
||||
try:
|
||||
retries += 1
|
||||
log.info('Looking for pulse: ' + malware_name)
|
||||
name = malware_name + " - Malware Domain Feed"
|
||||
name = name.replace(' ', '%20')
|
||||
result = self.otx.search_pulses(name)
|
||||
if 'results' in str(result):
|
||||
for pulse in result['results']:
|
||||
if 'id' in pulse:
|
||||
return pulse['id']
|
||||
return None
|
||||
query='name:"{}"'.format(self.pulse_name(malware_name))
|
||||
pulses = self.otx.get_my_pulses(query=query)
|
||||
if pulses:
|
||||
return pulses[0]['id']
|
||||
else:
|
||||
return None
|
||||
except socket.timeout:
|
||||
log.error("Timeout looking for pulse. Retrying")
|
||||
except AttributeError:
|
||||
log.error("OTX API internal error")
|
||||
|
||||
log.error("Max retries (5) exceeded")
|
||||
return None
|
||||
except Exception as exc:
|
||||
except Exception:
|
||||
log.error(traceback.format_exc())
|
||||
finally:
|
||||
socket.setdefaulttimeout(5)
|
||||
|
||||
def make_pulse(self, name,indicators,tags, description):
|
||||
self.otx.create_pulse(name=name, public=True, indicators=indicators, tags=[],
|
||||
references=[], description=description, tlp="White")
|
||||
return None
|
||||
|
||||
def create_pulse_request(self, malware_name, domains):
|
||||
try:
|
||||
name = malware_name + " - Malware Domain Feed"
|
||||
tags = [malware_name]
|
||||
description = "Command and Control domains for malware known as " + malware_name
|
||||
|
||||
indicators = []
|
||||
for domain in domains:
|
||||
indicators.append({'indicator': domain, 'type': 'hostname', 'title': 'Command and Control'})
|
||||
|
||||
self.make_pulse(name, indicators, tags, description)
|
||||
self.otx.create_pulse(
|
||||
name=self.pulse_name(malware_name), public=True, tlp="white", description=description,
|
||||
indicators=self.indicators(domains),
|
||||
tags=[malware_name],
|
||||
malware_families=[malware_name]
|
||||
)
|
||||
except Exception as ex:
|
||||
log.info(ex)
|
||||
|
||||
def modify_pulse_request(self, pulse_id, domains):
|
||||
try:
|
||||
new_domains = False
|
||||
|
||||
# Get domains from pulse
|
||||
result = self.otx.get_pulse_details(pulse_id)
|
||||
indicators = result['indicators']
|
||||
for indicator in indicators:
|
||||
domain = indicator['indicator']
|
||||
if domain not in domains:
|
||||
domains.append(domain)
|
||||
new_domains = True
|
||||
|
||||
new_indicators = []
|
||||
for domain in domains:
|
||||
new_indicators.append({'indicator': domain, 'type': 'hostname', 'title': 'Command and Control'})
|
||||
|
||||
# Update the pulse - if there are new domains
|
||||
if new_domains:
|
||||
self.otx.replace_pulse_indicators(pulse_id, new_indicators)
|
||||
|
||||
self.otx.add_pulse_indicators(pulse_id=pulse_id, new_indicators=self.indicators(domains))
|
||||
except Exception as ex:
|
||||
log.info(ex)
|
||||
|
||||
# Updates a pulse if it exists, else creates it
|
||||
def maintain_pulse(self, malware_name, domains):
|
||||
if domains:
|
||||
pulse_id = self.find_pulse(malware_name)
|
||||
if not pulse_id:
|
||||
log.info('Making pulse, doesnt exist')
|
||||
self.create_pulse_request(malware_name, domains)
|
||||
else:
|
||||
log.info('Updating pulse, already exists')
|
||||
self.modify_pulse_request(pulse_id, domains)
|
||||
if not domains:
|
||||
return
|
||||
|
||||
pulse_id = self.find_pulse(malware_name)
|
||||
if not pulse_id:
|
||||
log.info('Making pulse, doesnt exist')
|
||||
self.create_pulse_request(malware_name, domains)
|
||||
else:
|
||||
log.info('Updating pulse, already exists')
|
||||
self.modify_pulse_request(pulse_id, domains)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
"""
|
||||
As a simple testing mechanism, run like
|
||||
OTX_API_KEY=YOURKEYHERE ./PulseManager.py "Malware Name" domain1.com domain2.com domain3.com .... moredomains.com
|
||||
"""
|
||||
p = PulseManager()
|
||||
p.maintain_pulse(malware_name=sys.argv[1], domains=sys.argv[2:])
|
||||
|
13
examples/cli_example.py
Normal file → Executable file
13
examples/cli_example.py
Normal file → Executable file
@ -1,13 +1,16 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Very Simple CLI example to get indicator details from Alienvault OTX
|
||||
|
||||
from OTXv2 import OTXv2
|
||||
import IndicatorTypes
|
||||
import argparse
|
||||
import os
|
||||
|
||||
# Your API key
|
||||
API_KEY = ''
|
||||
OTX_SERVER = 'https://otx.alienvault.com/'
|
||||
otx = OTXv2(API_KEY, server=OTX_SERVER)
|
||||
# store OTX API key in environment variable OTX_API_KEY
|
||||
API_KEY = os.getenv("OTX_API_KEY")
|
||||
|
||||
otx = OTXv2(API_KEY)
|
||||
|
||||
parser = argparse.ArgumentParser(description='OTX CLI Example')
|
||||
parser.add_argument('-i', '--ip', help='IP eg; 4.4.4.4', required=False)
|
||||
@ -46,4 +49,4 @@ if args["pulse"]:
|
||||
print (str(result.get('results')))
|
||||
|
||||
if args["subscribed"]:
|
||||
print (str(otx.getall(max_page=3, limit=5)))
|
||||
print (str(otx.getall(max_items=3, limit=5)))
|
||||
|
31
examples/get_yara_rules.py
Normal file → Executable file
31
examples/get_yara_rules.py
Normal file → Executable file
@ -1,15 +1,24 @@
|
||||
import urllib2
|
||||
import socket
|
||||
from OTXv2 import OTXv2
|
||||
from OTXv2 import OTXv2, IndicatorTypes
|
||||
#!/usr/bin/env python
|
||||
|
||||
otx = OTXv2('API_KEY')
|
||||
from OTXv2 import OTXv2Cached, IndicatorTypes
|
||||
import logging
|
||||
import os
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
||||
# store OTX API key in environment variable OTX_API_KEY
|
||||
API_KEY = os.getenv("OTX_API_KEY")
|
||||
|
||||
# OTXv2Cached is a class like OTXv2 except that it maintains a local cache and therefore does not
|
||||
# need to constantly fetch all the data from the server
|
||||
# Initial fetch may take some time, subsequent fetches should be much faster (and easier on our servers)
|
||||
otx = OTXv2Cached(API_KEY)
|
||||
|
||||
# update local cache by fetching new pulses, or fetching pulses that you need due to changes in your subscription
|
||||
otx.update()
|
||||
|
||||
pulses = otx.getall()
|
||||
|
||||
for i in range(0,len(pulses)-1):
|
||||
print ("// https://otx.alienvault.com/pulse/" + pulses[i]["id"])
|
||||
indicators = pulses[i]["indicators"]
|
||||
for ind in indicators:
|
||||
if ind['type'] == "YARA":
|
||||
print(ind['content'])
|
||||
for i in otx.get_all_indicators(indicator_types=[IndicatorTypes.YARA]):
|
||||
print(i['content'])
|
||||
|
4
examples/is_malicious/get_malicious.py
Normal file → Executable file
4
examples/is_malicious/get_malicious.py
Normal file → Executable file
@ -1,3 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import IndicatorTypes
|
||||
|
||||
# Get a nested key from a dict, without having to do loads of ifs
|
||||
@ -132,4 +134,4 @@ def file(otx, hash):
|
||||
if suricata and 'trojan' in str(suricata).lower():
|
||||
alerts.append({'suricata': suricata})
|
||||
|
||||
return alerts
|
||||
return alerts
|
||||
|
33
examples/is_malicious/is_malicious.py
Normal file → Executable file
33
examples/is_malicious/is_malicious.py
Normal file → Executable file
@ -1,4 +1,5 @@
|
||||
# This script tells if a File, IP, Domain or URL may be malicious according to the data in OTX
|
||||
#!/usr/bin/env python
|
||||
# This script tells if a File, IP, Domain or URL may be malicious according to the data in OTX
|
||||
|
||||
from OTXv2 import OTXv2
|
||||
import argparse
|
||||
@ -28,42 +29,42 @@ args = vars(parser.parse_args())
|
||||
if args['ip']:
|
||||
alerts = get_malicious.ip(otx, args['ip'])
|
||||
if len(alerts) > 0:
|
||||
print 'Identified as potentially malicious'
|
||||
print str(alerts)
|
||||
print('Identified as potentially malicious')
|
||||
print(str(alerts))
|
||||
else:
|
||||
print 'Unknown or not identified as malicious'
|
||||
print('Unknown or not identified as malicious')
|
||||
|
||||
if args['host']:
|
||||
alerts = get_malicious.hostname(otx, args['host'])
|
||||
if len(alerts) > 0:
|
||||
print 'Identified as potentially malicious'
|
||||
print str(alerts)
|
||||
print('Identified as potentially malicious')
|
||||
print(str(alerts))
|
||||
else:
|
||||
print 'Unknown or not identified as malicious'
|
||||
print('Unknown or not identified as malicious')
|
||||
|
||||
if args['url']:
|
||||
alerts = get_malicious.url(otx, args['url'])
|
||||
if len(alerts) > 0:
|
||||
print 'Identified as potentially malicious'
|
||||
print str(alerts)
|
||||
print('Identified as potentially malicious')
|
||||
print(str(alerts))
|
||||
else:
|
||||
print 'Unknown or not identified as malicious'
|
||||
print('Unknown or not identified as malicious')
|
||||
|
||||
if args['hash']:
|
||||
alerts = get_malicious.file(otx, args['hash'])
|
||||
if len(alerts) > 0:
|
||||
print 'Identified as potentially malicious'
|
||||
print str(alerts)
|
||||
print('Identified as potentially malicious')
|
||||
print(str(alerts))
|
||||
else:
|
||||
print 'Unknown or not identified as malicious'
|
||||
print('Unknown or not identified as malicious')
|
||||
|
||||
|
||||
if args['file']:
|
||||
hash = hashlib.md5(open(args['file'], 'rb').read()).hexdigest()
|
||||
alerts = get_malicious.file(otx, hash)
|
||||
if len(alerts) > 0:
|
||||
print 'Identified as potentially malicious'
|
||||
print str(alerts)
|
||||
print('Identified as potentially malicious')
|
||||
print(str(alerts))
|
||||
else:
|
||||
print 'Unknown or not identified as malicious'
|
||||
print('Unknown or not identified as malicious')
|
||||
|
||||
|
4
examples/misp_json_to_otx.py
Normal file → Executable file
4
examples/misp_json_to_otx.py
Normal file → Executable file
@ -1,4 +1,6 @@
|
||||
# This example:
|
||||
#!/usr/bin/env python
|
||||
|
||||
# This example:
|
||||
# 1) Takes a MISP Json representing malicious activity, eg; those at https://www.circl.lu/doc/misp/feed-osint/ then
|
||||
# 2) Creates an OTX pulse representing that data
|
||||
|
||||
|
44
examples/update_feed.py
Normal file → Executable file
44
examples/update_feed.py
Normal file → Executable file
@ -1,3 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Simple example that keeps a pulse up to date with a feed
|
||||
# - If new indicators appear - they will be added
|
||||
# - If indicators are removed from a feed - they will be set to expire (currently this can take up to an hour, but will decrease shortly)
|
||||
@ -7,18 +9,20 @@
|
||||
# 2) Set the pulse pulse_id of your pulse below and run the script
|
||||
# Example pulse created with this script: https://otx.alienvault.com/pulse/58bd6a67f6a7974d31a1138d/
|
||||
|
||||
import os
|
||||
from OTXv2 import OTXv2
|
||||
import urllib2
|
||||
import socket
|
||||
import urllib2
|
||||
|
||||
# Your API key - set in environment variable named OTX_API_KEY
|
||||
API_KEY = os.getenv('OTX_API_KEY')
|
||||
|
||||
# Your API key
|
||||
API_KEY = ''
|
||||
# Create a pulse and set below, the pulse_id is in the url when using OTX eg; https://otx.alienvault.com/pulse/PULSE_ID/
|
||||
pulse_id= ''
|
||||
pulse_id = ''
|
||||
|
||||
# Example feed source
|
||||
feed_url = 'https://feodotracker.abuse.ch/blocklist/?download=ipblocklist'
|
||||
OTX_SERVER = 'https://otx.alienvault.com/'
|
||||
otx = OTXv2(API_KEY, server=OTX_SERVER)
|
||||
feed_url = 'https://feodotracker.abuse.ch/downloads/ipblocklist.txt'
|
||||
otx = OTXv2(API_KEY)
|
||||
|
||||
def valid_ip(ip, otx):
|
||||
try:
|
||||
@ -26,24 +30,24 @@ def valid_ip(ip, otx):
|
||||
socket.inet_aton(ip)
|
||||
return True
|
||||
except Exception as e:
|
||||
print str(e)
|
||||
print(str(e))
|
||||
return False
|
||||
|
||||
# Here we download and parse the feed
|
||||
print 'Downloading feed from ' + feed_url
|
||||
print('Downloading feed from ' + feed_url)
|
||||
new_indicators = []
|
||||
data = urllib2.urlopen(feed_url)
|
||||
for line in data.readlines():
|
||||
if not line.startswith('#'):
|
||||
ip = line.strip()
|
||||
# Is it a valid IP?
|
||||
if valid_ip(ip, otx):
|
||||
print 'Will add ' + ip
|
||||
# Change this to type : "Domain" for a domain indicator etc.
|
||||
new_indicators.append({ 'indicator': ip, 'type': 'IPv4' })
|
||||
else:
|
||||
print 'Wont add ' + ip
|
||||
if not line.startswith('#'):
|
||||
ip = line.strip()
|
||||
# Is it a valid IP?
|
||||
if valid_ip(ip, otx):
|
||||
print('Will add ' + ip)
|
||||
# Change this to type : "Domain" for a domain indicator etc.
|
||||
new_indicators.append({ 'indicator': ip, 'type': 'IPv4' })
|
||||
else:
|
||||
print('Wont add ' + ip)
|
||||
|
||||
print 'Updating indicators'
|
||||
print('Updating indicators')
|
||||
response = otx.replace_pulse_indicators(pulse_id, new_indicators)
|
||||
print 'Completed updating pulse'
|
||||
print('Completed updating pulse')
|
||||
|
25
examples/update_pulse.py
Normal file → Executable file
25
examples/update_pulse.py
Normal file → Executable file
@ -1,20 +1,25 @@
|
||||
# Simple example that prints the indicators from a pulse, then replaces them
|
||||
from OTXv2 import OTXv2
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Simple example that prints the indicators from a pulse, then replaces them
|
||||
import json
|
||||
import os
|
||||
from OTXv2 import OTXv2
|
||||
|
||||
API_KEY = ''
|
||||
pulse_id= ''
|
||||
OTX_SERVER = 'https://otx.alienvault.com'
|
||||
|
||||
otx = OTXv2(API_KEY, server=OTX_SERVER)
|
||||
# store OTX API key in environment variable OTX_API_KEY
|
||||
API_KEY = os.getenv("OTX_API_KEY")
|
||||
|
||||
print 'Getting indicators for pulse:'
|
||||
pulse_id = ''
|
||||
|
||||
otx = OTXv2(API_KEY)
|
||||
|
||||
print('Getting indicators for pulse:')
|
||||
indicators = otx.get_pulse_indicators(pulse_id=pulse_id)
|
||||
for indicator in indicators:
|
||||
print indicator['indicator'] + ',' + indicator['type'] + ',' + str(indicator['id'])
|
||||
print(indicator['indicator'] + ',' + indicator['type'] + ',' + str(indicator['id']))
|
||||
|
||||
print 'Updating indicators for pulse:'
|
||||
print('Updating indicators for pulse:')
|
||||
with open('updatePulse.json') as data_file:
|
||||
data = json.load(data_file)
|
||||
response = otx.replace_pulse_indicators(pulse_id, data)
|
||||
print 'Response: ' + str(response)
|
||||
print('Response: ' + str(response))
|
||||
|
4
setup.py
4
setup.py
@ -5,12 +5,12 @@ from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='OTXv2',
|
||||
version='1.5.1',
|
||||
version='1.5.2',
|
||||
description='AlienVault OTX API',
|
||||
author='AlienVault Team',
|
||||
author_email='otx@alienvault.com',
|
||||
url='https://github.com/AlienVault-Labs/OTX-Python-SDK',
|
||||
download_url='https://github.com/AlienVault-Labs/OTX-Python-SDK/tarball/1.5.1',
|
||||
download_url='https://github.com/AlienVault-Labs/OTX-Python-SDK/tarball/1.5.2',
|
||||
py_modules=['OTXv2', 'IndicatorTypes','patch_pulse'],
|
||||
install_requires=['requests', 'python-dateutil', 'pytz']
|
||||
)
|
||||
|
13
tests/test_client.py
Normal file → Executable file
13
tests/test_client.py
Normal file → Executable file
@ -52,7 +52,6 @@ class TestOTXv2(unittest.TestCase):
|
||||
self.api_key = api_key or ALIEN_API_APIKEY
|
||||
self.otx = OTXv2(self.api_key, server=ALIEN_DEV_SERVER)
|
||||
|
||||
'''
|
||||
class TestSubscriptionsInvalidKey(TestOTXv2):
|
||||
"""
|
||||
Confirm InvalidAPIKey class is raised for API Key failures
|
||||
@ -428,7 +427,6 @@ class TestPulseCreateInvalidKey(TestOTXv2):
|
||||
indicators=[],
|
||||
tags=[],
|
||||
references=[])
|
||||
'''
|
||||
|
||||
|
||||
class TestSubscription(unittest.TestCase):
|
||||
@ -517,7 +515,6 @@ class TestSubscription(unittest.TestCase):
|
||||
self.assertFalse(after2['is_subscribing'])
|
||||
|
||||
|
||||
'''
|
||||
class TestValidateIndicator(TestOTXv2):
|
||||
def test_validate_valid_domain(self):
|
||||
indicator = generate_rand_string(8, charset=string.ascii_letters).lower() + ".com"
|
||||
@ -594,15 +591,13 @@ class TestSubmissions(TestOTXv2):
|
||||
u1 = "http://flannelcat.rustybrooks.com/yyy/{}".format(self.rand1)
|
||||
u2 = "http://flannelcat.rustybrooks.com/yyy/{}".format(self.rand2)
|
||||
r = self.otx.submit_urls(urls=[u1, u2])
|
||||
r['added'].sort(key=lambda x: x['url'])
|
||||
r['added'].sort()
|
||||
self.assertDictEqual(r, {
|
||||
u'added': sorted([
|
||||
{u'canononical_url': u2, u'url': u2},
|
||||
{u'canononical_url': u1, u'url': u1},
|
||||
], key=lambda x: x['url']),
|
||||
u'added': sorted([u2, u1]),
|
||||
u'exists': [],
|
||||
u'skipped': [],
|
||||
u'updated': [],
|
||||
u'invalid': [],
|
||||
u'status': u'ok',
|
||||
})
|
||||
|
||||
@ -745,7 +740,7 @@ class TestOTXv2Cached(unittest.TestCase):
|
||||
self.assertIsNotNone(pulse.get('tags', None))
|
||||
self.assertIsNotNone(pulse.get('references', None))
|
||||
self.assertIsNotNone(res.get('exact_match'))
|
||||
'''
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
username = "qatester-git-{}".format(rand)
|
||||
|
Loading…
Reference in New Issue
Block a user