Merge pull request #807 from forensicanalysis/master

Add sqlite backend
This commit is contained in:
Florian Roth 2020-05-30 09:31:45 +02:00 committed by GitHub
commit 0cbc099def
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 724 additions and 14 deletions

View File

@ -35,3 +35,6 @@ jobs:
- name: Test Generated Elasticsearch Query Strings
run: |
make test-backend-es-qs
- name: Test SQL(ite) Backend
run: |
make test-backend-sql

View File

@ -58,6 +58,7 @@ test-sigmac:
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t humio -O rulecomment -c tools/config/humio.yml rules/ > /dev/null
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t crowdstrike -O rulecomment -c tools/config/crowdstrike.yml rules/ > /dev/null
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t sql -c sysmon rules/ > /dev/null
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t sqlite -c sysmon rules/ > /dev/null
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t logiq -c sysmon rules/ > /dev/null
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t splunk -c tools/config/splunk-windows-index.yml -f 'level>=high,level<=critical,status=stable,logsource=windows,tag=attack.execution' rules/ > /dev/null
! $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t splunk -c tools/config/splunk-windows-index.yml -f 'level>=high,level<=critical,status=xstable,logsource=windows' rules/ > /dev/null
@ -107,6 +108,10 @@ test-merge:
test-backend-es-qs:
tests/test-backend-es-qs.py
test-backend-sql:
cd tools && python3 setup.py install
cd tools && $(COVERAGE) run -m pytest tests/test_backend_sql.py tests/test_backend_sqlite.py
test-sigma2attack:
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigma2attack

View File

