mirror of
https://github.com/valitydev/SigmaHQ.git
synced 2024-11-08 18:23:52 +00:00
217 lines
9.1 KiB
Python
Executable File
217 lines
9.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# A Sigma to SIEM converter
|
|
# Copyright 2016-2017 Thomas Patzke, Florian Roth
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Lesser General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Lesser General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Lesser General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import sys
|
|
import argparse
|
|
import yaml
|
|
import json
|
|
import pathlib
|
|
import itertools
|
|
import logging
|
|
from sigma.parser.collection import SigmaCollectionParser
|
|
from sigma.parser.exceptions import SigmaCollectionParseError, SigmaParseError
|
|
from sigma.configuration import SigmaConfiguration, SigmaConfigurationChain
|
|
from sigma.config.exceptions import SigmaConfigParseError, SigmaRuleFilterParseException
|
|
from sigma.filter import SigmaRuleFilter
|
|
import sigma.backends.discovery as backends
|
|
from sigma.backends.base import BackendOptions
|
|
from sigma.backends.exceptions import BackendError, NotSupportedError, PartialMatchError, FullMatchError
|
|
import codecs
|
|
|
|
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def print_verbose(*args, **kwargs):
|
|
if cmdargs.verbose or cmdargs.debug:
|
|
print(*args, **kwargs)
|
|
|
|
def print_debug(*args, **kwargs): # pragme: no cover
|
|
if cmdargs.debug:
|
|
print(*args, **kwargs)
|
|
|
|
def alliter(path):
|
|
for sub in path.iterdir():
|
|
if sub.name.startswith("."):
|
|
continue
|
|
if sub.is_dir():
|
|
yield from alliter(sub)
|
|
else:
|
|
yield sub
|
|
|
|
def get_inputs(paths, recursive):
|
|
if paths == ['-']:
|
|
return [sys.stdin]
|
|
|
|
if recursive:
|
|
return list(itertools.chain.from_iterable([list(alliter(pathlib.Path(p))) for p in paths]))
|
|
else:
|
|
return [pathlib.Path(p) for p in paths]
|
|
|
|
class SigmacArgumentParser(argparse.ArgumentParser):
|
|
def format_help(self):
|
|
helptext = super().format_help() + "\nBackend options:\n"
|
|
|
|
for backend in backends.getBackendList():
|
|
if len(backend.options) > 0:
|
|
helptext += " " + backend.identifier + "\n"
|
|
for option, default, help, _ in backend.options:
|
|
helptext += " {:10}: {} (default: {})".format(option, help, default) + "\n"
|
|
|
|
return helptext
|
|
|
|
argparser = SigmacArgumentParser(description="Convert Sigma rules into SIEM signatures.")
|
|
argparser.add_argument("--recurse", "-r", action="store_true", help="Use directory as input (recurse into subdirectories is not implemented yet)")
|
|
argparser.add_argument("--filter", "-f", help="""
|
|
Define comma-separated filters that must match (AND-linked) to rule to be processed.
|
|
Valid filters: level<=x, level>=x, level=x, status=y, logsource=z, tag=t.
|
|
x is one of: low, medium, high, critical.
|
|
y is one of: experimental, testing, stable.
|
|
z is a word appearing in an arbitrary log source attribute.
|
|
t is a tag that must appear in the rules tag list, case-insensitive matching.
|
|
Multiple log source specifications are AND linked.
|
|
""")
|
|
argparser.add_argument("--target", "-t", default="es-qs", choices=backends.getBackendDict().keys(), help="Output target format")
|
|
argparser.add_argument("--target-list", "-l", action="store_true", help="List available output target formats")
|
|
argparser.add_argument("--config", "-c", action="append", help="Configurations with field name and index mapping for target environment. Multiple configurations are merged into one. Last config is authorative in case of conflicts.")
|
|
argparser.add_argument("--output", "-o", default=None, help="Output file or filename prefix if multiple files are generated")
|
|
argparser.add_argument("--backend-option", "-O", action="append", help="Options and switches that are passed to the backend")
|
|
argparser.add_argument("--defer-abort", "-d", action="store_true", help="Don't abort on parse or conversion errors, proceed with next rule. The exit code from the last error is returned")
|
|
argparser.add_argument("--ignore-backend-errors", "-I", action="store_true", help="Only return error codes for parse errors and ignore errors for rules that cause backend errors. Useful, when you want to get as much queries as possible.")
|
|
argparser.add_argument("--verbose", "-v", action="store_true", help="Be verbose")
|
|
argparser.add_argument("--debug", "-D", action="store_true", help="Debugging output")
|
|
argparser.add_argument("inputs", nargs="*", help="Sigma input files ('-' for stdin)")
|
|
cmdargs = argparser.parse_args()
|
|
|
|
if cmdargs.debug: # pragma: no cover
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
if cmdargs.target_list:
|
|
for backend in backends.getBackendList():
|
|
print("%10s: %s" % (backend.identifier, backend.__doc__))
|
|
sys.exit(0)
|
|
elif len(cmdargs.inputs) == 0:
|
|
print("Nothing to do!")
|
|
argparser.print_usage()
|
|
sys.exit(0)
|
|
|
|
rulefilter = None
|
|
if cmdargs.filter:
|
|
try:
|
|
rulefilter = SigmaRuleFilter(cmdargs.filter)
|
|
except SigmaRuleFilterParseException as e:
|
|
print("Parse error in Sigma rule filter expression: %s" % str(e), file=sys.stderr)
|
|
sys.exit(9)
|
|
|
|
sigmaconfigs = SigmaConfigurationChain()
|
|
if cmdargs.config:
|
|
for conffile in cmdargs.config:
|
|
try:
|
|
f = open(conffile)
|
|
sigmaconfig = SigmaConfiguration(f)
|
|
sigmaconfigs.append(sigmaconfig)
|
|
except OSError as e:
|
|
print("Failed to open Sigma configuration file %s: %s" % (conffile, str(e)), file=sys.stderr)
|
|
exit(5)
|
|
except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e:
|
|
print("Sigma configuration file %s is no valid YAML: %s" % (conffile, str(e)), file=sys.stderr)
|
|
exit(6)
|
|
except SigmaConfigParseError as e:
|
|
print("Sigma configuration parse error in %s: %s" % (conffile, str(e)), file=sys.stderr)
|
|
exit(7)
|
|
|
|
backend_options = BackendOptions(cmdargs.backend_option)
|
|
backend = backends.getBackend(cmdargs.target)(sigmaconfigs, backend_options)
|
|
filename = cmdargs.output
|
|
if filename:
|
|
try:
|
|
out = open(filename, "w", encoding='utf-8')
|
|
except (IOError, OSError) as e:
|
|
print("Failed to open output file '%s': %s" % (filename, str(e)), file=sys.stderr)
|
|
exit(1)
|
|
else:
|
|
out = sys.stdout
|
|
|
|
error = 0
|
|
for sigmafile in get_inputs(cmdargs.inputs, cmdargs.recurse):
|
|
print_verbose("* Processing Sigma input %s" % (sigmafile))
|
|
try:
|
|
if cmdargs.inputs == ['-']:
|
|
f = sigmafile
|
|
else:
|
|
f = sigmafile.open(encoding='utf-8')
|
|
parser = SigmaCollectionParser(f, sigmaconfigs, rulefilter)
|
|
results = parser.generate(backend)
|
|
for result in results:
|
|
print(result, file=out)
|
|
except OSError as e:
|
|
print("Failed to open Sigma file %s: %s" % (sigmafile, str(e)), file=sys.stderr)
|
|
error = 5
|
|
except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e:
|
|
print("Sigma file %s is no valid YAML: %s" % (sigmafile, str(e)), file=sys.stderr)
|
|
error = 3
|
|
if not cmdargs.defer_abort:
|
|
sys.exit(error)
|
|
except (SigmaParseError, SigmaCollectionParseError) as e:
|
|
print("Sigma parse error in %s: %s" % (sigmafile, str(e)), file=sys.stderr)
|
|
error = 4
|
|
if not cmdargs.defer_abort:
|
|
sys.exit(error)
|
|
except NotSupportedError as e:
|
|
print("The Sigma rule requires a feature that is not supported by the target system: " + str(e), file=sys.stderr)
|
|
if not cmdargs.ignore_backend_errors:
|
|
error = 9
|
|
if not cmdargs.defer_abort:
|
|
sys.exit(error)
|
|
except BackendError as e:
|
|
print("Backend error in %s: %s" % (sigmafile, str(e)), file=sys.stderr)
|
|
if not cmdargs.ignore_backend_errors:
|
|
error = 8
|
|
if not cmdargs.defer_abort:
|
|
sys.exit(error)
|
|
except NotImplementedError as e:
|
|
print("An unsupported feature is required for this Sigma rule (%s): " % (sigmafile) + str(e), file=sys.stderr)
|
|
print("Feel free to contribute for fun and fame, this is open source :) -> https://github.com/Neo23x0/sigma", file=sys.stderr)
|
|
if not cmdargs.ignore_backend_errors:
|
|
error = 42
|
|
if not cmdargs.defer_abort:
|
|
sys.exit(error)
|
|
except PartialMatchError as e:
|
|
print("Partial field match error: %s" % str(e), file=sys.stderr)
|
|
if not cmdargs.ignore_backend_errors:
|
|
error = 80
|
|
if not cmdargs.defer_abort:
|
|
sys.exit(error)
|
|
except FullMatchError as e:
|
|
print("Full field match error", file=sys.stderr)
|
|
if not cmdargs.ignore_backend_errors:
|
|
error = 90
|
|
if not cmdargs.defer_abort:
|
|
sys.exit(error)
|
|
finally:
|
|
try:
|
|
f.close()
|
|
except:
|
|
pass
|
|
|
|
result = backend.finalize()
|
|
if result:
|
|
print(result, file=out)
|
|
out.close()
|
|
|
|
sys.exit(error)
|