mirror of
https://github.com/valitydev/SigmaHQ.git
synced 2024-11-06 09:25:17 +00:00
Merge branch 'script_entry_points'
This commit is contained in:
commit
24d83b80cd
4
Makefile
4
Makefile
@ -14,7 +14,7 @@ finish:
|
||||
test-rules:
|
||||
yamllint rules
|
||||
tests/test_rules.py
|
||||
tools/sigma-uuid -Ver rules/
|
||||
tools/sigma_uuid -Ver rules/
|
||||
|
||||
test-sigmac:
|
||||
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac
|
||||
@ -116,7 +116,7 @@ test-backend-sql:
|
||||
test-sigma2attack:
|
||||
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigma2attack
|
||||
|
||||
build: tools/sigmac tools/merge_sigma tools/sigma/*.py tools/setup.py tools/setup.cfg
|
||||
build: tools/sigma/*.py tools/setup.py tools/setup.cfg
|
||||
cd tools && python3 setup.py bdist_wheel sdist
|
||||
|
||||
upload-test: build
|
||||
|
@ -1,10 +1,10 @@
|
||||
#!/usr/bin/env python3
|
||||
# Remove all hunks from a patch that don't add the id attribute to minimize the impact (removed
|
||||
# comments etc.) of sigma-uuid script.
|
||||
# comments etc.) of sigma_uuid script.
|
||||
#
|
||||
# Usually used as follows:
|
||||
# 1. Add UUIDs to rules:
|
||||
# tools/sigma-uuid -er rules
|
||||
# tools/sigma_uuid -er rules
|
||||
# 2. Generate and filter patch
|
||||
# git diff | contrib/filter-uuid-patch > rule-uuid.diff
|
||||
# 3. Reset to previous state
|
||||
|
@ -1,38 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
# Merge a Sigma rule collection into full Sigma rules
|
||||
# Copyright 2017 Thomas Patzke
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
from sigma.merge_sigma import main
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
import yaml
|
||||
|
||||
from sigma.parser.collection import SigmaCollectionParser
|
||||
|
||||
argparser = argparse.ArgumentParser(description="Convert Sigma rules into SIEM signatures.")
|
||||
argparser.add_argument("input", help="Sigma input file")
|
||||
cmdargs = argparser.parse_args()
|
||||
|
||||
try:
|
||||
f = open(cmdargs.input, "r")
|
||||
except IOError as e:
|
||||
print("Error while opening input file: %s" % str(e), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
content = "".join(f.readlines())
|
||||
f.close()
|
||||
sc = SigmaCollectionParser(content)
|
||||
|
||||
print(yaml.dump_all(sc, default_flow_style=False))
|
||||
main()
|
||||
|
@ -13,7 +13,7 @@ with open(path.join(here, 'README.md'), encoding='utf-8') as f:
|
||||
|
||||
setup(
|
||||
name='sigmatools',
|
||||
version='0.16.0',
|
||||
version='0.17.0',
|
||||
description='Tools for the Generic Signature Format for SIEM Systems',
|
||||
long_description=long_description,
|
||||
long_description_content_type="text/markdown",
|
||||
@ -77,11 +77,14 @@ setup(
|
||||
'config/generic/sysmon.yml',
|
||||
'config/generic/windows-audit.yml',
|
||||
])],
|
||||
scripts=[
|
||||
'sigmac',
|
||||
'merge_sigma',
|
||||
'sigma2misp',
|
||||
'sigma-similarity',
|
||||
'sigma-uuid',
|
||||
]
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'sigmac = sigma.sigmac:main',
|
||||
'merge_sigma = sigma.merge_sigma:main',
|
||||
'sigma2misp = sigma.sigma2misp:main',
|
||||
'sigma2attack = sigma.sigma2attack:main',
|
||||
'sigma_similarity = sigma.sigma_similarity:main',
|
||||
'sigma_uuid = sigma.sigma_uuid:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
42
tools/sigma/merge_sigma.py
Executable file
42
tools/sigma/merge_sigma.py
Executable file
@ -0,0 +1,42 @@
|
||||
#!/usr/bin/env python3
|
||||
# Merge a Sigma rule collection into full Sigma rules
|
||||
# Copyright 2017 Thomas Patzke
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
import yaml
|
||||
|
||||
from sigma.parser.collection import SigmaCollectionParser
|
||||
|
||||
def main():
|
||||
argparser = argparse.ArgumentParser(description="Convert Sigma rules into SIEM signatures.")
|
||||
argparser.add_argument("input", help="Sigma input file")
|
||||
cmdargs = argparser.parse_args()
|
||||
|
||||
try:
|
||||
f = open(cmdargs.input, "r")
|
||||
except IOError as e:
|
||||
print("Error while opening input file: %s" % str(e), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
content = "".join(f.readlines())
|
||||
f.close()
|
||||
sc = SigmaCollectionParser(content)
|
||||
|
||||
print(yaml.dump_all(sc, default_flow_style=False))
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
73
tools/sigma/sigma2attack.py
Executable file
73
tools/sigma/sigma2attack.py
Executable file
@ -0,0 +1,73 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
import yaml
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument("--rules-directory", "-d", dest="rules_dir", default="rules", help="Directory to read rules from")
|
||||
parser.add_argument("--out-file", "-o", dest="out_file", default="heatmap.json", help="File to write the JSON layer to")
|
||||
parser.add_argument("--no-comment", dest="no_comment", action="store_true", help="Don't store rule names in comments")
|
||||
args = parser.parse_args()
|
||||
|
||||
rule_files = glob.glob(os.path.join(args.rules_dir, "**/*.yml"), recursive=True)
|
||||
techniques_to_rules = {}
|
||||
curr_max_technique_count = 0
|
||||
num_rules_used = 0
|
||||
for rule_file in rule_files:
|
||||
try:
|
||||
rule = yaml.safe_load(open(rule_file).read())
|
||||
except yaml.YAMLError:
|
||||
sys.stderr.write("Ignoring rule " + rule_file + " (parsing failed)\n")
|
||||
continue
|
||||
if "tags" not in rule:
|
||||
sys.stderr.write("Ignoring rule " + rule_file + " (no tags)\n")
|
||||
continue
|
||||
tags = rule["tags"]
|
||||
for tag in tags:
|
||||
if tag.lower().startswith("attack.t"):
|
||||
technique_id = tag[len("attack."):].upper()
|
||||
num_rules_used += 1
|
||||
if technique_id not in techniques_to_rules:
|
||||
techniques_to_rules[technique_id] = []
|
||||
techniques_to_rules[technique_id].append(os.path.basename(rule_file))
|
||||
curr_max_technique_count = max(curr_max_technique_count, len(techniques_to_rules[technique_id]))
|
||||
|
||||
|
||||
scores = []
|
||||
for technique in techniques_to_rules:
|
||||
entry = {
|
||||
"techniqueID": technique,
|
||||
"score": len(techniques_to_rules[technique]),
|
||||
}
|
||||
if not args.no_comment:
|
||||
entry["comment"] = "\n".join(techniques_to_rules[technique])
|
||||
|
||||
scores.append(entry)
|
||||
|
||||
output = {
|
||||
"domain": "mitre-enterprise",
|
||||
"name": "Sigma rules heatmap",
|
||||
"gradient": {
|
||||
"colors": [
|
||||
"#ffffff",
|
||||
"#ff6666"
|
||||
],
|
||||
"maxValue": curr_max_technique_count,
|
||||
"minValue": 0
|
||||
},
|
||||
"version": "2.2",
|
||||
"techniques": scores,
|
||||
}
|
||||
|
||||
with open(args.out_file, "w") as f:
|
||||
f.write(json.dumps(output))
|
||||
print("[*] Layer file written in " + args.out_file + " (" + str(num_rules_used) + " rules)")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
73
tools/sigma/sigma2misp.py
Executable file
73
tools/sigma/sigma2misp.py
Executable file
@ -0,0 +1,73 @@
|
||||
#!/usr/bin/env python3
|
||||
# Import given Sigma rules to MISP
|
||||
|
||||
import argparse
|
||||
import pathlib
|
||||
import urllib3
|
||||
urllib3.disable_warnings()
|
||||
from pymisp import PyMISP
|
||||
|
||||
def create_new_event():
|
||||
if hasattr(misp, "new_event"):
|
||||
return misp.new_event(info=args.info)["Event"]["id"]
|
||||
|
||||
event = misp.MISPEvent()
|
||||
event.info = args.info
|
||||
return misp.add_event(event)["Event"]["id"]
|
||||
|
||||
|
||||
class MISPImportArgumentParser(argparse.ArgumentParser):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(
|
||||
description="Import Sigma rules into MISP events",
|
||||
epilog="Parameters can be read from a file by a @filename parameter. The file should contain one parameter per line. Dashes may be omitted.",
|
||||
fromfile_prefix_chars="@",
|
||||
)
|
||||
|
||||
def convert_arg_line_to_args(self, line : str):
|
||||
return ("--" + line.lstrip("--")).split()
|
||||
|
||||
def main():
|
||||
argparser = MISPImportArgumentParser()
|
||||
argparser.add_argument("--url", "-u", default="https://localhost", help="URL of MISP instance")
|
||||
argparser.add_argument("--key", "-k", required=True, help="API key")
|
||||
argparser.add_argument("--insecure", "-I", action="store_false", help="Disable TLS certifcate validation.")
|
||||
argparser.add_argument("--event", "-e", type=int, help="Add Sigma rule to event with this ID. If not set, create new event.")
|
||||
argparser.add_argument("--same-event", "-s", action="store_true", help="Import all Sigma rules to the same event, if no event is set.")
|
||||
argparser.add_argument("--info", "-i", default="Sigma import", help="Event Information field for newly created MISP event.")
|
||||
argparser.add_argument("--recursive", "-r", action="store_true", help="Recursive traversal of directory")
|
||||
argparser.add_argument("sigma", nargs="+", help="Sigma rule file that should be imported")
|
||||
args = argparser.parse_args()
|
||||
|
||||
if args.recursive:
|
||||
paths = [ p for pathname in args.sigma for p in pathlib.Path(pathname).glob("**/*") if p.is_file() ]
|
||||
else:
|
||||
paths = [ pathlib.Path(sigma) for sigma in args.sigma ]
|
||||
|
||||
misp = PyMISP(args.url, args.key, args.insecure)
|
||||
if args.event:
|
||||
if hasattr(misp, "get"):
|
||||
eventid = misp.get(args.event)["Event"]["id"]
|
||||
else:
|
||||
eventid = misp.get_event(args.event)["Event"]["id"]
|
||||
|
||||
first = True
|
||||
|
||||
for sigma in paths:
|
||||
if not args.event and (first or not args.same_event):
|
||||
eventid = create_new_event()
|
||||
print("Importing Sigma rule {} into MISP event {}...".format(sigma, eventid, end=""))
|
||||
f = sigma.open("rt")
|
||||
|
||||
if hasattr(misp, "add_named_attribute"):
|
||||
misp.add_named_attribute(eventid, "sigma", f.read())
|
||||
else:
|
||||
event = misp.get_event(eventid, pythonify=True)
|
||||
event.add_attribute("sigma", f.read())
|
||||
misp.update_event(event)
|
||||
|
||||
f.close()
|
||||
first = False
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
105
tools/sigma/sigma_similarity.py
Executable file
105
tools/sigma/sigma_similarity.py
Executable file
@ -0,0 +1,105 @@
|
||||
#!/usr/bin/env python3
|
||||
# Calculates similarity of Sigma rules by transformation into a normalized
|
||||
# string form and calculation of a string distance.
|
||||
|
||||
import argparse
|
||||
import pathlib
|
||||
import itertools
|
||||
import difflib
|
||||
|
||||
import progressbar
|
||||
|
||||
from sigma.parser.collection import SigmaCollectionParser
|
||||
from sigma.backends.base import SingleTextQueryBackend
|
||||
from sigma.configuration import SigmaConfiguration
|
||||
|
||||
argparser = argparse.ArgumentParser(description="Calculate a similarity score between Sigma rules.")
|
||||
argparser.add_argument("--recursive", "-r", action="store_true", help="Recurse into directories")
|
||||
argparser.add_argument("--verbose", "-v", action="count", help="Be verbose. Use once more for debug output.")
|
||||
argparser.add_argument("--top", "-t", type=int, help="Only output the n most similar rule pairs.")
|
||||
argparser.add_argument("--min-similarity", "-m", type=int, help="Only output pairs with a similarity above this threshold (percent)")
|
||||
argparser.add_argument("--primary", "-p", help="File with list of paths to primary rules. If given, only rule combinations with at leat one primary rule are compared. Primary rules must also be contained in input rule set.")
|
||||
argparser.add_argument("inputs", nargs="+", help="Sigma input files")
|
||||
args = argparser.parse_args()
|
||||
|
||||
def print_verbose(level, *args, **kwargs):
|
||||
if args.verbose >= level:
|
||||
print(*args, **kwargs)
|
||||
|
||||
class SigmaNormalizationBackend(SingleTextQueryBackend):
|
||||
"""Normalization of a Sigma rule into a non-existing query language that supports all Sigma features"""
|
||||
andToken = " AND "
|
||||
orToken = " OR "
|
||||
notToken = " NOT "
|
||||
subExpression = "(%s)"
|
||||
listExpression = "[%s]"
|
||||
listSeparator = ","
|
||||
valueExpression = "%s"
|
||||
typedValueExpression = dict()
|
||||
nullExpression = "NULL(%s)"
|
||||
notNullExpression = "NOTNULL(%s)"
|
||||
mapExpression = "{'%s':'%s'}"
|
||||
|
||||
sort_condition_lists = True
|
||||
|
||||
def generateListNode(self, node):
|
||||
"""Return sorted list"""
|
||||
return super().generateListNode(list(sorted([ str(item) for item in node ])))
|
||||
|
||||
def generateTypedValueNode(self, node):
|
||||
"""Return normalized form of typed values"""
|
||||
return "type_{}({})".format(node.identifier, str(node))
|
||||
|
||||
def generateAggregation(self, agg):
|
||||
if agg.aggfunc_notrans == "near":
|
||||
return " near in={} ex={}".format(str(agg.include), str(agg.exclude))
|
||||
else:
|
||||
return " | {}({}) by {} {} {}".format(agg.aggfunc_notrans, agg.aggfield, agg.groupfield, agg.cond_op, agg.condition)
|
||||
|
||||
def main():
|
||||
backend = SigmaNormalizationBackend(SigmaConfiguration())
|
||||
|
||||
if args.recursive:
|
||||
paths = [ p for pathname in args.inputs for p in pathlib.Path(pathname).glob("**/*") if p.is_file() ]
|
||||
else:
|
||||
paths = [ pathlib.Path(pathname) for pathname in args.inputs ]
|
||||
|
||||
primary_paths = None
|
||||
if args.primary:
|
||||
with open(args.primary, "r") as f:
|
||||
primary_paths = { pathname.strip() for pathname in f.readlines() }
|
||||
|
||||
parsed = {
|
||||
str(path): SigmaCollectionParser(path.open().read())
|
||||
for path in paths
|
||||
}
|
||||
converted = {
|
||||
str(path): list(sigma_collection.generate(backend))
|
||||
for path, sigma_collection in parsed.items()
|
||||
}
|
||||
converted_flat = (
|
||||
(path, i, normalized)
|
||||
for path, nlist in converted.items()
|
||||
for i, normalized in zip(range(len(nlist)), nlist)
|
||||
)
|
||||
converted_pairs_iter = itertools.combinations(converted_flat, 2)
|
||||
if primary_paths:
|
||||
converted_pairs = [ pair for pair in converted_pairs_iter if pair[0][0] in primary_paths or pair[1][0] in paths ]
|
||||
else:
|
||||
converted_pairs = list(converted_pairs_iter)
|
||||
similarities = [
|
||||
(item1[:2], item2[:2], difflib.SequenceMatcher(None, item1[2], item2[2]).ratio())
|
||||
for item1, item2 in progressbar.progressbar(converted_pairs)
|
||||
]
|
||||
|
||||
i = 0
|
||||
for similarity in sorted(similarities, key=lambda s: s[2], reverse=True):
|
||||
if args.min_similarity and similarity[2] * 100 < args.min_similarity: # finish after similarity drops below minimum
|
||||
break
|
||||
print("{:70} | {:2} | {:70} | {:2} | {:>3.2%}".format(*similarity[0], *similarity[1], similarity[2]))
|
||||
i += 1
|
||||
if args.top and i >= args.top: # end after $top pairs
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
89
tools/sigma/sigma_uuid.py
Executable file
89
tools/sigma/sigma_uuid.py
Executable file
@ -0,0 +1,89 @@
|
||||
#!/usr/bin/env python3
|
||||
# Assign UUIDs to Sigma rules and verify UUID assignment for a Sigma rule repository
|
||||
|
||||
from argparse import ArgumentParser
|
||||
from pathlib import Path
|
||||
from uuid import uuid4, UUID
|
||||
import yaml
|
||||
from sigma.output import SigmaYAMLDumper
|
||||
|
||||
def print_verbose(*arg, **kwarg):
|
||||
print(*arg, **kwarg)
|
||||
|
||||
# Define order-preserving representer from dicts/maps
|
||||
def yaml_preserve_order(self, dict_data):
|
||||
return self.represent_mapping("tag:yaml.org,2002:map", dict_data.items())
|
||||
|
||||
def main():
|
||||
argparser = ArgumentParser(description="Assign and verfify UUIDs of Sigma rules")
|
||||
argparser.add_argument("--verify", "-V", action="store_true", help="Verify existence and uniqueness of UUID assignments. Exits with error code if verification fails.")
|
||||
argparser.add_argument("--verbose", "-v", action="store_true", help="Be verbose.")
|
||||
argparser.add_argument("--recursive", "-r", action="store_true", help="Recurse into directories.")
|
||||
argparser.add_argument("--error", "-e", action="store_true", help="Exit with error code 10 on verification failures.")
|
||||
argparser.add_argument("inputs", nargs="+", help="Sigma rule files or repository directories")
|
||||
args = argparser.parse_args()
|
||||
|
||||
if args.verbose:
|
||||
print_verbose()
|
||||
|
||||
if args.recursive:
|
||||
paths = [ p for pathname in args.inputs for p in Path(pathname).glob("**/*") if p.is_file() ]
|
||||
else:
|
||||
paths = [ Path(pathname) for pathname in args.inputs ]
|
||||
|
||||
yaml.add_representer(dict, yaml_preserve_order)
|
||||
|
||||
uuids = set()
|
||||
passed = True
|
||||
for path in paths:
|
||||
print_verbose("Rule {}".format(str(path)))
|
||||
with path.open("r") as f:
|
||||
rules = list(yaml.safe_load_all(f))
|
||||
|
||||
if args.verify:
|
||||
i = 1
|
||||
for rule in rules:
|
||||
if "title" in rule: # Rule with a title should also have a UUID
|
||||
try:
|
||||
UUID(rule["id"])
|
||||
except ValueError: # id is not a valid UUID
|
||||
print("Rule {} in file {} has a malformed UUID '{}'.".format(i, str(path), rule["id"]))
|
||||
passed = False
|
||||
except KeyError: # rule has no id
|
||||
print("Rule {} in file {} has no UUID.".format(i, str(path)))
|
||||
passed = False
|
||||
i += 1
|
||||
else:
|
||||
newrules = list()
|
||||
changed = False
|
||||
i = 1
|
||||
for rule in rules:
|
||||
if "title" in rule and "id" not in rule: # only assign id to rules that have a title and no id
|
||||
newrule = dict()
|
||||
changed = True
|
||||
for k, v in rule.items():
|
||||
newrule[k] = v
|
||||
if k == "title": # insert id after title
|
||||
uuid = uuid4()
|
||||
newrule["id"] = str(uuid)
|
||||
print("Assigned UUID '{}' to rule {} in file {}.".format(uuid, i, str(path)))
|
||||
newrules.append(newrule)
|
||||
else:
|
||||
newrules.append(rule)
|
||||
i += 1
|
||||
|
||||
if changed:
|
||||
with path.open("w") as f:
|
||||
yaml.dump_all(newrules, f, Dumper=SigmaYAMLDumper, indent=4, width=160, default_flow_style=False)
|
||||
|
||||
if not passed:
|
||||
print("The Sigma rules listed above don't have an ID. The ID must be:")
|
||||
print("* Contained in the 'id' attribute")
|
||||
print("* a valid UUIDv4 (randomly generated)")
|
||||
print("* Unique in this repository")
|
||||
print("Please generate one with the sigma_uuid tool or here: https://www.uuidgenerator.net/version4")
|
||||
if args.error:
|
||||
exit(10)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
297
tools/sigma/sigmac.py
Executable file
297
tools/sigma/sigmac.py
Executable file
@ -0,0 +1,297 @@
|
||||
#!/usr/bin/env python3
|
||||
# A Sigma to SIEM converter
|
||||
# Copyright 2016-2017 Thomas Patzke, Florian Roth
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
import yaml
|
||||
import json
|
||||
import pathlib
|
||||
import itertools
|
||||
import logging
|
||||
from sigma.parser.collection import SigmaCollectionParser
|
||||
from sigma.parser.exceptions import SigmaCollectionParseError, SigmaParseError
|
||||
from sigma.configuration import SigmaConfiguration, SigmaConfigurationChain
|
||||
from sigma.config.collection import SigmaConfigurationManager
|
||||
from sigma.config.exceptions import SigmaConfigParseError, SigmaRuleFilterParseException
|
||||
from sigma.filter import SigmaRuleFilter
|
||||
import sigma.backends.discovery as backends
|
||||
from sigma.backends.base import BackendOptions
|
||||
from sigma.backends.exceptions import BackendError, NotSupportedError, PartialMatchError, FullMatchError
|
||||
from sigma.parser.modifiers import modifiers
|
||||
import codecs
|
||||
|
||||
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
|
||||
|
||||
# Error codes
|
||||
|
||||
ERR_OUTPUT = 1
|
||||
ERR_INVALID_YAML = 3
|
||||
ERR_SIGMA_PARSING = 4
|
||||
ERR_OPEN_SIGMA_RULE = 5
|
||||
ERR_OPEN_CONFIG_FILE = 5
|
||||
ERR_CONFIG_INVALID_YAML = 6
|
||||
ERR_CONFIG_PARSING = 6
|
||||
ERR_BACKEND = 8
|
||||
ERR_NOT_SUPPORTED = 9
|
||||
ERR_NO_TARGET = 10
|
||||
ERR_RULE_FILTER_PARSING = 11
|
||||
ERR_CONFIG_REQUIRED = 20
|
||||
ERR_CONFIG_ORDER = 21
|
||||
ERR_CONFIG_BACKEND = 22
|
||||
ERR_NOT_IMPLEMENTED = 42
|
||||
ERR_PARTIAL_FIELD_MATCH = 80
|
||||
ERR_FULL_FIELD_MATCH = 90
|
||||
|
||||
def alliter(path):
|
||||
for sub in path.iterdir():
|
||||
if sub.name.startswith("."):
|
||||
continue
|
||||
if sub.is_dir():
|
||||
yield from alliter(sub)
|
||||
else:
|
||||
yield sub
|
||||
|
||||
def get_inputs(paths, recursive):
|
||||
if paths == ['-']:
|
||||
return [sys.stdin]
|
||||
|
||||
if recursive:
|
||||
return list(itertools.chain.from_iterable([list(alliter(pathlib.Path(p))) for p in paths]))
|
||||
else:
|
||||
return [pathlib.Path(p) for p in paths]
|
||||
|
||||
class ActionBackendHelp(argparse.Action):
|
||||
def __call__(self, parser, ns, vals, opt):
|
||||
backend = backends.getBackend(vals)
|
||||
if len(backend.options) > 0:
|
||||
helptext = "Backend options for " + backend.identifier + "\n"
|
||||
for option, default, help, _ in backend.options:
|
||||
helptext += " {:10}: {} (default: {})".format(option, help, default) + "\n"
|
||||
|
||||
print(helptext)
|
||||
exit(0)
|
||||
|
||||
def set_argparser():
|
||||
"""Sets up and parses the command line arguments for Sigmac.
|
||||
Returns the argparser"""
|
||||
argparser = argparse.ArgumentParser(description="Convert Sigma rules into SIEM signatures.")
|
||||
argparser.add_argument("--recurse", "-r", action="store_true", help="Use directory as input (recurse into subdirectories is not implemented yet)")
|
||||
argparser.add_argument("--filter", "-f", help="""
|
||||
Define comma-separated filters that must match (AND-linked) to rule to be processed.
|
||||
Valid filters: level<=x, level>=x, level=x, status=y, logsource=z, tag=t.
|
||||
x is one of: low, medium, high, critical.
|
||||
y is one of: experimental, testing, stable.
|
||||
z is a word appearing in an arbitrary log source attribute.
|
||||
t is a tag that must appear in the rules tag list, case-insensitive matching.
|
||||
Multiple log source specifications are AND linked.
|
||||
""")
|
||||
argparser.add_argument("--target", "-t", choices=backends.getBackendDict().keys(), help="Output target format")
|
||||
argparser.add_argument("--lists", "-l", action="store_true", help="List available output target formats and configurations")
|
||||
argparser.add_argument("--config", "-c", action="append", help="Configurations with field name and index mapping for target environment. Multiple configurations are merged into one. Last config is authorative in case of conflicts.")
|
||||
argparser.add_argument("--output", "-o", default=None, help="Output file or filename prefix if multiple files are generated")
|
||||
argparser.add_argument("--backend-option", "-O", action="append", help="Options and switches that are passed to the backend")
|
||||
argparser.add_argument("--backend-config", "-C", help="Configuration file (YAML format) containing options to pass to the backend")
|
||||
argparser.add_argument("--backend-help", action=ActionBackendHelp, help="Print backend options")
|
||||
argparser.add_argument("--defer-abort", "-d", action="store_true", help="Don't abort on parse or conversion errors, proceed with next rule. The exit code from the last error is returned")
|
||||
argparser.add_argument("--ignore-backend-errors", "-I", action="store_true", help="Only return error codes for parse errors and ignore errors for rules that cause backend errors. Useful, when you want to get as much queries as possible.")
|
||||
argparser.add_argument("--shoot-yourself-in-the-foot", action="store_true", help=argparse.SUPPRESS)
|
||||
argparser.add_argument("--verbose", "-v", action="store_true", help="Be verbose")
|
||||
argparser.add_argument("--debug", "-D", action="store_true", help="Debugging output")
|
||||
argparser.add_argument("inputs", nargs="*", help="Sigma input files ('-' for stdin)")
|
||||
|
||||
return argparser
|
||||
|
||||
def list_backends(debug):
|
||||
for backend in sorted(backends.getBackendList(), key=lambda backend: backend.identifier):
|
||||
if debug:
|
||||
print("{:>15} : {} ({})".format(backend.identifier, backend.__doc__, backend.__name__))
|
||||
else:
|
||||
print("{:>15} : {}".format(backend.identifier, backend.__doc__))
|
||||
|
||||
def list_configurations(backend=None, scm=None):
|
||||
for conf_id, title, backends in sorted(scm.list(), key=lambda config: config[0]):
|
||||
if backend is not None and backend in backends or backend is None or len(backends) == 0:
|
||||
print("{:>30} : {}".format(conf_id, title))
|
||||
|
||||
def list_modifiers(modifiers):
|
||||
for modifier_id, modifier in modifiers.items():
|
||||
print("{:>10} : {}".format(modifier_id, modifier.__doc__))
|
||||
|
||||
def main():
|
||||
argparser = set_argparser()
|
||||
cmdargs = argparser.parse_args()
|
||||
|
||||
scm = SigmaConfigurationManager()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
if cmdargs.debug: # pragma: no cover
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
if cmdargs.lists:
|
||||
print("Backends:")
|
||||
list_backends(cmdargs.debug)
|
||||
|
||||
print()
|
||||
print("Configurations:")
|
||||
list_configurations(backend=cmdargs.target, scm=scm)
|
||||
|
||||
print()
|
||||
print("Modifiers:")
|
||||
list_modifiers(modifiers=modifiers)
|
||||
sys.exit(0)
|
||||
elif len(cmdargs.inputs) == 0:
|
||||
print("Nothing to do!")
|
||||
argparser.print_usage()
|
||||
sys.exit(0)
|
||||
|
||||
if cmdargs.target is None:
|
||||
print("No target selected, select one with -t/--target")
|
||||
argparser.print_usage()
|
||||
sys.exit(ERR_NO_TARGET)
|
||||
|
||||
rulefilter = None
|
||||
if cmdargs.filter:
|
||||
try:
|
||||
rulefilter = SigmaRuleFilter(cmdargs.filter)
|
||||
except SigmaRuleFilterParseException as e:
|
||||
print("Parse error in Sigma rule filter expression: %s" % str(e), file=sys.stderr)
|
||||
sys.exit(ERR_RULE_FILTER_PARSING)
|
||||
|
||||
sigmaconfigs = SigmaConfigurationChain()
|
||||
backend_class = backends.getBackend(cmdargs.target)
|
||||
if cmdargs.config is None:
|
||||
if backend_class.config_required and not cmdargs.shoot_yourself_in_the_foot:
|
||||
print("The backend you want to use usually requires a configuration to generate valid results. Please provide one with --config/-c.", file=sys.stderr)
|
||||
print("Available choices for this backend (get complete list with --lists/-l):")
|
||||
list_configurations(backend=cmdargs.target, scm=scm)
|
||||
sys.exit(ERR_CONFIG_REQUIRED)
|
||||
if backend_class.default_config is not None:
|
||||
cmdargs.config = backend_class.default_config
|
||||
|
||||
if cmdargs.config:
|
||||
order = 0
|
||||
for conf_name in cmdargs.config:
|
||||
try:
|
||||
sigmaconfig = scm.get(conf_name)
|
||||
if sigmaconfig.order is not None:
|
||||
if sigmaconfig.order <= order and not cmdargs.shoot_yourself_in_the_foot:
|
||||
print("The configurations were provided in the wrong order (order key check in config file)", file=sys.stderr)
|
||||
sys.exit(ERR_CONFIG_ORDER)
|
||||
order = sigmaconfig.order
|
||||
|
||||
try:
|
||||
if cmdargs.target not in sigmaconfig.config["backends"]:
|
||||
print("The configuration '{}' is not valid for backend '{}'. Valid choices are: {}".format(conf_name, cmdargs.target, ", ".join(sigmaconfig.config["backends"])), file=sys.stderr)
|
||||
sys.exit(ERR_CONFIG_ORDER)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
sigmaconfigs.append(sigmaconfig)
|
||||
except OSError as e:
|
||||
print("Failed to open Sigma configuration file %s: %s" % (conf_name, str(e)), file=sys.stderr)
|
||||
exit(ERR_OPEN_CONFIG_FILE)
|
||||
except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e:
|
||||
print("Sigma configuration file %s is no valid YAML: %s" % (conf_name, str(e)), file=sys.stderr)
|
||||
exit(ERR_CONFIG_INVALID_YAML)
|
||||
except SigmaConfigParseError as e:
|
||||
print("Sigma configuration parse error in %s: %s" % (conf_name, str(e)), file=sys.stderr)
|
||||
exit(ERR_CONFIG_PARSING)
|
||||
|
||||
backend_options = BackendOptions(cmdargs.backend_option, cmdargs.backend_config)
|
||||
backend = backend_class(sigmaconfigs, backend_options)
|
||||
|
||||
filename = cmdargs.output
|
||||
if filename:
|
||||
try:
|
||||
out = open(filename, "w", encoding='utf-8')
|
||||
except (IOError, OSError) as e:
|
||||
print("Failed to open output file '%s': %s" % (filename, str(e)), file=sys.stderr)
|
||||
exit(ERR_OUTPUT)
|
||||
else:
|
||||
out = sys.stdout
|
||||
|
||||
error = 0
|
||||
for sigmafile in get_inputs(cmdargs.inputs, cmdargs.recurse):
|
||||
logger.debug("* Processing Sigma input %s" % (sigmafile))
|
||||
try:
|
||||
if cmdargs.inputs == ['-']:
|
||||
f = sigmafile
|
||||
else:
|
||||
f = sigmafile.open(encoding='utf-8')
|
||||
parser = SigmaCollectionParser(f, sigmaconfigs, rulefilter)
|
||||
results = parser.generate(backend)
|
||||
for result in results:
|
||||
print(result, file=out)
|
||||
except OSError as e:
|
||||
print("Failed to open Sigma file %s: %s" % (sigmafile, str(e)), file=sys.stderr)
|
||||
error = ERR_OPEN_SIGMA_RULE
|
||||
except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e:
|
||||
print("Sigma file %s is no valid YAML: %s" % (sigmafile, str(e)), file=sys.stderr)
|
||||
error = ERR_INVALID_YAML
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
except (SigmaParseError, SigmaCollectionParseError) as e:
|
||||
print("Sigma parse error in %s: %s" % (sigmafile, str(e)), file=sys.stderr)
|
||||
error = ERR_SIGMA_PARSING
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
except NotSupportedError as e:
|
||||
print("The Sigma rule requires a feature that is not supported by the target system: " + str(e), file=sys.stderr)
|
||||
if not cmdargs.ignore_backend_errors:
|
||||
error = ERR_NOT_SUPPORTED
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
except BackendError as e:
|
||||
print("Backend error in %s: %s" % (sigmafile, str(e)), file=sys.stderr)
|
||||
if not cmdargs.ignore_backend_errors:
|
||||
error = ERR_BACKEND
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
except (NotImplementedError, TypeError) as e:
|
||||
print("An unsupported feature is required for this Sigma rule (%s): " % (sigmafile) + str(e), file=sys.stderr)
|
||||
print("Feel free to contribute for fun and fame, this is open source :) -> https://github.com/Neo23x0/sigma", file=sys.stderr)
|
||||
if not cmdargs.ignore_backend_errors:
|
||||
error = ERR_NOT_IMPLEMENTED
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
except PartialMatchError as e:
|
||||
print("Partial field match error: %s" % str(e), file=sys.stderr)
|
||||
if not cmdargs.ignore_backend_errors:
|
||||
error = ERR_PARTIAL_FIELD_MATCH
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
except FullMatchError as e:
|
||||
print("Full field match error", file=sys.stderr)
|
||||
if not cmdargs.ignore_backend_errors:
|
||||
error = ERR_FULL_FIELD_MATCH
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
finally:
|
||||
try:
|
||||
f.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
result = backend.finalize()
|
||||
if result:
|
||||
print(result, file=out)
|
||||
out.close()
|
||||
|
||||
sys.exit(error)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -1,69 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from sigma.sigma2attack import main
|
||||
|
||||
import yaml
|
||||
|
||||
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument("--rules-directory", "-d", dest="rules_dir", default="rules", help="Directory to read rules from")
|
||||
parser.add_argument("--out-file", "-o", dest="out_file", default="heatmap.json", help="File to write the JSON layer to")
|
||||
parser.add_argument("--no-comment", dest="no_comment", action="store_true", help="Don't store rule names in comments")
|
||||
args = parser.parse_args()
|
||||
|
||||
rule_files = glob.glob(os.path.join(args.rules_dir, "**/*.yml"), recursive=True)
|
||||
techniques_to_rules = {}
|
||||
curr_max_technique_count = 0
|
||||
num_rules_used = 0
|
||||
for rule_file in rule_files:
|
||||
try:
|
||||
rule = yaml.safe_load(open(rule_file).read())
|
||||
except yaml.YAMLError:
|
||||
sys.stderr.write("Ignoring rule " + rule_file + " (parsing failed)\n")
|
||||
continue
|
||||
if "tags" not in rule:
|
||||
sys.stderr.write("Ignoring rule " + rule_file + " (no tags)\n")
|
||||
continue
|
||||
tags = rule["tags"]
|
||||
for tag in tags:
|
||||
if tag.lower().startswith("attack.t"):
|
||||
technique_id = tag[len("attack."):].upper()
|
||||
num_rules_used += 1
|
||||
if technique_id not in techniques_to_rules:
|
||||
techniques_to_rules[technique_id] = []
|
||||
techniques_to_rules[technique_id].append(os.path.basename(rule_file))
|
||||
curr_max_technique_count = max(curr_max_technique_count, len(techniques_to_rules[technique_id]))
|
||||
|
||||
|
||||
scores = []
|
||||
for technique in techniques_to_rules:
|
||||
entry = {
|
||||
"techniqueID": technique,
|
||||
"score": len(techniques_to_rules[technique]),
|
||||
}
|
||||
if not args.no_comment:
|
||||
entry["comment"] = "\n".join(techniques_to_rules[technique])
|
||||
|
||||
scores.append(entry)
|
||||
|
||||
output = {
|
||||
"domain": "mitre-enterprise",
|
||||
"name": "Sigma rules heatmap",
|
||||
"gradient": {
|
||||
"colors": [
|
||||
"#ffffff",
|
||||
"#ff6666"
|
||||
],
|
||||
"maxValue": curr_max_technique_count,
|
||||
"minValue": 0
|
||||
},
|
||||
"version": "2.2",
|
||||
"techniques": scores,
|
||||
}
|
||||
|
||||
with open(args.out_file, "w") as f:
|
||||
f.write(json.dumps(output))
|
||||
print("[*] Layer file written in " + args.out_file + " (" + str(num_rules_used) + " rules)")
|
||||
main()
|
||||
|
@ -1,69 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
# Import given Sigma rules to MISP
|
||||
|
||||
import argparse
|
||||
import pathlib
|
||||
import urllib3
|
||||
urllib3.disable_warnings()
|
||||
from pymisp import PyMISP
|
||||
from sigma.sigma2misp import main
|
||||
|
||||
def create_new_event():
|
||||
if hasattr(misp, "new_event"):
|
||||
return misp.new_event(info=args.info)["Event"]["id"]
|
||||
|
||||
event = misp.MISPEvent()
|
||||
event.info = args.info
|
||||
return misp.add_event(event)["Event"]["id"]
|
||||
|
||||
|
||||
class MISPImportArgumentParser(argparse.ArgumentParser):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(
|
||||
description="Import Sigma rules into MISP events",
|
||||
epilog="Parameters can be read from a file by a @filename parameter. The file should contain one parameter per line. Dashes may be omitted.",
|
||||
fromfile_prefix_chars="@",
|
||||
)
|
||||
|
||||
def convert_arg_line_to_args(self, line : str):
|
||||
return ("--" + line.lstrip("--")).split()
|
||||
|
||||
argparser = MISPImportArgumentParser()
|
||||
argparser.add_argument("--url", "-u", default="https://localhost", help="URL of MISP instance")
|
||||
argparser.add_argument("--key", "-k", required=True, help="API key")
|
||||
argparser.add_argument("--insecure", "-I", action="store_false", help="Disable TLS certifcate validation.")
|
||||
argparser.add_argument("--event", "-e", type=int, help="Add Sigma rule to event with this ID. If not set, create new event.")
|
||||
argparser.add_argument("--same-event", "-s", action="store_true", help="Import all Sigma rules to the same event, if no event is set.")
|
||||
argparser.add_argument("--info", "-i", default="Sigma import", help="Event Information field for newly created MISP event.")
|
||||
argparser.add_argument("--recursive", "-r", action="store_true", help="Recursive traversal of directory")
|
||||
argparser.add_argument("sigma", nargs="+", help="Sigma rule file that should be imported")
|
||||
args = argparser.parse_args()
|
||||
|
||||
if args.recursive:
|
||||
paths = [ p for pathname in args.sigma for p in pathlib.Path(pathname).glob("**/*") if p.is_file() ]
|
||||
else:
|
||||
paths = [ pathlib.Path(sigma) for sigma in args.sigma ]
|
||||
|
||||
misp = PyMISP(args.url, args.key, args.insecure)
|
||||
if args.event:
|
||||
if hasattr(misp, "get"):
|
||||
eventid = misp.get(args.event)["Event"]["id"]
|
||||
else:
|
||||
eventid = misp.get_event(args.event)["Event"]["id"]
|
||||
|
||||
first = True
|
||||
|
||||
for sigma in paths:
|
||||
if not args.event and (first or not args.same_event):
|
||||
eventid = create_new_event()
|
||||
print("Importing Sigma rule {} into MISP event {}...".format(sigma, eventid, end=""))
|
||||
f = sigma.open("rt")
|
||||
|
||||
if hasattr(misp, "add_named_attribute"):
|
||||
misp.add_named_attribute(eventid, "sigma", f.read())
|
||||
else:
|
||||
event = misp.get_event(eventid, pythonify=True)
|
||||
event.add_attribute("sigma", f.read())
|
||||
misp.update_event(event)
|
||||
|
||||
f.close()
|
||||
first = False
|
||||
main()
|
||||
|
5
tools/sigma_similarity
Executable file
5
tools/sigma_similarity
Executable file
@ -0,0 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from sigma.sigmac import main
|
||||
|
||||
main()
|
5
tools/sigma_uuid
Executable file
5
tools/sigma_uuid
Executable file
@ -0,0 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from sigma.sigma_uuid import main
|
||||
|
||||
main()
|
292
tools/sigmac
292
tools/sigmac
@ -1,293 +1,5 @@
|
||||
#!/usr/bin/env python3
|
||||
# A Sigma to SIEM converter
|
||||
# Copyright 2016-2017 Thomas Patzke, Florian Roth
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
from sigma.sigmac import main
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
import yaml
|
||||
import json
|
||||
import pathlib
|
||||
import itertools
|
||||
import logging
|
||||
from sigma.parser.collection import SigmaCollectionParser
|
||||
from sigma.parser.exceptions import SigmaCollectionParseError, SigmaParseError
|
||||
from sigma.configuration import SigmaConfiguration, SigmaConfigurationChain
|
||||
from sigma.config.collection import SigmaConfigurationManager
|
||||
from sigma.config.exceptions import SigmaConfigParseError, SigmaRuleFilterParseException
|
||||
from sigma.filter import SigmaRuleFilter
|
||||
import sigma.backends.discovery as backends
|
||||
from sigma.backends.base import BackendOptions
|
||||
from sigma.backends.exceptions import BackendError, NotSupportedError, PartialMatchError, FullMatchError
|
||||
from sigma.parser.modifiers import modifiers
|
||||
import codecs
|
||||
|
||||
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
|
||||
|
||||
# Error codes
|
||||
|
||||
ERR_OUTPUT = 1
|
||||
ERR_INVALID_YAML = 3
|
||||
ERR_SIGMA_PARSING = 4
|
||||
ERR_OPEN_SIGMA_RULE = 5
|
||||
ERR_OPEN_CONFIG_FILE = 5
|
||||
ERR_CONFIG_INVALID_YAML = 6
|
||||
ERR_CONFIG_PARSING = 6
|
||||
ERR_BACKEND = 8
|
||||
ERR_NOT_SUPPORTED = 9
|
||||
ERR_NO_TARGET = 10
|
||||
ERR_RULE_FILTER_PARSING = 11
|
||||
ERR_CONFIG_REQUIRED = 20
|
||||
ERR_CONFIG_ORDER = 21
|
||||
ERR_CONFIG_BACKEND = 22
|
||||
ERR_NOT_IMPLEMENTED = 42
|
||||
ERR_PARTIAL_FIELD_MATCH = 80
|
||||
ERR_FULL_FIELD_MATCH = 90
|
||||
|
||||
def alliter(path):
|
||||
for sub in path.iterdir():
|
||||
if sub.name.startswith("."):
|
||||
continue
|
||||
if sub.is_dir():
|
||||
yield from alliter(sub)
|
||||
else:
|
||||
yield sub
|
||||
|
||||
def get_inputs(paths, recursive):
|
||||
if paths == ['-']:
|
||||
return [sys.stdin]
|
||||
|
||||
if recursive:
|
||||
return list(itertools.chain.from_iterable([list(alliter(pathlib.Path(p))) for p in paths]))
|
||||
else:
|
||||
return [pathlib.Path(p) for p in paths]
|
||||
|
||||
class ActionBackendHelp(argparse.Action):
|
||||
def __call__(self, parser, ns, vals, opt):
|
||||
backend = backends.getBackend(vals)
|
||||
if len(backend.options) > 0:
|
||||
helptext = "Backend options for " + backend.identifier + "\n"
|
||||
for option, default, help, _ in backend.options:
|
||||
helptext += " {:10}: {} (default: {})".format(option, help, default) + "\n"
|
||||
|
||||
print(helptext)
|
||||
exit(0)
|
||||
|
||||
def set_argparser():
|
||||
"""Sets up and parses the command line arguments for Sigmac.
|
||||
Returns the argparser"""
|
||||
argparser = argparse.ArgumentParser(description="Convert Sigma rules into SIEM signatures.")
|
||||
argparser.add_argument("--recurse", "-r", action="store_true", help="Use directory as input (recurse into subdirectories is not implemented yet)")
|
||||
argparser.add_argument("--filter", "-f", help="""
|
||||
Define comma-separated filters that must match (AND-linked) to rule to be processed.
|
||||
Valid filters: level<=x, level>=x, level=x, status=y, logsource=z, tag=t.
|
||||
x is one of: low, medium, high, critical.
|
||||
y is one of: experimental, testing, stable.
|
||||
z is a word appearing in an arbitrary log source attribute.
|
||||
t is a tag that must appear in the rules tag list, case-insensitive matching.
|
||||
Multiple log source specifications are AND linked.
|
||||
""")
|
||||
argparser.add_argument("--target", "-t", choices=backends.getBackendDict().keys(), help="Output target format")
|
||||
argparser.add_argument("--lists", "-l", action="store_true", help="List available output target formats and configurations")
|
||||
argparser.add_argument("--config", "-c", action="append", help="Configurations with field name and index mapping for target environment. Multiple configurations are merged into one. Last config is authorative in case of conflicts.")
|
||||
argparser.add_argument("--output", "-o", default=None, help="Output file or filename prefix if multiple files are generated")
|
||||
argparser.add_argument("--backend-option", "-O", action="append", help="Options and switches that are passed to the backend")
|
||||
argparser.add_argument("--backend-config", "-C", help="Configuration file (YAML format) containing options to pass to the backend")
|
||||
argparser.add_argument("--backend-help", action=ActionBackendHelp, help="Print backend options")
|
||||
argparser.add_argument("--defer-abort", "-d", action="store_true", help="Don't abort on parse or conversion errors, proceed with next rule. The exit code from the last error is returned")
|
||||
argparser.add_argument("--ignore-backend-errors", "-I", action="store_true", help="Only return error codes for parse errors and ignore errors for rules that cause backend errors. Useful, when you want to get as much queries as possible.")
|
||||
argparser.add_argument("--shoot-yourself-in-the-foot", action="store_true", help=argparse.SUPPRESS)
|
||||
argparser.add_argument("--verbose", "-v", action="store_true", help="Be verbose")
|
||||
argparser.add_argument("--debug", "-D", action="store_true", help="Debugging output")
|
||||
argparser.add_argument("inputs", nargs="*", help="Sigma input files ('-' for stdin)")
|
||||
|
||||
return argparser
|
||||
|
||||
argparser = set_argparser()
|
||||
cmdargs = argparser.parse_args()
|
||||
|
||||
scm = SigmaConfigurationManager()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
if cmdargs.debug: # pragma: no cover
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
def list_backends():
|
||||
for backend in sorted(backends.getBackendList(), key=lambda backend: backend.identifier):
|
||||
if cmdargs.debug:
|
||||
print("{:>15} : {} ({})".format(backend.identifier, backend.__doc__, backend.__name__))
|
||||
else:
|
||||
print("{:>15} : {}".format(backend.identifier, backend.__doc__))
|
||||
|
||||
def list_configurations(backend=None):
|
||||
for conf_id, title, backends in sorted(scm.list(), key=lambda config: config[0]):
|
||||
if backend is not None and backend in backends or backend is None or len(backends) == 0:
|
||||
print("{:>30} : {}".format(conf_id, title))
|
||||
|
||||
def list_modifiers():
|
||||
for modifier_id, modifier in modifiers.items():
|
||||
print("{:>10} : {}".format(modifier_id, modifier.__doc__))
|
||||
|
||||
if cmdargs.lists:
|
||||
print("Backends:")
|
||||
list_backends()
|
||||
|
||||
print()
|
||||
print("Configurations:")
|
||||
list_configurations(cmdargs.target)
|
||||
|
||||
print()
|
||||
print("Modifiers:")
|
||||
list_modifiers()
|
||||
sys.exit(0)
|
||||
elif len(cmdargs.inputs) == 0:
|
||||
print("Nothing to do!")
|
||||
argparser.print_usage()
|
||||
sys.exit(0)
|
||||
|
||||
if cmdargs.target is None:
|
||||
print("No target selected, select one with -t/--target")
|
||||
argparser.print_usage()
|
||||
sys.exit(ERR_NO_TARGET)
|
||||
|
||||
rulefilter = None
|
||||
if cmdargs.filter:
|
||||
try:
|
||||
rulefilter = SigmaRuleFilter(cmdargs.filter)
|
||||
except SigmaRuleFilterParseException as e:
|
||||
print("Parse error in Sigma rule filter expression: %s" % str(e), file=sys.stderr)
|
||||
sys.exit(ERR_RULE_FILTER_PARSING)
|
||||
|
||||
sigmaconfigs = SigmaConfigurationChain()
|
||||
backend_class = backends.getBackend(cmdargs.target)
|
||||
if cmdargs.config is None:
|
||||
if backend_class.config_required and not cmdargs.shoot_yourself_in_the_foot:
|
||||
print("The backend you want to use usually requires a configuration to generate valid results. Please provide one with --config/-c.", file=sys.stderr)
|
||||
print("Available choices for this backend (get complete list with --lists/-l):")
|
||||
list_configurations(cmdargs.target)
|
||||
sys.exit(ERR_CONFIG_REQUIRED)
|
||||
if backend_class.default_config is not None:
|
||||
cmdargs.config = backend_class.default_config
|
||||
|
||||
if cmdargs.config:
|
||||
order = 0
|
||||
for conf_name in cmdargs.config:
|
||||
try:
|
||||
sigmaconfig = scm.get(conf_name)
|
||||
if sigmaconfig.order is not None:
|
||||
if sigmaconfig.order <= order and not cmdargs.shoot_yourself_in_the_foot:
|
||||
print("The configurations were provided in the wrong order (order key check in config file)", file=sys.stderr)
|
||||
sys.exit(ERR_CONFIG_ORDER)
|
||||
order = sigmaconfig.order
|
||||
|
||||
try:
|
||||
if cmdargs.target not in sigmaconfig.config["backends"]:
|
||||
print("The configuration '{}' is not valid for backend '{}'. Valid choices are: {}".format(conf_name, cmdargs.target, ", ".join(sigmaconfig.config["backends"])), file=sys.stderr)
|
||||
sys.exit(ERR_CONFIG_ORDER)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
sigmaconfigs.append(sigmaconfig)
|
||||
except OSError as e:
|
||||
print("Failed to open Sigma configuration file %s: %s" % (conf_name, str(e)), file=sys.stderr)
|
||||
exit(ERR_OPEN_CONFIG_FILE)
|
||||
except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e:
|
||||
print("Sigma configuration file %s is no valid YAML: %s" % (conf_name, str(e)), file=sys.stderr)
|
||||
exit(ERR_CONFIG_INVALID_YAML)
|
||||
except SigmaConfigParseError as e:
|
||||
print("Sigma configuration parse error in %s: %s" % (conf_name, str(e)), file=sys.stderr)
|
||||
exit(ERR_CONFIG_PARSING)
|
||||
|
||||
backend_options = BackendOptions(cmdargs.backend_option, cmdargs.backend_config)
|
||||
backend = backend_class(sigmaconfigs, backend_options)
|
||||
|
||||
filename = cmdargs.output
|
||||
if filename:
|
||||
try:
|
||||
out = open(filename, "w", encoding='utf-8')
|
||||
except (IOError, OSError) as e:
|
||||
print("Failed to open output file '%s': %s" % (filename, str(e)), file=sys.stderr)
|
||||
exit(ERR_OUTPUT)
|
||||
else:
|
||||
out = sys.stdout
|
||||
|
||||
error = 0
|
||||
for sigmafile in get_inputs(cmdargs.inputs, cmdargs.recurse):
|
||||
logger.debug("* Processing Sigma input %s" % (sigmafile))
|
||||
try:
|
||||
if cmdargs.inputs == ['-']:
|
||||
f = sigmafile
|
||||
else:
|
||||
f = sigmafile.open(encoding='utf-8')
|
||||
parser = SigmaCollectionParser(f, sigmaconfigs, rulefilter)
|
||||
results = parser.generate(backend)
|
||||
for result in results:
|
||||
print(result, file=out)
|
||||
except OSError as e:
|
||||
print("Failed to open Sigma file %s: %s" % (sigmafile, str(e)), file=sys.stderr)
|
||||
error = ERR_OPEN_SIGMA_RULE
|
||||
except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e:
|
||||
print("Sigma file %s is no valid YAML: %s" % (sigmafile, str(e)), file=sys.stderr)
|
||||
error = ERR_INVALID_YAML
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
except (SigmaParseError, SigmaCollectionParseError) as e:
|
||||
print("Sigma parse error in %s: %s" % (sigmafile, str(e)), file=sys.stderr)
|
||||
error = ERR_SIGMA_PARSING
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
except NotSupportedError as e:
|
||||
print("The Sigma rule requires a feature that is not supported by the target system: " + str(e), file=sys.stderr)
|
||||
if not cmdargs.ignore_backend_errors:
|
||||
error = ERR_NOT_SUPPORTED
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
except BackendError as e:
|
||||
print("Backend error in %s: %s" % (sigmafile, str(e)), file=sys.stderr)
|
||||
if not cmdargs.ignore_backend_errors:
|
||||
error = ERR_BACKEND
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
except (NotImplementedError, TypeError) as e:
|
||||
print("An unsupported feature is required for this Sigma rule (%s): " % (sigmafile) + str(e), file=sys.stderr)
|
||||
print("Feel free to contribute for fun and fame, this is open source :) -> https://github.com/Neo23x0/sigma", file=sys.stderr)
|
||||
if not cmdargs.ignore_backend_errors:
|
||||
error = ERR_NOT_IMPLEMENTED
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
except PartialMatchError as e:
|
||||
print("Partial field match error: %s" % str(e), file=sys.stderr)
|
||||
if not cmdargs.ignore_backend_errors:
|
||||
error = ERR_PARTIAL_FIELD_MATCH
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
except FullMatchError as e:
|
||||
print("Full field match error", file=sys.stderr)
|
||||
if not cmdargs.ignore_backend_errors:
|
||||
error = ERR_FULL_FIELD_MATCH
|
||||
if not cmdargs.defer_abort:
|
||||
sys.exit(error)
|
||||
finally:
|
||||
try:
|
||||
f.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
result = backend.finalize()
|
||||
if result:
|
||||
print(result, file=out)
|
||||
out.close()
|
||||
|
||||
sys.exit(error)
|
||||
main()
|
||||
|
Loading…
Reference in New Issue
Block a user