Merge pull request #1772 from eocete-devo:master

[Devo backend] Added support for multicondition rules using Devo subqueries
This commit is contained in:
Thomas Patzke 2021-08-13 23:30:04 +02:00 committed by GitHub
commit f9c9f73b09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 4 deletions

View File

@ -53,6 +53,7 @@ class DevoBackend(SingleTextQueryBackend):
reEscape = re.compile('("|(?<!\\\\)\\\\(?![*?\\\\]))')
derivedField = re.compile('^select .* as (.+)$')
derivedFieldSet = set()
hasMulticondition = False
def __init__(self, sigmaconfig, options):
super().__init__(sigmaconfig)
@ -189,7 +190,10 @@ class DevoBackend(SingleTextQueryBackend):
agg.aggfunc == SigmaAggregationParser.AGGFUNC_AVG):
if agg.groupfield:
group_by = " group by {0}".format(self.fieldNameMapping(agg.groupfield, None))
if self.hasMulticondition:
group_by = " group every - by subquery_link,{0}".format(self.fieldNameMapping(agg.groupfield, None))
else:
group_by = " group by {0}".format(self.fieldNameMapping(agg.groupfield, None))
else:
group_by = ""
@ -206,7 +210,12 @@ class DevoBackend(SingleTextQueryBackend):
else:
derivedFieldsStr = ""
temp_table = "from {}{} where {}{} select {}".format(self.table, derivedFieldsStr, where_clause, group_by, select)
if self.hasMulticondition:
link_select = ' select "link" as subquery_link'
else:
link_select = ""
temp_table = "from {}{} where {}{}{} select {}".format(self.table, derivedFieldsStr, where_clause, link_select, group_by, select)
agg_condition = "agg {} {}".format(agg.cond_op, agg.condition)
return temp_table, agg_condition
@ -227,7 +236,12 @@ class DevoBackend(SingleTextQueryBackend):
else:
derivedFieldsStr = ""
return "from {}{} where {} select *".format(self.table, derivedFieldsStr, result)
if self.hasMulticondition:
select = 'select "link" as subquery_link'
else:
select = "select *"
return "from {}{} where {} {}".format(self.table, derivedFieldsStr, result, select)
def generate(self, sigmaparser):
"""Method is called for each sigma rule and receives the parsed rule (SigmaParser)"""
@ -237,6 +251,12 @@ class DevoBackend(SingleTextQueryBackend):
else:
self.table = "sourcetable"
if len(sigmaparser.condparsed) > 1:
self.hasMulticondition = True
else:
self.hasMulticondition = False
results = []
for parsed in sigmaparser.condparsed:
# Multi condition rules are not supported yet, only the first one will be processed
query = self.generateQuery(parsed)
@ -251,4 +271,17 @@ class DevoBackend(SingleTextQueryBackend):
if after is not None:
result += after
return result
results.append(result)
if self.hasMulticondition:
prefix = 'from siem.logtrust.alert.info select "link" as subquery_link group every 24h by subquery_link where '
suffix = " select *"
for i in range(len(results)):
results[i] = "subquery_link in ( " + results[i]
results[i] += ")"
body = " or ".join(results)
return prefix + body + suffix
return results[0]

View File

@ -214,6 +214,31 @@ class TestDevoBackend(unittest.TestCase):
# Act & Assert
self.validate(detection, expected_result)
def testMulticondition(self):
# Arrange
detection = {"selection1": {"fieldname1": "value1"},
"selection2": {"fieldname2": "value2"},
"condition": ["selection1", "selection2"]}
expected_result = 'from siem.logtrust.alert.info select "link" as subquery_link group every 24h by subquery_link' \
' where subquery_link in ( from ' + self.table + \
' where fieldname1 = "value1" select "link" as subquery_link) or subquery_link in ( from ' + self.table + \
' where fieldname2 = "value2" select "link" as subquery_link) select *'
# Act & Assert
self.validate(detection, expected_result)
def testMulticonditionAgg(self):
# Arrange
detection = {"selection1": {"fieldname1": "value1"},
"selection2": {"fieldname2": "value2"},
"condition": ["selection1 | count(fieldname1) by fieldname2 > 3", "selection2 | count(fieldname3) by fieldname4 > 3"]}
expected_result = 'from siem.logtrust.alert.info select "link" as subquery_link group every 24h by subquery_link' \
' where subquery_link in ( from ' + self.table + ' where fieldname1 = "value1" select "link" as' \
' subquery_link group every - by subquery_link,fieldname2 select count(fieldname1) as agg where agg > 3 select *)' \
' or subquery_link in ( from ' + self.table + ' where fieldname2 = "value2" select "link" as ' \
'subquery_link group every - by subquery_link,fieldname4 select count(fieldname3) as agg where agg > 3 select *) select *'
# Act & Assert
self.validate(detection, expected_result)
def validate(self, detection, expectation):
config = SigmaConfiguration()