mirror of
https://github.com/valitydev/SigmaHQ.git
synced 2024-11-06 17:35:19 +00:00
2cbefb208b
Legacy __print__ statements are syntax errors in Python 3 but __print()__ function works as expected in both Python 2 and Python 3. [flake8](http://flake8.pycqa.org) testing of https://github.com/Neo23x0/sigma on Python 3.7.1 $ __flake8 . --count --select=E9,F63,F72,F82 --show-source --statistics__ ``` ./contrib/sigma2sumologic.py:123:5: F821 undefined name 'parser_print_help' parser_print_help() ^ ./contrib/sigma2sumologic.py:211:32: F821 undefined name 'r' f.write(json.dumps(r, indent=4, sort_keys=True) + " ERROR: %s\n\nQUERY: %s" % (e, sumo_query)) ^ ./contrib/sigma2elastalert.py:165:32: E999 SyntaxError: invalid syntax print "Converting file " + file ^ ./tools/sigma/parser/collection.py:52:27: F821 undefined name 'SigmaCollectionParseError' raise SigmaCollectionParseError("action 'repeat' is only applicable after first valid Sigma rule") ^ 1 E999 SyntaxError: invalid syntax 3 F821 undefined name 'parser_print_help' 4 ``` __E901,E999,F821,F822,F823__ are the "_showstopper_" [flake8](http://flake8.pycqa.org) issues that can halt the runtime with a SyntaxError, NameError, etc. These 5 are different from most other flake8 issues which are merely "style violations" -- useful for readability but they do not effect runtime safety. * F821: undefined name `name` * F822: undefined name `name` in `__all__` * F823: local variable name referenced before assignment * E901: SyntaxError or IndentationError * E999: SyntaxError -- failed to compile a file into an Abstract Syntax Tree
174 lines
5.9 KiB
Python
Executable File
174 lines
5.9 KiB
Python
Executable File
#!/usr/bin/python
|
|
# Copyright 2018 David Routin
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Lesser General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Lesser General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Lesser General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
"""
|
|
Project: sigma2elastalert.py
|
|
Date: 25 Feb 2018
|
|
Author: David ROUTIN (@Rewt_1)
|
|
Version: 1.0
|
|
Description: This script creates elastalert configuration files from Sigma SIEM rules.
|
|
"""
|
|
|
|
import re
|
|
import os
|
|
import glob
|
|
import subprocess
|
|
import argparse
|
|
import yaml
|
|
import traceback
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--eshost", help="Elasticsearch host", type=str, required=True)
|
|
parser.add_argument("--esport", help="Elasticsearch port", type=str, required=True)
|
|
parser.add_argument("--ruledir", help="sigma rule directory path to convert", type=str, required=True)
|
|
parser.add_argument("--index", help="Elasticsearch index name egs: \"winlogbeat-*\"", type=str, required=True)
|
|
parser.add_argument("--email", help="email address to send mail alert", type=str, required=True)
|
|
parser.add_argument("--outdir", help="output directory to create elastalert rules", type=str, required=True)
|
|
parser.add_argument("--sigmac", help="Sigmac location", default="../tools/sigmac", type=str)
|
|
parser.add_argument("--realerttime", help="Realert time (optional value, default 5 minutes)", type=str, default=5)
|
|
parser.add_argument("--debug", help="Show debug output", type=bool, default=False)
|
|
args = parser.parse_args()
|
|
|
|
custom_query_keys = ["sensor", "Hostname", "EventID", "src_ip", "dst_ip"]
|
|
|
|
|
|
template="""es_host: ESHOST
|
|
es_port: ESPORT
|
|
name: "TITLE"
|
|
description: "DESCRIPTION"
|
|
index: INDEX
|
|
filter:
|
|
- query:
|
|
query_string:
|
|
query: 'QUERY'
|
|
realert:
|
|
minutes: MINUTES
|
|
query_key: UNIQKEYS
|
|
type: any
|
|
include: UNIQKEYS
|
|
alert:
|
|
- "email"
|
|
|
|
# (required, email specific)
|
|
# a list of email addresses to send alerts to
|
|
email:
|
|
- "EMAIL"
|
|
"""
|
|
|
|
def return_json_obj(x,custom_query_keys):
|
|
"""
|
|
Function used to filter all ES query object as unique value including predefined list from custom_query_keys
|
|
:param x: must contains ES query output
|
|
:param custom_query_keys: takes the list of predefined element to match in document
|
|
:return: a clean list (set) of all the query keys (EventID,TargetUserName...)
|
|
"""
|
|
# type: (str, list) -> list
|
|
y = x.replace(" ", "\n").split()
|
|
out = set()
|
|
for i in y:
|
|
out.update(re.findall("([a-zA-Z]+)\:", i))
|
|
|
|
for qk in custom_query_keys:
|
|
try:
|
|
out.remove(qk)
|
|
except:
|
|
pass
|
|
out = list(out)
|
|
count = 0
|
|
for qk in custom_query_keys:
|
|
count += 1
|
|
out.insert(count-1, qk)
|
|
return out
|
|
|
|
def rule_element(file_content, elements):
|
|
"""
|
|
Function used to get specific element from yaml document and return content
|
|
:type file_content: str
|
|
:type elements: list
|
|
:param file_content:
|
|
:param elements: list of elements of the yaml document to get "title", "description"
|
|
:return: the value of the key in the yaml document
|
|
"""
|
|
try:
|
|
yaml.safe_load(file_content.replace("---",""))
|
|
except:
|
|
raise Exception('Unsupported')
|
|
element_output = ""
|
|
for e in elements:
|
|
try:
|
|
element_output = yaml.safe_load(file_content.replace("---",""))[e]
|
|
except:
|
|
pass
|
|
if element_output is None:
|
|
return ""
|
|
return element_output
|
|
|
|
def get_rule_as_esqs(file):
|
|
"""
|
|
Function used to get Elastic query output from rule fome
|
|
:type file: str
|
|
:param file: rule filename
|
|
:return: string es query
|
|
"""
|
|
if not os.path.exists(args.sigmac):
|
|
print("Cannot find sigmac rule coverter at '%s', please set a correct location via '--sigmac'")
|
|
cmd = [args.sigmac, file, "--target", "es-qs"]
|
|
output = subprocess.Popen(cmd,stdout=subprocess.PIPE, stderr=subprocess.STDOUT).stdout.read()
|
|
if "unsupported" in output:
|
|
raise Exception('Unsupported output at this time')
|
|
output = output.split("\n")
|
|
# Remove empty string from \n
|
|
output = [a for a in output if a]
|
|
# Handle case of multiple queries returned
|
|
if len(output) > 1:
|
|
return " OR ".join(output)
|
|
return "".join(output)
|
|
|
|
# Dictionary that contains args set at launch time
|
|
convert_args = {
|
|
"ESHOST": args.eshost,
|
|
"ESPORT": args.esport,
|
|
"INDEX": args.index,
|
|
"EMAIL": args.email,
|
|
"MINUTES": args.realerttime
|
|
}
|
|
|
|
for file in glob.glob(args.ruledir + "/*"):
|
|
output_elast_config = template
|
|
try:
|
|
print("Processing %s ..." % file)
|
|
with open(file, "rb") as f:
|
|
file_content = f.read()
|
|
|
|
# Dictionary that contains args with values returned by functions
|
|
translate_func = {'QUERY': get_rule_as_esqs(file),
|
|
'TITLE': rule_element(file_content, ["title", "name"]),
|
|
'DESCRIPTION': rule_element(file_content, ["description"]),
|
|
'UNIQKEYS': str(return_json_obj(get_rule_as_esqs(file), custom_query_keys))
|
|
}
|
|
for entry in convert_args:
|
|
output_elast_config = re.sub(entry, str(convert_args[entry]), output_elast_config)
|
|
for entry in translate_func:
|
|
output_elast_config = re.sub(entry, translate_func[entry], output_elast_config)
|
|
print("Converting file " + file)
|
|
with open(os.path.join(args.outdir, "sigma-" + file.split("/")[-1]), "w") as f:
|
|
f.write(output_elast_config)
|
|
except Exception as e:
|
|
if args.debug:
|
|
traceback.print_exc()
|
|
print("error " + str(file) + "----" + str(e))
|
|
pass
|
|
|