mirror of
https://github.com/valitydev/SigmaHQ.git
synced 2024-11-06 09:25:17 +00:00
6d62d426c9
* Moved SigmaYAMLDumper to new sigma.output module
226 lines
7.9 KiB
Python
Executable File
226 lines
7.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# Convert Sigma rules with EventIDs to rules with generic log sources
|
|
|
|
from argparse import ArgumentParser
|
|
import yaml
|
|
import sys
|
|
from pathlib import Path
|
|
from sigma.output import SigmaYAMLDumper
|
|
|
|
class Output(object):
|
|
"""Output base class"""
|
|
def write(self, *args, **kwargs):
|
|
self.f.write(*args, **kwargs)
|
|
|
|
class SingleFileOutput(Output):
|
|
"""Output into single file with multiple YAML documents. Each input file is announced with comment."""
|
|
def __init__(self, name):
|
|
self.f = open(name, "x")
|
|
self.path = None
|
|
self.first = True
|
|
|
|
def new_output(self, path):
|
|
"""Announce new Sigma rule as input and start new YAML document."""
|
|
if self.path is None or self.path != path:
|
|
if self.first:
|
|
self.first = False
|
|
else:
|
|
self.f.write("---\n")
|
|
self.path = path
|
|
self.f.write("# Sigma rule: {}\n".format(path))
|
|
|
|
def finish(self):
|
|
self.f.close()
|
|
|
|
class StdoutOutput(SingleFileOutput):
|
|
"""Like SingleFileOutput, just for standard output"""
|
|
def __init__(self):
|
|
self.f = sys.stdout
|
|
self.path = None
|
|
self.first = True
|
|
|
|
def finish(self):
|
|
pass
|
|
|
|
class DirectoryOutput(Output):
|
|
"""Output each input file into a corresponding output file in target directory."""
|
|
def __init__(self, dirpath):
|
|
self.d = dirpath
|
|
self.f = None
|
|
self.path = None
|
|
self.opened = None
|
|
|
|
def new_output(self, path):
|
|
if self.path is None or self.path != path:
|
|
if self.f is not None:
|
|
self.f.close()
|
|
self.path = path
|
|
self.opened = False # opening file is deferred to first write
|
|
|
|
def write(self, *args, **kwargs):
|
|
if not self.opened:
|
|
self.f = (self.d / self.path.name).open("x")
|
|
super().write(*args, **kwargs)
|
|
|
|
def finish(self):
|
|
if self.f is not None:
|
|
self.f.close()
|
|
|
|
def get_output(output):
|
|
if output is None:
|
|
return StdoutOutput()
|
|
|
|
path = Path(output)
|
|
if path.is_dir():
|
|
return DirectoryOutput(path)
|
|
else:
|
|
return SingleFileOutput(output)
|
|
|
|
class AmbiguousRuleException(TypeError):
|
|
def __init__(self, ids):
|
|
super().__init__()
|
|
self.ids = ids
|
|
|
|
def __str__(self):
|
|
return(", ".join([str(eid) for eid in self.ids]))
|
|
|
|
def convert_to_generic(yamldoc):
|
|
changed = False
|
|
try:
|
|
product = yamldoc["logsource"]["product"]
|
|
service = yamldoc["logsource"]["service"]
|
|
except KeyError:
|
|
return False
|
|
|
|
if product == "windows" and service in ("sysmon", "security"):
|
|
# Currently, only Windows Security or Sysmon are relevant
|
|
eventids = set()
|
|
for name, detection in yamldoc["detection"].items(): # first collect all event ids
|
|
if name == "condition" or type(detection) is not dict:
|
|
continue
|
|
|
|
try:
|
|
eventid = detection["EventID"]
|
|
try: # expect that EventID attribute contains a list
|
|
eventids.update(eventid)
|
|
except TypeError: # if this fails, it's a plain value
|
|
eventids.add(eventid)
|
|
except KeyError: # No EventID attribute
|
|
pass
|
|
|
|
if 1 in eventids and service == "sysmon" or \
|
|
4688 in eventids and service == "security":
|
|
if len(eventids) == 1: # only convert if one EventID collected, else it gets complicated
|
|
# remove all EventID definitions
|
|
empty_name = list()
|
|
for name, detection in yamldoc["detection"].items():
|
|
if name == "condition" or type(detection) is not dict:
|
|
continue
|
|
try:
|
|
del detection["EventID"]
|
|
except KeyError:
|
|
pass
|
|
|
|
if detection == {}: # detection was reduced to nothing - remove it later
|
|
empty_name.append(name)
|
|
|
|
for name in empty_name: # delete empty detections
|
|
del yamldoc["detection"][name]
|
|
|
|
if yamldoc["detection"] == {}: # delete detection section if empty
|
|
del yamldoc["detection"]
|
|
|
|
# rewrite log source
|
|
yamldoc["logsource"] = {
|
|
"category": "process_creation",
|
|
"product": "windows"
|
|
}
|
|
|
|
changed = True
|
|
else: # raise an exception to print a warning message to make user aware about the issue
|
|
raise AmbiguousRuleException(eventids)
|
|
return changed
|
|
|
|
def get_input_paths(args):
|
|
if args.recursive:
|
|
return [ p for pathname in args.sigma for p in Path(pathname).glob("**/*") if p.is_file() ]
|
|
else:
|
|
return [ Path(sigma) for sigma in args.sigma ]
|
|
|
|
argparser = ArgumentParser(description="Convert between classical and generic log source Sigma rules.")
|
|
argparser.add_argument("--output", "-o", help="Output file or directory. Default: standard output.")
|
|
argparser.add_argument("--recursive", "-r", action="store_true", help="Recursive traversal of directory")
|
|
argparser.add_argument("--converted-list", "-c", help="Write list of rule files that were successfully converted (default: stdout)")
|
|
argparser.add_argument("sigma", nargs="+", help="Sigma rule file(s) that should be converted")
|
|
args = argparser.parse_args()
|
|
|
|
# Define order-preserving representer from dicts/maps
|
|
def yaml_preserve_order(self, dict_data):
|
|
return self.represent_mapping("tag:yaml.org,2002:map", dict_data.items())
|
|
|
|
yaml.add_representer(dict, yaml_preserve_order)
|
|
|
|
input_paths = get_input_paths(args)
|
|
output = get_output(args.output)
|
|
if args.converted_list:
|
|
fconv = open(args.converted_list, "w")
|
|
else:
|
|
fconv = sys.stdout
|
|
|
|
for path in input_paths:
|
|
try:
|
|
f = path.open("r")
|
|
except OSError as e:
|
|
print("Error while reading Sigma rule {}: {}".format(path, str(e)), file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
yamldocs = list(yaml.safe_load_all(f))
|
|
except yaml.YAMLError as e:
|
|
print("YAML parse error while parsing Sigma rule {}: {}".format(path, str(e)), file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
yamldoc_num = 0
|
|
changed = False
|
|
for yamldoc in yamldocs:
|
|
yamldoc_num += 1
|
|
output.new_output(path)
|
|
try:
|
|
changed |= convert_to_generic(yamldoc)
|
|
except AmbiguousRuleException as e:
|
|
changed = False
|
|
print("Rule {} in file {} contains multiple EventIDs: {}".format(yamldoc_num, str(path), str(e)), file=sys.stderr)
|
|
|
|
yamldocs_idx = list(zip(range(len(yamldocs)), yamldocs))
|
|
delete = set()
|
|
for i, yamldoc_a in yamldocs_idx: # iterate over all yaml document pairs
|
|
for j, yamldoc_b in yamldocs_idx:
|
|
if j <= i: # symmetric relation, skip same comparisons
|
|
continue
|
|
if yamldoc_a == yamldoc_b:
|
|
delete.add(j)
|
|
|
|
for i in reversed(sorted(delete)): # delete double yaml documents
|
|
del yamldocs[i]
|
|
|
|
# Common special case: two yaml docs, one global and one remainder of multiple following docs - merge them
|
|
try:
|
|
if len(yamldocs) == 2 and \
|
|
yamldocs[0]["action"] == "global" and \
|
|
"action" not in yamldocs[1] and \
|
|
set(yamldocs[0].keys()) & set(yamldocs[1].keys()) == set(): # last condition: no common keys
|
|
yamldocs[0].update(yamldocs[1])
|
|
del yamldocs[1]
|
|
except KeyError:
|
|
pass
|
|
|
|
if changed: # only write output if changed
|
|
try:
|
|
output.write(yaml.dump_all(yamldocs, Dumper=SigmaYAMLDumper, indent=4, width=160, default_flow_style=False))
|
|
print(path, file=fconv)
|
|
except OSError as e:
|
|
print("Error while writing result: {}".format(str(e)), file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
output.finish()
|