Merge pull request #1566 from eocete-devo/master

New backend for Devo queries
This commit is contained in:
Thomas Patzke 2021-06-22 12:23:36 +02:00 committed by GitHub
commit befdcda507
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 704 additions and 1 deletions

View File

@ -131,7 +131,7 @@ optional arguments:
tag that must appear in the rules tag list, case-
insensitive matching. Multiple log source
specifications are AND linked.
--target {arcsight,es-qs,es-dsl,kibana,xpack-watcher,elastalert,graylog,limacharlie,logpoint,grep,netwitness,powershell,qradar,qualys,splunk,splunkxml,sumologic,fieldlist,mdatp}, -t {arcsight,es-qs,es-dsl,kibana,xpack-watcher,elastalert,graylog,limacharlie,logpoint,grep,netwitness,powershell,qradar,qualys,splunk,splunkxml,sumologic,fieldlist,mdatp}
--target {arcsight,es-qs,es-dsl,kibana,xpack-watcher,elastalert,graylog,limacharlie,logpoint,grep,netwitness,powershell,qradar,qualys,splunk,splunkxml,sumologic,fieldlist,mdatp,devo}, -t {arcsight,es-qs,es-dsl,kibana,xpack-watcher,elastalert,graylog,limacharlie,logpoint,grep,netwitness,powershell,qradar,qualys,splunk,splunkxml,sumologic,fieldlist,mdatp,devo}
Output target format
--target-list, -l List available output target formats
--config CONFIG, -c CONFIG
@ -211,6 +211,7 @@ tools/sigmac -t splunk -c ~/my-splunk-mapping.yml -c tools/config/generic/window
* [Structured Threat Information Expression (STIX)](https://oasis-open.github.io/cti-documentation/stix/intro.html)
* [LOGIQ](https://www.logiq.ai)
* [uberAgent ESA](https://uberagent.com/)
* [Devo](https://devo.com)
Current work-in-progress
* [Splunk Data Models](https://docs.splunk.com/Documentation/Splunk/7.1.0/Knowledge/Aboutdatamodels)

View File

@ -346,4 +346,20 @@ tools/sigmac -t es-qs -c tools/config/winlogbeat.yml --backend-option keyword_ba
```bash
tools/sigmac -t es-qs -c tools/config/winlogbeat.yml --backend-option keyword_field=".keyword" --backend-option analyzed_sub_field_name=".security" rules/windows/sysmon/sysmon_wmi_susp_scripting.yml
```
### Devo
Devo backend admits several configurations that, based on the data source type, will apply a specific mapping and
will point to the proper Devo table. The current available configurations are:
* `devo-windows`, for windows sources
* `devo-web`, for generic web sources (webserver, apache, proxy...)
* `devo-network`, for generic network sources (firewall, dns...)
These backend configurations will specify the Devo table to build the query upon, and the output query will reference such
table if the rule sources matches the configuration sources.
For example, in order to translate a windows-related Sigma rule, one would use:
```bash
tools/sigmac -t devo -c tools/config/devo-windows.yml rules/windows/sysmon/sysmon_wmi_susp_scripting.yml
```

View File

@ -0,0 +1,22 @@
title: Devo sourcetype mappings for network sources
order: 20
backends:
- devo
logsources:
firewall-product:
product: firewall
index: firewall.all.traffic
firewall-category:
category: firewall
index: firewall.all.traffic
dns:
category: dns
index: network.dns
fieldmappings:
src_ip: srcIp
dst_ip: dstIp
dst_port: dstPort
parent_domain: select rootdomain(name) as parent_domain
record_type: type
answer: answers
query: name

29
tools/config/devo-web.yml Normal file
View File

@ -0,0 +1,29 @@
title: Devo sourcetype mappings for web sources
order: 20
backends:
- devo
logsources:
web:
category: webserver
index: web.all.access
proxy:
category: proxy
index: proxy.all.access
apache:
product: apache
index: web.all.access
fieldmappings:
c-uri: url
c-useragent: userAgent
sc-status: statusCode
useragent: userAgent
cs-method: method
clientip: srcIp
uri_query: select uriquery(url) as url_query
r-dns: select urihost(url) as url_dns
cs-host: srcHost
c-uri-query: select uriquery(url) as url_query
c-uri-stem: url
c-uri-extension: select uripath(url) as uri_path
cs-uri-query: select uriquery(url) as url_query

View File

@ -0,0 +1,144 @@
title: Devo sourcetype mappings for windows sources
order: 20
backends:
- devo
logsources:
windows:
product: windows
index: box.all.win
windows-category-process_creation:
product: windows
category: process_creation
windows-service-powershell:
product: windows
service: powershell
windows-service-powershell-classic:
product: windows
service: powershell-classic
windows-service-security:
product: windows
service: security
windows-service-sysmon:
product: windows
service: security
windows-category-registry_event:
product: windows
category: registry_event
windows-category-process_access:
product: windows
category: process_access
windows-service-windefend:
product: windows
service: windefend
windows-service-windef:
product: windows
service: windef
windows_defender:
product: windows_defender
index: box.all.win
windows-service-taskscheduler:
product: windows
service: taskscheduler
windows-service-wmi:
product: windows
service: wmi
windows-service-system:
product: windows
service: system
windows-category-network_connection:
product: windows
category: network_connection
windows-category-image_load:
product: windows
category: image_load
windows-category-file_event:
product: windows
category: file_event
windows-category-driver_load:
product: windows
category: driver_load
windows-service-applocker:
product: windows
service: applocker
windows-service-dns-server:
product: windows
service: dns-server
windows-service-ntlm:
product: windows
service: ntlm
windows-service-driver-framework:
product: windows
service: driver-framework
windows-category-create_remote_thread:
product: windows
category: create_remote_thread
windows-category-create_stream_hash:
product: windows
category: create_stream_hash
windows-category-dns_query:
product: windows
category: dns_query
windows-category-file_delete:
product: windows
category: file_delete
windows-category-pipe_created:
product: windows
category: pipe_created
windows-category-raw_access_thread:
product: windows
category: raw_access_thread
windows-category-wmi_event:
product: windows
category: wmi_event
fieldmappings:
EventID: eventID
HostName: machine
HostApplication: ProcessName # ???
Message: message
CommandLine: procCmdLine
Commandline: procCmdLine
ProcessCommandline: procCmdLine
ProcessCommandLine: procCmdLine
Image: serviceFileName
User: username
TaskName: category
TargetFilename: serviceFileName # ???
ServiceName: service
ProcessName: callerProcName
OriginalFilename: serviceFileName
OriginalFileName: serviceFileName
MachineName: machine
LogonId: subjectLogonId
GroupName: groupName
EventType: eventType
Description: message
Details: extMessage
ObjectName: objName
CreatorProcessName: parentProcessName
ServiceFileName: serviceFileName
ObjectType: objType
Keywords: keywords
SubjectLogonId: subjectLogonId
UserName: username
Status: status
SourceNetworkAddress: srcIp
AccountName: account
ObjectValueName: objValueName
LogonProcessName: procName
TargetUserName: targetUsername
WorkstationName: workstation
SubjectUserName: subjectUsername
Source: sourceName
Destination: dstIp
TargetImage: serviceFileName
CallingProcessName: callerProcName
TargetName: targetUsername
FileName: serviceFileName
TargetObject: objName
DestinationHostname: machine
DestinationIp: dstIp
DestinationIsIpv6: dstIp
ImageLoaded: serviceFileName
ScriptBlockText: select str(jqeval(jqcompile(".columns.data.EventData.ScriptBlockText"), jsonparse(message))) as ScriptBlockText
DestinationPort: select int(trim(split(split(rawMessage, "Destination Port:", 1), "&", 0))) as destinationPort / where eventID > 5100 or eventID < 5199

View File

@ -0,0 +1,254 @@
# Output backends for sigmac
# Copyright 2021 Devo, Inc.
# Author: Eduardo Ocete <eduardo.ocete@devo.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
from .base import SingleTextQueryBackend
from sigma.parser.modifiers.type import SigmaRegularExpressionModifier
from sigma.parser.condition import SigmaAggregationParser
from sigma.parser.exceptions import SigmaParseError
class DevoBackend(SingleTextQueryBackend):
"""Converts Sigma rule into Devo query."""
identifier = "devo"
active = True
andToken = " and " # Token used for linking expressions with logical AND
orToken = " or " # Same for OR
notToken = " not " # Same for NOT
subExpression = "(%s)" # Syntax for subexpressions, usually parenthesis around it. %s is inner expression
listExpression = "%s" # Syntax for lists, %s are list items separated with listSeparator
listSeparator = ", " # Character for separation of list items
valueExpression = "\"%s\"" # Expression of values, %s represents value
intValueExpression = "%s" # Expression of int values, %s represents value
nullExpression = "isnull(%s)" # Expression of queries for null values or non-existing fields. %s is field name
notNullExpression = "isnotnull(%s)" # Expression of queries for not null values. %s is field name
mapExpression = "%s = %s" # Syntax for field/value conditions. First %s is fieldname, second is value
mapMulti = "has(%s, %s)" # Syntax for field/value conditions. First %s is fieldname, second is value
mapWildcard = "matches(%s, nameglob(%s))" # Syntax for globbing conditions
mapRe = "matches(%s, %s)" # Syntax for regex conditions that already were transformed by SigmaRegularExpressionModifier
mapContains = "toktains(%s, %s, true, true)" # Systax for token value searches
mapListValueExpression = "%s or %s" # Syntax for field/value condititons where map value is a list
mapFullTextSearch = "weaktoktains(raw, \"%s\", true, true)" # Expression for full text searches
typedValueExpression = {
SigmaRegularExpressionModifier: "re(\"%s\")", # Syntax for regular expressions
}
# \ -> \\
# \* -> \*
# \\* -> \\*
reEscape = re.compile('("|(?<!\\\\)\\\\(?![*?\\\\]))')
derivedField = re.compile('^select .* as (.+)$')
derivedFieldSet = set()
def __init__(self, sigmaconfig, options):
super().__init__(sigmaconfig)
# Default table name. It is replaced based on the config file
self.table = "sourcetable"
def generateANDNode(self, node):
generated = []
for val in node:
if self.requireFTS(val):
generated.append(self.generateFTS(val))
else:
generated.append(self.generateNode(val))
filtered = [g for g in generated if g is not None]
if filtered:
return self.andToken.join(filtered)
else:
return None
def generateORNode(self, node):
generated = []
for val in node:
if self.requireFTS(val):
generated.append(self.generateFTS(val))
else:
generated.append(self.generateNode(val))
filtered = [g for g in generated if g is not None]
if filtered:
return self.orToken.join(filtered)
else:
return None
def generateNOTNode(self, node):
if self.requireFTS(node.item):
generated = self.generateFTS(node.item)
else:
generated = self.generateNode(node.item)
if generated is not None:
return self.notToken + generated
else:
return None
def generateSubexpressionNode(self, node):
generated = self.generateNode(node.items)
if generated:
return self.subExpression % generated
else:
return None
def generateListNode(self, node):
if not set([type(value) for value in node]).issubset({str, int}):
raise TypeError("List values must be strings or numbers")
return self.listExpression % (self.listSeparator.join([self.generateNode(value) for value in node]))
def generateMapItemNode(self, node):
fieldname, value = node
transformed_fieldname = self.fieldNameMapping(fieldname, value)
if not value:
# Handle value == None
return self.generateNULLValueNode(transformed_fieldname)
has_startswith = self.generateNode(value).startswith("\"*")
has_endswith = self.generateNode(value).endswith("*\"")
has_contains = has_startswith and has_endswith and len(self.generateNode(value)) > 3 # Covers "*" case
if type(value) == SigmaRegularExpressionModifier:
return self.mapRe % (transformed_fieldname, self.generateNode(value))
elif type(value) == list:
if has_contains:
return self.subExpression % self.andToken.join(self.mapContains % (transformed_fieldname, self.generateNode(val[1:-1])) for val in value)
elif has_startswith or has_endswith:
return self.generateMapItemListNode(transformed_fieldname, value)
else:
return self.mapMulti % (transformed_fieldname, self.generateNode(value))
elif type(value) in (str, int):
if has_contains:
return self.mapContains % (transformed_fieldname, self.generateNode(value[1:-1]))
elif has_startswith or has_endswith:
return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
else:
return self.mapExpression % (transformed_fieldname, self.generateNode(value))
else:
raise TypeError("Devo backend does not support map values of type " + str(type(value)))
def generateMapItemListNode(self, key, value):
return "(" + (" or ".join([self.mapWildcard % (key, self.generateValueNode(item)) for item in value])) + ")"
def generateValueNode(self, node):
if type(node) == int:
return self.intValueExpression % int(node)
return self.valueExpression % (self.cleanValue(node))
def generateNULLValueNode(self, fieldname):
return self.nullExpression % fieldname
def generateNotNULLValueNode(self, fieldname):
return self.notNullExpression % fieldname
def generateTypedValueNode(self, node):
try:
return self.typedValueExpression[type(node)] % (self.cleanValue(str(node)))
except KeyError:
raise NotImplementedError("Type modifier '{}' is not supported by backend".format(node.identifier))
def generateFTS(self, value):
return self.mapFullTextSearch % self.cleanValue(value)
def requireFTS(self, value):
return isinstance(value, str) or isinstance(value, int) or isinstance(value, list)
def fieldNameMapping(self, field, value):
# Handle derived fields
matched = self.derivedField.search(field)
if matched:
self.derivedFieldSet.add(field)
return matched.group(1)
return field
def generateAggregation(self, agg, where_clause):
if not agg:
return self.table, where_clause
# Near operator not supported yet
if agg.aggfunc == SigmaAggregationParser.AGGFUNC_NEAR:
raise NotImplementedError("The 'near' aggregation operator is not implemented for the %s backend" % self.identifier)
if (agg.aggfunc == SigmaAggregationParser.AGGFUNC_COUNT or
agg.aggfunc == SigmaAggregationParser.AGGFUNC_MAX or
agg.aggfunc == SigmaAggregationParser.AGGFUNC_MIN or
agg.aggfunc == SigmaAggregationParser.AGGFUNC_SUM or
agg.aggfunc == SigmaAggregationParser.AGGFUNC_AVG):
if agg.groupfield:
group_by = " group by {0}".format(self.fieldNameMapping(agg.groupfield, None))
else:
group_by = ""
if agg.aggfield:
select = "{}({}) as agg".format(agg.aggfunc_notrans, self.fieldNameMapping(agg.aggfield, None))
else:
if agg.aggfunc == SigmaAggregationParser.AGGFUNC_COUNT:
select = "{}(*) as agg".format(agg.aggfunc_notrans)
else:
raise SigmaParseError("For {} aggregation a fieldname needs to be specified".format(agg.aggfunc_notrans))
if self.derivedFieldSet:
derivedFieldsStr = " {}".format(" ".join(self.derivedFieldSet))
else:
derivedFieldsStr = ""
temp_table = "from {}{} where {}{} select {}".format(self.table, derivedFieldsStr, where_clause, group_by, select)
agg_condition = "agg {} {}".format(agg.cond_op, agg.condition)
return temp_table, agg_condition
raise NotImplementedError("{} aggregation not implemented in Devo Backend".format(agg.aggfunc_notrans))
def generateQuery(self, parsed):
if self.requireFTS(parsed.parsedSearch):
result = self.generateFTS(parsed.parsedSearch)
else:
result = self.generateNode(parsed.parsedSearch)
if parsed.parsedAgg:
fro, whe = self.generateAggregation(parsed.parsedAgg, result)
return "{} where {} select *".format(fro, whe)
if self.derivedFieldSet:
derivedFieldsStr = " {}".format(" ".join(self.derivedFieldSet))
else:
derivedFieldsStr = ""
return "from {}{} where {} select *".format(self.table, derivedFieldsStr, result)
def generate(self, sigmaparser):
"""Method is called for each sigma rule and receives the parsed rule (SigmaParser)"""
self.derivedFieldSet = set()
if sigmaparser.get_logsource() and sigmaparser.get_logsource().index:
self.table = sigmaparser.get_logsource().index[0]
else:
self.table = "sourcetable"
for parsed in sigmaparser.condparsed:
# Multi condition rules are not supported yet, only the first one will be processed
query = self.generateQuery(parsed)
before = self.generateBefore(parsed)
after = self.generateAfter(parsed)
result = ""
if before is not None:
result = before
if query is not None:
result += query
if after is not None:
result += after
return result

View File

@ -0,0 +1,237 @@
# Test output backends for sigmac
# Copyright 2021 Devo, Inc.
# Author: Eduardo Ocete <eduardo.ocete@devo.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from unittest.mock import patch
from sigma.backends.devo import DevoBackend
from sigma.parser.collection import SigmaCollectionParser
from sigma.configuration import SigmaConfiguration
class TestDevoBackend(unittest.TestCase):
def setUp(self):
self.basic_rule = {"title": "Devo Backend Test", "level": "testing"}
self.table = "sourcetable"
def testPlain(self):
# Int value
detection = {"selection1": {"fieldname1": 1},
"condition": "selection1"}
expected_result = 'from {} where fieldname1 = 1 select *'.format(self.table)
self.validate(detection, expected_result)
# String value
detection = {"selection1": {"fieldname1": "value1"},
"condition": "selection1"}
expected_result = 'from {} where fieldname1 = "value1" select *'.format(self.table)
self.validate(detection, expected_result)
# Int array value
detection = {"selection1": {"fieldname1": [1, 2, 3]},
"condition": "selection1"}
expected_result = 'from {} where has(fieldname1, 1, 2, 3) select *'.format(self.table)
self.validate(detection, expected_result)
# String array value
detection = {"selection1": {"fieldname1": ["value1", "value2", "value3"]},
"condition": "selection1"}
expected_result = 'from {} where has(fieldname1, "value1", "value2", "value3") select *'.format(self.table)
self.validate(detection, expected_result)
# Simple and
detection = {"selection1": {"fieldname1": ["value1", "value2", "value3"],
"fieldname2": "value5"},
"condition": "selection1"}
expected_result = 'from {} where (has(fieldname1, "value1", "value2", "value3") and fieldname2 = "value5") select *'.format(self.table)
self.validate(detection, expected_result)
# Selection and
detection = {"selection1": {"fieldname1": [1, 2, 3]},
"selection2": {"fieldname2": "value5"},
"condition": "selection1 and selection2"}
expected_result = 'from {} where (has(fieldname1, 1, 2, 3) and fieldname2 = "value5") select *'.format(self.table)
self.validate(detection, expected_result)
# Selection or
detection = {"selection1": {"fieldname1": [1, 2, 3]},
"selection2": {"fieldname2": "value5"},
"condition": "selection1 or selection2"}
expected_result = 'from {} where (has(fieldname1, 1, 2, 3) or fieldname2 = "value5") select *'.format(self.table)
self.validate(detection, expected_result)
# Selection one of them
detection = {"selection1": {"fieldname1": [1, 2, 3]},
"selection2": {"fieldname2": "value5"},
"condition": "1 of them"}
expected_result = 'from {} where (has(fieldname1, 1, 2, 3) or fieldname2 = "value5") select *'.format(self.table)
self.validate(detection, expected_result)
# Selection all of them
detection = {"selection1": {"fieldname1": [1, 2, 3]},
"selection2": {"fieldname2": "value5"},
"condition": "all of them"}
expected_result = 'from {} where (has(fieldname1, 1, 2, 3) and fieldname2 = "value5") select *'.format(self.table)
self.validate(detection, expected_result)
# Negation
detection = {"selection1": {"fieldname1": [1, 2, 3]},
"selection2": {"fieldname2": "value5"},
"condition": "selection1 and not selection2"}
expected_result = 'from {} where (has(fieldname1, 1, 2, 3) and not (fieldname2 = "value5")) select *'.format(self.table)
self.validate(detection, expected_result)
def testModifiers(self):
# Contains
detection = {"selection1": {"fieldname1|contains": "value1"},
"condition": "selection1"}
expected_result = 'from {} where toktains(fieldname1, "value1", true, true) select *'.format(self.table)
self.validate(detection, expected_result)
# StartsWith
detection = {"selection1": {"fieldname1|startswith": "value1"},
"condition": "selection1"}
expected_result = 'from {} where matches(fieldname1, nameglob("value1*")) select *'.format(self.table)
self.validate(detection, expected_result)
# EndsWith
detection = {"selection1": {"fieldname1|endswith": "value1"},
"condition": "selection1"}
expected_result = 'from {} where matches(fieldname1, nameglob("*value1")) select *'.format(self.table)
self.validate(detection, expected_result)
# All
detection = {"selection1": {"fieldname1|all": ["value1", "value2"]},
"condition": "selection1"}
expected_result = 'from {} where (fieldname1 = "value1" and fieldname1 = "value2") select *'.format(self.table)
self.validate(detection, expected_result)
def testAggregations(self):
# Count
detection = {"selection1": {"fieldname1": "value1"},
"condition": "selection1 | count() > 1"}
expected_result = 'from {} where fieldname1 = "value1" select count(*) as agg where agg > 1 select *'.format(self.table)
self.validate(detection, expected_result)
# Min
detection = {"selection1": {"fieldname1": "value1"},
"condition": "selection1 | min(fieldname2) by fieldname3 > 5"}
expected_result = 'from {} where fieldname1 = "value1" group by fieldname3 select min(fieldname2) as agg where agg > 5 select *'.format(self.table)
self.validate(detection, expected_result)
# Max
detection = {"selection1": {"fieldname1": "value1"},
"condition": "selection1 | max(fieldname2) by fieldname3 > 5"}
expected_result = 'from {} where fieldname1 = "value1" group by fieldname3 select max(fieldname2) as agg where agg > 5 select *'.format(self.table)
self.validate(detection, expected_result)
# Avg
detection = {"selection1": {"fieldname1": "value1"},
"condition": "selection1 | avg(fieldname2) by fieldname3 > 5"}
expected_result = 'from {} where fieldname1 = "value1" group by fieldname3 select avg(fieldname2) as agg where agg > 5 select *'.format(self.table)
self.validate(detection, expected_result)
# sum
detection = {"selection1": {"fieldname1": "value1"},
"condition": "selection1 | sum(fieldname2) by fieldname3 > 5"}
expected_result = 'from {} where fieldname1 = "value1" group by fieldname3 select sum(fieldname2) as agg where agg > 5 select *'.format(self.table)
self.validate(detection, expected_result)
# <
detection = {"selection1": {"fieldname1": "value1"},
"condition": "selection1 | sum(fieldname2) by fieldname3 < 5"}
expected_result = 'from {} where fieldname1 = "value1" group by fieldname3 select sum(fieldname2) as agg where agg < 5 select *'.format(self.table)
self.validate(detection, expected_result)
# ==
detection = {"selection1": {"fieldname1": "value1"},
"condition": "selection1 | sum(fieldname2) by fieldname3 == 5"}
expected_result = 'from {} where fieldname1 = "value1" group by fieldname3 select sum(fieldname2) as agg where agg == 5 select *'.format(self.table)
self.validate(detection, expected_result)
# Multiple conditions
detection = {"selection1": {"fieldname1": "value1"},
"selection2": {"fieldname2": "*", "fieldname3": "*"},
"condition": "selection1 or selection2 | count(fieldname4) by fieldname5 > 3"}
expected_result = 'from {} where (fieldname1 = "value1" or (matches(fieldname2, nameglob("*")) and matches(fieldname3, nameglob("*")))) group by fieldname5 select count(fieldname4) as agg where agg > 3 select *'.format(self.table)
self.validate(detection, expected_result)
def testFullTextSearch(self):
# Single str FTS
detection = {"selection1": ["value1"],
"condition": "selection1"}
expected_result = 'from {} where weaktoktains(raw, "value1", true, true) select *'.format(self.table)
self.validate(detection, expected_result)
# OR node FTS
detection = {"selection1": {"fieldname1": "value1"},
"selection2|contains": ["value2", "value3"],
"condition": "1 of them"}
expected_result = 'from {} where (fieldname1 = "value1" or weaktoktains(raw, "value2", true, true) or weaktoktains(raw, "value3", true, true)) select *'.format(self.table)
self.validate(detection, expected_result)
def testRegex(self):
# Arrange
detection = {"selection1": {"fieldname1|re": "([0-9]|[1-9][0-9]|[1-4][0-9]{2})"},
"condition": "selection1"}
expected_result = 'from ' + self.table + ' where matches(fieldname1, re(\"([0-9]|[1-9][0-9]|[1-4][0-9]{2})\")) select *'
# Act & Assert
self.validate(detection, expected_result)
def testDerivedFields(self):
# Arrange
detection = {"selection1": {"select func(fieldname1) as fieldname1": "value1"},
"condition": "selection1"}
expected_result = 'from ' + self.table + \
' select func(fieldname1) as fieldname1 where fieldname1 = "value1" select *'
# Act & Assert
self.validate(detection, expected_result)
def testNearNotSupported(self):
# Arrange
detection = {"selection1": {"fieldname1": "value1"},
"selection2": {"fieldname2": "value2"},
"condition": "selection1 | near selection1 and selection2"}
expected_result = NotImplementedError()
# Act & Assert
self.validate(detection, expected_result)
def validate(self, detection, expectation):
config = SigmaConfiguration()
self.basic_rule["detection"] = detection
with patch("yaml.safe_load_all", return_value=[self.basic_rule]):
parser = SigmaCollectionParser("any sigma io", config, None)
backend = DevoBackend(config, self.table)
assert len(parser.parsers) == 1
for p in parser.parsers:
if isinstance(expectation, str):
self.assertEqual(expectation, backend.generate(p))
elif isinstance(expectation, Exception):
self.assertRaises(type(expectation), backend.generate, p)
if __name__ == '__main__':
unittest.main()