From dedfb65d635f544011e30cf6ab303db7d782e092 Mon Sep 17 00:00:00 2001 From: Jonas Hagg Date: Mon, 25 May 2020 10:44:14 +0200 Subject: [PATCH 1/6] Implemented Aggregation for SQL, Added SQLite FullTextSearch --- tools/sigma/backends/sql.py | 124 +++++++++-- tools/sigma/backends/sqlite.py | 125 +++++++++++ tools/tests/test_backend_sql.py | 320 +++++++++++++++++++++++++++++ tools/tests/test_backend_sqlite.py | 133 ++++++++++++ 4 files changed, 688 insertions(+), 14 deletions(-) create mode 100644 tools/sigma/backends/sqlite.py create mode 100644 tools/tests/test_backend_sql.py create mode 100644 tools/tests/test_backend_sqlite.py diff --git a/tools/sigma/backends/sql.py b/tools/sigma/backends/sql.py index b3149c01..72f7cc29 100644 --- a/tools/sigma/backends/sql.py +++ b/tools/sigma/backends/sql.py @@ -16,7 +16,9 @@ import re import sigma -from .base import SingleTextQueryBackend +from sigma.backends.base import SingleTextQueryBackend +from sigma.parser.condition import SigmaAggregationParser, NodeSubexpression, ConditionAND, ConditionOR, ConditionNOT +from sigma.parser.exceptions import SigmaParseError class SQLBackend(SingleTextQueryBackend): """Converts Sigma rule into SQL query""" @@ -34,12 +36,16 @@ class SQLBackend(SingleTextQueryBackend): notNullExpression = "%s=*" # Expression of queries for not null values. %s is field name mapExpression = "%s = %s" # Syntax for field/value conditions. First %s is fieldname, second is value mapMulti = "%s IN %s" # Syntax for field/value conditions. First %s is fieldname, second is value - mapWildcard = "%s LIKE %s" # Syntax for swapping wildcard conditions. + mapWildcard = "%s LIKE %s escape \'\\\'"# Syntax for swapping wildcard conditions: Adding \ as escape character mapSource = "%s=%s" # Syntax for sourcetype mapListsSpecialHandling = False # Same handling for map items with list values as for normal values (strings, integers) if True, generateMapItemListNode method is called with node mapListValueExpression = "%s OR %s" # Syntax for field/value condititons where map value is a list mapLength = "(%s %s)" + def __init__(self, sigmaconfig, table): + super().__init__(sigmaconfig) + self.table = table + def generateANDNode(self, node): generated = [ self.generateNode(val) for val in node ] filtered = [ g for g in generated if g is not None ] @@ -78,29 +84,32 @@ class SQLBackend(SingleTextQueryBackend): def generateMapItemNode(self, node): fieldname, value = node transformed_fieldname = self.fieldNameMapping(fieldname, value) - if "," in self.generateNode(value) and "%" not in self.generateNode(value): + + has_wildcard = re.search(r"((\\(\*|\?|\\))|\*|\?|_|%)", self.generateNode(value)) + + if "," in self.generateNode(value) and not has_wildcard: return self.mapMulti % (transformed_fieldname, self.generateNode(value)) elif "LENGTH" in transformed_fieldname: return self.mapLength % (transformed_fieldname, value) elif type(value) == list: return self.generateMapItemListNode(transformed_fieldname, value) - elif self.mapListsSpecialHandling == False and type(value) in (str, int, list) or self.mapListsSpecialHandling == True and type(value) in (str, int): - if "%" in self.generateNode(value): + elif self.mapListsSpecialHandling == False and type(value) in (str, int, list) or self.mapListsSpecialHandling == True and type(value) in (str, int): + if has_wildcard: return self.mapWildcard % (transformed_fieldname, self.generateNode(value)) else: return self.mapExpression % (transformed_fieldname, self.generateNode(value)) elif "sourcetype" in transformed_fieldname: return self.mapSource % (transformed_fieldname, self.generateNode(value)) - elif "*" in str(value): + elif has_wildcard: return self.mapWildcard % (transformed_fieldname, self.generateNode(value)) else: raise TypeError("Backend does not support map values of type " + str(type(value))) def generateMapItemListNode(self, key, value): - return "(" + (" OR ".join(['%s LIKE %s' % (key, self.generateValueNode(item)) for item in value])) + ")" + return "(" + (" OR ".join([self.mapWildcard % (key, self.generateValueNode(item)) for item in value])) + ")" def generateValueNode(self, node): - return self.valueExpression % (self.cleanValue(str(node))) + return self.valueExpression % (self.cleanValue(str(node))) def generateNULLValueNode(self, node): return self.nullExpression % (node.item) @@ -117,10 +126,97 @@ class SQLBackend(SingleTextQueryBackend): return fieldname def cleanValue(self, val): - if "*" == val: - pass - elif "*.*.*" in val: - val = val.replace("*.*.*", "%") - elif re.search(r'\*', val): - val = re.sub(r'\*', '%', val) + if not isinstance(val, str): + return str(val) + + #Single backlashes which are not in front of * or ? are doulbed + val = re.sub(r"(? full text search + #False: no subexpression found, where a full text search is needed + + def _evaluateCondition(condition): + #Helper function to evaulate condtions + if type(condition) not in [ConditionAND, ConditionOR, ConditionNOT]: + raise NotImplementedError("Error in recursive Search logic") + + results = [] + for elem in condition.items: + if isinstance(elem, NodeSubexpression): + results.append(self._recursiveFtsSearch(elem)) + if isinstance(elem, ConditionNOT): + results.append(_evaluateCondition(elem)) + if isinstance(elem, tuple): + results.append(False) + if type(elem) in (str, int, list): + return True + return any(results) + + if type(subexpression) in [str, int, list]: + return True + elif type(subexpression) in [tuple]: + return False + + if not isinstance(subexpression, NodeSubexpression): + raise NotImplementedError("Error in recursive Search logic") + + if isinstance(subexpression.items, NodeSubexpression): + return self._recursiveFtsSearch(subexpression.items) + elif type(subexpression.items) in [ConditionAND, ConditionOR, ConditionNOT]: + return _evaluateCondition(subexpression.items) \ No newline at end of file diff --git a/tools/sigma/backends/sqlite.py b/tools/sigma/backends/sqlite.py new file mode 100644 index 00000000..c4e2651e --- /dev/null +++ b/tools/sigma/backends/sqlite.py @@ -0,0 +1,125 @@ + +from sigma.backends.sql import SQLBackend +from sigma.parser.condition import NodeSubexpression, ConditionAND, ConditionOR, ConditionNOT +import re + + +class SQLiteBackend(SQLBackend): + """SQLiteBackend provides FullTextSearch functionality""" + identifier = "sqlite" + active = True + + mapFullTextSearch = "%s MATCH ('\"%s\"')" + + def __init__(self, sigmaconfig, table): + super().__init__(sigmaconfig, table) + self.mappingItem = False + + def requireFTS(self, node): + return (not self.mappingItem and + (type(node) in (int, str) or all(isinstance(val, str) for val in node) or all(isinstance(val, int) for val in node))) + + def generateFTS(self, value): + if re.search(r"((\\(\*|\?|\\))|\*|\?|_|%)", value): + raise NotImplementedError( + "Wildcards in SQlite Full Text Search not implemented") + self.countFTS += 1 + return self.mapFullTextSearch % (self.table, value) + + def generateANDNode(self, node): + + if self.requireFTS(node): + fts = str('"' + self.andToken + '"').join(self.cleanValue(val) + for val in node) + return self.generateFTS(fts) + + generated = [self.generateNode(val) for val in node] + filtered = [g for g in generated if g is not None] + if filtered: + return self.andToken.join(filtered) + else: + return None + + def generateORNode(self, node): + + if self.requireFTS(node): + fts = str('"' + self.orToken + '"').join(self.cleanValue(val) + for val in node) + return self.generateFTS(fts) + + generated = [self.generateNode(val) for val in node] + filtered = [g for g in generated if g is not None] + if filtered: + return self.orToken.join(filtered) + else: + return None + + def generateMapItemNode(self, node): + try: + self.mappingItem = True + fieldname, value = node + transformed_fieldname = self.fieldNameMapping(fieldname, value) + + has_wildcard = re.search( + r"((\\(\*|\?|\\))|\*|\?|_|%)", self.generateNode(value)) + + if "," in self.generateNode(value) and not has_wildcard: + return self.mapMulti % (transformed_fieldname, self.generateNode(value)) + elif "LENGTH" in transformed_fieldname: + return self.mapLength % (transformed_fieldname, value) + elif type(value) == list: + return self.generateMapItemListNode(transformed_fieldname, value) + elif self.mapListsSpecialHandling == False and type(value) in (str, int, list) or self.mapListsSpecialHandling == True and type(value) in (str, int): + + if has_wildcard: + return self.mapWildcard % (transformed_fieldname, self.generateNode(value)) + else: + return self.mapExpression % (transformed_fieldname, self.generateNode(value)) + + elif "sourcetype" in transformed_fieldname: + return self.mapSource % (transformed_fieldname, self.generateNode(value)) + elif has_wildcard: + return self.mapWildcard % (transformed_fieldname, self.generateNode(value)) + else: + raise TypeError( + "Backend does not support map values of type " + str(type(value))) + finally: + self.mappingItem = False + + def generateValueNode(self, node): + if self.mappingItem: + return self.valueExpression % (self.cleanValue(str(node))) + else: + return self.generateFTS(self.cleanValue(str(node))) + + def generateQuery(self, parsed): + self.countFTS = 0 + result = self.generateNode(parsed.parsedSearch) + if self.countFTS > 1: + raise NotImplementedError( + "Match operator ({}) is allowed only once in SQLite, parse rule in a different way:\n{}".format(self.countFTS, result)) + self.countFTS = 0 + + if parsed.parsedAgg: + # Handle aggregation + fro, whe = self.generateAggregation(parsed.parsedAgg, result) + return "SELECT * FROM {} WHERE {}".format(fro, whe) + + return "SELECT * FROM {} WHERE {}".format(self.table, result) + + def generateFullTextQuery(self, search, parsed): + + search = search.replace('"', '') + search = '" OR "'.join(search.split(" OR ")) + search = '" AND "'.join(search.split(" AND ")) + search = '"{}"'.format(search) + search = search.replace('%', '') + search = search.replace('_', '') + search = '{} MATCH (\'{}\')'.format(self.table, search) + + if parsed.parsedAgg: + # Handle aggregation + fro, whe = self.generateAggregation(parsed.parsedAgg, search) + return "SELECT * FROM {} WHERE {}".format(fro, whe) + + return 'SELECT * FROM {} WHERE {}'.format(self.table, search) \ No newline at end of file diff --git a/tools/tests/test_backend_sql.py b/tools/tests/test_backend_sql.py new file mode 100644 index 00000000..c1a9b38b --- /dev/null +++ b/tools/tests/test_backend_sql.py @@ -0,0 +1,320 @@ +import unittest +from unittest.mock import patch + +from sigma.backends.sql import SQLBackend + +from sigma.parser.collection import SigmaCollectionParser +from sigma.config.mapping import FieldMapping +from sigma.configuration import SigmaConfiguration + +class TestGenerateQuery(unittest.TestCase): + + def setUp(self): + self.basic_rule = {"title": "Test", "level": "testing"} + self.table = "eventlog" + + def test_regular_queries(self): + # Test regular queries + detection = {"selection": {"fieldname": "test1"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname = "test1"'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname": 4}, "condition": "selection"} + expected_result = 'select * from {} where fieldname = "4"'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname": [ + "test1", "test2"]}, "condition": "selection"} + expected_result = 'select * from {} where fieldname in ("test1", "test2")'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": { + "fieldname": [3, 4]}, "condition": "selection"} + expected_result = 'select * from {} where fieldname in ("3", "4")'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname1": "test1", "fieldname2": [ + "test2", "test3"]}, "condition": "selection"} + expected_result = 'select * from {} where (fieldname1 = "test1" and fieldname2 in ("test2", "test3"))'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname": "test1"}, "filter": { + "fieldname2": "whatever"}, "condition": "selection and filter"} + expected_result = 'select * from {} where (fieldname = "test1" and fieldname2 = "whatever")'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname": "test1"}, "filter": { + "fieldname2": "whatever"}, "condition": "selection or filter"} + expected_result = 'select * from {} where (fieldname = "test1" or fieldname2 = "whatever")'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname": "test1"}, "filter": { + "fieldname2": "whatever"}, "condition": "selection and not filter"} + expected_result = 'select * from {} where (fieldname = "test1" and not (fieldname2 = "whatever"))'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname1": "test1"}, "filter": { + "fieldname2": "test2"}, "condition": "1 of them"} + expected_result = 'select * from {} where (fieldname1 = "test1" or fieldname2 = "test2")'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname1": "test1"}, "filter": { + "fieldname2": "test2"}, "condition": "all of them"} + expected_result = 'select * from {} where (fieldname1 = "test1" and fieldname2 = "test2")'.format( + self.table) + self.validate(detection, expected_result) + + def test_modifiers(self): + + # contains + detection = {"selection": {"fieldname|contains": "test"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname like "%test%" escape \'\\\''.format( + self.table) + self.validate(detection, expected_result) + + # all + detection = {"selection": {"fieldname|all": [ + "test1", "test2"]}, "condition": "selection"} + expected_result = 'select * from {} where (fieldname = "test1" and fieldname = "test2")'.format( + self.table) + self.validate(detection, expected_result) + + # endswith + detection = {"selection": {"fieldname|endswith": "test"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname like "%test" escape \'\\\''.format( + self.table) + self.validate(detection, expected_result) + + # startswith + detection = {"selection": {"fieldname|startswith": "test"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname like "test%" escape \'\\\''.format( + self.table) + self.validate(detection, expected_result) + + def test_aggregations(self): + + # count + detection = {"selection": {"fieldname": "test"}, + "condition": "selection | count() > 5"} + inner_query = 'select count(*) as agg from {} where fieldname = "test"'.format( + self.table) + expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + self.validate(detection, expected_result) + + # min + detection = {"selection": {"fieldname1": "test"}, + "condition": "selection | min(fieldname2) > 5"} + inner_query = 'select min(fieldname2) as agg from {} where fieldname1 = "test"'.format( + self.table) + expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + self.validate(detection, expected_result) + + # max + detection = {"selection": {"fieldname1": "test"}, + "condition": "selection | max(fieldname2) > 5"} + inner_query = 'select max(fieldname2) as agg from {} where fieldname1 = "test"'.format( + self.table) + expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + self.validate(detection, expected_result) + + # avg + detection = {"selection": {"fieldname1": "test"}, + "condition": "selection | avg(fieldname2) > 5"} + inner_query = 'select avg(fieldname2) as agg from {} where fieldname1 = "test"'.format( + self.table) + expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + self.validate(detection, expected_result) + + # sum + detection = {"selection": {"fieldname1": "test"}, + "condition": "selection | sum(fieldname2) > 5"} + inner_query = 'select sum(fieldname2) as agg from {} where fieldname1 = "test"'.format( + self.table) + expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + self.validate(detection, expected_result) + + # < + detection = {"selection": {"fieldname1": "test"}, + "condition": "selection | sum(fieldname2) < 5"} + inner_query = 'select sum(fieldname2) as agg from {} where fieldname1 = "test"'.format( + self.table) + expected_result = 'select * from ({}) where agg < 5'.format(inner_query) + self.validate(detection, expected_result) + + # == + detection = {"selection": {"fieldname1": "test"}, + "condition": "selection | sum(fieldname2) == 5"} + inner_query = 'select sum(fieldname2) as agg from {} where fieldname1 = "test"'.format( + self.table) + expected_result = 'select * from ({}) where agg == 5'.format(inner_query) + self.validate(detection, expected_result) + + # group by + detection = {"selection": {"fieldname1": "test"}, + "condition": "selection | sum(fieldname2) by fieldname3 == 5"} + inner_query = 'select sum(fieldname2) as agg from {} where fieldname1 = "test" group by fieldname3'.format( + self.table) + expected_result = 'select * from ({}) where agg == 5'.format(inner_query) + self.validate(detection, expected_result) + + # multiple conditions + detection = {"selection": {"fieldname1": "test"}, "filter": { + "fieldname2": "tessst"}, "condition": "selection or filter | sum(fieldname2) == 5"} + inner_query = 'select sum(fieldname2) as agg from {} where (fieldname1 = "test" or fieldname2 = "tessst")'.format( + self.table) + expected_result = 'select * from ({}) where agg == 5'.format(inner_query) + self.validate(detection, expected_result) + + def test_wildcards(self): + + # wildcard: * + detection = {"selection": {"fieldname": "test*"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname like '.format( + self.table) + r'"test%"' + r" escape '\'" + self.validate(detection, expected_result) + + # wildcard: ? + detection = {"selection": {"fieldname": "test?"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname like '.format( + self.table) + r'"test_"' + r" escape '\'" + self.validate(detection, expected_result) + + # escaping: + detection = {"selection": {"fieldname": r"test\?"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname like '.format( + self.table) + r'"test\?"' + r" escape '\'" + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname": r"test\\*"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname like '.format( + self.table) + r'"test\\%"' + r" escape '\'" + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname": r"test\*"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname like '.format( + self.table) + r'"test\*"' + r" escape '\'" + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname": r"test\\"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname like '.format( + self.table) + r'"test\\"' + r" escape '\'" + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname": r"test\abc"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname like '.format( + self.table) + r'"test\\abc"' + r" escape '\'" + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname": r"test%"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname like '.format( + self.table) + r'"test\%"' + r" escape '\'" + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname": r"test_"}, + "condition": "selection"} + expected_result = 'select * from {} where fieldname like '.format( + self.table) + r'"test\_"' + r" escape '\'" + self.validate(detection, expected_result) + + # multiple options + detection = {"selection": {"fieldname": [ + "test*", "*test"]}, "condition": "selection"} + opt1 = 'fieldname like ' + r'"test%"' + r" escape '\'" + opt2 = 'fieldname like ' + r'"%test"' + r" escape '\'" + expected_result = 'select * from {} where ({} or {})'.format( + self.table, opt1, opt2) + self.validate(detection, expected_result) + + detection = {"selection": {"fieldname|all": [ + "test*", "*test"]}, "condition": "selection"} + opt1 = 'fieldname like ' + r'"test%"' + r" escape '\'" + opt2 = 'fieldname like ' + r'"%test"' + r" escape '\'" + expected_result = 'select * from {} where ({} and {})'.format( + self.table, opt1, opt2) + self.validate(detection, expected_result) + + def test_fieldname_mapping(self): + detection = {"selection": {"fieldname": "test1"}, + "condition": "selection"} + expected_result = 'select * from {} where mapped_fieldname = "test1"'.format( + self.table) + + # configure mapping + config = SigmaConfiguration() + config.fieldmappings["fieldname"] = FieldMapping( + "fieldname", "mapped_fieldname") + + self.basic_rule["detection"] = detection + + with patch("yaml.safe_load_all", return_value=[self.basic_rule]): + parser = SigmaCollectionParser("any sigma io", config, None) + backend = SQLBackend(config, self.table) + + assert len(parser.parsers) == 1 + + for p in parser.parsers: + self.assertEqual(expected_result.lower(), + backend.generate(p).lower()) + + def test_not_implemented(self): + # near aggregation not implemented + detection = {"selection": {"fieldname": "test"}, "filter": { + "fieldname": "test2"}, "condition": "selection | near selection and filter"} + expected_result = NotImplementedError() + self.validate(detection, expected_result) + + # re modifier is not implemented + detection = {"selection": {"fieldname|re": "test"}, + "condition": "selection"} + expected_result = NotImplementedError() + self.validate(detection, expected_result) + + #Full Text Search is not implemented + detection = {"selection": ["test1"], "condition": "selection"} + expected_result = NotImplementedError() + self.validate(detection, expected_result) + + + def validate(self, detection, expectation): + + config = SigmaConfiguration() + + self.basic_rule["detection"] = detection + + with patch("yaml.safe_load_all", return_value=[self.basic_rule]): + parser = SigmaCollectionParser("any sigma io", config, None) + backend = SQLBackend(config, self.table) + + assert len(parser.parsers) == 1 + + for p in parser.parsers: + if isinstance(expectation, str): + self.assertEqual(expectation.lower(), + backend.generate(p).lower()) + elif isinstance(expectation, Exception): + self.assertRaises(type(expectation), backend.generate, p) + + +if __name__ == '__main__': + unittest.main() diff --git a/tools/tests/test_backend_sqlite.py b/tools/tests/test_backend_sqlite.py new file mode 100644 index 00000000..3c6a7a71 --- /dev/null +++ b/tools/tests/test_backend_sqlite.py @@ -0,0 +1,133 @@ +import unittest +from unittest.mock import patch + +from sigma.backends.sqlite import SQLiteBackend + +from sigma.parser.collection import SigmaCollectionParser +from sigma.config.mapping import FieldMapping +from sigma.configuration import SigmaConfiguration + +class TestFullTextSearch(unittest.TestCase): + + def setUp(self): + self.basic_rule = {"title": "Test", "level": "testing"} + self.table = "eventlog" + + def test_full_text_search(self): + detection = {"selection": ["test1"], "condition": "selection"} + expected_result = 'select * from {0} where {0} match (\'"test1"\')'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": [5], "condition": "selection"} + expected_result = 'select * from {0} where {0} match (\'"5"\')'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": ["test1", "test2"], "condition": "selection"} + expected_result = 'select * from {0} where ({0} match (\'"test1" OR "test2"\'))'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": ["test1"], "filter":["test2"], "condition": "selection and filter"} + expected_result = 'select * from {0} where ({0} match (\'"test1" and "test2"\'))'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": [5, 6], "condition": "selection"} + expected_result = 'select * from {0} where ({0} match (\'"5" OR "6"\'))'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": ["test1"], "filter": [ + "test2"], "condition": "selection or filter"} + expected_result = 'select * from {0} where ({0} match (\'"test1" OR "test2"\'))'.format( + self.table) + self.validate(detection, expected_result) + + detection = {"selection": ["test1"], "filter": [ + "test2"], "condition": "selection and filter"} + expected_result = 'select * from {0} where ({0} match (\'"test1" and "test2"\'))'.format( + self.table) + self.validate(detection, expected_result) + + def test_full_text_search_aggregation(self): + # aggregation with fts + detection = {"selection": ["test"], + "condition": "selection | count() > 5"} + inner_query = 'select count(*) as agg from {0} where {0} match (\'"test"\')'.format( + self.table) + expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + self.validate(detection, expected_result) + + detection = {"selection": ["test1", "test2"], + "condition": "selection | count() > 5"} + inner_query = 'select count(*) as agg from {0} where ({0} match (\'"test1" or "test2"\'))'.format( + self.table) + expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + self.validate(detection, expected_result) + + # aggregation + group by + fts + detection = {"selection": ["test1", "test2"], + "condition": "selection | count() by fieldname > 5"} + inner_query = 'select count(*) as agg from {0} where ({0} match (\'"test1" or "test2"\')) group by fieldname'.format( + self.table) + expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + self.validate(detection, expected_result) + + def test_not_implemented(self): + # fts not implemented with wildcards + detection = {"selection": ["test*"], "condition": "selection"} + expected_result = NotImplementedError() + self.validate(detection, expected_result) + + detection = {"selection": ["test?"], "condition": "selection"} + expected_result = NotImplementedError() + self.validate(detection, expected_result) + + detection = {"selection": ["test\\"], "condition": "selection"} + expected_result = NotImplementedError() + self.validate(detection, expected_result) + + + # fts is not implemented for nested condtions + detection = {"selection": ["test"], "filter": [ + "test2"], "condition": "selection and filter"} # this is ok + detection = {"selection": ["test"], "filter": [ + "test2"], "condition": "selection or filter"} # this is ok + detection = {"selection": ["test"], "filter": [ + "test2"], "condition": "selection and not filter"} # this is already nested + expected_result = NotImplementedError() + self.validate(detection, expected_result) + + detection = {"selection": ["test"], "filter": [ + "test2"], "condition": "selection and filter and filter"} # this is nested + expected_result = NotImplementedError() + self.validate(detection, expected_result) + + detection = {"selection": ["test"], "filter": [ + "test2"], "condition": "selection and filter or filter"} # this is nested + expected_result = NotImplementedError() + self.validate(detection, expected_result) + + def validate(self, detection, expectation): + + config = SigmaConfiguration() + + self.basic_rule["detection"] = detection + + with patch("yaml.safe_load_all", return_value=[self.basic_rule]): + parser = SigmaCollectionParser("any sigma io", config, None) + backend = SQLiteBackend(config, self.table) + + assert len(parser.parsers) == 1 + + for p in parser.parsers: + if isinstance(expectation, str): + self.assertEqual(expectation.lower(), + backend.generate(p).lower()) + elif isinstance(expectation, Exception): + self.assertRaises(type(expectation), backend.generate, p) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file From abf1a2c6d7eb6a031fcb60dd003fccf62bd7af33 Mon Sep 17 00:00:00 2001 From: Jonas Hagg Date: Mon, 25 May 2020 10:54:16 +0200 Subject: [PATCH 2/6] Adjusted Makefile --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index 1d36cd90..79eba11e 100644 --- a/Makefile +++ b/Makefile @@ -58,6 +58,7 @@ test-sigmac: $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t humio -O rulecomment -c tools/config/humio.yml rules/ > /dev/null $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t crowdstrike -O rulecomment -c tools/config/crowdstrike.yml rules/ > /dev/null $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t sql -c sysmon rules/ > /dev/null + $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t sqlite -c sysmon rules/ > /dev/null $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t logiq -c sysmon rules/ > /dev/null $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t splunk -c tools/config/splunk-windows-index.yml -f 'level>=high,level<=critical,status=stable,logsource=windows,tag=attack.execution' rules/ > /dev/null ! $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t splunk -c tools/config/splunk-windows-index.yml -f 'level>=high,level<=critical,status=xstable,logsource=windows' rules/ > /dev/null From 70935d26ce214b566b34c4555531e0510a619a99 Mon Sep 17 00:00:00 2001 From: Jonas Plum Date: Fri, 29 May 2020 23:56:05 +0200 Subject: [PATCH 3/6] Add license header --- Makefile | 5 +- tools/sigma/backends/sql.py | 23 ++--- tools/sigma/backends/sqlite.py | 15 ++++ tools/tests/test_backend_sql.py | 138 ++++++++++++++++------------- tools/tests/test_backend_sqlite.py | 47 ++++++---- 5 files changed, 138 insertions(+), 90 deletions(-) diff --git a/Makefile b/Makefile index 79eba11e..2766a7e7 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ TMPOUT = $(shell tempfile||mktemp) COVSCOPE = tools/sigma/*.py,tools/sigma/backends/*.py,tools/sigmac,tools/merge_sigma,tools/sigma2attack export COVERAGE = coverage -test: clearcov test-rules test-sigmac test-merge test-sigma2attack build finish +test: clearcov test-rules test-sigmac test-merge test-backend-sql test-sigma2attack build finish clearcov: rm -f .coverage @@ -108,6 +108,9 @@ test-merge: test-backend-es-qs: tests/test-backend-es-qs.py +test-backend-sql: + pytest tests/test_backend_sql.py tests/test_backend_sqlite.py + test-sigma2attack: $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigma2attack diff --git a/tools/sigma/backends/sql.py b/tools/sigma/backends/sql.py index 72f7cc29..7ea27c76 100644 --- a/tools/sigma/backends/sql.py +++ b/tools/sigma/backends/sql.py @@ -1,5 +1,6 @@ # Output backends for sigmac # Copyright 2019 Jayden Zheng +# Copyright 2020 Jonas Hagg # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by @@ -36,7 +37,7 @@ class SQLBackend(SingleTextQueryBackend): notNullExpression = "%s=*" # Expression of queries for not null values. %s is field name mapExpression = "%s = %s" # Syntax for field/value conditions. First %s is fieldname, second is value mapMulti = "%s IN %s" # Syntax for field/value conditions. First %s is fieldname, second is value - mapWildcard = "%s LIKE %s escape \'\\\'"# Syntax for swapping wildcard conditions: Adding \ as escape character + mapWildcard = "%s LIKE %s ESCAPE \'\\\'"# Syntax for swapping wildcard conditions: Adding \ as escape character mapSource = "%s=%s" # Syntax for sourcetype mapListsSpecialHandling = False # Same handling for map items with list values as for normal values (strings, integers) if True, generateMapItemListNode method is called with node mapListValueExpression = "%s OR %s" # Syntax for field/value condititons where map value is a list @@ -87,13 +88,13 @@ class SQLBackend(SingleTextQueryBackend): has_wildcard = re.search(r"((\\(\*|\?|\\))|\*|\?|_|%)", self.generateNode(value)) - if "," in self.generateNode(value) and not has_wildcard: + if "," in self.generateNode(value) and not has_wildcard: return self.mapMulti % (transformed_fieldname, self.generateNode(value)) elif "LENGTH" in transformed_fieldname: return self.mapLength % (transformed_fieldname, value) elif type(value) == list: return self.generateMapItemListNode(transformed_fieldname, value) - elif self.mapListsSpecialHandling == False and type(value) in (str, int, list) or self.mapListsSpecialHandling == True and type(value) in (str, int): + elif self.mapListsSpecialHandling == False and type(value) in (str, int, list) or self.mapListsSpecialHandling == True and type(value) in (str, int): if has_wildcard: return self.mapWildcard % (transformed_fieldname, self.generateNode(value)) else: @@ -107,7 +108,7 @@ class SQLBackend(SingleTextQueryBackend): def generateMapItemListNode(self, key, value): return "(" + (" OR ".join([self.mapWildcard % (key, self.generateValueNode(item)) for item in value])) + ")" - + def generateValueNode(self, node): return self.valueExpression % (self.cleanValue(str(node))) @@ -144,11 +145,11 @@ class SQLBackend(SingleTextQueryBackend): #Replace ? with _, if even number of backsashes (or zero) in front of ? val = re.sub(r"(?. from sigma.backends.sql import SQLBackend from sigma.parser.condition import NodeSubexpression, ConditionAND, ConditionOR, ConditionNOT diff --git a/tools/tests/test_backend_sql.py b/tools/tests/test_backend_sql.py index c1a9b38b..b4bd8202 100644 --- a/tools/tests/test_backend_sql.py +++ b/tools/tests/test_backend_sql.py @@ -1,3 +1,19 @@ +# Test output backends for sigmac +# Copyright 2020 Jonas Hagg + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. + +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + import unittest from unittest.mock import patch @@ -17,60 +33,60 @@ class TestGenerateQuery(unittest.TestCase): # Test regular queries detection = {"selection": {"fieldname": "test1"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname = "test1"'.format( + expected_result = 'SELECT * FROM {} WHERE fieldname = "test1"'.format( self.table) self.validate(detection, expected_result) detection = {"selection": {"fieldname": 4}, "condition": "selection"} - expected_result = 'select * from {} where fieldname = "4"'.format( + expected_result = 'SELECT * FROM {} WHERE fieldname = "4"'.format( self.table) self.validate(detection, expected_result) detection = {"selection": {"fieldname": [ "test1", "test2"]}, "condition": "selection"} - expected_result = 'select * from {} where fieldname in ("test1", "test2")'.format( + expected_result = 'SELECT * FROM {} WHERE fieldname IN ("test1", "test2")'.format( self.table) self.validate(detection, expected_result) detection = {"selection": { "fieldname": [3, 4]}, "condition": "selection"} - expected_result = 'select * from {} where fieldname in ("3", "4")'.format( + expected_result = 'SELECT * FROM {} WHERE fieldname IN ("3", "4")'.format( self.table) self.validate(detection, expected_result) detection = {"selection": {"fieldname1": "test1", "fieldname2": [ "test2", "test3"]}, "condition": "selection"} - expected_result = 'select * from {} where (fieldname1 = "test1" and fieldname2 in ("test2", "test3"))'.format( + expected_result = 'SELECT * FROM {} WHERE (fieldname1 = "test1" AND fieldname2 IN ("test2", "test3"))'.format( self.table) self.validate(detection, expected_result) detection = {"selection": {"fieldname": "test1"}, "filter": { "fieldname2": "whatever"}, "condition": "selection and filter"} - expected_result = 'select * from {} where (fieldname = "test1" and fieldname2 = "whatever")'.format( + expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" AND fieldname2 = "whatever")'.format( self.table) self.validate(detection, expected_result) detection = {"selection": {"fieldname": "test1"}, "filter": { "fieldname2": "whatever"}, "condition": "selection or filter"} - expected_result = 'select * from {} where (fieldname = "test1" or fieldname2 = "whatever")'.format( + expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" OR fieldname2 = "whatever")'.format( self.table) self.validate(detection, expected_result) detection = {"selection": {"fieldname": "test1"}, "filter": { "fieldname2": "whatever"}, "condition": "selection and not filter"} - expected_result = 'select * from {} where (fieldname = "test1" and not (fieldname2 = "whatever"))'.format( + expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" AND NOT (fieldname2 = "whatever"))'.format( self.table) self.validate(detection, expected_result) detection = {"selection": {"fieldname1": "test1"}, "filter": { "fieldname2": "test2"}, "condition": "1 of them"} - expected_result = 'select * from {} where (fieldname1 = "test1" or fieldname2 = "test2")'.format( + expected_result = 'SELECT * FROM {} WHERE (fieldname1 = "test1" OR fieldname2 = "test2")'.format( self.table) self.validate(detection, expected_result) detection = {"selection": {"fieldname1": "test1"}, "filter": { "fieldname2": "test2"}, "condition": "all of them"} - expected_result = 'select * from {} where (fieldname1 = "test1" and fieldname2 = "test2")'.format( + expected_result = 'SELECT * FROM {} WHERE (fieldname1 = "test1" AND fieldname2 = "test2")'.format( self.table) self.validate(detection, expected_result) @@ -79,28 +95,28 @@ class TestGenerateQuery(unittest.TestCase): # contains detection = {"selection": {"fieldname|contains": "test"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname like "%test%" escape \'\\\''.format( + expected_result = 'SELECT * FROM {} WHERE fieldname LIKE "%test%" ESCAPE \'\\\''.format( self.table) self.validate(detection, expected_result) # all detection = {"selection": {"fieldname|all": [ "test1", "test2"]}, "condition": "selection"} - expected_result = 'select * from {} where (fieldname = "test1" and fieldname = "test2")'.format( + expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" AND fieldname = "test2")'.format( self.table) self.validate(detection, expected_result) # endswith detection = {"selection": {"fieldname|endswith": "test"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname like "%test" escape \'\\\''.format( + expected_result = 'SELECT * FROM {} WHERE fieldname LIKE "%test" ESCAPE \'\\\''.format( self.table) self.validate(detection, expected_result) # startswith detection = {"selection": {"fieldname|startswith": "test"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname like "test%" escape \'\\\''.format( + expected_result = 'SELECT * FROM {} WHERE fieldname LIKE "test%" ESCAPE \'\\\''.format( self.table) self.validate(detection, expected_result) @@ -109,73 +125,73 @@ class TestGenerateQuery(unittest.TestCase): # count detection = {"selection": {"fieldname": "test"}, "condition": "selection | count() > 5"} - inner_query = 'select count(*) as agg from {} where fieldname = "test"'.format( + inner_query = 'SELECT count(*) AS agg FROM {} WHERE fieldname = "test"'.format( self.table) - expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query) self.validate(detection, expected_result) # min detection = {"selection": {"fieldname1": "test"}, "condition": "selection | min(fieldname2) > 5"} - inner_query = 'select min(fieldname2) as agg from {} where fieldname1 = "test"'.format( + inner_query = 'SELECT min(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format( self.table) - expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query) self.validate(detection, expected_result) # max detection = {"selection": {"fieldname1": "test"}, "condition": "selection | max(fieldname2) > 5"} - inner_query = 'select max(fieldname2) as agg from {} where fieldname1 = "test"'.format( + inner_query = 'SELECT max(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format( self.table) - expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query) self.validate(detection, expected_result) # avg detection = {"selection": {"fieldname1": "test"}, "condition": "selection | avg(fieldname2) > 5"} - inner_query = 'select avg(fieldname2) as agg from {} where fieldname1 = "test"'.format( + inner_query = 'SELECT avg(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format( self.table) - expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query) self.validate(detection, expected_result) # sum detection = {"selection": {"fieldname1": "test"}, "condition": "selection | sum(fieldname2) > 5"} - inner_query = 'select sum(fieldname2) as agg from {} where fieldname1 = "test"'.format( + inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format( self.table) - expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query) self.validate(detection, expected_result) # < detection = {"selection": {"fieldname1": "test"}, "condition": "selection | sum(fieldname2) < 5"} - inner_query = 'select sum(fieldname2) as agg from {} where fieldname1 = "test"'.format( + inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format( self.table) - expected_result = 'select * from ({}) where agg < 5'.format(inner_query) + expected_result = 'SELECT * FROM ({}) WHERE agg < 5'.format(inner_query) self.validate(detection, expected_result) # == detection = {"selection": {"fieldname1": "test"}, "condition": "selection | sum(fieldname2) == 5"} - inner_query = 'select sum(fieldname2) as agg from {} where fieldname1 = "test"'.format( + inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format( self.table) - expected_result = 'select * from ({}) where agg == 5'.format(inner_query) + expected_result = 'SELECT * FROM ({}) WHERE agg == 5'.format(inner_query) self.validate(detection, expected_result) # group by detection = {"selection": {"fieldname1": "test"}, "condition": "selection | sum(fieldname2) by fieldname3 == 5"} - inner_query = 'select sum(fieldname2) as agg from {} where fieldname1 = "test" group by fieldname3'.format( + inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test" GROUP BY fieldname3'.format( self.table) - expected_result = 'select * from ({}) where agg == 5'.format(inner_query) + expected_result = 'SELECT * FROM ({}) WHERE agg == 5'.format(inner_query) self.validate(detection, expected_result) # multiple conditions detection = {"selection": {"fieldname1": "test"}, "filter": { - "fieldname2": "tessst"}, "condition": "selection or filter | sum(fieldname2) == 5"} - inner_query = 'select sum(fieldname2) as agg from {} where (fieldname1 = "test" or fieldname2 = "tessst")'.format( + "fieldname2": "tessst"}, "condition": "selection OR filter | sum(fieldname2) == 5"} + inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE (fieldname1 = "test" OR fieldname2 = "tessst")'.format( self.table) - expected_result = 'select * from ({}) where agg == 5'.format(inner_query) + expected_result = 'SELECT * FROM ({}) WHERE agg == 5'.format(inner_query) self.validate(detection, expected_result) def test_wildcards(self): @@ -183,81 +199,81 @@ class TestGenerateQuery(unittest.TestCase): # wildcard: * detection = {"selection": {"fieldname": "test*"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname like '.format( - self.table) + r'"test%"' + r" escape '\'" + expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format( + self.table) + r'"test%"' + r" ESCAPE '\'" self.validate(detection, expected_result) # wildcard: ? detection = {"selection": {"fieldname": "test?"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname like '.format( - self.table) + r'"test_"' + r" escape '\'" + expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format( + self.table) + r'"test_"' + r" ESCAPE '\'" self.validate(detection, expected_result) # escaping: detection = {"selection": {"fieldname": r"test\?"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname like '.format( - self.table) + r'"test\?"' + r" escape '\'" + expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format( + self.table) + r'"test\?"' + r" ESCAPE '\'" self.validate(detection, expected_result) detection = {"selection": {"fieldname": r"test\\*"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname like '.format( - self.table) + r'"test\\%"' + r" escape '\'" + expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format( + self.table) + r'"test\\%"' + r" ESCAPE '\'" self.validate(detection, expected_result) detection = {"selection": {"fieldname": r"test\*"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname like '.format( - self.table) + r'"test\*"' + r" escape '\'" + expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format( + self.table) + r'"test\*"' + r" ESCAPE '\'" self.validate(detection, expected_result) detection = {"selection": {"fieldname": r"test\\"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname like '.format( - self.table) + r'"test\\"' + r" escape '\'" + expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format( + self.table) + r'"test\\"' + r" ESCAPE '\'" self.validate(detection, expected_result) detection = {"selection": {"fieldname": r"test\abc"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname like '.format( - self.table) + r'"test\\abc"' + r" escape '\'" + expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format( + self.table) + r'"test\\abc"' + r" ESCAPE '\'" self.validate(detection, expected_result) detection = {"selection": {"fieldname": r"test%"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname like '.format( - self.table) + r'"test\%"' + r" escape '\'" + expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format( + self.table) + r'"test\%"' + r" ESCAPE '\'" self.validate(detection, expected_result) detection = {"selection": {"fieldname": r"test_"}, "condition": "selection"} - expected_result = 'select * from {} where fieldname like '.format( - self.table) + r'"test\_"' + r" escape '\'" + expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format( + self.table) + r'"test\_"' + r" ESCAPE '\'" self.validate(detection, expected_result) # multiple options detection = {"selection": {"fieldname": [ "test*", "*test"]}, "condition": "selection"} - opt1 = 'fieldname like ' + r'"test%"' + r" escape '\'" - opt2 = 'fieldname like ' + r'"%test"' + r" escape '\'" - expected_result = 'select * from {} where ({} or {})'.format( + opt1 = 'fieldname LIKE ' + r'"test%"' + r" ESCAPE '\'" + opt2 = 'fieldname LIKE ' + r'"%test"' + r" ESCAPE '\'" + expected_result = 'SELECT * FROM {} WHERE ({} OR {})'.format( self.table, opt1, opt2) self.validate(detection, expected_result) detection = {"selection": {"fieldname|all": [ "test*", "*test"]}, "condition": "selection"} - opt1 = 'fieldname like ' + r'"test%"' + r" escape '\'" - opt2 = 'fieldname like ' + r'"%test"' + r" escape '\'" - expected_result = 'select * from {} where ({} and {})'.format( + opt1 = 'fieldname LIKE ' + r'"test%"' + r" ESCAPE '\'" + opt2 = 'fieldname LIKE ' + r'"%test"' + r" ESCAPE '\'" + expected_result = 'SELECT * FROM {} WHERE ({} AND {})'.format( self.table, opt1, opt2) self.validate(detection, expected_result) def test_fieldname_mapping(self): detection = {"selection": {"fieldname": "test1"}, "condition": "selection"} - expected_result = 'select * from {} where mapped_fieldname = "test1"'.format( + expected_result = 'SELECT * FROM {} WHERE mapped_fieldname = "test1"'.format( self.table) # configure mapping @@ -274,8 +290,7 @@ class TestGenerateQuery(unittest.TestCase): assert len(parser.parsers) == 1 for p in parser.parsers: - self.assertEqual(expected_result.lower(), - backend.generate(p).lower()) + self.assertEqual(expected_result, backend.generate(p)) def test_not_implemented(self): # near aggregation not implemented @@ -310,8 +325,7 @@ class TestGenerateQuery(unittest.TestCase): for p in parser.parsers: if isinstance(expectation, str): - self.assertEqual(expectation.lower(), - backend.generate(p).lower()) + self.assertEqual(expectation, backend.generate(p)) elif isinstance(expectation, Exception): self.assertRaises(type(expectation), backend.generate, p) diff --git a/tools/tests/test_backend_sqlite.py b/tools/tests/test_backend_sqlite.py index 3c6a7a71..66fc6812 100644 --- a/tools/tests/test_backend_sqlite.py +++ b/tools/tests/test_backend_sqlite.py @@ -1,3 +1,19 @@ +# Test output backends for sigmac +# Copyright 2020 Jonas Hagg + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. + +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + import unittest from unittest.mock import patch @@ -15,39 +31,39 @@ class TestFullTextSearch(unittest.TestCase): def test_full_text_search(self): detection = {"selection": ["test1"], "condition": "selection"} - expected_result = 'select * from {0} where {0} match (\'"test1"\')'.format( + expected_result = 'SELECT * FROM {0} WHERE {0} MATCH (\'"test1"\')'.format( self.table) self.validate(detection, expected_result) detection = {"selection": [5], "condition": "selection"} - expected_result = 'select * from {0} where {0} match (\'"5"\')'.format( + expected_result = 'SELECT * FROM {0} WHERE {0} MATCH (\'"5"\')'.format( self.table) self.validate(detection, expected_result) detection = {"selection": ["test1", "test2"], "condition": "selection"} - expected_result = 'select * from {0} where ({0} match (\'"test1" OR "test2"\'))'.format( + expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\'))'.format( self.table) self.validate(detection, expected_result) detection = {"selection": ["test1"], "filter":["test2"], "condition": "selection and filter"} - expected_result = 'select * from {0} where ({0} match (\'"test1" and "test2"\'))'.format( + expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" AND "test2"\'))'.format( self.table) self.validate(detection, expected_result) detection = {"selection": [5, 6], "condition": "selection"} - expected_result = 'select * from {0} where ({0} match (\'"5" OR "6"\'))'.format( + expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"5" OR "6"\'))'.format( self.table) self.validate(detection, expected_result) detection = {"selection": ["test1"], "filter": [ "test2"], "condition": "selection or filter"} - expected_result = 'select * from {0} where ({0} match (\'"test1" OR "test2"\'))'.format( + expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\'))'.format( self.table) self.validate(detection, expected_result) detection = {"selection": ["test1"], "filter": [ "test2"], "condition": "selection and filter"} - expected_result = 'select * from {0} where ({0} match (\'"test1" and "test2"\'))'.format( + expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" AND "test2"\'))'.format( self.table) self.validate(detection, expected_result) @@ -55,26 +71,26 @@ class TestFullTextSearch(unittest.TestCase): # aggregation with fts detection = {"selection": ["test"], "condition": "selection | count() > 5"} - inner_query = 'select count(*) as agg from {0} where {0} match (\'"test"\')'.format( + inner_query = 'SELECT count(*) AS agg FROM {0} WHERE {0} MATCH (\'"test"\')'.format( self.table) - expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query) self.validate(detection, expected_result) detection = {"selection": ["test1", "test2"], "condition": "selection | count() > 5"} - inner_query = 'select count(*) as agg from {0} where ({0} match (\'"test1" or "test2"\'))'.format( + inner_query = 'SELECT count(*) AS agg FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\'))'.format( self.table) - expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query) self.validate(detection, expected_result) # aggregation + group by + fts detection = {"selection": ["test1", "test2"], "condition": "selection | count() by fieldname > 5"} - inner_query = 'select count(*) as agg from {0} where ({0} match (\'"test1" or "test2"\')) group by fieldname'.format( + inner_query = 'SELECT count(*) AS agg FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\')) GROUP BY fieldname'.format( self.table) - expected_result = 'select * from ({}) where agg > 5'.format(inner_query) + expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query) self.validate(detection, expected_result) - + def test_not_implemented(self): # fts not implemented with wildcards detection = {"selection": ["test*"], "condition": "selection"} @@ -124,8 +140,7 @@ class TestFullTextSearch(unittest.TestCase): for p in parser.parsers: if isinstance(expectation, str): - self.assertEqual(expectation.lower(), - backend.generate(p).lower()) + self.assertEqual(expectation, backend.generate(p)) elif isinstance(expectation, Exception): self.assertRaises(type(expectation), backend.generate, p) From 4a8ab88adecc4035aedb8b8780c29a76a31eca04 Mon Sep 17 00:00:00 2001 From: Jonas Plum Date: Sat, 30 May 2020 00:15:38 +0200 Subject: [PATCH 4/6] Fix test path --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 2766a7e7..30e2fb99 100644 --- a/Makefile +++ b/Makefile @@ -109,7 +109,7 @@ test-backend-es-qs: tests/test-backend-es-qs.py test-backend-sql: - pytest tests/test_backend_sql.py tests/test_backend_sqlite.py + pytest tools/tests/test_backend_sql.py tools/tests/test_backend_sqlite.py test-sigma2attack: $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigma2attack From 5cc82d0f05b4a0ea6640f6bdc4fc0fe719945108 Mon Sep 17 00:00:00 2001 From: Jonas Plum Date: Sat, 30 May 2020 00:56:06 +0200 Subject: [PATCH 5/6] Move testcase --- .github/workflows/sigma-test.yml | 3 +++ Makefile | 5 +++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/sigma-test.yml b/.github/workflows/sigma-test.yml index b6e10159..ee0c317a 100644 --- a/.github/workflows/sigma-test.yml +++ b/.github/workflows/sigma-test.yml @@ -35,3 +35,6 @@ jobs: - name: Test Generated Elasticsearch Query Strings run: | make test-backend-es-qs + - name: Test SQL(ite) Backend + run: | + make test-backend-sql diff --git a/Makefile b/Makefile index 30e2fb99..5b2c7f17 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ TMPOUT = $(shell tempfile||mktemp) COVSCOPE = tools/sigma/*.py,tools/sigma/backends/*.py,tools/sigmac,tools/merge_sigma,tools/sigma2attack export COVERAGE = coverage -test: clearcov test-rules test-sigmac test-merge test-backend-sql test-sigma2attack build finish +test: clearcov test-rules test-sigmac test-merge test-sigma2attack build finish clearcov: rm -f .coverage @@ -109,7 +109,8 @@ test-backend-es-qs: tests/test-backend-es-qs.py test-backend-sql: - pytest tools/tests/test_backend_sql.py tools/tests/test_backend_sqlite.py + cd tools && python3 setup.py install + cd tools && python3 -m pytest tests/test_backend_sql.py tests/test_backend_sqlite.py test-sigma2attack: $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigma2attack From 3a6ac5bd5c711848b9e85b94741b210740cdada9 Mon Sep 17 00:00:00 2001 From: Jonas Plum Date: Sat, 30 May 2020 01:57:06 +0200 Subject: [PATCH 6/6] Remove unused function --- Makefile | 2 +- tools/sigma/backends/sql.py | 2 +- tools/sigma/backends/sqlite.py | 19 +------------------ 3 files changed, 3 insertions(+), 20 deletions(-) diff --git a/Makefile b/Makefile index 5b2c7f17..e4968975 100644 --- a/Makefile +++ b/Makefile @@ -110,7 +110,7 @@ test-backend-es-qs: test-backend-sql: cd tools && python3 setup.py install - cd tools && python3 -m pytest tests/test_backend_sql.py tests/test_backend_sqlite.py + cd tools && $(COVERAGE) run -m pytest tests/test_backend_sql.py tests/test_backend_sqlite.py test-sigma2attack: $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigma2attack diff --git a/tools/sigma/backends/sql.py b/tools/sigma/backends/sql.py index 7ea27c76..5b446a6f 100644 --- a/tools/sigma/backends/sql.py +++ b/tools/sigma/backends/sql.py @@ -178,7 +178,7 @@ class SQLBackend(SingleTextQueryBackend): def generateQuery(self, parsed): if self._recursiveFtsSearch(parsed.parsedSearch): - raise NotImplementedError("FullTextSearch not implemented for SQL Backend, use SQLite Backend.") + raise NotImplementedError("FullTextSearch not implemented for SQL Backend.") result = self.generateNode(parsed.parsedSearch) if parsed.parsedAgg: diff --git a/tools/sigma/backends/sqlite.py b/tools/sigma/backends/sqlite.py index f29b0eb2..8eec13ea 100644 --- a/tools/sigma/backends/sqlite.py +++ b/tools/sigma/backends/sqlite.py @@ -20,7 +20,7 @@ import re class SQLiteBackend(SQLBackend): - """SQLiteBackend provides FullTextSearch functionality""" + """Converts Sigma rule into SQL query for SQLite""" identifier = "sqlite" active = True @@ -121,20 +121,3 @@ class SQLiteBackend(SQLBackend): return "SELECT * FROM {} WHERE {}".format(fro, whe) return "SELECT * FROM {} WHERE {}".format(self.table, result) - - def generateFullTextQuery(self, search, parsed): - - search = search.replace('"', '') - search = '" OR "'.join(search.split(" OR ")) - search = '" AND "'.join(search.split(" AND ")) - search = '"{}"'.format(search) - search = search.replace('%', '') - search = search.replace('_', '') - search = '{} MATCH (\'{}\')'.format(self.table, search) - - if parsed.parsedAgg: - # Handle aggregation - fro, whe = self.generateAggregation(parsed.parsedAgg, search) - return "SELECT * FROM {} WHERE {}".format(fro, whe) - - return 'SELECT * FROM {} WHERE {}'.format(self.table, search) \ No newline at end of file