monitoring upload in chunks, default size = 500kB

This commit is contained in:
Arseniy Fomchenko 2017-04-10 20:01:48 +03:00
parent 49a46d04bc
commit dab38f3e1b

View File

@ -46,6 +46,17 @@ class BackendTypes(object):
pass
def chop(data_list, chunk_size):
if sys.getsizeof(str(data_list)) <= chunk_size:
return [data_list]
elif len(data_list) == 1:
logger.warning("Too large piece of Telegraf data. Might experience upload problems.")
return [data_list]
else:
mid = len(data_list) / 2
return chop(data_list[:mid], chunk_size) + chop(data_list[mid:], chunk_size)
class Plugin(AbstractPlugin, AggregateResultListener,
MonitoringDataListener):
SECTION = 'meta'
@ -109,7 +120,8 @@ class Plugin(AbstractPlugin, AggregateResultListener,
'log_monitoring_requests',
'log_status_requests',
'log_other_requests',
'threads_timeout'
'threads_timeout',
'chunk_size'
]
return opts
@ -125,6 +137,7 @@ class Plugin(AbstractPlugin, AggregateResultListener,
"target_lock_duration", "30m"))
self.send_status_period = expand_to_seconds(
self.get_option('send_status_period', '10'))
self.chunk_size = int(self.get_option("chunk_size", '500000'))
def check_task_is_open(self):
if self.backend_type == BackendTypes.OVERLOAD:
@ -348,7 +361,7 @@ class Plugin(AbstractPlugin, AggregateResultListener,
if len(data_list) > 0:
if self.is_telegraf:
# telegraf
self.monitoring_queue.put(data_list)
[self.monitoring_queue.put(chunk) for chunk in chop(data_list, self.chunk_size)]
else:
# monitoring
[self.monitoring_queue.put(data) for data in data_list]