mirror of
https://github.com/valitydev/Cortex-Analyzers.git
synced 2024-11-06 09:05:19 +00:00
#446 #361 use python3 and the library maintained by blacktop instead of a local one https://pypi.org/project/virustotal-api/
This commit is contained in:
parent
12682d284c
commit
cd0cf6da2c
@ -1,3 +1,4 @@
|
||||
cortexutils
|
||||
future
|
||||
requests
|
||||
virustotal-api
|
||||
|
@ -1,10 +1,10 @@
|
||||
#!/usr/bin/env python
|
||||
# encoding: utf-8
|
||||
import sys
|
||||
|
||||
import time
|
||||
import hashlib
|
||||
|
||||
from virustotal_api import PublicApi as VirusTotalPublicApi
|
||||
from virus_total_apis import PublicApi as VirusTotalPublicApi
|
||||
from cortexutils.analyzer import Analyzer
|
||||
|
||||
|
||||
@ -45,8 +45,6 @@ class VirusTotalAnalyzer(Analyzer):
|
||||
if status != 200:
|
||||
self.error('Bad status : ' + str(status))
|
||||
results = response.get('results', {})
|
||||
if 'verbose_msg' in results:
|
||||
print (str(results.get('verbose_msg')), sys.stderr)
|
||||
if 'Missing IP address' in results.get('verbose_msg', ''):
|
||||
results['verbose_msg'] = 'IP address not available in VirusTotal'
|
||||
return results
|
||||
@ -140,8 +138,10 @@ class VirusTotalAnalyzer(Analyzer):
|
||||
if self.data_type == 'file':
|
||||
filename = self.get_param('filename', 'noname.ext')
|
||||
filepath = self.get_param('file', None, 'File is missing')
|
||||
self.read_scan_response(self.vt.scan_file(
|
||||
(filename, open(filepath, 'rb'))), self.wait_file_report)
|
||||
self.read_scan_response(
|
||||
self.vt.scan_file(filepath, from_disk=True, filename=filename),
|
||||
self.wait_file_report
|
||||
)
|
||||
elif self.data_type == 'url':
|
||||
data = self.get_param('data', None, 'Data is missing')
|
||||
self.read_scan_response(
|
||||
@ -160,7 +160,7 @@ class VirusTotalAnalyzer(Analyzer):
|
||||
hashes = self.get_param('attachment.hashes', None)
|
||||
if hashes is None:
|
||||
filepath = self.get_param('file', None, 'File is missing')
|
||||
hash = hashlib.sha256(open(filepath, 'r').read()).hexdigest()
|
||||
hash = hashlib.sha256(open(filepath, 'rb').read()).hexdigest()
|
||||
else:
|
||||
# find SHA256 hash
|
||||
hash = next(h for h in hashes if len(h) == 64)
|
||||
|
@ -1,792 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
""" Simple class to interact with VirusTotal's Public and Private API as well as VirusTotal Intelligence.
|
||||
|
||||
:copyright: (c) 2014 by Josh "blacktop" Maine.
|
||||
:license: GPLv3, see LICENSE for more details.
|
||||
|
||||
The APIs are documented at:
|
||||
https://www.virustotal.com/en/documentation/public-api/
|
||||
https://www.virustotal.com/en/documentation/private-api/
|
||||
https://www.virustotal.com/intelligence/help/automation/
|
||||
|
||||
EXAMPLE USAGE:::
|
||||
|
||||
from virus_total_apis import PublicApi as vtPubAPI
|
||||
|
||||
vt = vtPubAPI(<INSERT_API_KEY_HERE>)
|
||||
response = vt.get_file_report('44cda81782dc2a346abd7b2285530c5f')
|
||||
|
||||
print json.dumps(response, sort_keys=False, indent=4)
|
||||
"""
|
||||
|
||||
import os
|
||||
try:
|
||||
import requests
|
||||
import StringIO
|
||||
except ImportError:
|
||||
from io import StringIO
|
||||
pass
|
||||
|
||||
|
||||
class PublicApi():
|
||||
""" VirusTotal's Public API lets you upload and scan files, submit and scan URLs, access finished scan reports
|
||||
and make automatic comments on URLs and samples without the need of using the HTML website interface. In other
|
||||
words, it allows you to build simple scripts to access the information generated by VirusTotal.
|
||||
|
||||
The chosen format for the API is HTTP POST requests with JSON object responses and it is limited to at most 4
|
||||
requests of any nature in any given 1 minute time frame. If you run a honeyclient, honeypot or any other
|
||||
automation that is going to provide resources to VirusTotal and not only retrieve reports you are entitled to
|
||||
a higher request rate quota, ask for it at contact@virustotal.com and you will receive special privileges when
|
||||
performing the calls to the API. Note that you will only have a higher request rate quota when asking for files
|
||||
or URLs that you previously sent to VirusTotal.
|
||||
|
||||
In this second version we have improved the response format so as to ease the task of retrieving results, we
|
||||
have also introduced batch requests, you may now ask for several items with a sole API call (as long as you
|
||||
cohere with the request rate limit).
|
||||
|
||||
The public API is a free service, available for any website or application that is free to consumers. The API
|
||||
must not be used in commercial products or services, it can not be used as a substitute for antivirus products
|
||||
and it can not be integrated in any project that may harm the antivirus industry directly or indirectly.
|
||||
Noncompliance of these terms will result in inmediate permanent ban of the infractor individual or organization.
|
||||
"""
|
||||
|
||||
def __init__(self, api_key=None, proxies=None):
|
||||
self.api_key = api_key
|
||||
self.proxies = proxies
|
||||
self.base = 'https://www.virustotal.com/vtapi/v2/'
|
||||
self.version = 2
|
||||
if api_key is None:
|
||||
raise ApiError("You must supply a valid VirusTotal API key.")
|
||||
|
||||
def scan_file(self, this_file):
|
||||
""" Submit a file to be scanned by VirusTotal
|
||||
|
||||
:param this_file: File to be scanned (32MB file size limit)
|
||||
:return: JSON response that contains scan_id and permalink.
|
||||
"""
|
||||
params = {'apikey': self.api_key}
|
||||
try:
|
||||
if type(this_file) == str and os.path.isfile(this_file):
|
||||
files = {'file': (this_file, open(this_file, 'rb'))}
|
||||
elif isinstance(this_file, StringIO.StringIO):
|
||||
files = {'file': this_file.read()}
|
||||
else:
|
||||
files = {'file': this_file}
|
||||
except TypeError as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
try:
|
||||
response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def rescan_file(self, this_hash):
|
||||
""" Rescan a previously submitted filed or schedule an scan to be performed in the future.
|
||||
|
||||
:param this_hash: a md5/sha1/sha256 hash. You can also specify a CSV list made up of a combination of any of
|
||||
the three allowed hashes (up to 25 items), this allows you to perform a batch request with
|
||||
one single call. Note that the file must already be present in our file store.
|
||||
:return: JSON response that contains scan_id and permalink.
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'resource': this_hash}
|
||||
|
||||
try:
|
||||
response = requests.post(self.base + 'file/rescan', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_file_report(self, this_hash):
|
||||
""" Get the scan results for a file.
|
||||
|
||||
You can also specify a CSV list made up of a combination of hashes and scan_ids
|
||||
(up to 4 items with the standard request rate), this allows you to perform a batch
|
||||
request with one single call.
|
||||
i.e. {'resource': '99017f6eebbac24f351415dd410d522d, 88817f6eebbac24f351415dd410d522d'}.
|
||||
|
||||
:param this_hash: The md5/sha1/sha256/scan_ids hash of the file whose dynamic behavioural report you want to
|
||||
retrieve or scan_ids from a previous call to scan_file.
|
||||
:return:
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'resource': this_hash}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'file/report', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def scan_url(self, this_url):
|
||||
""" Submit a URL to be scanned by VirusTotal.
|
||||
|
||||
:param this_url: The URL that should be scanned. This parameter accepts a list of URLs (up to 4 with the
|
||||
standard request rate) so as to perform a batch scanning request with one single call. The
|
||||
URLs must be separated by a new line character.
|
||||
:return: JSON response that contains scan_id and permalink.
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'url': this_url}
|
||||
|
||||
try:
|
||||
response = requests.post(self.base + 'url/scan', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_url_report(self, this_url, scan='0'):
|
||||
""" Get the scan results for a URL. (can do batch searches like get_file_report)
|
||||
|
||||
:param this_url: a URL will retrieve the most recent report on the given URL. You may also specify a scan_id
|
||||
(sha256-timestamp as returned by the URL submission API) to access a specific report. At the
|
||||
same time, you can specify a CSV list made up of a combination of hashes and scan_ids so as
|
||||
to perform a batch request with one single call (up to 4 resources per call with the standard
|
||||
request rate). When sending multiples, the scan_ids or URLs must be separated by a new line
|
||||
character.
|
||||
:param scan: (optional): this is an optional parameter that when set to "1" will automatically submit the URL
|
||||
for analysis if no report is found for it in VirusTotal's database. In this case the result will
|
||||
contain a scan_id field that can be used to query the analysis report later on.
|
||||
:return: JSON response
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def put_comments(self, resource, comment):
|
||||
""" Post a comment on a file or URL.
|
||||
|
||||
The initial idea of VirusTotal Community was that users should be able to make comments on files and URLs,
|
||||
the comments may be malware analyses, false positive flags, disinfection instructions, etc.
|
||||
|
||||
Imagine you have some automatic setup that can produce interesting results related to a given sample or URL
|
||||
that you submit to VirusTotal for antivirus characterization, you might want to give visibility to your setup
|
||||
by automatically reviewing samples and URLs with the output of your automation.
|
||||
|
||||
:param resource: either a md5/sha1/sha256 hash of the file you want to review or the URL itself that you want
|
||||
to comment on.
|
||||
:param comment: the actual review, you can tag it using the "#" twitter-like syntax (e.g. #disinfection #zbot)
|
||||
and reference users using the "@" syntax (e.g. @VirusTotalTeam).
|
||||
:return: If the comment was successfully posted the response code will be 1, 0 otherwise.
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'resource': resource, 'comment': comment}
|
||||
|
||||
try:
|
||||
response = requests.post(self.base + 'comments/put', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_ip_report(self, this_ip):
|
||||
""" Get IP address reports.
|
||||
|
||||
:param this_ip: a valid IPv4 address in dotted quad notation, for the time being only IPv4 addresses are
|
||||
supported.
|
||||
:return: JSON response
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'ip': this_ip}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'ip-address/report', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_domain_report(self, this_domain):
|
||||
""" Get information about a given domain.
|
||||
|
||||
:param this_domain: a domain name.
|
||||
:return: JSON response
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'domain': this_domain}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'domain/report', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
|
||||
class PrivateApi(PublicApi):
|
||||
def scan_file(self, this_file, notify_url=None, notify_changes_only=None):
|
||||
""" Submit a file to be scanned by VirusTotal.
|
||||
|
||||
Allows you to send a file for scanning with VirusTotal. Before performing your submissions we encourage you to
|
||||
retrieve the latest report on the files, if it is recent enough you might want to save time and bandwidth by
|
||||
making use of it. File size limit is 32MB, in order to submmit files up to 200MB in size you must request a
|
||||
special upload URL.
|
||||
|
||||
:param this_file: The file to be uploaded.
|
||||
:param notify_url: A URL to which a POST notification should be sent when the scan finishes.
|
||||
:param notify_changes_only: Used in conjunction with notify_url. Indicates if POST notifications should be
|
||||
sent only if the scan results differ from the previous analysis.
|
||||
:return: JSON response that contains scan_id and permalink.
|
||||
"""
|
||||
params = {'apikey': self.api_key}
|
||||
files = {'file': (this_file, open(this_file, 'rb'))}
|
||||
|
||||
try:
|
||||
response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
@property
|
||||
def get_upload_url(self):
|
||||
""" Get a special URL for submitted files bigger than 32MB.
|
||||
|
||||
In order to submit files bigger than 32MB you need to obtain a special upload URL to which you
|
||||
can POST files up to 200MB in size. This API generates such a URL.
|
||||
|
||||
:return: JSON special upload URL to which you can POST files up to 200MB in size.
|
||||
"""
|
||||
params = {'apikey': self.api_key}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'file/scan/upload_url', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
if response.status_code == requests.codes.ok:
|
||||
return response.json()['upload_url']
|
||||
else:
|
||||
return dict(response_code=response.status_code)
|
||||
|
||||
def rescan_file(self, resource, date='', period='', repeat='', notify_url='', notify_changes_only=''):
|
||||
""" Rescan a previously submitted filed or schedule an scan to be performed in the future.
|
||||
|
||||
This API allows you to rescan files present in VirusTotal's file store without having to
|
||||
resubmit them, thus saving bandwidth. You only need to know one of the hashes of the file
|
||||
to rescan.
|
||||
|
||||
:param resource: An md5/sha1/sha256 hash. You can also specify a CSV list made up of a
|
||||
combination of any of the three allowed hashes (up to 25 items), this allows you to perform
|
||||
a batch request with just one single call. Note that the file must already be present in our
|
||||
file store.
|
||||
:param date: (optional) Date in %Y%m%d%H%M%S format (example: 20120725170000) in which the rescan should
|
||||
be performed. If not specified the rescan will be performed immediately.
|
||||
:param period: (optional) Periodicity (in days) with which the file should be rescanned. If this argument
|
||||
is provided the file will be rescanned periodically every period days, if not, the rescan is
|
||||
performed once and not repated again.
|
||||
:param repeat: (optional) Used in conjunction with period to specify the number of times the file should be
|
||||
rescanned. If this argument is provided the file will be rescanned the given amount of times in coherence
|
||||
with the chosen periodicity, if not, the file will be rescanned indefinitely.
|
||||
:param notify_url: (optional) A URL to which a POST notification should be sent when the rescan finishes.
|
||||
:param notify_changes_only: (optional) Used in conjunction with notify_url. Indicates if POST notifications
|
||||
should only be sent if the scan results differ from the previous one.
|
||||
:return: JSON response that contains scan_id and permalink.
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'resource': resource}
|
||||
|
||||
try:
|
||||
response = requests.post(self.base + 'file/rescan', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def cancel_rescan_file(self, resource):
|
||||
""" Delete a previously scheduled scan.
|
||||
|
||||
Deletes a scheduled file rescan task. The file rescan api allows you to schedule periodic scans of a file,
|
||||
this API call tells VirusTotal to stop rescanning a file that you have previously enqueued for recurrent
|
||||
scanning.
|
||||
|
||||
:param resource: The md5/sha1/sha256 hash of the file whose dynamic behavioural report you want to retrieve.
|
||||
:return: JSON acknowledgement. In the event that the scheduled scan deletion fails for whatever reason, the
|
||||
response code will be -1.
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'resource': resource}
|
||||
|
||||
try:
|
||||
response = requests.post(self.base + 'rescan/delete', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_file_report(self, resource, allinfo=1):
|
||||
""" Get the scan results for a file.
|
||||
|
||||
Retrieves a concluded file scan report for a given file. Unlike the public API, this call allows you to also
|
||||
access all the information we have on a particular file (VirusTotal metadata, signature information, structural
|
||||
information, etc.) by using the allinfo parameter described later on.
|
||||
|
||||
:param resource: An md5/sha1/sha256 hash of a file for which you want to retrieve the most recent antivirus
|
||||
report. You may also specify a scan_id (sha256-timestamp as returned by the scan API) to access a specific
|
||||
report. You can also specify a CSV list made up of a combination of hashes and scan_ids (up to 25 items),
|
||||
this allows you to perform a batch request with just one single call.
|
||||
:param allinfo: (optional) If specified and set to one, the call will return additional info, other than the
|
||||
antivirus results, on the file being queried. This additional info includes the output of several tools acting
|
||||
on the file (PDFiD, ExifTool, sigcheck, TrID, etc.), metadata regarding VirusTotal submissions (number of
|
||||
unique sources that have sent the file in the past, first seen date, last seen date, etc.), the output of
|
||||
in-house technologies such as a behavioural sandbox, etc.
|
||||
|
||||
:return: JSON response
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'resource': resource, 'allinfo': allinfo}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'file/report', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_file_behaviour(self, this_hash):
|
||||
""" Get a report about the behaviour of the file in sand boxed environment.
|
||||
|
||||
VirusTotal runs a distributed setup of Cuckoo sandbox machines that execute the files we receive. Execution is
|
||||
attempted only once, upon first submission to VirusTotal, and only Portable Executables under 10MB in size are
|
||||
ran. The execution of files is a best effort process, hence, there are no guarantees about a report being
|
||||
generated for a given file in our dataset.
|
||||
|
||||
If a file did indeed produce a behavioural report, a summary of it can be obtained by using the file scan
|
||||
lookup call providing the additional HTTP POST parameter allinfo=1. The summary will appear under the
|
||||
behaviour-v1 property of the additional_info field in the JSON report.
|
||||
|
||||
:param this_hash: The md5/sha1/sha256 hash of the file whose dynamic behavioural report you want to retrieve.
|
||||
:return: full JSON report of the file's execution as returned by the Cuckoo JSON report encoder.
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'hash': this_hash}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'file/behaviour', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_network_traffic(self, this_hash):
|
||||
""" Get a dump of the network traffic generated by the file.
|
||||
|
||||
VirusTotal runs a distributed setup of Cuckoo sandbox machines that execute the files we receive.
|
||||
Execution is attempted only once, upon first submission to VirusTotal, and only Portable Executables
|
||||
under 10MB in size are ran. The execution of files is a best effort process, hence, there are no
|
||||
guarantees about a report being generated for a given file in our dataset.
|
||||
|
||||
Files that are successfully executed may communicate with certain network resources, all this
|
||||
communication is recorded in a network traffic dump (pcap file). This API allows you to retrieve
|
||||
the network traffic dump generated during the file's execution.
|
||||
|
||||
:param this_hash: The md5/sha1/sha256 hash of the file whose network traffic dump you want to retrieve.
|
||||
:return: Pcap
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'hash': this_hash}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'file/network-traffic', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
try:
|
||||
return _return_response_and_status_code(response)
|
||||
except ValueError:
|
||||
return response.content
|
||||
|
||||
def file_search(self, query, offset=None):
|
||||
""" Search for samples.
|
||||
|
||||
In addition to retrieving all information on a particular file, VirusTotal allows you to perform what we
|
||||
call "advanced reverse searches". Reverse searches take you from a file property to a list of files that
|
||||
match that property. For example, this functionality enables you to retrieve all those files marked by at
|
||||
least one antivirus vendor as Zbot, or all those files that have a size under 90KB and are detected by at
|
||||
least 10 antivirus solutions, or all those PDF files that have an invalid XREF section, etc.
|
||||
|
||||
This API is equivalent to VirusTotal Intelligence advanced searches. A very wide variety of search modifiers
|
||||
are available, including: file size, file type, first submission date to VirusTotal, last submission date to
|
||||
VirusTotal, number of positives, dynamic behavioural properties, binary content, submission file name, and a
|
||||
very long etcetera. The full list of search modifiers allowed for file search queries is documented at:
|
||||
https://www.virustotal.com/intelligence/help/file-search/#search-modifiers
|
||||
|
||||
NOTE:
|
||||
Daily limited! No matter what API step you have licensed, this API call is limited to 50K requests per day.
|
||||
If you need any more, chances are you are approaching your engineering problem erroneously and you can
|
||||
probably solve it using the file distribution call. Do not hesitate to contact us with your particular
|
||||
use case.
|
||||
|
||||
EXAMPLE:
|
||||
search_options = 'type:peexe size:90kb+ positives:5+ behaviour:"taskkill"'
|
||||
|
||||
:param query: A search modifier compliant file search query.
|
||||
:param offset: (optional) The offset value returned by a previously issued identical query, allows you to
|
||||
paginate over the results. If not specified the first 300 matching files sorted according to last submission
|
||||
date to VirusTotal in a descending fashion will be returned.
|
||||
:return: JSON response - By default the list returned contains at most 300 hashes, ordered according to
|
||||
last submission date to VirusTotal in a descending fashion.
|
||||
"""
|
||||
params = dict(apikey=self.api_key, query=query, offset=offset)
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'file/search', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_file_clusters(self, this_date):
|
||||
""" File similarity clusters for a given time frame.
|
||||
|
||||
VirusTotal has built its own in-house file similarity clustering functionality. At present, this clustering
|
||||
works only on PE, PDF, DOC and RTF files and is based on a very simple structural feature hash. This hash
|
||||
can very often be confused by certain compression and packing strategies, in other words, this clustering
|
||||
logic is no holly grail, yet it has proven itself very useful in the past.
|
||||
|
||||
This API offers a programmatic access to the clustering section of VirusTotal Intelligence:
|
||||
https://www.virustotal.com/intelligence/clustering/
|
||||
|
||||
NOTE:
|
||||
Please note that you must be logged in with a valid VirusTotal Community user account with access to
|
||||
VirusTotal Intelligence in order to be able to view the clustering listing.
|
||||
|
||||
:param this_date: A specific day for which we want to access the clustering details, example: 2013-09-10.
|
||||
:return: JSON object contains several properties
|
||||
num_candidates - Total number of files submitted during the given time frame for which a feature hash could
|
||||
be calculated.
|
||||
num_clusters - Total number of clusters generated for the given time period under consideration, a cluster
|
||||
can be as small as an individual file, meaning that no other feature-wise similar file was
|
||||
found.
|
||||
size_top200 - The sum of the number of files in the 200 largest clusters identified.
|
||||
clusters - List of JSON objects that contain details about the 200 largest clusters identified. These
|
||||
objects contain 4 properties: id, label, size and avg_positives.. The id field can be used
|
||||
to then query the search API call for files contained in the given cluster. The label
|
||||
property is a verbose human-intelligible name for the cluster. The size field is the number
|
||||
of files that make up the cluster. Finally, avg_positives represents the average number of
|
||||
antivirus detections that the files in the cluster exhibit.
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'date': this_date}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'file/clusters', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_file_distribution(self, before='', after='', reports='false', limit='1000'):
|
||||
""" Get a live feed with the latest files submitted to VirusTotal.
|
||||
|
||||
Allows you to retrieve a live feed of absolutely all uploaded files to VirusTotal, and download them for
|
||||
further scrutiny. This API requires you to stay synced with the live submissions as only a backlog of 6
|
||||
hours is provided at any given point in time.
|
||||
|
||||
:param before: (optional) Retrieve files received before the given timestamp, in timestamp descending order.
|
||||
:param after: (optional) Retrieve files received after the given timestamp, in timestamp ascending order.
|
||||
:param reports: (optional) Include the files' antivirus results in the response. Possible values are 'true' or
|
||||
'false' (default value is 'false').
|
||||
:param limit: (optional) Retrieve limit file items at most (default: 1000).
|
||||
:return: JSON response: please see https://www.virustotal.com/en/documentation/private-api/#file-distribution
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'before': before, 'after': after, 'reports': reports, 'limit': limit}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'file/distribution', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_file(self, this_hash):
|
||||
""" Download a file by its hash.
|
||||
|
||||
Downloads a file from VirusTotal's store given one of its hashes. This call can be used in conjuction with
|
||||
the file searching call in order to download samples that match a given set of criteria.
|
||||
|
||||
:param this_hash: The md5/sha1/sha256 hash of the file you want to download.
|
||||
:return: Downloaded file in response.content
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'hash': this_hash}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'file/download', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
if response.status_code == requests.codes.ok:
|
||||
return response.content
|
||||
elif response.status_code == 403:
|
||||
return dict(error='You tried to perform calls to functions for which you require a Private API key.',
|
||||
response_code=response.status_code)
|
||||
elif response.status_code == 404:
|
||||
return dict(error='File not found.', response_code=response.status_code)
|
||||
else:
|
||||
return dict(response_code=response.status_code)
|
||||
|
||||
def scan_url(self, this_url):
|
||||
""" Submit a URL to be scanned by VirusTotal.
|
||||
|
||||
Allows you to submit URLs to be scanned by VirusTotal. Before performing your submission we encourage you to
|
||||
retrieve the latest report on the URL, if it is recent enough you might want to save time and bandwidth by
|
||||
making use of it.
|
||||
|
||||
:param this_url: The URL that should be scanned. This parameter accepts a list of URLs so as to perform a batch
|
||||
scanning request with just one single call (up to 25 URLs per call). The URLs must be separated by a new line
|
||||
character.
|
||||
:return: JSON response that contains scan_id and permalink.
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'url': this_url}
|
||||
|
||||
try:
|
||||
response = requests.post(self.base + 'url/scan', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_url_report(self, this_url, scan='0', allinfo=1):
|
||||
""" Get the scan results for a URL.
|
||||
|
||||
:param this_url: A URL for which you want to retrieve the most recent report. You may also specify a scan_id
|
||||
(sha256-timestamp as returned by the URL submission API) to access a specific report. At the same time, you
|
||||
can specify a CSV list made up of a combination of urls and scan_ids (up to 25 items) so as to perform a batch
|
||||
request with one single call. The CSV list must be separated by new line characters.
|
||||
:param scan: (optional) This is an optional parameter that when set to "1" will automatically submit the URL
|
||||
for analysis if no report is found for it in VirusTotal's database. In this case the result will contain a
|
||||
scan_id field that can be used to query the analysis report later on.
|
||||
:param allinfo: (optional) If this parameter is specified and set to "1" additional info regarding the URL
|
||||
(other than the URL scanning engine results) will also be returned. This additional info includes VirusTotal
|
||||
related metadata (first seen date, last seen date, files downloaded from the given URL, etc.) and the output
|
||||
of other tools and datasets when fed with the URL.
|
||||
:return: JSON response
|
||||
"""
|
||||
|
||||
params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan, 'allinfo': allinfo}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_url_distribution(self, after=None, reports='true', limit=1000):
|
||||
""" Get a live feed with the lastest URLs submitted to VirusTotal.
|
||||
|
||||
Allows you to retrieve a live feed of URLs submitted to VirusTotal, along with their scan reports. This
|
||||
call enables you to stay synced with VirusTotal URL submissions and replicate our dataset.
|
||||
|
||||
:param after: (optional) Retrieve URLs received after the given timestamp, in timestamp ascending order.
|
||||
:param reports: (optional) When set to "true" each item retrieved will include the results for each particular
|
||||
URL scan (in exactly the same format as the URL scan retrieving API). If the parameter is not specified, each
|
||||
item returned will only contain the scanned URL and its detection ratio.
|
||||
:param limit: (optional) Retrieve limit file items at most (default: 1000).
|
||||
:return: JSON response
|
||||
"""
|
||||
|
||||
params = {'apikey': self.api_key, 'after': after, 'reports': reports, 'limit': limit}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'url/distribution', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_ip_report(self, this_ip):
|
||||
""" Get information about a given IP address.
|
||||
|
||||
Retrieves a report on a given IP address (including the information recorded by VirusTotal's Passive DNS
|
||||
infrastructure).
|
||||
|
||||
:param this_ip: A valid IPv4 address in dotted quad notation, for the time being only IPv4 addresses are
|
||||
supported.
|
||||
:return: JSON response
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'ip': this_ip}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'ip-address/report', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_domain_report(self, this_domain):
|
||||
""" Get information about a given domain.
|
||||
|
||||
Retrieves a report on a given domain (including the information recorded by VirusTotal's passive DNS
|
||||
infrastructure).
|
||||
|
||||
:param this_domain: A domain name.
|
||||
:return: JSON response
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'domain': this_domain}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'domain/report', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def put_comments(self, resource, comment):
|
||||
""" Post a comment on a file or URL.
|
||||
|
||||
Allows you to place comments on URLs and files, these comments will be publicly visible in VirusTotal
|
||||
Community, under the corresponding tab in the reports for each particular item.
|
||||
|
||||
Comments can range from URLs and locations where a given file was found in the wild to full reverse
|
||||
engineering reports on a given malware specimen, anything that may help other analysts in extending their
|
||||
knowledge about a particular file or URL.
|
||||
|
||||
:param resource: Either an md5/sha1/sha256 hash of the file you want to review or the URL itself that you want
|
||||
to comment on.
|
||||
:param comment: The actual review, you can tag it using the "#" twitter-like syntax (e.g. #disinfection #zbot)
|
||||
and reference users using the "@" syntax (e.g. @VirusTotalTeam).
|
||||
:return: JSON response
|
||||
"""
|
||||
params = {'apikey': self.api_key, 'resource': resource, 'comment': comment}
|
||||
|
||||
try:
|
||||
response = requests.post(self.base + 'comments/put', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
def get_comments(self, resource, before=None):
|
||||
""" Get comments for a file or URL.
|
||||
|
||||
Retrieve a list of VirusTotal Community comments for a given file or URL. VirusTotal Community comments are
|
||||
user submitted reviews on a given item, these comments may contain anything from the in-the-wild locations of
|
||||
files up to fully-featured reverse engineering reports on a given sample.
|
||||
|
||||
:param resource: Either an md5/sha1/sha256 hash of the file or the URL itself you want to retrieve.
|
||||
:param before: (optional) A datetime token that allows you to iterate over all comments on a specific item
|
||||
whenever it has been commented on more than 25 times.
|
||||
:return: JSON response - The application answers with the comments sorted in descending order according to
|
||||
their date.
|
||||
"""
|
||||
params = dict(apikey=self.api_key, resource=resource, before=before)
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'comments/get', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return _return_response_and_status_code(response)
|
||||
|
||||
|
||||
class IntelApi():
|
||||
""" To make the best use of your VirusTotal Intelligence account and so, we have exposed some
|
||||
VirusTotal Intelligence functionality for programmatic interaction even if you do not have a
|
||||
Private Mass API key.
|
||||
"""
|
||||
|
||||
def __init__(self, api_key, proxies=None):
|
||||
self.api_key = api_key
|
||||
self.proxies = proxies
|
||||
self.base = 'https://www.virustotal.com/intelligence/'
|
||||
|
||||
def get_hashes_from_search(self, query, page=None):
|
||||
""" Get the scan results for a file.
|
||||
|
||||
Even if you do not have a Private Mass API key that you can use, you can still automate VirusTotal Intelligence
|
||||
searches pretty much in the same way that the searching for files api call works.
|
||||
|
||||
:param query: a VirusTotal Intelligence search string in accordance with the file search documentation .
|
||||
<https://www.virustotal.com/intelligence/help/file-search/>
|
||||
:param page: the next_page property of the results of a previously issued query to this API. This parameter
|
||||
should not be provided if it is the very first query to the API, i.e. if we are retrieving the
|
||||
first page of results.
|
||||
apikey: the API key associated to a VirusTotal Community account with VirusTotal Intelligence privileges.
|
||||
"""
|
||||
params = {'query': query, 'apikey': self.api_key, 'page': page}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'search/programmatic/', params=params, proxies=self.proxies)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
return response.json()['next_page'], response
|
||||
|
||||
def get_file(self, file_hash, save_file_at):
|
||||
""" Get the scan results for a file.
|
||||
|
||||
Even if you do not have a Private Mass API key that you can use, you can still download files from the
|
||||
VirusTotal storage making use of your VirusTotal Intelligence quota, i.e. programmatic downloads will
|
||||
also deduct quota.
|
||||
|
||||
:param file_hash: You may use either the md5, sha1 or sha256 hash of the file in order to download it.
|
||||
:param save_file_at: Path of where to save the file.
|
||||
"""
|
||||
params = {'hash': file_hash, 'apikey': self.api_key}
|
||||
|
||||
try:
|
||||
response = requests.get(self.base + 'download/', params=params, proxies=self.proxies, stream=True)
|
||||
except requests.RequestException as e:
|
||||
return dict(error=e.message)
|
||||
|
||||
if response.status_code == requests.codes.ok:
|
||||
self.save_downloaded_file(file_hash, save_file_at, response.content)
|
||||
return response.content
|
||||
elif response.status_code == 403:
|
||||
return dict(error='You tried to perform calls to functions for which you require a Private API key.',
|
||||
response_code=response.status_code)
|
||||
elif response.status_code == 404:
|
||||
return dict(error='File not found.', response_code=response.status_code)
|
||||
else:
|
||||
return dict(response_code=response.status_code)
|
||||
|
||||
def get_all_file_report_pages(self, query):
|
||||
""" Get File Report (All Pages).
|
||||
|
||||
:param query: a VirusTotal Intelligence search string in accordance with the file search documentation.
|
||||
:return: All JSON responses appended together.
|
||||
"""
|
||||
responses = []
|
||||
next_page, response = self.get_hashes_from_search(self, query)
|
||||
responses.append(_return_response_and_status_code(response))
|
||||
while next_page:
|
||||
next_page, response = self.get_hashes_from_search(query, next_page)
|
||||
responses.append(_return_response_and_status_code(response))
|
||||
return dict(results=responses)
|
||||
|
||||
@staticmethod
|
||||
def save_downloaded_file(filename, save_file_at, file_stream):
|
||||
""" Save Downloaded File to Disk Helper Function
|
||||
|
||||
:param save_file_at: Path of where to save the file.
|
||||
:param file_stream: File stream
|
||||
:param filename: Name to save the file.
|
||||
"""
|
||||
filename = os.path.join(save_file_at, filename)
|
||||
with open(filename, 'wb') as f:
|
||||
f.write(file_stream)
|
||||
f.flush()
|
||||
|
||||
|
||||
class ApiError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _return_response_and_status_code(response):
|
||||
""" Output the requests response JSON and status code
|
||||
|
||||
:rtype : dict
|
||||
:param response: requests response object
|
||||
:return: dict containing the JSON response and/or the status code with error string.
|
||||
"""
|
||||
if response.status_code == requests.codes.ok:
|
||||
return dict(results=response.json(), response_code=response.status_code)
|
||||
elif response.status_code == 204:
|
||||
return dict(error='You exceeded the public API request rate limit (4 requests of any nature per minute)',
|
||||
response_code=response.status_code)
|
||||
elif response.status_code == 403:
|
||||
return dict(error='You tried to perform calls to functions for which you require a Private API key.',
|
||||
response_code=response.status_code)
|
||||
else:
|
||||
return dict(response_code=response.status_code)
|
Loading…
Reference in New Issue
Block a user