2017-02-13 22:14:40 +00:00
#!/usr/bin/env python3
# A Sigma to SIEM converter
2017-12-07 20:55:43 +00:00
# Copyright 2016-2017 Thomas Patzke, Florian Roth
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
2017-02-13 22:14:40 +00:00
import sys
import argparse
import yaml
import json
2017-03-06 08:36:10 +00:00
import pathlib
import itertools
2017-10-31 21:13:20 +00:00
import logging
2017-12-08 21:32:39 +00:00
from sigma.parser import SigmaCollectionParser, SigmaCollectionParseError, SigmaParseError
from sigma.config import SigmaConfiguration, SigmaConfigParseError, SigmaRuleFilter, SigmaRuleFilterParseException
import sigma.backends as backends
2017-12-12 23:12:56 +00:00
import codecs
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
2017-02-13 22:14:40 +00:00
2017-10-31 21:13:20 +00:00
logger = logging.getLogger(__name__)
2017-02-13 22:14:40 +00:00
def print_verbose(*args, **kwargs):
if cmdargs.verbose or cmdargs.debug:
print(*args, **kwargs)
def print_debug(*args, **kwargs):
if cmdargs.debug:
print(*args, **kwargs)
2017-03-06 08:36:10 +00:00
def alliter(path):
for sub in path.iterdir():
2017-12-14 21:39:51 +00:00
if sub.name.startswith("."):
continue
2017-03-06 08:36:10 +00:00
if sub.is_dir():
yield from alliter(sub)
else:
yield sub
def get_inputs(paths, recursive):
if recursive:
return list(itertools.chain.from_iterable([list(alliter(pathlib.Path(p))) for p in paths]))
else:
2017-03-07 08:41:46 +00:00
return [pathlib.Path(p) for p in paths]
2017-03-06 08:36:10 +00:00
2018-03-20 23:53:44 +00:00
class SigmacArgumentParser(argparse.ArgumentParser):
def format_help(self):
helptext = super().format_help() + "\nBackend options:\n"
for backend in backends.getBackendList():
if len(backend.options) > 0:
helptext += " " + backend.identifier + "\n"
for option, default, help, _ in backend.options:
helptext += " {:10}: {} (default: {})".format(option, help, default) + "\n"
return helptext
argparser = SigmacArgumentParser(description="Convert Sigma rules into SIEM signatures.")
2017-03-06 08:36:10 +00:00
argparser.add_argument("--recurse", "-r", action="store_true", help="Recurse into subdirectories (not yet implemented)")
2017-11-01 23:02:15 +00:00
argparser.add_argument("--filter", "-f", help="""
Define comma-separated filters that must match (AND-linked) to rule to be processed.
Valid filters: level<=x, level>=x, level=x, status=y, logsource=z.
x is one of: low, medium, high, critical.
y is one of: experimental, testing, stable.
z is a word appearing in an arbitrary log source attribute.
Multiple log source specifications are AND linked.
""")
2017-03-01 08:40:51 +00:00
argparser.add_argument("--target", "-t", default="es-qs", choices=backends.getBackendDict().keys(), help="Output target format")
2017-02-13 22:14:40 +00:00
argparser.add_argument("--target-list", "-l", action="store_true", help="List available output target formats")
2017-03-04 22:36:46 +00:00
argparser.add_argument("--config", "-c", help="Configuration with field name and index mapping for target environment (not yet implemented)")
2017-08-28 22:05:59 +00:00
argparser.add_argument("--output", "-o", default=None, help="Output file or filename prefix if multiple files are generated (not yet implemented)")
2017-09-21 22:28:35 +00:00
argparser.add_argument("--backend-option", "-O", action="append", help="Options and switches that are passed to the backend")
2017-08-01 22:56:22 +00:00
argparser.add_argument("--defer-abort", "-d", action="store_true", help="Don't abort on parse or conversion errors, proceed with next rule. The exit code from the last error is returned")
argparser.add_argument("--ignore-not-implemented", "-I", action="store_true", help="Only return error codes for parse errors and ignore errors for rules with not implemented features")
2017-02-13 22:14:40 +00:00
argparser.add_argument("--verbose", "-v", action="store_true", help="Be verbose")
2017-08-01 22:56:22 +00:00
argparser.add_argument("--debug", "-D", action="store_true", help="Debugging output")
2017-02-13 22:14:40 +00:00
argparser.add_argument("inputs", nargs="*", help="Sigma input files")
cmdargs = argparser.parse_args()
2017-10-31 22:06:18 +00:00
if cmdargs.debug:
logger.setLevel(logging.DEBUG)
2017-02-13 22:14:40 +00:00
if cmdargs.target_list:
for backend in backends.getBackendList():
print("%10s: %s" % (backend.identifier, backend.__doc__))
sys.exit(0)
2017-12-08 22:50:08 +00:00
elif len(cmdargs.inputs) == 0:
print("Nothing to do!")
argparser.print_usage()
2017-02-13 22:14:40 +00:00
2017-11-01 23:02:15 +00:00
rulefilter = None
if cmdargs.filter:
try:
rulefilter = SigmaRuleFilter(cmdargs.filter)
except SigmaRuleFilterParseException as e:
print("Parse error in Sigma rule filter expression: %s" % str(e), file=sys.stderr)
sys.exit(9)
2017-03-18 22:15:03 +00:00
out = sys.stdout
2017-03-06 21:07:04 +00:00
sigmaconfig = SigmaConfiguration()
2017-03-04 22:36:46 +00:00
if cmdargs.config:
2017-03-05 22:44:52 +00:00
try:
conffile = cmdargs.config
f = open(conffile)
2017-03-06 21:07:04 +00:00
sigmaconfig = SigmaConfiguration(f)
2017-03-05 22:44:52 +00:00
except OSError as e:
2017-03-06 22:01:33 +00:00
print("Failed to open Sigma configuration file %s: %s" % (conffile, str(e)), file=sys.stderr)
2017-10-19 15:42:56 +00:00
exit(5)
except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e:
2017-03-06 22:01:33 +00:00
print("Sigma configuration file %s is no valid YAML: %s" % (conffile, str(e)), file=sys.stderr)
2017-10-19 15:42:56 +00:00
exit(6)
except SigmaConfigParseError as e:
2017-03-06 22:01:33 +00:00
print("Sigma configuration parse error in %s: %s" % (conffile, str(e)), file=sys.stderr)
2017-10-19 15:42:56 +00:00
exit(7)
2017-03-04 22:36:46 +00:00
2017-09-16 21:46:40 +00:00
backend_options = backends.BackendOptions(cmdargs.backend_option)
2017-02-22 21:47:12 +00:00
try:
2017-09-16 21:46:40 +00:00
backend = backends.getBackend(cmdargs.target)(sigmaconfig, backend_options, cmdargs.output)
2017-10-19 15:42:56 +00:00
# not existing backend is already detected by argument parser
except IOError as e:
2017-08-28 22:05:59 +00:00
print("Failed to open output file '%s': %s" % (cmdargs.output, str(e)), file=sys.stderr)
exit(1)
2017-02-22 21:47:12 +00:00
2017-08-01 22:56:22 +00:00
error = 0
2017-03-06 08:36:10 +00:00
for sigmafile in get_inputs(cmdargs.inputs, cmdargs.recurse):
2017-02-22 21:43:35 +00:00
print_verbose("* Processing Sigma input %s" % (sigmafile))
2017-02-13 22:14:40 +00:00
try:
2017-12-12 23:12:56 +00:00
f = sigmafile.open(encoding='utf-8')
2017-11-01 23:02:15 +00:00
parser = SigmaCollectionParser(f, sigmaconfig, rulefilter)
2017-10-31 22:06:18 +00:00
parser.generate(backend)
2017-02-13 22:14:40 +00:00
except OSError as e:
2017-03-06 22:01:33 +00:00
print("Failed to open Sigma file %s: %s" % (sigmafile, str(e)), file=sys.stderr)
2017-08-07 06:54:18 +00:00
error = 5
2017-10-19 15:42:56 +00:00
except (yaml.parser.ParserError, yaml.scanner.ScannerError) as e:
2017-03-06 22:01:33 +00:00
print("Sigma file %s is no valid YAML: %s" % (sigmafile, str(e)), file=sys.stderr)
2017-08-01 22:56:22 +00:00
error = 3
if not cmdargs.defer_abort:
sys.exit(error)
2017-11-01 23:02:15 +00:00
except (SigmaParseError, SigmaCollectionParseError) as e:
2017-03-06 22:01:33 +00:00
print("Sigma parse error in %s: %s" % (sigmafile, str(e)), file=sys.stderr)
2017-08-01 22:56:22 +00:00
error = 4
if not cmdargs.defer_abort:
sys.exit(error)
2017-10-22 22:45:01 +00:00
except backends.BackendError as e:
print("Backend error in %s: %s" % (sigmafile, str(e)), file=sys.stderr)
error = 8
if not cmdargs.defer_abort:
sys.exit(error)
2017-02-22 21:43:35 +00:00
except NotImplementedError as e:
2017-03-06 22:01:33 +00:00
print("An unsupported feature is required for this Sigma rule: " + str(e), file=sys.stderr)
print("Feel free to contribute for fun and fame, this is open source :) -> https://github.com/Neo23x0/sigma", file=sys.stderr)
2017-08-01 22:56:22 +00:00
if not cmdargs.ignore_not_implemented:
error = 42
if not cmdargs.defer_abort:
sys.exit(error)
2017-02-13 22:14:40 +00:00
finally:
2017-08-07 06:54:18 +00:00
try:
f.close()
except:
pass
2017-09-03 22:56:04 +00:00
backend.finalize()
2017-08-01 22:56:22 +00:00
sys.exit(error)