mirror of
https://github.com/valitydev/holmes.git
synced 2024-11-06 01:45:25 +00:00
Added purge scripts (#59)
Co-authored-by: Ian Bodrievskii <i.bodrievski@thewatch.io>
This commit is contained in:
parent
c063edc5e3
commit
5b3fc30ef6
128
scripts/dominant/common_tools.py
Executable file
128
scripts/dominant/common_tools.py
Executable file
@ -0,0 +1,128 @@
|
||||
#! /usr/bin/env python3
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import subprocess
|
||||
import argparse
|
||||
import datetime
|
||||
|
||||
env_default_service_ipaddr = 'FISTFUL_IPADDR'
|
||||
env_default_service_port = 'FISTFUL_PORT'
|
||||
|
||||
default_service_host = 'fistful_server'
|
||||
default_service_port = 8022
|
||||
|
||||
def args_init():
|
||||
return argparse.ArgumentParser()
|
||||
|
||||
|
||||
def args_add(parser, *args, **kwargs):
|
||||
parser.add_argument(*args, **kwargs)
|
||||
|
||||
|
||||
def args_parse(parser):
|
||||
args = parser.parse_args()
|
||||
#print(f'Input params: {args}')
|
||||
return args
|
||||
|
||||
|
||||
def service_call(service_name, ip_addr_port, func, *args):
|
||||
(service, service_path, service_proto_path) = service_get(service_name)
|
||||
service_url = service_resolve_url(service_path, *ip_addr_port)
|
||||
time = datetime.datetime.now().isoformat(timespec='milliseconds')
|
||||
#print(f'{time} Calling {service_name} {func}..')
|
||||
return woorl_call(service_url, service_proto_path, service, func, *args)
|
||||
|
||||
def service_call_safe(service_name, ip_addr_port, func, *args):
|
||||
(service, service_path, service_proto_path) = service_get(service_name)
|
||||
service_url = service_resolve_url(service_path, *ip_addr_port)
|
||||
time = datetime.datetime.now().isoformat(timespec='milliseconds')
|
||||
#print(f'{time} Calling {service_name} {func}..')
|
||||
return woorl_call_safe(service_url, service_proto_path, service, func, *args)
|
||||
|
||||
|
||||
def service_resolve_url(service_path, ip_addr, port):
|
||||
service_addr = (
|
||||
ip_addr
|
||||
or os.environ.get(env_default_service_ipaddr)
|
||||
or default_service_host)
|
||||
service_port = (
|
||||
port
|
||||
or os.environ.get(env_default_service_port)
|
||||
or default_service_port)
|
||||
service_url = f'http://{ip_addr}:{port}{service_path}'
|
||||
#print(f'Service URL resolved: {service_url}')
|
||||
return service_url
|
||||
|
||||
|
||||
def service_get(service):
|
||||
return {
|
||||
'domain_config_repository': (
|
||||
('Repository',
|
||||
'/v1/domain/repository',
|
||||
'../../damsel/proto/domain_config.thrift')
|
||||
),
|
||||
'withdrawal_management': (
|
||||
('Management',
|
||||
'/v1/withdrawal',
|
||||
'fistful-proto/proto/withdrawal.thrift')
|
||||
),
|
||||
'withdrawal_repairer': (
|
||||
('Repairer',
|
||||
'/v1/repair/withdrawal',
|
||||
'fistful-proto/proto/withdrawal.thrift')
|
||||
),
|
||||
'withdrawal_session_management': (
|
||||
('Management',
|
||||
'/v1/withdrawal_session',
|
||||
'fistful-proto/proto/withdrawal_session.thrift')
|
||||
),
|
||||
'shumpune_accounter': (
|
||||
('Accounter',
|
||||
'/shumpune',
|
||||
'shumaich-proto/proto/shumpune.thrift')
|
||||
)
|
||||
}[service]
|
||||
|
||||
|
||||
def woorl_call(service_url, proto_path, service, func, *args):
|
||||
woorl_cmd = [
|
||||
"woorl", "--deadline=30s", "-s", proto_path, service_url,
|
||||
service, func
|
||||
] + [maybe_json_dumps(arg) for arg in args]
|
||||
try:
|
||||
#print(f'Calling woorl with cmd={woorl_cmd}')
|
||||
return subprocess.check_output(woorl_cmd, text=True, stderr=subprocess.STDOUT, timeout=None)
|
||||
except subprocess.CalledProcessError as e:
|
||||
#print(e.output, file=sys.stderr)
|
||||
exit(e.returncode)
|
||||
|
||||
def woorl_call_safe(service_url, proto_path, service, func, *args):
|
||||
woorl_cmd = [
|
||||
"woorl", "--deadline=30s", "-s", proto_path, service_url,
|
||||
service, func
|
||||
] + [maybe_json_dumps(arg) for arg in args]
|
||||
try:
|
||||
#print(f'Calling woorl with cmd={woorl_cmd}')
|
||||
return subprocess.check_output(woorl_cmd, text=True, stderr=subprocess.STDOUT, timeout=None)
|
||||
except subprocess.CalledProcessError as e:
|
||||
return 'failed'
|
||||
|
||||
def maybe_json_dumps(woorl_arg):
|
||||
if isinstance(woorl_arg, dict):
|
||||
return json.dumps(woorl_arg)
|
||||
return woorl_arg
|
||||
|
||||
|
||||
def file_open(filename, mode):
|
||||
date = datetime.datetime.now().isoformat(timespec='seconds')
|
||||
filename += f'_{date}.json'
|
||||
filename = filename_clean(filename)
|
||||
return open(filename, mode)
|
||||
|
||||
|
||||
def filename_clean(filename):
|
||||
for char in '<>"/\|?* ':
|
||||
filename = filename.replace(char, '')
|
||||
return filename
|
109
scripts/dominant/domain_config_purge.py
Executable file
109
scripts/dominant/domain_config_purge.py
Executable file
@ -0,0 +1,109 @@
|
||||
#! /usr/bin/env python3
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
import json
|
||||
import copy
|
||||
import os
|
||||
|
||||
import domain_config_tools as dctools
|
||||
import common_tools as ctools
|
||||
|
||||
def make_remove_op(obj):
|
||||
return {
|
||||
"ops": [
|
||||
{
|
||||
"remove": {
|
||||
"object": obj
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def domain_config_purge(ip_addr_port, attempts):
|
||||
|
||||
all_time_removed_cnt = 0
|
||||
|
||||
for cnt in range(attempts):
|
||||
print(f'Purge domain config ==================> attempt #{cnt}..')
|
||||
|
||||
# Get domain config
|
||||
print("Checkout domain config, it may take some time..")
|
||||
raw_data = ctools.service_call(
|
||||
'domain_config_repository',
|
||||
ip_addr_port,
|
||||
'Checkout',
|
||||
dctools.domain_make_version('head')
|
||||
)
|
||||
data = json.loads(raw_data)
|
||||
domain_version = data['version']
|
||||
print(f'Domain config of version #{domain_version} received..')
|
||||
domain = data['domain']
|
||||
|
||||
all_objects = [obj['value'] for obj in domain]
|
||||
|
||||
objects_amount = len(all_objects)
|
||||
if objects_amount == 0:
|
||||
print('No objects in config, nothing to do here..')
|
||||
print('Done.')
|
||||
return
|
||||
|
||||
print(f'{objects_amount} objects found in domain config..')
|
||||
print(f'Start purging..')
|
||||
|
||||
success_cnt = 0
|
||||
fail_flag = False
|
||||
success_flag = False
|
||||
for obj in domain:
|
||||
|
||||
val = obj['value']
|
||||
commit = json.dumps(make_remove_op(val))
|
||||
call_result = ctools.service_call_safe(
|
||||
'domain_config_repository',
|
||||
ip_addr_port,
|
||||
'Commit',
|
||||
str(domain_version),
|
||||
commit
|
||||
)
|
||||
if call_result == 'failed':
|
||||
repeate = True
|
||||
obj_tag = list(obj['value'].keys())[0]
|
||||
obj_ref = obj['value'][obj_tag]['ref']
|
||||
print(f'Remove FAILED for DomainObject={obj_tag} Ref={obj_ref}')
|
||||
fail_flag = True
|
||||
else:
|
||||
success_flag = True
|
||||
domain_version += 1
|
||||
success_cnt += 1
|
||||
all_time_removed_cnt += 1
|
||||
print(f'Remove SUCCESS for DomainObject={obj_tag} Ref={obj_ref}')
|
||||
|
||||
if fail_flag == False:
|
||||
print(f'Removed {all_time_removed_cnt} objects')
|
||||
print('All objects removed, purge ok')
|
||||
print('Done.')
|
||||
return
|
||||
|
||||
if success_flag == False:
|
||||
raise Exception(f'No objects removed for the attempt!')
|
||||
|
||||
print(f'Removed {all_time_removed_cnt} objects')
|
||||
print(f'Domain config purge hasn\'t been finished completely in {attempts} attempts!')
|
||||
print('Done.')
|
||||
|
||||
def main():
|
||||
parser = ctools.args_init()
|
||||
ctools.args_add(parser, '-a', '--ip_addr', help='Dominant IP address')
|
||||
ctools.args_add(parser, '-p', '--port', help='Dominant port')
|
||||
ctools.args_add(parser, 'attempts', help='Purge attempts amount')
|
||||
args = ctools.args_parse(parser)
|
||||
|
||||
domain_config_purge((args.ip_addr, args.port), int(args.attempts))
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
||||
|
||||
|
||||
|
53
scripts/dominant/domain_config_tools.py
Executable file
53
scripts/dominant/domain_config_tools.py
Executable file
@ -0,0 +1,53 @@
|
||||
#! /usr/bin/env python3
|
||||
|
||||
import json
|
||||
|
||||
|
||||
def domain_make_version(version):
|
||||
result = {}
|
||||
if version == 'head':
|
||||
result = {"head": {}}
|
||||
else:
|
||||
result = {"version": int(version)}
|
||||
return result
|
||||
|
||||
def domain_get_object(obj_name, obj_ref, domain):
|
||||
for obj in domain:
|
||||
if obj_name in obj['key']:
|
||||
ref = obj.get("key").get(obj_name).get("id")
|
||||
if int(ref) == int(obj_ref):
|
||||
return obj.get("value").get(obj_name)
|
||||
|
||||
return None
|
||||
|
||||
def domain_search_objects(obj_name, domain):
|
||||
obj_refs = []
|
||||
for obj in domain:
|
||||
if obj_name in obj['key']:
|
||||
ref = obj.get("key").get(obj_name).get("id")
|
||||
obj_refs.append(ref)
|
||||
|
||||
return obj_refs
|
||||
|
||||
def selector_search_values(acc, selector):
|
||||
if "value" in selector:
|
||||
for val in selector.get("value"):
|
||||
val_id = val.get("id")
|
||||
acc.append(val_id)
|
||||
|
||||
if "decisions" in selector:
|
||||
for d in selector.get("decisions"):
|
||||
if "then_" in d:
|
||||
decision_result = d.get("then_")
|
||||
if "value" in decision_result:
|
||||
for val in decision_result.get("value"):
|
||||
val_id = val.get("id")
|
||||
if val_id not in acc:
|
||||
acc.append(val_id)
|
||||
|
||||
if "decisions" in decision_result:
|
||||
deeper_decisions = decision_result.get("decisions")
|
||||
deeper_values = selector_search_values([], deeper_decisions)
|
||||
acc.extend(deeper_values)
|
||||
|
||||
return acc
|
Loading…
Reference in New Issue
Block a user