2017-02-13 22:29:56 +00:00
# Sigma parser
2017-12-07 20:55:43 +00:00
# Copyright 2016-2017 Thomas Patzke, Florian Roth
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
2017-02-13 22:29:56 +00:00
import yaml
import re
2017-10-31 21:13:20 +00:00
import logging
2017-12-08 21:32:39 +00:00
from sigma . parser import ConditionAND , ConditionOR
2017-10-31 21:13:20 +00:00
logger = logging . getLogger ( __name__ )
2017-02-13 22:29:56 +00:00
2017-03-23 23:48:32 +00:00
# Field Mapping Definitions
def FieldMapping ( source , target = None ) :
""" Determines target type and instantiate appropriate mapping type """
if target == None :
return SimpleFieldMapping ( source , source )
elif type ( target ) == str :
return SimpleFieldMapping ( source , target )
2017-03-23 23:58:11 +00:00
elif type ( target ) == list :
return MultiFieldMapping ( source , target )
2017-03-24 23:21:44 +00:00
elif type ( target ) == dict :
return ConditionalFieldMapping ( source , target )
2017-03-23 23:48:32 +00:00
class SimpleFieldMapping :
""" 1:1 field mapping """
target_type = str
def __init__ ( self , source , target ) :
""" Initialization with generic target type check """
if type ( target ) != self . target_type :
raise TypeError ( " Target type mismatch: wrong mapping type for this target " )
self . source = source
self . target = target
def resolve ( self , key , value , sigmaparser ) :
""" Return mapped field name """
return ( self . target , value )
2017-03-29 20:22:01 +00:00
def resolve_fieldname ( self , fieldname ) :
return self . target
2017-03-23 23:58:11 +00:00
class MultiFieldMapping ( SimpleFieldMapping ) :
""" 1:n field mapping that expands target field names into OR conditions """
target_type = list
def resolve ( self , key , value , sigmaparser ) :
""" Returns multiple target field names as OR condition """
cond = ConditionOR ( )
for fieldname in self . target :
cond . add ( ( fieldname , value ) )
return cond
2017-03-29 20:22:01 +00:00
def resolve_fieldname ( self , fieldname ) :
return self . target
2017-03-24 23:21:44 +00:00
class ConditionalFieldMapping ( SimpleFieldMapping ) :
"""
Conditional field mapping :
* key contains field = value condition , value target mapping
* key " default " maps when no condition matches
* if no condition matches and there is no default , don ' t perform mapping
"""
target_type = dict
def __init__ ( self , source , target ) :
""" Init table between condition field names and values """
super ( ) . __init__ ( source , target )
self . conditions = dict ( ) # condition field -> condition value -> target fields
self . default = None
for condition , target in self . target . items ( ) :
try : # key contains condition (field=value)
field , value = condition . split ( " = " )
self . add_condition ( field , value , target )
except ValueError as e : # no, condition - "default" expected
if condition == " default " :
if self . default == None :
if type ( target ) == str :
self . default = [ target ]
elif type ( target ) == list :
self . default = target
else :
raise SigmaConfigParseError ( " Default mapping must be single value or list " )
else :
raise SigmaConfigParseError ( " Conditional field mapping can have only one default value, use list for multiple target mappings " )
else :
raise SigmaConfigParseError ( " Expected condition or default " ) from e
def add_condition ( self , field , value , target ) :
if field not in self . conditions :
self . conditions [ field ] = dict ( )
if value not in self . conditions [ field ] :
self . conditions [ field ] [ value ] = list ( )
if type ( target ) == str :
self . conditions [ field ] [ value ] . append ( target )
elif type ( target ) == list :
self . conditions [ field ] [ value ] . extend ( target )
def resolve ( self , key , value , sigmaparser ) :
# build list of matching target mappings
targets = set ( )
for condfield in self . conditions :
if condfield in sigmaparser . values :
rulefieldvalues = sigmaparser . values [ condfield ]
for condvalue in self . conditions [ condfield ] :
if condvalue in rulefieldvalues :
targets . update ( self . conditions [ condfield ] [ condvalue ] )
if len ( targets ) == 0 : # no matching condition, try with default mapping
if self . default != None :
targets = self . default
if len ( targets ) == 1 : # result set contains only one target, return mapped item (like SimpleFieldMapping)
return ( targets . pop ( ) , value )
elif len ( targets ) > 1 : # result set contains multiple targets, return all linked as OR condition (like MultiFieldMapping)
cond = ConditionOR ( )
for target in targets :
cond . add ( ( target , value ) )
return cond
else : # no mapping found
return ( key , value )
2017-03-29 20:22:01 +00:00
def resolve_fieldname ( self , fieldname ) :
if self . default != None :
return self . default
else :
return fieldname
2017-03-04 23:37:28 +00:00
# Configuration
class SigmaConfiguration :
2017-03-05 22:44:52 +00:00
""" Sigma converter configuration. Contains field mappings and logsource descriptions """
2017-03-06 21:07:04 +00:00
def __init__ ( self , configyaml = None ) :
if configyaml == None :
2017-03-17 22:28:06 +00:00
self . config = None
2017-03-05 22:44:52 +00:00
self . fieldmappings = dict ( )
2017-03-06 21:07:04 +00:00
self . logsources = dict ( )
2017-03-17 22:28:06 +00:00
self . logsourcemerging = SigmaLogsourceConfiguration . MM_AND
2017-10-22 22:05:12 +00:00
self . defaultindex = None
2017-03-17 22:28:06 +00:00
self . backend = None
2017-03-06 21:07:04 +00:00
else :
config = yaml . safe_load ( configyaml )
2017-03-17 22:28:06 +00:00
self . config = config
2017-03-04 23:37:28 +00:00
2017-03-23 23:48:32 +00:00
self . fieldmappings = dict ( )
2017-03-06 21:07:04 +00:00
try :
2017-03-23 23:48:32 +00:00
for source , target in config [ ' fieldmappings ' ] . items ( ) :
self . fieldmappings [ source ] = FieldMapping ( source , target )
2017-03-06 21:07:04 +00:00
except KeyError :
2017-03-23 23:48:32 +00:00
pass
2017-03-06 21:07:04 +00:00
if type ( self . fieldmappings ) != dict :
raise SigmaConfigParseError ( " Fieldmappings must be a map " )
2017-03-14 22:22:32 +00:00
try :
self . logsourcemerging = config [ ' logsourcemerging ' ]
except KeyError :
self . logsourcemerging = SigmaLogsourceConfiguration . MM_AND
2017-03-17 22:28:06 +00:00
2017-10-22 22:05:12 +00:00
try :
self . defaultindex = config [ ' defaultindex ' ]
except KeyError :
self . defaultindex = None
2017-03-12 22:12:21 +00:00
self . logsources = list ( )
2017-03-17 22:28:06 +00:00
self . backend = None
2017-03-06 21:07:04 +00:00
def get_fieldmapping ( self , fieldname ) :
""" Return mapped fieldname if mapping defined or field name given in parameter value """
2017-03-05 22:44:52 +00:00
try :
2017-03-06 21:07:04 +00:00
return self . fieldmappings [ fieldname ]
2017-03-05 22:44:52 +00:00
except KeyError :
2017-03-23 23:48:32 +00:00
return FieldMapping ( fieldname )
2017-03-04 23:37:28 +00:00
2017-03-12 22:12:21 +00:00
def get_logsource ( self , category , product , service ) :
""" Return merged log source definition of all logosurces that match criteria """
2017-03-14 22:22:32 +00:00
matching = [ logsource for logsource in self . logsources if logsource . matches ( category , product , service ) ]
2017-10-22 22:05:12 +00:00
return SigmaLogsourceConfiguration ( matching , self . defaultindex )
2017-03-12 22:12:21 +00:00
2017-03-17 22:28:06 +00:00
def set_backend ( self , backend ) :
""" Set backend. This is used by other code to determine target properties for index addressing """
self . backend = backend
if self . config != None :
if ' logsources ' in self . config :
logsources = self . config [ ' logsources ' ]
if type ( logsources ) != dict :
raise SigmaConfigParseError ( " Logsources must be a map " )
for name , logsource in logsources . items ( ) :
2017-10-22 22:05:12 +00:00
self . logsources . append ( SigmaLogsourceConfiguration ( logsource , self . defaultindex , name , self . logsourcemerging , self . get_indexfield ( ) ) )
2017-03-17 22:28:06 +00:00
def get_indexfield ( self ) :
""" Get index condition if index field name is configured """
if self . backend != None :
return self . backend . index_field
2017-03-12 22:12:21 +00:00
class SigmaLogsourceConfiguration :
""" Contains the definition of a log source """
2017-03-14 22:22:32 +00:00
MM_AND = " and " # Merge all conditions with AND
MM_OR = " or " # Merge all conditions with OR
2017-03-12 22:12:21 +00:00
2017-10-22 22:05:12 +00:00
def __init__ ( self , logsource = None , defaultindex = None , name = None , mergemethod = MM_AND , indexfield = None ) :
2017-03-12 22:12:21 +00:00
self . name = name
2017-03-17 22:28:06 +00:00
self . indexfield = indexfield
2017-03-12 22:12:21 +00:00
if logsource == None : # create empty object
self . category = None
self . product = None
self . service = None
self . index = list ( )
self . conditions = None
elif type ( logsource ) == list and all ( [ isinstance ( o , SigmaLogsourceConfiguration ) for o in logsource ] ) : # list of SigmaLogsourceConfigurations: merge according to mergemethod
# Merge category, product and service
2017-03-17 22:28:06 +00:00
categories = set ( [ ls . category for ls in logsource if ls . category != None ] )
products = set ( [ ls . product for ls in logsource if ls . product != None ] )
services = set ( [ ls . service for ls in logsource if ls . service != None ] )
2017-03-12 22:12:21 +00:00
if len ( categories ) > 1 or len ( products ) > 1 or len ( services ) > 1 :
2017-03-29 21:33:26 +00:00
raise ValueError ( " Merged SigmaLogsourceConfigurations must have disjunct categories ( %s ), products ( %s ) and services ( %s ) " % ( str ( categories ) , str ( products ) , str ( services ) ) )
2017-03-12 22:12:21 +00:00
try :
self . category = categories . pop ( )
except KeyError :
self . category = None
try :
self . product = products . pop ( )
except KeyError :
self . product = None
try :
self . service = services . pop ( )
except KeyError :
self . service = None
# Merge all index patterns
self . index = list ( set ( [ index for ls in logsource for index in ls . index ] ) ) # unique(flat(logsources.index))
2017-10-22 22:05:12 +00:00
if len ( self . index ) == 0 and defaultindex is not None : # if no index pattern matched and default index is present: use default index
if type ( defaultindex ) == str :
self . index = [ defaultindex ]
elif type ( defaultindex ) == list and all ( [ type ( i ) == str for i in defaultindex ] ) :
self . index = defaultindex
else :
raise TypeError ( " Default index must be string or list of strings " )
2017-03-12 22:12:21 +00:00
2017-03-17 22:28:06 +00:00
# "merge" index field (should never differ between instances because it is provided by backend class
indexfields = [ ls . indexfield for ls in logsource if ls . indexfield != None ]
try :
self . indexfield = indexfields [ 0 ]
except IndexError :
self . indexfield = None
2017-03-12 22:12:21 +00:00
# Merge conditions according to mergemethod
2017-03-14 22:22:32 +00:00
if mergemethod == self . MM_AND :
2017-03-12 22:12:21 +00:00
cond = ConditionAND ( )
2017-03-14 22:22:32 +00:00
elif mergemethod == self . MM_OR :
2017-03-12 22:12:21 +00:00
cond = ConditionOR ( )
else :
2017-03-14 22:22:32 +00:00
raise ValueError ( " Mergemethod must be ' %s ' or ' %s ' " % ( self . MM_AND , self . MM_OR ) )
2017-03-12 22:12:21 +00:00
for ls in logsource :
2017-03-17 22:28:06 +00:00
if ls . conditions != None :
cond . add ( ls . conditions )
if len ( cond ) > 0 :
self . conditions = cond
else :
self . conditions = None
2017-03-12 22:12:21 +00:00
elif type ( logsource ) == dict : # create logsource configuration from parsed yaml
if ' category ' in logsource and type ( logsource [ ' category ' ] ) != str \
or ' product ' in logsource and type ( logsource [ ' product ' ] ) != str \
or ' service ' in logsource and type ( logsource [ ' service ' ] ) != str :
raise SigmaConfigParseError ( " Logsource category, product or service must be a string " )
try :
self . category = logsource [ ' category ' ]
except KeyError :
self . category = None
try :
self . product = logsource [ ' product ' ]
except KeyError :
self . product = None
try :
self . service = logsource [ ' service ' ]
except KeyError :
self . service = None
if self . category == None and self . product == None and self . service == None :
raise SigmaConfigParseError ( " Log source definition will not match " )
if ' index ' in logsource :
2017-03-17 22:28:06 +00:00
index = logsource [ ' index ' ]
if type ( index ) not in ( str , list ) :
2017-03-12 22:12:21 +00:00
raise SigmaConfigParseError ( " Logsource index must be string or list of strings " )
2017-10-22 22:05:12 +00:00
if type ( index ) == list and not all ( [ type ( index ) == str for index in logsource [ ' index ' ] ] ) :
2017-03-12 22:12:21 +00:00
raise SigmaConfigParseError ( " Logsource index patterns must be strings " )
2017-03-17 22:28:06 +00:00
if type ( index ) == list :
self . index = index
else :
self . index = [ index ]
2017-03-12 22:12:21 +00:00
else :
2017-10-22 22:05:12 +00:00
# no default index handling here - this branch is executed if log source definitions are parsed from
# config and these must not necessarily contain an index definition. A valid index may later be result
# from a merge, where default index handling applies.
2017-03-14 22:22:32 +00:00
self . index = [ ]
2017-03-12 22:12:21 +00:00
if ' conditions ' in logsource :
if type ( logsource [ ' conditions ' ] ) != dict :
raise SigmaConfigParseError ( " Logsource conditions must be a map " )
cond = ConditionAND ( )
for key , value in logsource [ ' conditions ' ] . items ( ) :
cond . add ( ( key , value ) )
self . conditions = cond
else :
self . conditions = None
else :
raise SigmaConfigParseError ( " Logsource definitions must be maps " )
def matches ( self , category , product , service ) :
""" Match log source definition against given criteria, None = ignore """
searched = 0
for searchval , selfval in zip ( ( category , product , service ) , ( self . category , self . product , self . service ) ) :
2017-09-10 22:27:14 +00:00
if searchval == None and selfval != None :
2017-03-29 21:33:26 +00:00
return False
2017-09-15 22:32:31 +00:00
if selfval != None :
2017-03-12 22:12:21 +00:00
searched + = 1
if searchval != selfval :
return False
if searched :
return True
2017-03-17 22:28:06 +00:00
def get_indexcond ( self ) :
""" Get index condition if index field name is configured """
cond = ConditionOR ( )
if self . indexfield :
for index in self . index :
cond . add ( ( self . indexfield , index ) )
return cond
else :
return None
def __str__ ( self ) :
return " [ LogSourceConfiguration: %s %s %s indices: %s ] " % ( self . category , self . product , self . service , str ( self . index ) )
2017-03-04 23:37:28 +00:00
class SigmaConfigParseError ( Exception ) :
pass
2017-11-01 23:02:15 +00:00
# Rule Filtering
class SigmaRuleFilter :
""" Filter for Sigma rules with conditions """
LEVELS = {
" low " : 0 ,
" medium " : 1 ,
" high " : 2 ,
" critical " : 3
}
STATES = [ " experimental " , " testing " , " stable " ]
def __init__ ( self , expr ) :
self . minlevel = None
self . maxlevel = None
self . status = None
self . logsources = list ( )
for cond in [ c . replace ( " " , " " ) for c in expr . split ( " , " ) ] :
if cond . startswith ( " level<= " ) :
try :
level = cond [ cond . index ( " = " ) + 1 : ]
self . maxlevel = self . LEVELS [ level ]
except KeyError as e :
raise SigmaRuleFilterParseException ( " Unknown level ' %s ' in condition ' %s ' " % ( level , cond ) ) from e
elif cond . startswith ( " level>= " ) :
try :
level = cond [ cond . index ( " = " ) + 1 : ]
self . minlevel = self . LEVELS [ level ]
except KeyError as e :
raise SigmaRuleFilterParseException ( " Unknown level ' %s ' in condition ' %s ' " % ( level , cond ) ) from e
elif cond . startswith ( " level= " ) :
try :
level = cond [ cond . index ( " = " ) + 1 : ]
self . minlevel = self . LEVELS [ level ]
self . maxlevel = self . minlevel
except KeyError as e :
raise SigmaRuleFilterParseException ( " Unknown level ' %s ' in condition ' %s ' " % ( level , cond ) ) from e
elif cond . startswith ( " status= " ) :
self . status = cond [ cond . index ( " = " ) + 1 : ]
if self . status not in self . STATES :
raise SigmaRuleFilterParseException ( " Unknown status ' %s ' in condition ' %s ' " % ( self . status , cond ) )
elif cond . startswith ( " logsource= " ) :
self . logsources . append ( cond [ cond . index ( " = " ) + 1 : ] )
else :
raise SigmaRuleFilterParseException ( " Unknown condition ' %s ' " % cond )
def match ( self , yamldoc ) :
""" Match filter conditions against rule """
# Levels
if self . minlevel is not None or self . maxlevel is not None :
try :
level = self . LEVELS [ yamldoc [ ' level ' ] ]
except KeyError : # missing or invalid level
return False # User wants level restriction, but it's not possible here
# Minimum level
if self . minlevel is not None :
if level < self . minlevel :
return False
# Maximum level
if self . maxlevel is not None :
if level > self . maxlevel :
return False
# Status
if self . status is not None :
try :
status = yamldoc [ ' status ' ]
except KeyError : # missing status
return False # User wants status restriction, but it's not possible here
if status != self . status :
return False
# Log Sources
if len ( self . logsources ) > 0 :
try :
logsources = { value for key , value in yamldoc [ ' logsource ' ] . items ( ) }
except ( KeyError , AttributeError ) : # no log source set
return False # User wants status restriction, but it's not possible here
for logsrc in self . logsources :
if logsrc not in logsources :
return False
# all tests passed
return True
class SigmaRuleFilterParseException ( Exception ) :
pass