mirror of
https://github.com/valitydev/salt.git
synced 2024-11-08 09:23:56 +00:00
Merge pull request #19170 from cedwards/kafka_returner
Initial commit - kafka returner
This commit is contained in:
commit
d779ee1fba
85
salt/returners/kafka_return.py
Normal file
85
salt/returners/kafka_return.py
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
'''
|
||||||
|
Return data to a Kafka topic
|
||||||
|
|
||||||
|
:maintainer: Christer Edwards (christer.edwards@gmail.com)
|
||||||
|
:maturity: 0.1
|
||||||
|
:depends: kafka-python
|
||||||
|
:platform: all
|
||||||
|
|
||||||
|
To enable this returner install kafka-python and enable the following settings
|
||||||
|
in the minion config:
|
||||||
|
|
||||||
|
returner.kafka.hostnames:
|
||||||
|
- "server1"
|
||||||
|
- "server2"
|
||||||
|
- "server3"
|
||||||
|
|
||||||
|
returner.kafka.topic: 'topic'
|
||||||
|
|
||||||
|
To use the kafka returner, append '--return kafka' to the Salt command, eg;
|
||||||
|
|
||||||
|
salt '*' test.ping --return kafka
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import salt.utils
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
from kafka import KafkaClient, SimpleProducer, SimpleConsumer
|
||||||
|
HAS_KAFKA = True
|
||||||
|
except ImportError:
|
||||||
|
HAS_KAFKA = False
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
__virtualname__ = 'kafka'
|
||||||
|
|
||||||
|
|
||||||
|
def __virtual__():
|
||||||
|
if not HAS_KAFKA:
|
||||||
|
return False
|
||||||
|
return __virtualname__
|
||||||
|
|
||||||
|
|
||||||
|
def _get_conn(ret=None):
|
||||||
|
'''
|
||||||
|
Return a kafka connection
|
||||||
|
'''
|
||||||
|
if __salt__['config.option']('returner.kafka.hostnames'):
|
||||||
|
hostnames = __salt__['config.option']('returner.kafka.hostnames')
|
||||||
|
return KafkaClient(hostnames)
|
||||||
|
else:
|
||||||
|
log.error('Unable to find kafka returner config option: hostnames')
|
||||||
|
|
||||||
|
|
||||||
|
def _close_conn(conn):
|
||||||
|
'''
|
||||||
|
Close the kafka connection
|
||||||
|
'''
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def returner(ret):
|
||||||
|
'''
|
||||||
|
Return information to a Kafka server
|
||||||
|
'''
|
||||||
|
if __salt__['config.option']('returner.kafka.topic'):
|
||||||
|
topic = __salt__['config.option']('returner.kafka.topic')
|
||||||
|
|
||||||
|
conn = _get_conn(ret)
|
||||||
|
producer = SimpleProducer(conn)
|
||||||
|
producer.send_messages(topic, json.dumps(ret))
|
||||||
|
|
||||||
|
_close_conn(conn)
|
||||||
|
else:
|
||||||
|
log.error('Unable to find kafka returner config option: topic')
|
||||||
|
|
Loading…
Reference in New Issue
Block a user