mirror of
https://github.com/valitydev/SigmaHQ.git
synced 2024-11-07 17:58:52 +00:00
163 lines
5.7 KiB
Python
Executable File
163 lines
5.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Convert Sigma rules with EventIDs to rules with generic log sources
|
|
|
|
from argparse import ArgumentParser
|
|
import yaml
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
class Output(object):
|
|
"""Output base class"""
|
|
def write(self, *args, **kwargs):
|
|
self.f.write(*args, **kwargs)
|
|
|
|
class SingleFileOutput(Output):
|
|
"""Output into single file with multiple YAML documents. Each input file is announced with comment."""
|
|
def __init__(self, name):
|
|
self.f = open(name, "x")
|
|
self.first = True
|
|
|
|
def new_output(self, name):
|
|
"""Announce new Sigma rule as input and start new YAML document."""
|
|
self.current = name
|
|
if self.first:
|
|
self.first = False
|
|
else:
|
|
self.write("---")
|
|
self.f.write("# Sigma rule: {}\n".format(name))
|
|
|
|
def finish(self):
|
|
self.f.close()
|
|
|
|
class StdoutOutput(SingleFileOutput):
|
|
"""Like SingleFileOutput, just for standard output"""
|
|
def __init__(self):
|
|
self.f = sys.stdout
|
|
self.first = True
|
|
|
|
def finish(self):
|
|
pass
|
|
|
|
class DirectoryOutput(Output):
|
|
"""Output each input file into a corresponding output file in target directory."""
|
|
def __init__(self, dirpath):
|
|
self.d = dirpath
|
|
self.f = None
|
|
|
|
def new_output(self, path):
|
|
if self.f is not None:
|
|
self.f.close()
|
|
self.f = (self.d / path.name).open("x")
|
|
|
|
def get_output(output):
|
|
if output is None:
|
|
return StdoutOutput()
|
|
|
|
path = Path(output)
|
|
if path.is_dir():
|
|
return DirectoryOutput(path)
|
|
else:
|
|
return SingleFileOutput(output)
|
|
|
|
class SigmaYAMLDumper(yaml.Dumper):
|
|
"""YAML dumper that increases amount of indentation, e.g. for lists"""
|
|
def increase_indent(self, flow=False, indentless=False):
|
|
return super().increase_indent(flow, False)
|
|
|
|
class AmbiguousRuleException(TypeError):
|
|
def __init__(self, ids):
|
|
super().__init__()
|
|
self.ids = ids
|
|
|
|
def __str__(self):
|
|
return(", ".join([str(eid) for eid in self.ids]))
|
|
|
|
def convert_to_generic(yamldoc):
|
|
changed = False
|
|
product = yamldoc["logsource"]["product"]
|
|
service = yamldoc["logsource"]["service"]
|
|
if product == "windows" and service in ("sysmon", "security"):
|
|
# Currently, only Windows Security or Sysmon are relevant
|
|
eventids = set()
|
|
for name, detection in yamldoc["detection"].items(): # first collect all event ids
|
|
if name == "condition" or type(detection) is not dict:
|
|
continue
|
|
|
|
try:
|
|
eventid = detection["EventID"]
|
|
try: # expect that EventID attribute contains a list
|
|
eventids.update(eventid)
|
|
except TypeError: # if this fails, it's a plain value
|
|
eventids.add(eventid)
|
|
except KeyError: # No EventID attribute
|
|
pass
|
|
|
|
if 1 in eventids and service == "sysmon" or \
|
|
4688 in eventids and service == "security":
|
|
if len(eventids) == 1: # only convert if one EventID collected, else it gets complicated
|
|
# remove all EventID definitions
|
|
for name, detection in yamldoc["detection"].items():
|
|
if name == "condition" or type(detection) is not dict:
|
|
continue
|
|
try:
|
|
del detection["EventID"]
|
|
except KeyError:
|
|
pass
|
|
|
|
# rewrite log source
|
|
yamldoc["logsource"] = {
|
|
"category": "process_creation",
|
|
"product": "windows"
|
|
}
|
|
|
|
changed = True
|
|
else: # raise an exception to print a warning message to make user aware about the issue
|
|
raise AmbiguousRuleException(eventids)
|
|
return changed
|
|
|
|
def get_input_paths(args):
|
|
if args.recursive:
|
|
return [ p for pathname in args.sigma for p in Path(pathname).glob("**/*") if p.is_file() ]
|
|
else:
|
|
return [ Path(sigma) for sigma in args.sigma ]
|
|
|
|
argparser = ArgumentParser(description="Convert between classical and generic log source Sigma rules.")
|
|
argparser.add_argument("--output", "-o", help="Output file or directory. Default: standard output.")
|
|
argparser.add_argument("--recursive", "-r", action="store_true", help="Recursive traversal of directory")
|
|
argparser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
|
argparser.add_argument("sigma", nargs="+", help="Sigma rule file(s) that should be converted")
|
|
args = argparser.parse_args()
|
|
|
|
input_paths = get_input_paths(args)
|
|
output = get_output(args.output)
|
|
|
|
for path in input_paths:
|
|
try:
|
|
f = path.open("r")
|
|
except OSError as e:
|
|
print("Error while reading Sigma rule {}: {}".format(path, str(e)), file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
yamldocs = yaml.safe_load_all(f)
|
|
except yaml.YAMLError as e:
|
|
print("YAML parse error while parsing Sigma rule {}: {}".format(path, str(e)), file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
yamldoc_num = 0
|
|
for yamldoc in yamldocs:
|
|
yamldoc_num += 1
|
|
try:
|
|
if convert_to_generic(yamldoc):
|
|
# only write output if changed
|
|
try:
|
|
output.new_output(path)
|
|
output.write(yaml.dump(yamldoc, Dumper=SigmaYAMLDumper, width=160, default_flow_style=False))
|
|
except OSError as e:
|
|
print("Error while writing result: {}".format(str(e)), file=sys.stderr)
|
|
sys.exit(2)
|
|
except AmbiguousRuleException as e:
|
|
print("Rule {} in file {} contains multiple EventIDs: {}".format(yamldoc_num, str(path), str(e)), file=sys.stderr)
|
|
|
|
output.finish()
|