diff --git a/tools/sigma/backends/elasticsearch.py b/tools/sigma/backends/elasticsearch.py index dbb51d10..0c9a42ec 100644 --- a/tools/sigma/backends/elasticsearch.py +++ b/tools/sigma/backends/elasticsearch.py @@ -17,6 +17,7 @@ import json import re import sigma +import yaml from .base import BaseBackend, SingleTextQueryBackend from .mixins import RulenameCommentMixin, MultiRuleOutputMixin from .exceptions import NotSupportedError @@ -497,3 +498,161 @@ class XPackWatcherBackend(ElasticsearchQuerystringBackend, MultiRuleOutputMixin) else: raise NotImplementedError("Output type '%s' not supported" % self.output_type) return result + +class ElastalertBackend(MultiRuleOutputMixin, ElasticsearchQuerystringBackend): + """Elastalert backend""" + identifier = 'elastalert' + active = True + options = ( + ("emails", None, "Email addresses for Elastalert notification, if you want to alert several email addresses put them coma separated", None), + ("smtp_host", None, "SMTP server address", None), + ("from_addr", None, "Email sender address", None), + ("smtp_auth_file", None, "Local path with login info", None), + ("realert_time", "0m", "Ignore repeating alerts for a period of time", None), + ("expo_realert_time", "60m", "This option causes the value of realert to exponentially increase while alerts continue to fire", None) + ) + interval = None + title = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.elastalert_alerts = dict() + self.fields = [] + + def generate(self, sigmaparser): + rulename = self.getRuleName(sigmaparser) + title = sigmaparser.parsedyaml.setdefault("title", "") + description = sigmaparser.parsedyaml.setdefault("description", "") + false_positives = sigmaparser.parsedyaml.setdefault("falsepositives", "") + level = sigmaparser.parsedyaml.setdefault("level", "") + rule_tag = sigmaparser.parsedyaml.setdefault("tags", ["NOT-DEF"]) + # Get time frame if exists + interval = self.generateTimeframe(sigmaparser.parsedyaml["detection"].setdefault("timeframe", "30m")) + # creating condition + index = sigmaparser.get_logsource().index + if len(index) == 0: # fallback if no index is given + index = "logstash-*" + elif len(index) > 0: + index = index[0] + #Init a rule number cpt in case there are several elastalert rules generated fron one Sigma rule + rule_number = 0 + for parsed in sigmaparser.condparsed: + #Static data + rule_object = { + "name": rulename + "_" + str(rule_number), + "description": description, + "index": index, + "priority": self.convertLevel(level), + "realert": self.generateTimeframe(self.realert_time), + #"exponential_realert": self.generateTimeframe(self.expo_realert_time) + } + rule_object['filter'] = self.generateQuery(parsed) + + #Handle aggregation + if parsed.parsedAgg: + if parsed.parsedAgg.aggfunc == sigma.parser.condition.SigmaAggregationParser.AGGFUNC_COUNT or parsed.parsedAgg.aggfunc == sigma.parser.condition.SigmaAggregationParser.AGGFUNC_MIN or parsed.parsedAgg.aggfunc == sigma.parser.condition.SigmaAggregationParser.AGGFUNC_MAX or parsed.parsedAgg.aggfunc == sigma.parser.condition.SigmaAggregationParser.AGGFUNC_AVG or parsed.parsedAgg.aggfunc == sigma.parser.condition.SigmaAggregationParser.AGGFUNC_SUM: + rule_object['query_key'] = parsed.parsedAgg.groupfield + rule_object['type'] = "metric_aggregation" + rule_object['buffer_time'] = interval + rule_object['doc_type'] = "doc" + + if parsed.parsedAgg.aggfunc == sigma.parser.condition.SigmaAggregationParser.AGGFUNC_COUNT: + rule_object['metric_agg_type'] = "cardinality" + else: + rule_object['metric_agg_type'] = parsed.parsedAgg.aggfunc_notrans + + if parsed.parsedAgg.aggfield: + rule_object['metric_agg_key'] = parsed.parsedAgg.aggfield + else: + rule_object['metric_agg_key'] = "_id" + + condition_value = int(parsed.parsedAgg.condition) + if parsed.parsedAgg.cond_op == ">": + rule_object['max_threshold'] = condition_value + elif parsed.parsedAgg.cond_op == ">=": + rule_object['max_threshold'] = condition_value - 1 + elif parsed.parsedAgg.cond_op == "<": + rule_object['min_threshold'] = condition_value + elif parsed.parsedAgg.cond_op == "<=": + rule_object['min_threshold'] = condition_value - 1 + else: + rule_object['max_threshold'] = condition_value - 1 + rule_object['min_threshold'] = condition_value + 1 + else: + rule_object['type'] = "any" + + #Handle alert action + rule_object['alert'] = [] + if self.emails: + rule_object['alert'].append('email') + rule_object['email'] = [] + for address in self.emails.split(','): + rule_object['email'].append(address) + if self.smtp_host: + rule_object['smtp_host'] = self.smtp_host + if self.from_addr: + rule_object['from_addr'] = self.from_addr + if self.smtp_auth_file: + rule_object['smtp_auth_file'] = self.smtp_auth_file + #If alert is not define put debug as default + if len(rule_object['alert']) == 0: + rule_object['alert'].append('debug') + + #Increment rule number + rule_number += 1 + self.elastalert_alerts[rule_object['name']] = rule_object + #Clear fields + self.fields = [] + + def generateQuery(self, parsed): + #Generate ES QS Query + return [{ 'query' : { 'query_string' : { 'query' : super().generateQuery(parsed) } } }] + + def generateNode(self, node): + #Save fields for adding them in query_key + #if type(node) == sigma.parser.NodeSubexpression: + # for k,v in node.items.items: + # self.fields.append(k) + return super().generateNode(node) + + def generateTimeframe(self, timeframe): + time_unit = timeframe[-1:] + duration = timeframe[:-1] + timeframe_object = {} + if time_unit == "s": + timeframe_object['seconds'] = int(duration) + elif time_unit == "m": + timeframe_object['minutes'] = int(duration) + elif time_unit == "h": + timeframe_object['hours'] = int(duration) + elif time_unit == "d": + timeframe_object['days'] = int(duration) + else: + timeframe_object['months'] = int(duration) + return timeframe_object + + def generateAggregation(self, agg): + if agg: + if agg.aggfunc == sigma.parser.condition.SigmaAggregationParser.AGGFUNC_COUNT or agg.aggfunc == sigma.parser.condition.SigmaAggregationParser.AGGFUNC_MIN or agg.aggfunc == sigma.parser.condition.SigmaAggregationParser.AGGFUNC_MAX or agg.aggfunc == sigma.parser.condition.SigmaAggregationParser.AGGFUNC_AVG or agg.aggfunc == sigma.parser.condition.SigmaAggregationParser.AGGFUNC_SUM: + return "" + else: + for name, idx in agg.aggfuncmap.items(): + if idx == agg.aggfunc: + funcname = name + break + raise NotImplementedError("%s : The '%s' aggregation operator is not yet implemented for this backend"%(self.title, funcname)) + + def convertLevel(self, level): + return { + 'critical': 1, + 'high': 2, + 'medium': 3, + 'low': 4 + }.get(level, 2) + + def finalize(self): + result = "" + for rulename, rule in self.elastalert_alerts.items(): + result += yaml.dump(rule, default_flow_style=False) + result += '\n' + return result