mirror of
https://github.com/valitydev/SigmaHQ.git
synced 2024-11-07 01:45:21 +00:00
sigmac: improved backend options
* parsing in main class * help
This commit is contained in:
parent
bd20ffdad9
commit
4a9849b161
@ -78,6 +78,7 @@ class BaseBackend:
|
||||
index_field = None # field name that is used to address indices
|
||||
output_class = None # one of the above output classes
|
||||
file_list = None
|
||||
options = tuple() # a list of tuples with following elements: option name, default value, help text, target attribute name (option name if None)
|
||||
|
||||
def __init__(self, sigmaconfig, backend_options=None, filename=None):
|
||||
"""
|
||||
@ -87,11 +88,17 @@ class BaseBackend:
|
||||
super().__init__()
|
||||
if not isinstance(sigmaconfig, (sigma.config.SigmaConfiguration, None)):
|
||||
raise TypeError("SigmaConfiguration object expected")
|
||||
self.options = backend_options
|
||||
self.backend_options = backend_options
|
||||
self.sigmaconfig = sigmaconfig
|
||||
self.sigmaconfig.set_backend(self)
|
||||
self.output = self.output_class(filename)
|
||||
|
||||
# Parse options
|
||||
for option, default_value, _, target in self.options:
|
||||
if target is None:
|
||||
target = option
|
||||
setattr(self, target, self.backend_options.setdefault(option, default_value))
|
||||
|
||||
def generate(self, sigmaparser):
|
||||
"""Method is called for each sigma rule and receives the parsed rule (SigmaParser)"""
|
||||
for parsed in sigmaparser.condparsed:
|
||||
@ -311,15 +318,17 @@ class KibanaBackend(ElasticsearchQuerystringBackend, MultiRuleOutputMixin):
|
||||
identifier = "kibana"
|
||||
active = True
|
||||
output_class = SingleOutput
|
||||
options = (
|
||||
("output", "import", "Output format: import = JSON file manually imported in Kibana, curl = Shell script that imports queries in Kibana via curl (jq is additionally required)", "output_type"),
|
||||
("es", "localhost:9200", "Host and port of Elasticsearch instance", None),
|
||||
("index", ".kibana", "Kibana index", None),
|
||||
("prefix", "Sigma: ", "Title prefix of Sigma queries", None),
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.kibanaconf = list()
|
||||
self.indexsearch = set()
|
||||
self.output_type = self.options.setdefault("output", "import")
|
||||
self.es = self.options.setdefault("es", "localhost:9200")
|
||||
self.index = self.options.setdefault("index", ".kibana")
|
||||
self.prefix = self.options.setdefault("prefix", "Sigma: ")
|
||||
|
||||
def generate(self, sigmaparser):
|
||||
rulename = self.getRuleName(sigmaparser)
|
||||
@ -426,19 +435,15 @@ class XPackWatcherBackend(ElasticsearchQuerystringBackend, MultiRuleOutputMixin)
|
||||
identifier = "xpack-watcher"
|
||||
active = True
|
||||
output_class = SingleOutput
|
||||
options = (
|
||||
("output", "curl", "Output format: curl = Shell script that imports queries in Watcher index with curl", "output_type"),
|
||||
("es", "localhost:9200", "Host and port of Elasticsearch instance", None),
|
||||
("mail", None, "Mail address for Watcher notification (only logging if not set)", None),
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.watcher_alert = dict()
|
||||
try:
|
||||
self.output_type = self.options["output"]
|
||||
except KeyError:
|
||||
self.output_type = "curl"
|
||||
|
||||
try:
|
||||
self.es = self.options["es"]
|
||||
except KeyError:
|
||||
self.es = "localhost:9200"
|
||||
|
||||
def generate(self, sigmaparser):
|
||||
# get the details if this alert occurs
|
||||
@ -546,7 +551,7 @@ class XPackWatcherBackend(ElasticsearchQuerystringBackend, MultiRuleOutputMixin)
|
||||
# Building the action
|
||||
action_subject = "Sigma Rule '%s'" % title
|
||||
try: # mail notification if mail address is given
|
||||
email = self.options['mail']
|
||||
email = self.mail
|
||||
action = {
|
||||
"send_email": {
|
||||
"email": {
|
||||
|
14
tools/sigmac
14
tools/sigmac
@ -54,7 +54,19 @@ def get_inputs(paths, recursive):
|
||||
else:
|
||||
return [pathlib.Path(p) for p in paths]
|
||||
|
||||
argparser = argparse.ArgumentParser(description="Convert Sigma rules into SIEM signatures.")
|
||||
class SigmacArgumentParser(argparse.ArgumentParser):
|
||||
def format_help(self):
|
||||
helptext = super().format_help() + "\nBackend options:\n"
|
||||
|
||||
for backend in backends.getBackendList():
|
||||
if len(backend.options) > 0:
|
||||
helptext += " " + backend.identifier + "\n"
|
||||
for option, default, help, _ in backend.options:
|
||||
helptext += " {:10}: {} (default: {})".format(option, help, default) + "\n"
|
||||
|
||||
return helptext
|
||||
|
||||
argparser = SigmacArgumentParser(description="Convert Sigma rules into SIEM signatures.")
|
||||
argparser.add_argument("--recurse", "-r", action="store_true", help="Recurse into subdirectories (not yet implemented)")
|
||||
argparser.add_argument("--filter", "-f", help="""
|
||||
Define comma-separated filters that must match (AND-linked) to rule to be processed.
|
||||
|
Loading…
Reference in New Issue
Block a user