@ -1,5 +1,6 @@
# Output backends for sigmac
# Copyright 2019 Jayden Zheng
# Copyright 2020 Jonas Hagg
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
@ -16,7 +17,9 @@
import re
import sigma
from .base import SingleTextQueryBackend
from sigma.backends.base import SingleTextQueryBackend
from sigma.parser.condition import SigmaAggregationParser, NodeSubexpression, ConditionAND, ConditionOR, ConditionNOT
from sigma.parser.exceptions import SigmaParseError
class SQLBackend(SingleTextQueryBackend):
"""Converts Sigma rule into SQL query"""
@ -34,12 +37,16 @@ class SQLBackend(SingleTextQueryBackend):
notNullExpression = "%s=*" # Expression of queries for not null values. %s is field name
mapExpression = "%s = %s" # Syntax for field/value conditions. First %s is fieldname, second is value
mapMulti = "%s IN %s" # Syntax for field/value conditions. First %s is fieldname, second is value
mapWildcard = "%s LIKE %s" # Syntax for swapping wildcard conditions.
mapWildcard = "%s LIKE %s ESCAPE \'\\\'"# Syntax for swapping wildcard conditions: Adding \ as escape character
mapSource = "%s=%s" # Syntax for sourcetype
mapListsSpecialHandling = False # Same handling for map items with list values as for normal values (strings, integers) if True, generateMapItemListNode method is called with node
mapListValueExpression = "%s OR %s" # Syntax for field/value condititons where map value is a list
mapLength = "(%s %s)"
def __init__(self, sigmaconfig, table):
super().__init__(sigmaconfig)
self.table = table
def generateANDNode(self, node):
generated = [ self.generateNode(val) for val in node ]
filtered = [ g for g in generated if g is not None ]
@ -78,29 +85,32 @@ class SQLBackend(SingleTextQueryBackend):
def generateMapItemNode(self, node):
fieldname, value = node
transformed_fieldname = self.fieldNameMapping(fieldname, value)
if "," in self.generateNode(value) and "%" not in self.generateNode(value):
has_wildcard = re.search(r"((\\(\*|\?|\\))|\*|\?|_|%)", self.generateNode(value))
if "," in self.generateNode(value) and not has_wildcard:
return self.mapMulti % (transformed_fieldname, self.generateNode(value))
elif "LENGTH" in transformed_fieldname:
return self.mapLength % (transformed_fieldname, value)
elif type(value) == list:
return self.generateMapItemListNode(transformed_fieldname, value)
elif self.mapListsSpecialHandling == False and type(value) in (str, int, list) or self.mapListsSpecialHandling == True and type(value) in (str, int):
if "%" in self.generateNode(value):
if has_wildcard:
return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
else:
return self.mapExpression % (transformed_fieldname, self.generateNode(value))
elif "sourcetype" in transformed_fieldname:
return self.mapSource % (transformed_fieldname, self.generateNode(value))
elif "*" in str(value):
elif has_wildcard:
return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
else:
raise TypeError("Backend does not support map values of type " + str(type(value)))
def generateMapItemListNode(self, key, value):
return "(" + (" OR ".join(['%s LIKE %s' % (key, self.generateValueNode(item)) for item in value])) + ")"
return "(" + (" OR ".join([self.mapWildcard % (key, self.generateValueNode(item)) for item in value])) + ")"
def generateValueNode(self, node):
return self.valueExpression % (self.cleanValue(str(node)))
return self.valueExpression % (self.cleanValue(str(node)))
def generateNULLValueNode(self, node):
return self.nullExpression % (node.item)
@ -117,10 +127,97 @@ class SQLBackend(SingleTextQueryBackend):
return fieldname
def cleanValue(self, val):
if "*" == val:
pass
elif "*.*.*" in val:
val = val.replace("*.*.*", "%")
elif re.search(r'\*', val):
val = re.sub(r'\*', '%', val)
if not isinstance(val, str):
return str(val)
#Single backlashes which are not in front of * or ? are doulbed
val = re.sub(r"(?<!\\)\\(?!(\\|\*|\?))", r"\\\\", val)
#Replace _ with \_ because _ is a sql wildcard
val = re.sub(r'_', r'\_', val)
#Replace % with \% because % is a sql wildcard
val = re.sub(r'%', r'\%', val)
#Replace * with %, if even number of backslashes (or zero) in front of *
val = re.sub(r"(?<!\\)(\\\\)*(?!\\)\*", r"\1%", val)
#Replace ? with _, if even number of backsashes (or zero) in front of ?
val = re.sub(r"(?<!\\)(\\\\)*(?!\\)\?", r"\1_", val)
return val
def generateAggregation(self, agg, where_clausel):
if not agg:
return self.table, where_clausel
if (agg.aggfunc == SigmaAggregationParser.AGGFUNC_COUNT or
agg.aggfunc == SigmaAggregationParser.AGGFUNC_MAX or
agg.aggfunc == SigmaAggregationParser.AGGFUNC_MIN or
agg.aggfunc == SigmaAggregationParser.AGGFUNC_SUM or
agg.aggfunc == SigmaAggregationParser.AGGFUNC_AVG):
if agg.groupfield:
group_by = " GROUP BY {0}".format(self.fieldNameMapping(agg.groupfield, None))
else:
group_by = ""
if agg.aggfield:
select = "{}({}) AS agg".format(agg.aggfunc_notrans, self.fieldNameMapping(agg.aggfield, None))
else:
if agg.aggfunc == SigmaAggregationParser.AGGFUNC_COUNT:
select = "{}(*) AS agg".format(agg.aggfunc_notrans)
else:
raise SigmaParseError("For {} aggregation a fieldname needs to be specified".format(agg.aggfunc_notrans))
temp_table = "(SELECT {} FROM {} WHERE {}{})".format(select, self.table, where_clausel, group_by)
agg_condition = "agg {} {}".format(agg.cond_op, agg.condition)
return temp_table, agg_condition
raise NotImplementedError("{} aggregation not implemented in SQL Backend".format(agg.aggfunc_notrans))
def generateQuery(self, parsed):
if self._recursiveFtsSearch(parsed.parsedSearch):
raise NotImplementedError("FullTextSearch not implemented for SQL Backend.")
result = self.generateNode(parsed.parsedSearch)
if parsed.parsedAgg:
#Handle aggregation
fro, whe = self.generateAggregation(parsed.parsedAgg, result)
return "SELECT * FROM {} WHERE {}".format(fro, whe)
return "SELECT * FROM {} WHERE {}".format(self.table, result)
def _recursiveFtsSearch(self, subexpression):
#True: found subexpression, where no fieldname is requested -> full text search
#False: no subexpression found, where a full text search is needed
def _evaluateCondition(condition):
#Helper function to evaulate condtions
if type(condition) not in [ConditionAND, ConditionOR, ConditionNOT]:
raise NotImplementedError("Error in recursive Search logic")
results = []
for elem in condition.items:
if isinstance(elem, NodeSubexpression):
results.append(self._recursiveFtsSearch(elem))
if isinstance(elem, ConditionNOT):
results.append(_evaluateCondition(elem))
if isinstance(elem, tuple):
results.append(False)
if type(elem) in (str, int, list):
return True
return any(results)
if type(subexpression) in [str, int, list]:
return True
elif type(subexpression) in [tuple]:
return False
if not isinstance(subexpression, NodeSubexpression):
raise NotImplementedError("Error in recursive Search logic")
if isinstance(subexpression.items, NodeSubexpression):
return self._recursiveFtsSearch(subexpression.items)
elif type(subexpression.items) in [ConditionAND, ConditionOR, ConditionNOT]:
return _evaluateCondition(subexpression.items)

