finished building and translating AST, asupporting nested queries

This commit is contained in:
Young 2021-08-22 21:58:04 -07:00
parent 6ccff2cff5
commit f51c462439
3 changed files with 190 additions and 102 deletions

View File

@ -1 +1 @@
{"description": "Detects rare scheduled tasks creations that only appear a few times per time frame and could reveal password dumpers, backdoor installs or other types of malicious code", "enabled": true, "false_positives": ["Software installation", "Software updates"], "filters": [], "from": "now-360s", "immutable": false, "index": ["winlogbeat-*"], "interval": "5m", "rule_id": "b0d77106-7bb0-41fe-bd94-d1752164d066", "language": "lucene", "output_index": ".siem-signals-default", "max_signals": 100, "risk_score": 5, "name": "Rare Schtasks Creations", "query": "(\"Security\" AND winlog.event_id:\"4698\")", "meta": {"from": "1m"}, "severity": "low", "tags": ["Execution", "Privilege Escalation", "Persistence", "T1053", "T1053.005"], "to": "now", "type": "threshold", "threat": [{"tactic": {"id": "TA0002", "reference": "", "name": "Execution"}, "framework": "MITRE ATT&CK\u00ae", "technique": [{"id": "T1053", "name": "Scheduled Task/Job", "reference": ""}]}, {"tactic": {"id": "TA0004", "reference": "", "name": "Privilege Escalation"}, "framework": "MITRE ATT&CK\u00ae", "technique": [{"id": "T1053", "name": "Scheduled Task/Job", "reference": ""}]}, {"tactic": {"id": "TA0003", "reference": "", "name": "Persistence"}, "framework": "MITRE ATT&CK\u00ae", "technique": [{"id": "T1053", "name": "Scheduled Task/Job", "reference": ""}]}], "version": 1, "threshold": {"field": "winlog.event_data.TaskName", "value": 6}}
{"type": "monitor", "name": "RDP over Reverse SSH Tunnel WFP", "description": "Detects svchost hosting RDP termsvcs communicating with the loopback address", "enabled": true, "schedule": {"period": {"interval": 5, "unit": "MINUTES"}}, "inputs": [{"search": {"indices": ["opensearch-security-logs"], "query": {"size": 1, "aggregations": {}, "query": {"bool": {"should": [{"bool": {"must": [{"match": {"": "\"System\""}}, {"match": {"winlog.event_id": "\"16\""}}]}}, {"bool": {"should": [{"match": {"winlog.event_data.HiveName.keyword": "*\\\\AppData\\\\Local\\\\Temp\\\\SAM*"}}, {"match": {"winlog.event_data.HiveName.keyword": "*.dmp"}}]}}]}}}}}], "tags": ["Defense Evasion", "Lateral Movement", "T1090", "T1090.001", "T1090.002", "T1021.001"], "triggers": [{"name": "generated-trigger", "severity": "2", "condition": {"script": {"source": "ctx.results[0] > 0", "lang": "painless"}}, "actions": []}], "sigma_meta_data": {"rule_id": "5bed80b6-b3e8-428e-a3ae-d3c757589e41", "threat": [{"tactic": {"id": "TA0005", "reference": "", "name": "Defense Evasion"}, "framework": "MITRE ATT&CK\u00ae", "technique": []}, {"tactic": {"id": "TA0008", "reference": "", "name": "Lateral Movement"}, "framework": "MITRE ATT&CK\u00ae", "technique": []}]}, "references": ["", ""]}

View File

