mirror of
https://github.com/valitydev/atomic-threat-coverage.git
synced 2024-11-06 17:45:23 +00:00
Kibana saved search ID search and BaseKibana class
This commit is contained in:
parent
3fb1d761ea
commit
84e3685e2a
@ -6,9 +6,115 @@
|
||||
|
||||
import json
|
||||
import datetime
|
||||
import getpass
|
||||
import requests
|
||||
|
||||
|
||||
class BaseKibanaAgg:
|
||||
class BaseKibana:
|
||||
"""kibana_url - link to Kibana main page"""
|
||||
|
||||
username = str()
|
||||
password = str()
|
||||
_kibana_auth = None
|
||||
kibana_url = str()
|
||||
kibana_usage = None
|
||||
|
||||
@classmethod
|
||||
def init_kibana_api(cls):
|
||||
# TODO: Do some checks, test connection, etc
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def init_credentials(cls):
|
||||
_ = ""
|
||||
while _ not in ["y", "n"]:
|
||||
_ = input("Can I use Kibana? [y/n]: ")[0].lower()
|
||||
if _ == "n":
|
||||
cls.kibana_usage = False
|
||||
return False
|
||||
_ = ""
|
||||
while _ not in ["y", "n"]:
|
||||
_ = input("Does your Kibana instance requires " +
|
||||
"auth? [y/n]: ")[0].lower()
|
||||
if _ == "y":
|
||||
cls._kibana_auth = True
|
||||
cls.username = input("Username [%s]: " % cls.username)
|
||||
cls.password = getpass.getpass(
|
||||
"Password [%s]: " % "".join(["*" for val in cls.password])
|
||||
)
|
||||
elif _ == "n":
|
||||
cls._kibana_auth = False
|
||||
|
||||
cls.kibana_url = input("Provide Kibana URL (main page, for instance" +
|
||||
" http://localhost:5601/): ")
|
||||
while True:
|
||||
print("KIBANA_URL: %s" % cls.kibana_url)
|
||||
_ = input("Is this correct? [y/n]: ")[0].lower()
|
||||
if _ == "y":
|
||||
break
|
||||
else:
|
||||
cls.kibana_url = input("Provide Kibana URL " +
|
||||
"(main page, for instance" +
|
||||
" http://localhost:5601/): ")
|
||||
cls.kibana_url = cls.kibana_url if cls.kibana_url.endswith("/") else \
|
||||
cls.kibana_url + "/"
|
||||
cls.kibana_usage = True
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def check_kibana_vars(cls):
|
||||
if not isinstance(cls.kibana_usage, bool):
|
||||
return cls.init_credentials()
|
||||
if isinstance(cls._kibana_auth, bool):
|
||||
if cls._kibana_auth:
|
||||
if not cls.username or not cls.password:
|
||||
return cls.init_credentials
|
||||
if not cls.kibana_url:
|
||||
return cls.init_credentials
|
||||
else:
|
||||
return cls.init_credentials
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def search_id_of_title_by_type(cls, search_type, search_title):
|
||||
"""Returns an ID (string) of an object searched using object title
|
||||
search_type - string in ["index-pattern", "search"]
|
||||
search_title - string
|
||||
"""
|
||||
search_type = search_type.lower()
|
||||
if search_type not in ["index-pattern", "search"]:
|
||||
raise Exception("Search type (%s) not supported" % search_type)
|
||||
if cls.check_kibana_vars():
|
||||
result_dict = {}
|
||||
total_pages = int()
|
||||
current_page = 1
|
||||
suffix = "api/saved_objects/_find?" + \
|
||||
"type=%s&fields=title&fields=id" % search_type
|
||||
|
||||
r = requests.get(cls.kibana_url + suffix)
|
||||
|
||||
if r.json().get("total"):
|
||||
total_pages = r.json().get("total")
|
||||
|
||||
while current_page <= total_pages:
|
||||
if r.json().get("saved_objects"):
|
||||
for item in r.json().get("saved_objects"):
|
||||
if item.get("attributes"):
|
||||
result_dict[item.get("attributes").get("title")] =\
|
||||
item.get('id')
|
||||
if search_title in result_dict.keys():
|
||||
del(result_dict)
|
||||
return result_dict[search_title]
|
||||
else:
|
||||
current_page += 1
|
||||
r = requests.get(
|
||||
cls.kibana_url + suffix + "&pages=%s" % current_page
|
||||
)
|
||||
del(result_dict)
|
||||
return None
|
||||
|
||||
|
||||
class BaseKibanaAgg(BaseKibana):
|
||||
"""Base Kibana Agg"""
|
||||
|
||||
def __init__(self, id=None, enabled=None, type=None, schema=None,
|
||||
@ -47,7 +153,7 @@ class BaseKibanaAgg:
|
||||
return str(self.__call__())
|
||||
|
||||
|
||||
class BaseKibanaSeriesParams:
|
||||
class BaseKibanaSeriesParams(BaseKibana):
|
||||
"""Base Kibana Series Params"""
|
||||
|
||||
def __init__(self, id, data=None, drawLinesBetweenPoints=None,
|
||||
@ -99,7 +205,7 @@ class BaseKibanaSeriesParams:
|
||||
return str(self.__call__())
|
||||
|
||||
|
||||
class BaseKibanaVisState:
|
||||
class BaseKibanaVisState(BaseKibana):
|
||||
"""Base Kibana visState"""
|
||||
|
||||
def __init__(self, title=None, type=None, params=None, aggs=None):
|
||||
@ -136,7 +242,7 @@ class BaseKibanaVisState:
|
||||
return iter(self.__dict__)
|
||||
|
||||
|
||||
class BaseKibanaParams:
|
||||
class BaseKibanaParams(BaseKibana):
|
||||
"""Base Kibana Params"""
|
||||
|
||||
def __init__(self, type=None, grid=None, categoryAxes=None, valueAxes=None,
|
||||
@ -196,7 +302,7 @@ class BaseKibanaParams:
|
||||
return str(self.__call__())
|
||||
|
||||
|
||||
class BaseKibanaVisualizationObject:
|
||||
class BaseKibanaVisualizationObject(BaseKibana):
|
||||
"""Base Kibana VisualizationObject"""
|
||||
|
||||
def __init__(self, title=None):
|
||||
@ -223,7 +329,7 @@ class BaseKibanaVisualizationObject:
|
||||
return str(self.__call__())
|
||||
|
||||
|
||||
class BaseKibanaDoc:
|
||||
class BaseKibanaDoc(BaseKibana):
|
||||
"""Base Kibana Doc"""
|
||||
|
||||
def __init__(self):
|
||||
|
@ -11,18 +11,17 @@ from ast import literal_eval
|
||||
class KibanaVisualizationDoc(base.BaseKibanaDoc):
|
||||
"""Kibana Visualization Doc"""
|
||||
|
||||
def __init__(self, title, index_name, type):
|
||||
def __init__(self, title, type):
|
||||
|
||||
super().__init__() # Init Base Class
|
||||
self._meta_data_set = False
|
||||
self.metric_id = 1
|
||||
self.type = "visualization"
|
||||
self.visualization = base.BaseKibanaVisualizationObject(title=title)
|
||||
self.visualization.visState = base.BaseKibanaVisState(
|
||||
title=title, type=type)
|
||||
self.visualization.visState.params = base.BaseKibanaParams(type=type)
|
||||
self.visualization.kibanaSavedObjectMeta["searchSourceJSON"] = \
|
||||
"{\"index\":\"%s-*\",\"query\":{\"query_string\":{" % index_name +\
|
||||
"\"analyze_wildcard\":true,\"query\":\"*\"}},\"filter\":[]}"
|
||||
|
||||
self.some_defaults()
|
||||
|
||||
def some_defaults(self):
|
||||
@ -90,7 +89,44 @@ class KibanaVisualizationDoc(base.BaseKibanaDoc):
|
||||
)
|
||||
tmp_dictionary.pop("metric_id", None)
|
||||
tmp_dictionary.pop("updated_at", None)
|
||||
tmp_dictionary.pop("_meta_data_set", None)
|
||||
tmp_dictionary["_source"] = tmp_dictionary.pop("visualization")
|
||||
return json.dumps([tmp_dictionary])
|
||||
else:
|
||||
raise Exception("Data validation failed")
|
||||
|
||||
def set_index_search(self, index_name):
|
||||
if self.search_id_of_title_by_type(
|
||||
search_type="index-pattern", search_title=index_name
|
||||
):
|
||||
self.visualization.kibanaSavedObjectMeta["searchSourceJSON"] = \
|
||||
("{\"index\":\"%s\",\"query\":{\"query_string\":{" +
|
||||
"\"analyze_wildcard\":true,\"query\":\"*\"}},\"filter\":[]" +
|
||||
"}") % index_name
|
||||
self._meta_data_set = True
|
||||
else:
|
||||
raise Exception(
|
||||
"Did not find such index name. Didn't you forget an asterisk?"
|
||||
)
|
||||
|
||||
def set_saved_search(self, saved_search_name=None, saved_search_id=None):
|
||||
"""Provide ID if you know it and don't want to engage kibana"""
|
||||
if not saved_search_name and not saved_search_id:
|
||||
raise Exception(
|
||||
"What's the point of running this method without arguments?"
|
||||
)
|
||||
_id = ""
|
||||
if saved_search_id:
|
||||
_id = saved_search_id
|
||||
else:
|
||||
if not self.check_kibana_vars():
|
||||
raise Exception(
|
||||
"Cannot search for an ID if no access to Kibana!"
|
||||
)
|
||||
_id = self.search_id_of_title_by_type(
|
||||
search_type="search", search_title=saved_search_name
|
||||
)
|
||||
self.visualization.kibanaSavedObjectMeta["searchSourceJSON"] = \
|
||||
("{\"index\":\"%s\",\"query\":" +
|
||||
"{\"query\":\"\",\"language\":\"lucene\"},\"filter\":[]}") % _id
|
||||
self._meta_data_set = True
|
||||
|
Loading…
Reference in New Issue
Block a user