View File

@ -0,0 +1,123 @@
# Output backends for sigmac
# Copyright 2020 Jonas Hagg
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from sigma.backends.sql import SQLBackend
from sigma.parser.condition import NodeSubexpression, ConditionAND, ConditionOR, ConditionNOT
import re
class SQLiteBackend(SQLBackend):
"""Converts Sigma rule into SQL query for SQLite"""
identifier = "sqlite"
active = True
mapFullTextSearch = "%s MATCH ('\"%s\"')"
def __init__(self, sigmaconfig, table):
super().__init__(sigmaconfig, table)
self.mappingItem = False
def requireFTS(self, node):
return (not self.mappingItem and
(type(node) in (int, str) or all(isinstance(val, str) for val in node) or all(isinstance(val, int) for val in node)))
def generateFTS(self, value):
if re.search(r"((\\(\*|\?|\\))|\*|\?|_|%)", value):
raise NotImplementedError(
"Wildcards in SQlite Full Text Search not implemented")
self.countFTS += 1
return self.mapFullTextSearch % (self.table, value)
def generateANDNode(self, node):
if self.requireFTS(node):
fts = str('"' + self.andToken + '"').join(self.cleanValue(val)
for val in node)
return self.generateFTS(fts)
generated = [self.generateNode(val) for val in node]
filtered = [g for g in generated if g is not None]
if filtered:
return self.andToken.join(filtered)
else:
return None
def generateORNode(self, node):
if self.requireFTS(node):
fts = str('"' + self.orToken + '"').join(self.cleanValue(val)
for val in node)
return self.generateFTS(fts)
generated = [self.generateNode(val) for val in node]
filtered = [g for g in generated if g is not None]
if filtered:
return self.orToken.join(filtered)
else:
return None
def generateMapItemNode(self, node):
try:
self.mappingItem = True
fieldname, value = node
transformed_fieldname = self.fieldNameMapping(fieldname, value)
has_wildcard = re.search(
r"((\\(\*|\?|\\))|\*|\?|_|%)", self.generateNode(value))
if "," in self.generateNode(value) and not has_wildcard:
return self.mapMulti % (transformed_fieldname, self.generateNode(value))
elif "LENGTH" in transformed_fieldname:
return self.mapLength % (transformed_fieldname, value)
elif type(value) == list:
return self.generateMapItemListNode(transformed_fieldname, value)
elif self.mapListsSpecialHandling == False and type(value) in (str, int, list) or self.mapListsSpecialHandling == True and type(value) in (str, int):
if has_wildcard:
return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
else:
return self.mapExpression % (transformed_fieldname, self.generateNode(value))
elif "sourcetype" in transformed_fieldname:
return self.mapSource % (transformed_fieldname, self.generateNode(value))
elif has_wildcard:
return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
else:
raise TypeError(
"Backend does not support map values of type " + str(type(value)))
finally:
self.mappingItem = False
def generateValueNode(self, node):
if self.mappingItem:
return self.valueExpression % (self.cleanValue(str(node)))
else:
return self.generateFTS(self.cleanValue(str(node)))
def generateQuery(self, parsed):
self.countFTS = 0
result = self.generateNode(parsed.parsedSearch)
if self.countFTS > 1:
raise NotImplementedError(
"Match operator ({}) is allowed only once in SQLite, parse rule in a different way:\n{}".format(self.countFTS, result))
self.countFTS = 0
if parsed.parsedAgg:
# Handle aggregation
fro, whe = self.generateAggregation(parsed.parsedAgg, result)
return "SELECT * FROM {} WHERE {}".format(fro, whe)
return "SELECT * FROM {} WHERE {}".format(self.table, result)

View File

