mirror of
https://github.com/valitydev/SigmaHQ.git
synced 2024-11-06 17:35:19 +00:00
e0f813ebbb
First version of sigmac that converts Sigma YAMLs without aggregations into ES Query Strings suitable for Kibana or other tools.
352 lines
14 KiB
Python
352 lines
14 KiB
Python
# Sigma parser
|
|
|
|
import yaml
|
|
import re
|
|
|
|
COND_NONE = 0
|
|
COND_AND = 1
|
|
COND_OR = 2
|
|
COND_NOT = 3
|
|
|
|
class SigmaParser:
|
|
def __init__(self, sigma):
|
|
self.definitions = dict()
|
|
self.parsedyaml = yaml.safe_load(sigma)
|
|
|
|
def parse_sigma(self):
|
|
try: # definition uniqueness check
|
|
for definitionName, definition in self.parsedyaml["detection"].items():
|
|
if definitionName in self.definitions:
|
|
raise SigmaParseError("Definition '%s' was already defined" % (definitionName))
|
|
self.definitions[definitionName] = definition
|
|
except KeyError:
|
|
raise SigmaParseError("No detection definitions found")
|
|
|
|
try: # tokenization
|
|
conditions = self.parsedyaml["detection"]["condition"]
|
|
self.condtoken = list() # list of tokenized conditions
|
|
if type(conditions) == str:
|
|
self.condtoken.append(SigmaConditionTokenizer(conditions))
|
|
elif type(conditions) == list:
|
|
for condition in conditions:
|
|
self.condtoken.append(SigmaConditionTokenizer(condition))
|
|
except KeyError:
|
|
raise SigmaParseError("No condition found")
|
|
|
|
self.condparsed = list() # list of parsed conditions
|
|
for tokens in self.condtoken:
|
|
self.condparsed.append(SigmaConditionParser(self, tokens))
|
|
|
|
def parse_definition_byname(self, definitionName, condOverride=None):
|
|
try:
|
|
definition = self.definitions[definitionName]
|
|
except KeyError as e:
|
|
raise SigmaParseError("Unknown definition '%s'" % (definitionName)) from e
|
|
return self.parse_definition(definition, condOverride)
|
|
|
|
def parse_definition(self, definition, condOverride=None):
|
|
if type(definition) not in (dict, list):
|
|
raise SigmaParseError("Expected map or list, got type %s: '%s'" % (type(definition), str(definition)))
|
|
|
|
if type(definition) == list: # list of values or maps
|
|
if condOverride: # condition given through rule detection condition, e.g. 1 of x
|
|
cond = condOverride()
|
|
else: # no condition given, use default from spec
|
|
cond = ConditionOR()
|
|
|
|
subcond = None
|
|
for value in definition:
|
|
if type(value) in (str, int):
|
|
cond.add(value)
|
|
elif type(value) in (dict, list):
|
|
cond.add(self.parse_definition(value))
|
|
else:
|
|
raise SigmaParseError("Definition list may only contain plain values or maps")
|
|
elif type(definition) == dict: # map
|
|
cond = ConditionAND()
|
|
for key, value in definition.items():
|
|
cond.add((key, value))
|
|
|
|
return cond
|
|
|
|
class SigmaConditionToken:
|
|
"""Token of a Sigma condition expression"""
|
|
TOKEN_AND = 1
|
|
TOKEN_OR = 2
|
|
TOKEN_NOT = 3
|
|
TOKEN_ID = 4
|
|
TOKEN_LPAR = 5
|
|
TOKEN_RPAR = 6
|
|
TOKEN_PIPE = 7
|
|
TOKEN_ONE = 8
|
|
TOKEN_ALL = 9
|
|
TOKEN_AGG = 10
|
|
TOKEN_EQ = 11
|
|
TOKEN_LT = 12
|
|
TOKEN_LTE = 13
|
|
TOKEN_GT = 14
|
|
TOKEN_GTE = 15
|
|
TOKEN_BY = 16
|
|
|
|
tokenstr = [
|
|
"INVALID",
|
|
"AND",
|
|
"OR",
|
|
"NOT",
|
|
"ID",
|
|
"LPAR",
|
|
"RPAR",
|
|
"PIPE",
|
|
"ONE",
|
|
"ALL",
|
|
"AGG",
|
|
"EQ",
|
|
"LT",
|
|
"LTE",
|
|
"GT",
|
|
"GTE",
|
|
"BY",
|
|
]
|
|
|
|
def __init__(self, tokendef, match, pos):
|
|
self.type = tokendef[0]
|
|
self.matched = match.group()
|
|
self.pos = pos
|
|
|
|
def __eq__(self, other):
|
|
if type(other) == int: # match against type
|
|
return self.type == other
|
|
else:
|
|
raise NotImplementedError("SigmaConditionToken can only be compared against token type constants")
|
|
|
|
def __str__(self):
|
|
return "[ Token: %s: '%s' ]" % (self.tokenstr[self.type], self.matched)
|
|
|
|
class SigmaConditionTokenizer:
|
|
"""Tokenize condition string into token sequence"""
|
|
tokendefs = [ # list of tokens, preferred recognition in given order, (token identifier, matching regular expression). Ignored if token id == None
|
|
(SigmaConditionToken.TOKEN_ONE, re.compile("1 of", re.IGNORECASE)),
|
|
(SigmaConditionToken.TOKEN_ALL, re.compile("all of", re.IGNORECASE)),
|
|
(None, re.compile("[\\s\\r\\n]+")),
|
|
(SigmaConditionToken.TOKEN_AGG, re.compile("count|distcount|min|max|avg|sum", re.IGNORECASE)),
|
|
(SigmaConditionToken.TOKEN_BY, re.compile("by", re.IGNORECASE)),
|
|
(SigmaConditionToken.TOKEN_EQ, re.compile("==")),
|
|
(SigmaConditionToken.TOKEN_LT, re.compile("<")),
|
|
(SigmaConditionToken.TOKEN_LTE, re.compile("<=")),
|
|
(SigmaConditionToken.TOKEN_GT, re.compile(">")),
|
|
(SigmaConditionToken.TOKEN_GTE, re.compile(">=")),
|
|
(SigmaConditionToken.TOKEN_PIPE, re.compile("\\|")),
|
|
(SigmaConditionToken.TOKEN_AND, re.compile("and", re.IGNORECASE)),
|
|
(SigmaConditionToken.TOKEN_OR, re.compile("or", re.IGNORECASE)),
|
|
(SigmaConditionToken.TOKEN_NOT, re.compile("not", re.IGNORECASE)),
|
|
(SigmaConditionToken.TOKEN_ID, re.compile("\\w+")),
|
|
(SigmaConditionToken.TOKEN_LPAR, re.compile("\\(")),
|
|
(SigmaConditionToken.TOKEN_RPAR, re.compile("\\)")),
|
|
]
|
|
|
|
def __init__(self, condition):
|
|
if type(condition) == str: # String that is parsed
|
|
self.tokens = list()
|
|
pos = 1
|
|
|
|
while len(condition) > 0:
|
|
for tokendef in self.tokendefs: # iterate over defined tokens and try to recognize the next one
|
|
match = tokendef[1].match(condition)
|
|
if match:
|
|
if tokendef[0] != None:
|
|
self.tokens.append(SigmaConditionToken(tokendef, match, pos + match.start()))
|
|
pos += match.end() # increase position and cut matched prefix from condition
|
|
condition = condition[match.end():]
|
|
break
|
|
else: # no valid token identified
|
|
raise SigmaParseError("Unexpected token in condition at position %d")
|
|
elif type(condition) == list: # List of tokens to be converted into SigmaConditionTokenizer class
|
|
self.tokens = condition
|
|
else:
|
|
raise TypeError("SigmaConditionTokenizer constructor expects string or list, got %s" % (type(condition)))
|
|
|
|
def __str__(self):
|
|
return " ".join([str(token) for token in self.tokens])
|
|
|
|
def __iter__(self):
|
|
return iter(self.tokens)
|
|
|
|
def __len__(self):
|
|
return len(self.tokens)
|
|
|
|
def __getitem__(self, i):
|
|
if type(i) == int:
|
|
return self.tokens[i]
|
|
elif type(i) == slice:
|
|
return SigmaConditionTokenizer(self.tokens[i])
|
|
else:
|
|
raise IndexError("Expected index or slice")
|
|
|
|
def __add__(self, other):
|
|
if isinstance(other, SigmaConditionTokenizer):
|
|
return SigmaConditionTokenizer(self.tokens + other.tokens)
|
|
elif isinstance(other, (SigmaConditionToken, ParseTreeNode)):
|
|
return SigmaConditionTokenizer(self.tokens + [ other ])
|
|
else:
|
|
raise TypeError("+ operator expects SigmaConditionTokenizer or token type, got %s: %s" % (type(other), str(other)))
|
|
|
|
def index(self, item):
|
|
return self.tokens.index(item)
|
|
|
|
class SigmaParseError(Exception):
|
|
pass
|
|
|
|
### Parse Tree Node Classes ###
|
|
class ParseTreeNode:
|
|
"""Parse Tree Node Base Class"""
|
|
def __init__(self):
|
|
raise NotImplementedError("ConditionBase is no usable class")
|
|
|
|
def __str__(self):
|
|
return "[ %s: %s ]" % (self.__doc__, str([str(item) for item in self.items]))
|
|
|
|
class ConditionBase(ParseTreeNode):
|
|
"""Base class for conditional operations"""
|
|
op = COND_NONE
|
|
items = None
|
|
|
|
def __init__(self):
|
|
raise NotImplementedError("ConditionBase is no usable class")
|
|
|
|
def add(self, item):
|
|
self.items.append(item)
|
|
|
|
def __iter__(self):
|
|
return iter(self.items)
|
|
|
|
class ConditionAND(ConditionBase):
|
|
"""AND Condition"""
|
|
op = COND_AND
|
|
|
|
def __init__(self, sigma=None, op=None, val1=None, val2=None):
|
|
if sigma == None and op == None and val1 == None and val2 == None: # no parameters given - initialize empty
|
|
self.items = list()
|
|
else: # called by parser, use given values
|
|
self.items = [ val1, val2 ]
|
|
|
|
class ConditionOR(ConditionAND):
|
|
"""OR Condition"""
|
|
op = COND_OR
|
|
|
|
class ConditionNOT(ConditionBase):
|
|
"""NOT Condition"""
|
|
op = COND_NOT
|
|
|
|
def __init__(self, sigma=None, op=None, val=None):
|
|
if sigma == None and op == None and val == None: # no parameters given - initialize empty
|
|
self.items = list()
|
|
else: # called by parser, use given values
|
|
self.items = [ val ]
|
|
|
|
def add(self, item):
|
|
if len(self.items) == 0:
|
|
super.add(item)
|
|
else:
|
|
raise ValueError("Only one element allowed in NOT condition")
|
|
|
|
@property
|
|
def item(self):
|
|
try:
|
|
return self.items[0]
|
|
except IndexError:
|
|
return None
|
|
|
|
class NodeSubexpression(ParseTreeNode):
|
|
"""Subexpression"""
|
|
def __init__(self, subexpr):
|
|
self.items = subexpr
|
|
|
|
# Parse tree converters: convert something into one of the parse tree node classes defined above
|
|
def convertAllOf(sigma, op, val):
|
|
"""Convert 'all of x' into ConditionAND"""
|
|
return NodeSubexpression(sigma.parse_definition_byname(val.matched, ConditionAND))
|
|
|
|
def convertOneOf(sigma, op, val):
|
|
"""Convert '1 of x' into ConditionOR"""
|
|
return NodeSubexpression(sigma.parse_definition_byname(val.matched, ConditionOR))
|
|
|
|
def convertId(sigma, op):
|
|
"""Convert search identifiers (lists or maps) into condition nodes according to spec defaults"""
|
|
return NodeSubexpression(sigma.parse_definition_byname(op.matched))
|
|
|
|
# Condition parser class
|
|
class SigmaConditionParser:
|
|
"""Parser for Sigma condition expression"""
|
|
searchOperators = [ # description of operators: (token id, number of operands, parse tree node class) - order == precedence
|
|
(SigmaConditionToken.TOKEN_ALL, 1, convertAllOf),
|
|
(SigmaConditionToken.TOKEN_ONE, 1, convertOneOf),
|
|
(SigmaConditionToken.TOKEN_ID, 0, convertId),
|
|
(SigmaConditionToken.TOKEN_NOT, 1, ConditionNOT),
|
|
(SigmaConditionToken.TOKEN_AND, 2, ConditionAND),
|
|
(SigmaConditionToken.TOKEN_OR, 2, ConditionOR),
|
|
]
|
|
|
|
def __init__(self, sigmaParser, tokens):
|
|
if SigmaConditionToken.TOKEN_PIPE in tokens: # aggregations are not yet supported
|
|
raise NotImplementedError("Aggregation expressions are not yet supported")
|
|
|
|
self.sigmaParser = sigmaParser
|
|
self.parsedSearch = self.parseSearch(tokens)
|
|
|
|
def parseSearch(self, tokens):
|
|
"""
|
|
Iterative parsing of search expression.
|
|
"""
|
|
# 1. Identify subexpressions with parentheses around them and parse them like a separate search expression
|
|
while SigmaConditionToken.TOKEN_LPAR in tokens:
|
|
lPos = tokens.index(SigmaConditionToken.TOKEN_LPAR)
|
|
lTok = tokens[lPos]
|
|
try:
|
|
rPos = tokens.index(SigmaConditionToken.TOKEN_RPAR)
|
|
rTok = tokens[rPos]
|
|
except ValueError as e:
|
|
raise SigmaParseError("Missing matching closing parentheses") from e
|
|
if lPos + 1 == rPos:
|
|
raise SigmaParseError("Empty subexpression at " + str(lTok.pos))
|
|
if lPos > rPos:
|
|
raise SigmaParseError("Closing parentheses at position " + str(rTok.pos) + " precedes opening at position " + str(lTok.pos))
|
|
|
|
subparsed = self.parseSearch(tokens[lPos + 1:rPos])[0]
|
|
tokens = tokens[:lPos] + NodeSubexpression(subparsed) + tokens[rPos + 1:] # replace parentheses + expression with group node that contains parsed subexpression
|
|
|
|
# 2. Iterate over all known operators in given precedence
|
|
for operator in self.searchOperators:
|
|
# 3. reduce all occurrences into corresponding parse tree nodes
|
|
while operator[0] in tokens:
|
|
pos_op = tokens.index(operator[0])
|
|
tok_op = tokens[pos_op]
|
|
if operator[1] == 0: # operator
|
|
treenode = operator[2](self.sigmaParser, tok_op)
|
|
tokens = tokens[:pos_op] + treenode + tokens[pos_op + 1:]
|
|
elif operator[1] == 1: # operator value
|
|
pos_val = pos_op + 1
|
|
tok_val = tokens[pos_val]
|
|
treenode = operator[2](self.sigmaParser, tok_op, tok_val)
|
|
tokens = tokens[:pos_op] + treenode + tokens[pos_val + 1:]
|
|
elif operator[1] == 2: # value1 operator value2
|
|
pos_val1 = pos_op - 1
|
|
pos_val2 = pos_op + 1
|
|
tok_val1 = tokens[pos_val1]
|
|
tok_val2 = tokens[pos_val2]
|
|
treenode = operator[2](self.sigmaParser, tok_op, tok_val1, tok_val2)
|
|
tokens = tokens[:pos_val1] + treenode + tokens[pos_val2 + 1:]
|
|
|
|
if len(tokens) != 1: # parse tree must begin with exactly one node
|
|
raise ValueError("Parse tree must have exactly one start node!")
|
|
|
|
return tokens
|
|
|
|
def __str__(self):
|
|
return str(self.parsedSearch)
|
|
|
|
def __len__(self):
|
|
return len(self.parsedSearch)
|
|
|
|
def getParseTree(self):
|
|
return(self.parsedSearch[0])
|