Added some phython and bash examples (#16657)

Scripts in Bash and Python that some others might find helpful, or
assist when thinking about vulnerability API/pages
This commit is contained in:
Grant Bilstad 2024-02-15 17:23:41 -06:00 committed by GitHub
parent 96bd31dc1f
commit 3c20cce575
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -68,3 +68,170 @@ export FLEET_ENV_PATH=./path/to/env/file/fleet_env
# run a live queries on hosts (queries with id=1 and id=2 on hosts with id=3 and id=4)
./tools/api/fleet/queries/run "[1,2]" "[3,4]"
```
Bash Script - Pulls all hosts based on software _name_ for your Fleet instance, uses jq. Helps if wanting to track down a particular software and see what hosts might be affected.
`./name.sh api_token software_title_id base_url`
```
#!/bin/bash
# Check if we have the correct number of arguments
if [ "$#" -ne 3 ]; then
echo "Usage: $0 <api_token> <software_title_id> <base_url>"
exit 1
fi
# Read arguments
API_TOKEN="$1"
SOFTWARE_TITLE_ID="$2"
BASE_URL="$3"
# Get the version IDs for the software title
VERSION_IDS=$(curl -s "${BASE_URL}/software/titles/${SOFTWARE_TITLE_ID}" \
-H 'accept: application/json, text/plain, */*' \
-H "authorization: Bearer ${API_TOKEN}" \
--compressed | jq '.software_title.versions[].id')
# Define a jq filter for deduplicating hosts by id
jq_filter='[.[] | {id: .id, hostname: .hostname}] | unique_by(.id)'
# Make a temporary file to hold all host entries
tmpfile=$(mktemp)
# Loop through each version ID and get the hosts
for version_id in $VERSION_IDS; do
# Fetch hosts for the current version ID
curl -s "${BASE_URL}/hosts?software_version_id=${version_id}" \
-H 'accept: application/json, text/plain, */*' \
-H "authorization: Bearer ${API_TOKEN}" \
--compressed | jq '.hosts[]' >> "$tmpfile"
done
# Deduplicate hosts by id and convert to a JSON array
jq -s "$jq_filter" "$tmpfile"
# Remove the temporary file
rm "$tmpfile"
```
Some quick Python to pull all Vuln software per host
Might be better to do this _backwards_ by host instead of by the software. Attempting to use parallel threading to make it run faster, only helps a little.
can adjust `structure` to display what info you want.
```
import requests
import time
import json
from concurrent.futures import ThreadPoolExecutor
# Define the base URL for the API
BASE_URL = "https://dogfood.fleetdm.com/api/latest" #change to your base url
# The headers for the HTTP requests, including the Authorization Bearer token
HEADERS = {
'Authorization': 'Bearer TOKEN', # Add your API token
'Content-Type': 'application/json'
}
# Initialize counters for API calls and hits
api_calls_counter = 0
hits_counter = 0
# Initialize a cache to store hosts for software versions
version_hosts_cache = {}
# Function to get all the software titles with vulnerabilities
def get_all_vulnerable_software_titles():
global api_calls_counter
endpoint = (f"{BASE_URL}/fleet/software/titles?scope=software-titles&page=0&per_page=1000&order_direction=desc&order_key=hosts_count&vulnerable=true") #note that this is set to 1k to try and capture "all" but might need to adjust
response = requests.get(endpoint, headers=HEADERS)
api_calls_counter += 1
if response.status_code == 200:
data = response.json()
return data.get('software_titles', [])
else:
raise Exception(f"Error fetching software titles: {response.text}")
# Function to get the hosts for a software version with caching
def get_hosts_for_software_version(version_id):
global api_calls_counter
global hits_counter
# Check the cache first
if version_id in version_hosts_cache:
return version_hosts_cache[version_id]
# If not cached, make the request
endpoint = f"{BASE_URL}/fleet/hosts?software_version_id={version_id}"
response = requests.get(endpoint, headers=HEADERS)
api_calls_counter += 1
if response.status_code == 200:
hosts = response.json().get('hosts', [])
hits_counter += len(hosts)
# Cache the result
version_hosts_cache[version_id] = hosts
return hosts
else:
raise Exception(f"Error fetching hosts for software version {version_id}: {response.text}")
# Function to fetch hosts for all vulnerable software versions in parallel using threading
def fetch_hosts_in_parallel(software_versions):
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_version_id = {executor.submit(get_hosts_for_software_version, v['id']): v['id'] for v in
software_versions}
for future in future_to_version_id:
future.result() # We wait for each call to complete here. The results are stored in the cache.
# Main function to build the desired structure
def build_structure():
global api_calls_counter
global hits_counter
api_calls_counter = 0
hits_counter = 0
software_titles = get_all_vulnerable_software_titles()
vulnerable_versions = [version for software in software_titles for version in software.get('versions', []) if
version.get('vulnerabilities')]
fetch_hosts_in_parallel(vulnerable_versions) # Fetch all hosts in parallel
structure = {}
for software in software_titles:
for version in software.get('versions', []):
if version.get('vulnerabilities'):
version_id = version['id']
hosts = version_hosts_cache.get(version_id, [])
for host in hosts:
host_id = host['id']
if host_id not in structure:
structure[host_id] = {
"hostname": host['hostname'],
"software": []
}
structure[host_id]['software'].append({
"version_id": str(version_id),
"software_id": str(software['id']),
"name": software['name']
})
return structure
# Run the main function and print results
if __name__ == "__main__":
start_time = time.time()
final_structure = build_structure()
print(json.dumps(final_structure, indent=2))
#print(f"Total time taken: {time.time() - start_time:.2f} seconds")
#print(f"API Calls: {api_calls_counter}")
#print(f"total number of software vulns: {hits_counter}")
```