@ -0,0 +1,334 @@
# Test output backends for sigmac
# Copyright 2020 Jonas Hagg
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from unittest.mock import patch
from sigma.backends.sql import SQLBackend
from sigma.parser.collection import SigmaCollectionParser
from sigma.config.mapping import FieldMapping
from sigma.configuration import SigmaConfiguration
class TestGenerateQuery(unittest.TestCase):
def setUp(self):
self.basic_rule = {"title": "Test", "level": "testing"}
self.table = "eventlog"
def test_regular_queries(self):
# Test regular queries
detection = {"selection": {"fieldname": "test1"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname = "test1"'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": {"fieldname": 4}, "condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname = "4"'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": {"fieldname": [
"test1", "test2"]}, "condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname IN ("test1", "test2")'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": {
"fieldname": [3, 4]}, "condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname IN ("3", "4")'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": {"fieldname1": "test1", "fieldname2": [
"test2", "test3"]}, "condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE (fieldname1 = "test1" AND fieldname2 IN ("test2", "test3"))'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": {"fieldname": "test1"}, "filter": {
"fieldname2": "whatever"}, "condition": "selection and filter"}
expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" AND fieldname2 = "whatever")'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": {"fieldname": "test1"}, "filter": {
"fieldname2": "whatever"}, "condition": "selection or filter"}
expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" OR fieldname2 = "whatever")'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": {"fieldname": "test1"}, "filter": {
"fieldname2": "whatever"}, "condition": "selection and not filter"}
expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" AND NOT (fieldname2 = "whatever"))'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": {"fieldname1": "test1"}, "filter": {
"fieldname2": "test2"}, "condition": "1 of them"}
expected_result = 'SELECT * FROM {} WHERE (fieldname1 = "test1" OR fieldname2 = "test2")'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": {"fieldname1": "test1"}, "filter": {
"fieldname2": "test2"}, "condition": "all of them"}
expected_result = 'SELECT * FROM {} WHERE (fieldname1 = "test1" AND fieldname2 = "test2")'.format(
self.table)
self.validate(detection, expected_result)
def test_modifiers(self):
# contains
detection = {"selection": {"fieldname|contains": "test"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE "%test%" ESCAPE \'\\\''.format(
self.table)
self.validate(detection, expected_result)
# all
detection = {"selection": {"fieldname|all": [
"test1", "test2"]}, "condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" AND fieldname = "test2")'.format(
self.table)
self.validate(detection, expected_result)
# endswith
detection = {"selection": {"fieldname|endswith": "test"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE "%test" ESCAPE \'\\\''.format(
self.table)
self.validate(detection, expected_result)
# startswith
detection = {"selection": {"fieldname|startswith": "test"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE "test%" ESCAPE \'\\\''.format(
self.table)
self.validate(detection, expected_result)
def test_aggregations(self):
# count
detection = {"selection": {"fieldname": "test"},
"condition": "selection | count() > 5"}
inner_query = 'SELECT count(*) AS agg FROM {} WHERE fieldname = "test"'.format(
self.table)
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
self.validate(detection, expected_result)
# min
detection = {"selection": {"fieldname1": "test"},
"condition": "selection | min(fieldname2) > 5"}
inner_query = 'SELECT min(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
self.table)
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
self.validate(detection, expected_result)
# max
detection = {"selection": {"fieldname1": "test"},
"condition": "selection | max(fieldname2) > 5"}
inner_query = 'SELECT max(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
self.table)
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
self.validate(detection, expected_result)
# avg
detection = {"selection": {"fieldname1": "test"},
"condition": "selection | avg(fieldname2) > 5"}
inner_query = 'SELECT avg(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
self.table)
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
self.validate(detection, expected_result)
# sum
detection = {"selection": {"fieldname1": "test"},
"condition": "selection | sum(fieldname2) > 5"}
inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
self.table)
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
self.validate(detection, expected_result)
# <
detection = {"selection": {"fieldname1": "test"},
"condition": "selection | sum(fieldname2) < 5"}
inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
self.table)
expected_result = 'SELECT * FROM ({}) WHERE agg < 5'.format(inner_query)
self.validate(detection, expected_result)
# ==
detection = {"selection": {"fieldname1": "test"},
"condition": "selection | sum(fieldname2) == 5"}
inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
self.table)
expected_result = 'SELECT * FROM ({}) WHERE agg == 5'.format(inner_query)
self.validate(detection, expected_result)
# group by
detection = {"selection": {"fieldname1": "test"},
"condition": "selection | sum(fieldname2) by fieldname3 == 5"}
inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test" GROUP BY fieldname3'.format(
self.table)
expected_result = 'SELECT * FROM ({}) WHERE agg == 5'.format(inner_query)
self.validate(detection, expected_result)
# multiple conditions
detection = {"selection": {"fieldname1": "test"}, "filter": {
"fieldname2": "tessst"}, "condition": "selection OR filter | sum(fieldname2) == 5"}
inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE (fieldname1 = "test" OR fieldname2 = "tessst")'.format(
self.table)
expected_result = 'SELECT * FROM ({}) WHERE agg == 5'.format(inner_query)
self.validate(detection, expected_result)
def test_wildcards(self):
# wildcard: *
detection = {"selection": {"fieldname": "test*"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
self.table) + r'"test%"' + r" ESCAPE '\'"
self.validate(detection, expected_result)
# wildcard: ?
detection = {"selection": {"fieldname": "test?"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
self.table) + r'"test_"' + r" ESCAPE '\'"
self.validate(detection, expected_result)
# escaping:
detection = {"selection": {"fieldname": r"test\?"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
self.table) + r'"test\?"' + r" ESCAPE '\'"
self.validate(detection, expected_result)
detection = {"selection": {"fieldname": r"test\\*"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
self.table) + r'"test\\%"' + r" ESCAPE '\'"
self.validate(detection, expected_result)
detection = {"selection": {"fieldname": r"test\*"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
self.table) + r'"test\*"' + r" ESCAPE '\'"
self.validate(detection, expected_result)
detection = {"selection": {"fieldname": r"test\\"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
self.table) + r'"test\\"' + r" ESCAPE '\'"
self.validate(detection, expected_result)
detection = {"selection": {"fieldname": r"test\abc"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
self.table) + r'"test\\abc"' + r" ESCAPE '\'"
self.validate(detection, expected_result)
detection = {"selection": {"fieldname": r"test%"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
self.table) + r'"test\%"' + r" ESCAPE '\'"
self.validate(detection, expected_result)
detection = {"selection": {"fieldname": r"test_"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
self.table) + r'"test\_"' + r" ESCAPE '\'"
self.validate(detection, expected_result)
# multiple options
detection = {"selection": {"fieldname": [
"test*", "*test"]}, "condition": "selection"}
opt1 = 'fieldname LIKE ' + r'"test%"' + r" ESCAPE '\'"
opt2 = 'fieldname LIKE ' + r'"%test"' + r" ESCAPE '\'"
expected_result = 'SELECT * FROM {} WHERE ({} OR {})'.format(
self.table, opt1, opt2)
self.validate(detection, expected_result)
detection = {"selection": {"fieldname|all": [
"test*", "*test"]}, "condition": "selection"}
opt1 = 'fieldname LIKE ' + r'"test%"' + r" ESCAPE '\'"
opt2 = 'fieldname LIKE ' + r'"%test"' + r" ESCAPE '\'"
expected_result = 'SELECT * FROM {} WHERE ({} AND {})'.format(
self.table, opt1, opt2)
self.validate(detection, expected_result)
def test_fieldname_mapping(self):
detection = {"selection": {"fieldname": "test1"},
"condition": "selection"}
expected_result = 'SELECT * FROM {} WHERE mapped_fieldname = "test1"'.format(
self.table)
# configure mapping
config = SigmaConfiguration()
config.fieldmappings["fieldname"] = FieldMapping(
"fieldname", "mapped_fieldname")
self.basic_rule["detection"] = detection
with patch("yaml.safe_load_all", return_value=[self.basic_rule]):
parser = SigmaCollectionParser("any sigma io", config, None)
backend = SQLBackend(config, self.table)
assert len(parser.parsers) == 1
for p in parser.parsers:
self.assertEqual(expected_result, backend.generate(p))
def test_not_implemented(self):
# near aggregation not implemented
detection = {"selection": {"fieldname": "test"}, "filter": {
"fieldname": "test2"}, "condition": "selection | near selection and filter"}
expected_result = NotImplementedError()
self.validate(detection, expected_result)
# re modifier is not implemented
detection = {"selection": {"fieldname|re": "test"},
"condition": "selection"}
expected_result = NotImplementedError()
self.validate(detection, expected_result)
#Full Text Search is not implemented
detection = {"selection": ["test1"], "condition": "selection"}
expected_result = NotImplementedError()
self.validate(detection, expected_result)
def validate(self, detection, expectation):
config = SigmaConfiguration()
self.basic_rule["detection"] = detection
with patch("yaml.safe_load_all", return_value=[self.basic_rule]):
parser = SigmaCollectionParser("any sigma io", config, None)
backend = SQLBackend(config, self.table)
assert len(parser.parsers) == 1
for p in parser.parsers:
if isinstance(expectation, str):
self.assertEqual(expectation, backend.generate(p))
elif isinstance(expectation, Exception):
self.assertRaises(type(expectation), backend.generate, p)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,148 @@
# Test output backends for sigmac
# Copyright 2020 Jonas Hagg
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from unittest.mock import patch
from sigma.backends.sqlite import SQLiteBackend
from sigma.parser.collection import SigmaCollectionParser
from sigma.config.mapping import FieldMapping
from sigma.configuration import SigmaConfiguration
class TestFullTextSearch(unittest.TestCase):
def setUp(self):
self.basic_rule = {"title": "Test", "level": "testing"}
self.table = "eventlog"
def test_full_text_search(self):
detection = {"selection": ["test1"], "condition": "selection"}
expected_result = 'SELECT * FROM {0} WHERE {0} MATCH (\'"test1"\')'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": [5], "condition": "selection"}
expected_result = 'SELECT * FROM {0} WHERE {0} MATCH (\'"5"\')'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": ["test1", "test2"], "condition": "selection"}
expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\'))'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": ["test1"], "filter":["test2"], "condition": "selection and filter"}
expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" AND "test2"\'))'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": [5, 6], "condition": "selection"}
expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"5" OR "6"\'))'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": ["test1"], "filter": [
"test2"], "condition": "selection or filter"}
expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\'))'.format(
self.table)
self.validate(detection, expected_result)
detection = {"selection": ["test1"], "filter": [
"test2"], "condition": "selection and filter"}
expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" AND "test2"\'))'.format(
self.table)
self.validate(detection, expected_result)
def test_full_text_search_aggregation(self):
# aggregation with fts
detection = {"selection": ["test"],
"condition": "selection | count() > 5"}
inner_query = 'SELECT count(*) AS agg FROM {0} WHERE {0} MATCH (\'"test"\')'.format(
self.table)
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
self.validate(detection, expected_result)
detection = {"selection": ["test1", "test2"],
"condition": "selection | count() > 5"}
inner_query = 'SELECT count(*) AS agg FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\'))'.format(
self.table)
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
self.validate(detection, expected_result)
# aggregation + group by + fts
detection = {"selection": ["test1", "test2"],
"condition": "selection | count() by fieldname > 5"}
inner_query = 'SELECT count(*) AS agg FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\')) GROUP BY fieldname'.format(
self.table)
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
self.validate(detection, expected_result)
def test_not_implemented(self):
# fts not implemented with wildcards
detection = {"selection": ["test*"], "condition": "selection"}
expected_result = NotImplementedError()
self.validate(detection, expected_result)
detection = {"selection": ["test?"], "condition": "selection"}
expected_result = NotImplementedError()
self.validate(detection, expected_result)
detection = {"selection": ["test\\"], "condition": "selection"}
expected_result = NotImplementedError()
self.validate(detection, expected_result)
# fts is not implemented for nested condtions
detection = {"selection": ["test"], "filter": [
"test2"], "condition": "selection and filter"} # this is ok
detection = {"selection": ["test"], "filter": [
"test2"], "condition": "selection or filter"} # this is ok
detection = {"selection": ["test"], "filter": [
"test2"], "condition": "selection and not filter"} # this is already nested
expected_result = NotImplementedError()
self.validate(detection, expected_result)
detection = {"selection": ["test"], "filter": [
"test2"], "condition": "selection and filter and filter"} # this is nested
expected_result = NotImplementedError()
self.validate(detection, expected_result)
detection = {"selection": ["test"], "filter": [
"test2"], "condition": "selection and filter or filter"} # this is nested
expected_result = NotImplementedError()
self.validate(detection, expected_result)
def validate(self, detection, expectation):
config = SigmaConfiguration()
self.basic_rule["detection"] = detection
with patch("yaml.safe_load_all", return_value=[self.basic_rule]):
parser = SigmaCollectionParser("any sigma io", config, None)
backend = SQLiteBackend(config, self.table)
assert len(parser.parsers) == 1
for p in parser.parsers:
if isinstance(expectation, str):
self.assertEqual(expectation, backend.generate(p))
elif isinstance(expectation, Exception):
self.assertRaises(type(expectation), backend.generate, p)
if __name__ == '__main__':
unittest.main()