@ -40,58 +40,214 @@ class Atom:
def __init__(self, field: str, prop: str) -> None:
self.field = field
self.prop = prop
def __str__(self) -> str:
return "Atom( {}, {} )".format(self.field.replace("\\\\", "\\"), self.prop.replace("\\\\", "\\"))
# Root of AST is always a Group
class Group:
def __init__(self) -> None:
def __init__(self) -> None:
def __str__(self) -> str:
return "Group( {} )".format(str(self.ary).replace("\\\\", "\\"))
class Boolean:
def __init__(self, expression: Union[Atom, Group]) -> None:
self.expression = expression
def __str__(self) -> str:
return "Boolean( {} )".format(str(self.expression).replace("\\\\", "\\"))
class Ary:
def __init__(self, bool1: Boolean, bool2: List[Tuple[str, Boolean]] = None) -> None:
self.bool1 = bool1
self.bool2 = bool2
def __str__(self) -> str:
return "Ary( {}, {} )".format(str(self.bool1).replace("\\\\", "\\"), [(rel, str(boolean).replace("\\\\", "\\")) for rel, boolean in self.bool2])
def group_init(self, ary:Ary):
def group_init(self, ary: Ary):
self.ary = ary
Group.__init__ = group_init
def parseAtom(s: str) -> Atom:
return Atom(s.split(":")[0], s.split(":")[0])
def parse_atom(s: str) -> Atom:
reg = r"(?<!\\):" # (any character that's not '\') followed by ':'
return Atom(*re.split(reg, s))
def parseGroup(s: str) -> Group:
return Group(parseAry(s[1:-1]))
# Since root of AST is always a Group, call parse_group to initiate parsing of overall expression
def parse_group(s: str) -> Group:
return Group(parse_ary(s[1:-1]))
def expandGroup(s: str) -> str:
field = s.strip("()").split(":")[0]
props = s.strip("()").split(":")[1].strip("()").split()
def expand_group(s: str) -> str:
reg = r"(?<!\\):" # (any character that's not '\') followed by ':'
field, props = re.split(reg, s.strip("()")) # props = (prop1 OR prop2...)
props = props.strip("()").split() # Further split props
newGroup = []
for index in range(len(props)):
element = props[index]
if index%2 == 0:
newGroup.append(f'{field}: {element}')
if element not in ["AND", "OR"]:
return "(" + "".join(newGroup) + ")"
return "(" + " ".join(newGroup) + ")"
def parseBoolean(s: str) -> Boolean:
if not s.contains("("):
expression = parseAtom(s)
def parse_boolean(s: str) -> Boolean:
if "(" not in s:
expression = parse_atom(s)
if s[0] != '(':
s = expandGroup(s)
expression = parseGroup(s)
s = expand_group(s)
expression = parse_group(s)
return Boolean(expression)
def parseAry(s: str) -> Ary:
def parse_ary(s: str) -> Ary:
lst = []
left = right = level = 0
while left < len(s):
# Going down one level
if right < len(s) and s[right] == '(':
level += 1
# Going up one level
elif right < len(s) and s[right] == ')':
level -= 1
# s[left:right] is parse-able
elif right == len(s) or (s[right] == ' ' and level == 0):
section = s[left:right]
# Handle Boolean case
if section not in ["AND", "OR"]:
section = parse_boolean(section)
left = right + 1
right += 1
# [Bool, Rel, Bool, Rel, Bool,...] => Bool, [(Rel, Bool), (Rel, Bool),...]
bool1 = lst[0]
bool2 = []
for i in range(1, len(lst), 2):
tupe = (lst[i], lst[i + 1])
return Ary(bool1, bool2)
def translate_atom(atom: Atom) -> dict:
return {
"match": {
atom.field: atom.prop
def translate_group(group: Group) -> dict:
return translate_ary(group.ary)
def translate_boolean(boolean: Boolean) -> dict:
if type(boolean.expression) is Atom:
return translate_atom(boolean.expression)
return translate_group(boolean.expression)
# Combining ary.bool1 and ary.bool2 into array of Boolean grouped by ANDs and split by ORs
def convert_bool_array(bool1: Boolean, boolArr: List[Tuple[str, Boolean]]) -> List[List[Boolean]]:
result = [[bool1]]
resultIndex = 0
for rel, boolean in boolArr:
if rel == "AND":
if resultIndex == len(result):
resultIndex += 2
return result
def adjust_matches(matches: List[dict]) -> List[dict]:
for index in range(len(matches)):
match = matches[index]
if "match" in match.keys():
matches[index] = {
"bool": {
"must": [match]
return matches
def contains_group(booleanArr: List[Boolean]) -> bool:
for boolean in booleanArr:
if type(boolean.expression) is Group:
return True
return False
def translate_ary(ary: Ary) -> dict:
parsedTranslation = convert_bool_array(ary.bool1, ary.bool2)
# print(f'ParsedTranslation: {parsedTranslation}')
clauses = []
translateIndex = 0
while translateIndex < len(parsedTranslation):
parsedExpression = parsedTranslation[translateIndex]
currMatches = []
clause = "must" # default clause is "must"; clause is "should" if multiple "or" statements
# Statement was joined by "or"
if len(parsedExpression) == 1:
counter = 1
tempIndex = translateIndex
while tempIndex+1 < len(parsedTranslation) and len(parsedTranslation[tempIndex+1]) == 1:
tempIndex += 1
counter += 1
# If there's more than one, use "should" clase instead of "must"
if counter > 1:
clause = "should"
parsedExpression = []
# Rebuild parsed expression to join statements together and fast forward the translate index
for i in range(counter):
parsedExpression += parsedTranslation[translateIndex+i]
translateIndex = tempIndex
# Iterate through each statement and join match statements into array
for boolean in parsedExpression:
# print(f'Boolean: {boolean}\nCurrMatches: {currMatches}\n')
if contains_group(parsedExpression):
print(f"\nContains Group; currMatches: {currMatches}\n")
currMatches = adjust_matches(currMatches)
currQuery = {
"bool": {
clause: currMatches
# print(f'\nCurrQuery: {currQuery}')
translateIndex += 1
# If only one type of clause, don't use nested bool object
if len(clauses) > 1:
return {
"bool": {
"should": clauses
return clauses[0]
class OpenSearchBackend(object):
"""OpenSearch detection rule backend."""
@ -229,80 +385,12 @@ class OpenSearchBackend(object):
def build_query(self, translation):
# print(f'\nparsed translation: {translation.strip("()").split("OR")}\n')
translation = "(\"System\" AND winlog.event_id:\"16\" AND winlog.event_data.HiveName.keyword:*\\\\AppData\\\\Local\\\\Temp\\\\SAM* AND winlog.event_data.HiveName.keyword:*.dmp)"
# translation = "(\"System\""
parsedTranslation = translation.strip("()").split("OR")
translation = "(\"System\" AND winlog.event_id:\"16\" OR winlog.event_data.HiveName.keyword:*\\\\AppData\\\\Local\\\\Temp\\\\SAM* OR winlog.event_data.HiveName.keyword:*.dmp)"
# translation = '(winlog.event_id:"5156" AND (winlog.event_data.SourcePort:"3389" AND winlog.event_data.DestAddress.keyword:(127.* OR \:\:1)))'
print(f'\nExpanded group: {expandGroup("winlog.event_data.DestAddress.keyword:(127.* OR 121)")}\n')
if len(parsedTranslation) == 0:
return {}
clauses = []
translateIndex = 0
while translateIndex < len(parsedTranslation):
expression = parsedTranslation[translateIndex]
currMatches = []
clause = "must" # default clause is "must"; clause is "should" if multiple "or" statements
parsedExpression = expression.split()
# Statement was joined by "or"
if len(parsedExpression) == 1:
counter = 1
tempIndex = translateIndex
while tempIndex+1 < len(parsedTranslation) and len(parsedTranslation[tempIndex+1].split()) == 1:
tempIndex += 1
counter += 1
# If there's more than one, use "should" clase instead of "must"
if counter > 1:
clause = "should"
parsedExpression = []
# Rebuild parsed expression to join statements together and fast forward the translate index
for i in range(counter):
translateIndex = tempIndex
# Iterate through each statement and join match statements into array
for expressionIndex in range(0, len(parsedExpression), 2):
element = parsedExpression[expressionIndex]
"match": {
element.split(":")[0]: element.split(":")[1]
currQuery = {
"bool": {
clause: currMatches
translateIndex += 1
# If only one type of clause, don't use nested bool object
if len(clauses) > 1:
if self.isThreshold:
self.isThreshold = False
return {
"bool": {
"should": clauses,
"filter": self.rule_threshold
return {
"bool": {
"should": clauses
return clauses[0]
ast = parse_group(translation)
print("\nAST: " + str(ast) + "\n")
return translate_group(ast)
Builds inputs field of OS monitor.

View File

@ -323,13 +323,13 @@ def main():
if not cmdargs.defer_abort:
except (NotImplementedError, TypeError) as e:
print("An unsupported feature is required for this Sigma rule (%s): " % (sigmafile) + str(e), file=sys.stderr)
print("Feel free to contribute for fun and fame, this is open source :) ->", file=sys.stderr)
if not cmdargs.ignore_backend_errors:
if not cmdargs.defer_abort:
# except (NotImplementedError, TypeError) as e:
# print("An unsupported feature is required for this Sigma rule (%s): " % (sigmafile) + str(e), file=sys.stderr)
# print("Feel free to contribute for fun and fame, this is open source :) ->", file=sys.stderr)
# if not cmdargs.ignore_backend_errors:
# if not cmdargs.defer_abort:
# sys.exit(error)
except PartialMatchError as e:
print("Error: Partial field match error: %s" % str(e), file=sys.stderr)
if not cmdargs.ignore_backend_errors: