mirror of
https://github.com/valitydev/SigmaHQ.git
synced 2024-11-07 09:48:58 +00:00
Merge pull request #807 from forensicanalysis/master
Add sqlite backend
This commit is contained in:
commit
0cbc099def
3
.github/workflows/sigma-test.yml
vendored
3
.github/workflows/sigma-test.yml
vendored
@ -35,3 +35,6 @@ jobs:
|
||||
- name: Test Generated Elasticsearch Query Strings
|
||||
run: |
|
||||
make test-backend-es-qs
|
||||
- name: Test SQL(ite) Backend
|
||||
run: |
|
||||
make test-backend-sql
|
||||
|
5
Makefile
5
Makefile
@ -58,6 +58,7 @@ test-sigmac:
|
||||
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t humio -O rulecomment -c tools/config/humio.yml rules/ > /dev/null
|
||||
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t crowdstrike -O rulecomment -c tools/config/crowdstrike.yml rules/ > /dev/null
|
||||
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t sql -c sysmon rules/ > /dev/null
|
||||
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t sqlite -c sysmon rules/ > /dev/null
|
||||
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t logiq -c sysmon rules/ > /dev/null
|
||||
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t splunk -c tools/config/splunk-windows-index.yml -f 'level>=high,level<=critical,status=stable,logsource=windows,tag=attack.execution' rules/ > /dev/null
|
||||
! $(COVERAGE) run -a --include=$(COVSCOPE) tools/sigmac -rvdI -t splunk -c tools/config/splunk-windows-index.yml -f 'level>=high,level<=critical,status=xstable,logsource=windows' rules/ > /dev/null
|
||||
@ -107,6 +108,10 @@ test-merge:
|
||||
test-backend-es-qs:
|
||||
tests/test-backend-es-qs.py
|
||||
|
||||
test-backend-sql:
|
||||
cd tools && python3 setup.py install
|
||||
cd tools && $(COVERAGE) run -m pytest tests/test_backend_sql.py tests/test_backend_sqlite.py
|
||||
|
||||
test-sigma2attack:
|
||||
$(COVERAGE) run -a --include=$(COVSCOPE) tools/sigma2attack
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
# Output backends for sigmac
|
||||
# Copyright 2019 Jayden Zheng
|
||||
# Copyright 2020 Jonas Hagg
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License as published by
|
||||
@ -16,7 +17,9 @@
|
||||
|
||||
import re
|
||||
import sigma
|
||||
from .base import SingleTextQueryBackend
|
||||
from sigma.backends.base import SingleTextQueryBackend
|
||||
from sigma.parser.condition import SigmaAggregationParser, NodeSubexpression, ConditionAND, ConditionOR, ConditionNOT
|
||||
from sigma.parser.exceptions import SigmaParseError
|
||||
|
||||
class SQLBackend(SingleTextQueryBackend):
|
||||
"""Converts Sigma rule into SQL query"""
|
||||
@ -34,12 +37,16 @@ class SQLBackend(SingleTextQueryBackend):
|
||||
notNullExpression = "%s=*" # Expression of queries for not null values. %s is field name
|
||||
mapExpression = "%s = %s" # Syntax for field/value conditions. First %s is fieldname, second is value
|
||||
mapMulti = "%s IN %s" # Syntax for field/value conditions. First %s is fieldname, second is value
|
||||
mapWildcard = "%s LIKE %s" # Syntax for swapping wildcard conditions.
|
||||
mapWildcard = "%s LIKE %s ESCAPE \'\\\'"# Syntax for swapping wildcard conditions: Adding \ as escape character
|
||||
mapSource = "%s=%s" # Syntax for sourcetype
|
||||
mapListsSpecialHandling = False # Same handling for map items with list values as for normal values (strings, integers) if True, generateMapItemListNode method is called with node
|
||||
mapListValueExpression = "%s OR %s" # Syntax for field/value condititons where map value is a list
|
||||
mapLength = "(%s %s)"
|
||||
|
||||
def __init__(self, sigmaconfig, table):
|
||||
super().__init__(sigmaconfig)
|
||||
self.table = table
|
||||
|
||||
def generateANDNode(self, node):
|
||||
generated = [ self.generateNode(val) for val in node ]
|
||||
filtered = [ g for g in generated if g is not None ]
|
||||
@ -78,29 +85,32 @@ class SQLBackend(SingleTextQueryBackend):
|
||||
def generateMapItemNode(self, node):
|
||||
fieldname, value = node
|
||||
transformed_fieldname = self.fieldNameMapping(fieldname, value)
|
||||
if "," in self.generateNode(value) and "%" not in self.generateNode(value):
|
||||
|
||||
has_wildcard = re.search(r"((\\(\*|\?|\\))|\*|\?|_|%)", self.generateNode(value))
|
||||
|
||||
if "," in self.generateNode(value) and not has_wildcard:
|
||||
return self.mapMulti % (transformed_fieldname, self.generateNode(value))
|
||||
elif "LENGTH" in transformed_fieldname:
|
||||
return self.mapLength % (transformed_fieldname, value)
|
||||
elif type(value) == list:
|
||||
return self.generateMapItemListNode(transformed_fieldname, value)
|
||||
elif self.mapListsSpecialHandling == False and type(value) in (str, int, list) or self.mapListsSpecialHandling == True and type(value) in (str, int):
|
||||
if "%" in self.generateNode(value):
|
||||
if has_wildcard:
|
||||
return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
|
||||
else:
|
||||
return self.mapExpression % (transformed_fieldname, self.generateNode(value))
|
||||
elif "sourcetype" in transformed_fieldname:
|
||||
return self.mapSource % (transformed_fieldname, self.generateNode(value))
|
||||
elif "*" in str(value):
|
||||
elif has_wildcard:
|
||||
return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
|
||||
else:
|
||||
raise TypeError("Backend does not support map values of type " + str(type(value)))
|
||||
|
||||
def generateMapItemListNode(self, key, value):
|
||||
return "(" + (" OR ".join(['%s LIKE %s' % (key, self.generateValueNode(item)) for item in value])) + ")"
|
||||
|
||||
return "(" + (" OR ".join([self.mapWildcard % (key, self.generateValueNode(item)) for item in value])) + ")"
|
||||
|
||||
def generateValueNode(self, node):
|
||||
return self.valueExpression % (self.cleanValue(str(node)))
|
||||
return self.valueExpression % (self.cleanValue(str(node)))
|
||||
|
||||
def generateNULLValueNode(self, node):
|
||||
return self.nullExpression % (node.item)
|
||||
@ -117,10 +127,97 @@ class SQLBackend(SingleTextQueryBackend):
|
||||
return fieldname
|
||||
|
||||
def cleanValue(self, val):
|
||||
if "*" == val:
|
||||
pass
|
||||
elif "*.*.*" in val:
|
||||
val = val.replace("*.*.*", "%")
|
||||
elif re.search(r'\*', val):
|
||||
val = re.sub(r'\*', '%', val)
|
||||
if not isinstance(val, str):
|
||||
return str(val)
|
||||
|
||||
#Single backlashes which are not in front of * or ? are doulbed
|
||||
val = re.sub(r"(?<!\\)\\(?!(\\|\*|\?))", r"\\\\", val)
|
||||
|
||||
#Replace _ with \_ because _ is a sql wildcard
|
||||
val = re.sub(r'_', r'\_', val)
|
||||
|
||||
#Replace % with \% because % is a sql wildcard
|
||||
val = re.sub(r'%', r'\%', val)
|
||||
|
||||
#Replace * with %, if even number of backslashes (or zero) in front of *
|
||||
val = re.sub(r"(?<!\\)(\\\\)*(?!\\)\*", r"\1%", val)
|
||||
|
||||
#Replace ? with _, if even number of backsashes (or zero) in front of ?
|
||||
val = re.sub(r"(?<!\\)(\\\\)*(?!\\)\?", r"\1_", val)
|
||||
return val
|
||||
|
||||
def generateAggregation(self, agg, where_clausel):
|
||||
if not agg:
|
||||
return self.table, where_clausel
|
||||
|
||||
if (agg.aggfunc == SigmaAggregationParser.AGGFUNC_COUNT or
|
||||
agg.aggfunc == SigmaAggregationParser.AGGFUNC_MAX or
|
||||
agg.aggfunc == SigmaAggregationParser.AGGFUNC_MIN or
|
||||
agg.aggfunc == SigmaAggregationParser.AGGFUNC_SUM or
|
||||
agg.aggfunc == SigmaAggregationParser.AGGFUNC_AVG):
|
||||
|
||||
if agg.groupfield:
|
||||
group_by = " GROUP BY {0}".format(self.fieldNameMapping(agg.groupfield, None))
|
||||
else:
|
||||
group_by = ""
|
||||
|
||||
if agg.aggfield:
|
||||
select = "{}({}) AS agg".format(agg.aggfunc_notrans, self.fieldNameMapping(agg.aggfield, None))
|
||||
else:
|
||||
if agg.aggfunc == SigmaAggregationParser.AGGFUNC_COUNT:
|
||||
select = "{}(*) AS agg".format(agg.aggfunc_notrans)
|
||||
else:
|
||||
raise SigmaParseError("For {} aggregation a fieldname needs to be specified".format(agg.aggfunc_notrans))
|
||||
|
||||
temp_table = "(SELECT {} FROM {} WHERE {}{})".format(select, self.table, where_clausel, group_by)
|
||||
agg_condition = "agg {} {}".format(agg.cond_op, agg.condition)
|
||||
|
||||
return temp_table, agg_condition
|
||||
|
||||
raise NotImplementedError("{} aggregation not implemented in SQL Backend".format(agg.aggfunc_notrans))
|
||||
|
||||
def generateQuery(self, parsed):
|
||||
if self._recursiveFtsSearch(parsed.parsedSearch):
|
||||
raise NotImplementedError("FullTextSearch not implemented for SQL Backend.")
|
||||
result = self.generateNode(parsed.parsedSearch)
|
||||
|
||||
if parsed.parsedAgg:
|
||||
#Handle aggregation
|
||||
fro, whe = self.generateAggregation(parsed.parsedAgg, result)
|
||||
return "SELECT * FROM {} WHERE {}".format(fro, whe)
|
||||
|
||||
return "SELECT * FROM {} WHERE {}".format(self.table, result)
|
||||
|
||||
def _recursiveFtsSearch(self, subexpression):
|
||||
#True: found subexpression, where no fieldname is requested -> full text search
|
||||
#False: no subexpression found, where a full text search is needed
|
||||
|
||||
def _evaluateCondition(condition):
|
||||
#Helper function to evaulate condtions
|
||||
if type(condition) not in [ConditionAND, ConditionOR, ConditionNOT]:
|
||||
raise NotImplementedError("Error in recursive Search logic")
|
||||
|
||||
results = []
|
||||
for elem in condition.items:
|
||||
if isinstance(elem, NodeSubexpression):
|
||||
results.append(self._recursiveFtsSearch(elem))
|
||||
if isinstance(elem, ConditionNOT):
|
||||
results.append(_evaluateCondition(elem))
|
||||
if isinstance(elem, tuple):
|
||||
results.append(False)
|
||||
if type(elem) in (str, int, list):
|
||||
return True
|
||||
return any(results)
|
||||
|
||||
if type(subexpression) in [str, int, list]:
|
||||
return True
|
||||
elif type(subexpression) in [tuple]:
|
||||
return False
|
||||
|
||||
if not isinstance(subexpression, NodeSubexpression):
|
||||
raise NotImplementedError("Error in recursive Search logic")
|
||||
|
||||
if isinstance(subexpression.items, NodeSubexpression):
|
||||
return self._recursiveFtsSearch(subexpression.items)
|
||||
elif type(subexpression.items) in [ConditionAND, ConditionOR, ConditionNOT]:
|
||||
return _evaluateCondition(subexpression.items)
|
123
tools/sigma/backends/sqlite.py
Normal file
123
tools/sigma/backends/sqlite.py
Normal file
@ -0,0 +1,123 @@
|
||||
# Output backends for sigmac
|
||||
# Copyright 2020 Jonas Hagg
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from sigma.backends.sql import SQLBackend
|
||||
from sigma.parser.condition import NodeSubexpression, ConditionAND, ConditionOR, ConditionNOT
|
||||
import re
|
||||
|
||||
|
||||
class SQLiteBackend(SQLBackend):
|
||||
"""Converts Sigma rule into SQL query for SQLite"""
|
||||
identifier = "sqlite"
|
||||
active = True
|
||||
|
||||
mapFullTextSearch = "%s MATCH ('\"%s\"')"
|
||||
|
||||
def __init__(self, sigmaconfig, table):
|
||||
super().__init__(sigmaconfig, table)
|
||||
self.mappingItem = False
|
||||
|
||||
def requireFTS(self, node):
|
||||
return (not self.mappingItem and
|
||||
(type(node) in (int, str) or all(isinstance(val, str) for val in node) or all(isinstance(val, int) for val in node)))
|
||||
|
||||
def generateFTS(self, value):
|
||||
if re.search(r"((\\(\*|\?|\\))|\*|\?|_|%)", value):
|
||||
raise NotImplementedError(
|
||||
"Wildcards in SQlite Full Text Search not implemented")
|
||||
self.countFTS += 1
|
||||
return self.mapFullTextSearch % (self.table, value)
|
||||
|
||||
def generateANDNode(self, node):
|
||||
|
||||
if self.requireFTS(node):
|
||||
fts = str('"' + self.andToken + '"').join(self.cleanValue(val)
|
||||
for val in node)
|
||||
return self.generateFTS(fts)
|
||||
|
||||
generated = [self.generateNode(val) for val in node]
|
||||
filtered = [g for g in generated if g is not None]
|
||||
if filtered:
|
||||
return self.andToken.join(filtered)
|
||||
else:
|
||||
return None
|
||||
|
||||
def generateORNode(self, node):
|
||||
|
||||
if self.requireFTS(node):
|
||||
fts = str('"' + self.orToken + '"').join(self.cleanValue(val)
|
||||
for val in node)
|
||||
return self.generateFTS(fts)
|
||||
|
||||
generated = [self.generateNode(val) for val in node]
|
||||
filtered = [g for g in generated if g is not None]
|
||||
if filtered:
|
||||
return self.orToken.join(filtered)
|
||||
else:
|
||||
return None
|
||||
|
||||
def generateMapItemNode(self, node):
|
||||
try:
|
||||
self.mappingItem = True
|
||||
fieldname, value = node
|
||||
transformed_fieldname = self.fieldNameMapping(fieldname, value)
|
||||
|
||||
has_wildcard = re.search(
|
||||
r"((\\(\*|\?|\\))|\*|\?|_|%)", self.generateNode(value))
|
||||
|
||||
if "," in self.generateNode(value) and not has_wildcard:
|
||||
return self.mapMulti % (transformed_fieldname, self.generateNode(value))
|
||||
elif "LENGTH" in transformed_fieldname:
|
||||
return self.mapLength % (transformed_fieldname, value)
|
||||
elif type(value) == list:
|
||||
return self.generateMapItemListNode(transformed_fieldname, value)
|
||||
elif self.mapListsSpecialHandling == False and type(value) in (str, int, list) or self.mapListsSpecialHandling == True and type(value) in (str, int):
|
||||
|
||||
if has_wildcard:
|
||||
return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
|
||||
else:
|
||||
return self.mapExpression % (transformed_fieldname, self.generateNode(value))
|
||||
|
||||
elif "sourcetype" in transformed_fieldname:
|
||||
return self.mapSource % (transformed_fieldname, self.generateNode(value))
|
||||
elif has_wildcard:
|
||||
return self.mapWildcard % (transformed_fieldname, self.generateNode(value))
|
||||
else:
|
||||
raise TypeError(
|
||||
"Backend does not support map values of type " + str(type(value)))
|
||||
finally:
|
||||
self.mappingItem = False
|
||||
|
||||
def generateValueNode(self, node):
|
||||
if self.mappingItem:
|
||||
return self.valueExpression % (self.cleanValue(str(node)))
|
||||
else:
|
||||
return self.generateFTS(self.cleanValue(str(node)))
|
||||
|
||||
def generateQuery(self, parsed):
|
||||
self.countFTS = 0
|
||||
result = self.generateNode(parsed.parsedSearch)
|
||||
if self.countFTS > 1:
|
||||
raise NotImplementedError(
|
||||
"Match operator ({}) is allowed only once in SQLite, parse rule in a different way:\n{}".format(self.countFTS, result))
|
||||
self.countFTS = 0
|
||||
|
||||
if parsed.parsedAgg:
|
||||
# Handle aggregation
|
||||
fro, whe = self.generateAggregation(parsed.parsedAgg, result)
|
||||
return "SELECT * FROM {} WHERE {}".format(fro, whe)
|
||||
|
||||
return "SELECT * FROM {} WHERE {}".format(self.table, result)
|
334
tools/tests/test_backend_sql.py
Normal file
334
tools/tests/test_backend_sql.py
Normal file
@ -0,0 +1,334 @@
|
||||
# Test output backends for sigmac
|
||||
# Copyright 2020 Jonas Hagg
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from sigma.backends.sql import SQLBackend
|
||||
|
||||
from sigma.parser.collection import SigmaCollectionParser
|
||||
from sigma.config.mapping import FieldMapping
|
||||
from sigma.configuration import SigmaConfiguration
|
||||
|
||||
class TestGenerateQuery(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.basic_rule = {"title": "Test", "level": "testing"}
|
||||
self.table = "eventlog"
|
||||
|
||||
def test_regular_queries(self):
|
||||
# Test regular queries
|
||||
detection = {"selection": {"fieldname": "test1"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname = "test1"'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname": 4}, "condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname = "4"'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname": [
|
||||
"test1", "test2"]}, "condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname IN ("test1", "test2")'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {
|
||||
"fieldname": [3, 4]}, "condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname IN ("3", "4")'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname1": "test1", "fieldname2": [
|
||||
"test2", "test3"]}, "condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE (fieldname1 = "test1" AND fieldname2 IN ("test2", "test3"))'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname": "test1"}, "filter": {
|
||||
"fieldname2": "whatever"}, "condition": "selection and filter"}
|
||||
expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" AND fieldname2 = "whatever")'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname": "test1"}, "filter": {
|
||||
"fieldname2": "whatever"}, "condition": "selection or filter"}
|
||||
expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" OR fieldname2 = "whatever")'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname": "test1"}, "filter": {
|
||||
"fieldname2": "whatever"}, "condition": "selection and not filter"}
|
||||
expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" AND NOT (fieldname2 = "whatever"))'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname1": "test1"}, "filter": {
|
||||
"fieldname2": "test2"}, "condition": "1 of them"}
|
||||
expected_result = 'SELECT * FROM {} WHERE (fieldname1 = "test1" OR fieldname2 = "test2")'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname1": "test1"}, "filter": {
|
||||
"fieldname2": "test2"}, "condition": "all of them"}
|
||||
expected_result = 'SELECT * FROM {} WHERE (fieldname1 = "test1" AND fieldname2 = "test2")'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
def test_modifiers(self):
|
||||
|
||||
# contains
|
||||
detection = {"selection": {"fieldname|contains": "test"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE "%test%" ESCAPE \'\\\''.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# all
|
||||
detection = {"selection": {"fieldname|all": [
|
||||
"test1", "test2"]}, "condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE (fieldname = "test1" AND fieldname = "test2")'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# endswith
|
||||
detection = {"selection": {"fieldname|endswith": "test"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE "%test" ESCAPE \'\\\''.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# startswith
|
||||
detection = {"selection": {"fieldname|startswith": "test"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE "test%" ESCAPE \'\\\''.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
def test_aggregations(self):
|
||||
|
||||
# count
|
||||
detection = {"selection": {"fieldname": "test"},
|
||||
"condition": "selection | count() > 5"}
|
||||
inner_query = 'SELECT count(*) AS agg FROM {} WHERE fieldname = "test"'.format(
|
||||
self.table)
|
||||
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# min
|
||||
detection = {"selection": {"fieldname1": "test"},
|
||||
"condition": "selection | min(fieldname2) > 5"}
|
||||
inner_query = 'SELECT min(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
|
||||
self.table)
|
||||
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# max
|
||||
detection = {"selection": {"fieldname1": "test"},
|
||||
"condition": "selection | max(fieldname2) > 5"}
|
||||
inner_query = 'SELECT max(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
|
||||
self.table)
|
||||
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# avg
|
||||
detection = {"selection": {"fieldname1": "test"},
|
||||
"condition": "selection | avg(fieldname2) > 5"}
|
||||
inner_query = 'SELECT avg(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
|
||||
self.table)
|
||||
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# sum
|
||||
detection = {"selection": {"fieldname1": "test"},
|
||||
"condition": "selection | sum(fieldname2) > 5"}
|
||||
inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
|
||||
self.table)
|
||||
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# <
|
||||
detection = {"selection": {"fieldname1": "test"},
|
||||
"condition": "selection | sum(fieldname2) < 5"}
|
||||
inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
|
||||
self.table)
|
||||
expected_result = 'SELECT * FROM ({}) WHERE agg < 5'.format(inner_query)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# ==
|
||||
detection = {"selection": {"fieldname1": "test"},
|
||||
"condition": "selection | sum(fieldname2) == 5"}
|
||||
inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test"'.format(
|
||||
self.table)
|
||||
expected_result = 'SELECT * FROM ({}) WHERE agg == 5'.format(inner_query)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# group by
|
||||
detection = {"selection": {"fieldname1": "test"},
|
||||
"condition": "selection | sum(fieldname2) by fieldname3 == 5"}
|
||||
inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE fieldname1 = "test" GROUP BY fieldname3'.format(
|
||||
self.table)
|
||||
expected_result = 'SELECT * FROM ({}) WHERE agg == 5'.format(inner_query)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# multiple conditions
|
||||
detection = {"selection": {"fieldname1": "test"}, "filter": {
|
||||
"fieldname2": "tessst"}, "condition": "selection OR filter | sum(fieldname2) == 5"}
|
||||
inner_query = 'SELECT sum(fieldname2) AS agg FROM {} WHERE (fieldname1 = "test" OR fieldname2 = "tessst")'.format(
|
||||
self.table)
|
||||
expected_result = 'SELECT * FROM ({}) WHERE agg == 5'.format(inner_query)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
def test_wildcards(self):
|
||||
|
||||
# wildcard: *
|
||||
detection = {"selection": {"fieldname": "test*"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
|
||||
self.table) + r'"test%"' + r" ESCAPE '\'"
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# wildcard: ?
|
||||
detection = {"selection": {"fieldname": "test?"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
|
||||
self.table) + r'"test_"' + r" ESCAPE '\'"
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# escaping:
|
||||
detection = {"selection": {"fieldname": r"test\?"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
|
||||
self.table) + r'"test\?"' + r" ESCAPE '\'"
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname": r"test\\*"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
|
||||
self.table) + r'"test\\%"' + r" ESCAPE '\'"
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname": r"test\*"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
|
||||
self.table) + r'"test\*"' + r" ESCAPE '\'"
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname": r"test\\"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
|
||||
self.table) + r'"test\\"' + r" ESCAPE '\'"
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname": r"test\abc"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
|
||||
self.table) + r'"test\\abc"' + r" ESCAPE '\'"
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname": r"test%"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
|
||||
self.table) + r'"test\%"' + r" ESCAPE '\'"
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname": r"test_"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE fieldname LIKE '.format(
|
||||
self.table) + r'"test\_"' + r" ESCAPE '\'"
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# multiple options
|
||||
detection = {"selection": {"fieldname": [
|
||||
"test*", "*test"]}, "condition": "selection"}
|
||||
opt1 = 'fieldname LIKE ' + r'"test%"' + r" ESCAPE '\'"
|
||||
opt2 = 'fieldname LIKE ' + r'"%test"' + r" ESCAPE '\'"
|
||||
expected_result = 'SELECT * FROM {} WHERE ({} OR {})'.format(
|
||||
self.table, opt1, opt2)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": {"fieldname|all": [
|
||||
"test*", "*test"]}, "condition": "selection"}
|
||||
opt1 = 'fieldname LIKE ' + r'"test%"' + r" ESCAPE '\'"
|
||||
opt2 = 'fieldname LIKE ' + r'"%test"' + r" ESCAPE '\'"
|
||||
expected_result = 'SELECT * FROM {} WHERE ({} AND {})'.format(
|
||||
self.table, opt1, opt2)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
def test_fieldname_mapping(self):
|
||||
detection = {"selection": {"fieldname": "test1"},
|
||||
"condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {} WHERE mapped_fieldname = "test1"'.format(
|
||||
self.table)
|
||||
|
||||
# configure mapping
|
||||
config = SigmaConfiguration()
|
||||
config.fieldmappings["fieldname"] = FieldMapping(
|
||||
"fieldname", "mapped_fieldname")
|
||||
|
||||
self.basic_rule["detection"] = detection
|
||||
|
||||
with patch("yaml.safe_load_all", return_value=[self.basic_rule]):
|
||||
parser = SigmaCollectionParser("any sigma io", config, None)
|
||||
backend = SQLBackend(config, self.table)
|
||||
|
||||
assert len(parser.parsers) == 1
|
||||
|
||||
for p in parser.parsers:
|
||||
self.assertEqual(expected_result, backend.generate(p))
|
||||
|
||||
def test_not_implemented(self):
|
||||
# near aggregation not implemented
|
||||
detection = {"selection": {"fieldname": "test"}, "filter": {
|
||||
"fieldname": "test2"}, "condition": "selection | near selection and filter"}
|
||||
expected_result = NotImplementedError()
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# re modifier is not implemented
|
||||
detection = {"selection": {"fieldname|re": "test"},
|
||||
"condition": "selection"}
|
||||
expected_result = NotImplementedError()
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
#Full Text Search is not implemented
|
||||
detection = {"selection": ["test1"], "condition": "selection"}
|
||||
expected_result = NotImplementedError()
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
|
||||
def validate(self, detection, expectation):
|
||||
|
||||
config = SigmaConfiguration()
|
||||
|
||||
self.basic_rule["detection"] = detection
|
||||
|
||||
with patch("yaml.safe_load_all", return_value=[self.basic_rule]):
|
||||
parser = SigmaCollectionParser("any sigma io", config, None)
|
||||
backend = SQLBackend(config, self.table)
|
||||
|
||||
assert len(parser.parsers) == 1
|
||||
|
||||
for p in parser.parsers:
|
||||
if isinstance(expectation, str):
|
||||
self.assertEqual(expectation, backend.generate(p))
|
||||
elif isinstance(expectation, Exception):
|
||||
self.assertRaises(type(expectation), backend.generate, p)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
148
tools/tests/test_backend_sqlite.py
Normal file
148
tools/tests/test_backend_sqlite.py
Normal file
@ -0,0 +1,148 @@
|
||||
# Test output backends for sigmac
|
||||
# Copyright 2020 Jonas Hagg
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from sigma.backends.sqlite import SQLiteBackend
|
||||
|
||||
from sigma.parser.collection import SigmaCollectionParser
|
||||
from sigma.config.mapping import FieldMapping
|
||||
from sigma.configuration import SigmaConfiguration
|
||||
|
||||
class TestFullTextSearch(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.basic_rule = {"title": "Test", "level": "testing"}
|
||||
self.table = "eventlog"
|
||||
|
||||
def test_full_text_search(self):
|
||||
detection = {"selection": ["test1"], "condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {0} WHERE {0} MATCH (\'"test1"\')'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": [5], "condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {0} WHERE {0} MATCH (\'"5"\')'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": ["test1", "test2"], "condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\'))'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": ["test1"], "filter":["test2"], "condition": "selection and filter"}
|
||||
expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" AND "test2"\'))'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": [5, 6], "condition": "selection"}
|
||||
expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"5" OR "6"\'))'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": ["test1"], "filter": [
|
||||
"test2"], "condition": "selection or filter"}
|
||||
expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\'))'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": ["test1"], "filter": [
|
||||
"test2"], "condition": "selection and filter"}
|
||||
expected_result = 'SELECT * FROM {0} WHERE ({0} MATCH (\'"test1" AND "test2"\'))'.format(
|
||||
self.table)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
def test_full_text_search_aggregation(self):
|
||||
# aggregation with fts
|
||||
detection = {"selection": ["test"],
|
||||
"condition": "selection | count() > 5"}
|
||||
inner_query = 'SELECT count(*) AS agg FROM {0} WHERE {0} MATCH (\'"test"\')'.format(
|
||||
self.table)
|
||||
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": ["test1", "test2"],
|
||||
"condition": "selection | count() > 5"}
|
||||
inner_query = 'SELECT count(*) AS agg FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\'))'.format(
|
||||
self.table)
|
||||
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
# aggregation + group by + fts
|
||||
detection = {"selection": ["test1", "test2"],
|
||||
"condition": "selection | count() by fieldname > 5"}
|
||||
inner_query = 'SELECT count(*) AS agg FROM {0} WHERE ({0} MATCH (\'"test1" OR "test2"\')) GROUP BY fieldname'.format(
|
||||
self.table)
|
||||
expected_result = 'SELECT * FROM ({}) WHERE agg > 5'.format(inner_query)
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
def test_not_implemented(self):
|
||||
# fts not implemented with wildcards
|
||||
detection = {"selection": ["test*"], "condition": "selection"}
|
||||
expected_result = NotImplementedError()
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": ["test?"], "condition": "selection"}
|
||||
expected_result = NotImplementedError()
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": ["test\\"], "condition": "selection"}
|
||||
expected_result = NotImplementedError()
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
|
||||
# fts is not implemented for nested condtions
|
||||
detection = {"selection": ["test"], "filter": [
|
||||
"test2"], "condition": "selection and filter"} # this is ok
|
||||
detection = {"selection": ["test"], "filter": [
|
||||
"test2"], "condition": "selection or filter"} # this is ok
|
||||
detection = {"selection": ["test"], "filter": [
|
||||
"test2"], "condition": "selection and not filter"} # this is already nested
|
||||
expected_result = NotImplementedError()
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": ["test"], "filter": [
|
||||
"test2"], "condition": "selection and filter and filter"} # this is nested
|
||||
expected_result = NotImplementedError()
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
detection = {"selection": ["test"], "filter": [
|
||||
"test2"], "condition": "selection and filter or filter"} # this is nested
|
||||
expected_result = NotImplementedError()
|
||||
self.validate(detection, expected_result)
|
||||
|
||||
def validate(self, detection, expectation):
|
||||
|
||||
config = SigmaConfiguration()
|
||||
|
||||
self.basic_rule["detection"] = detection
|
||||
|
||||
with patch("yaml.safe_load_all", return_value=[self.basic_rule]):
|
||||
parser = SigmaCollectionParser("any sigma io", config, None)
|
||||
backend = SQLiteBackend(config, self.table)
|
||||
|
||||
assert len(parser.parsers) == 1
|
||||
|
||||
for p in parser.parsers:
|
||||
if isinstance(expectation, str):
|
||||
self.assertEqual(expectation, backend.generate(p))
|
||||
elif isinstance(expectation, Exception):
|
||||
self.assertRaises(type(expectation), backend.generate, p)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user