Merge pull request #9539 from jcollie/logstash-v1

Support for Logstash v1 event schema (and improved timestamp handling)
This commit is contained in:
Pedro Algarvio 2014-01-03 10:30:29 -08:00
commit cd8ccb99ed

View File

@ -11,17 +11,18 @@
UDP Logging Handler
-------------------
In order to setup the datagram handler for `Logstash`_, please define on
the salt configuration file:
For versions of `Logstash`_ before 1.2.0:
In the salt configuration file:
.. code-block:: yaml
logstash_udp_handler:
host: 127.0.0.1
port: 9999
version: 0
On the `Logstash`_ configuration file you need something like:
In the `Logstash`_ configuration file:
.. code-block:: text
@ -32,6 +33,27 @@
}
}
For version 1.2.0 of `Logstash`_ and newer:
In the salt configuration file:
.. code-block:: yaml
logstash_udp_handler:
host: 127.0.0.1
port: 9999
version: 1
In the `Logstash`_ configuration file:
.. code-block:: text
input {
udp {
port => 9999
codec => json
}
}
Please read the `UDP input`_ configuration page for additional information.
@ -39,16 +61,17 @@
ZeroMQ Logging Handler
----------------------
In order to setup the ZMQ handler for `Logstash`_, please define on the
salt configuration file:
For versions of `Logstash`_ before 1.2.0:
In the salt configuration file:
.. code-block:: yaml
logstash_zmq_handler:
address: tcp://127.0.0.1:2021
version: 0
On the `Logstash`_ configuration file you need something like:
In the `Logstash`_ configuration file:
.. code-block:: text
@ -63,6 +86,27 @@
}
}
For version 1.2.0 of `Logstash`_ and newer:
In the salt configuration file:
.. code-block:: yaml
logstash_zmq_handler:
address: tcp://127.0.0.1:2021
version: 1
In the `Logstash`_ configuration file:
.. code-block:: text
input {
zeromq {
topology => "pubsub"
address => "tcp://0.0.0.0:2021"
codec => json
}
}
Please read the `ZeroMQ input`_ configuration page for additional
information.
@ -123,13 +167,6 @@ from salt._compat import string_types
from salt.log.setup import LOG_LEVELS
from salt.log.mixins import NewStyleClassMixIn
# Import 3rd-party libs
try:
from pytz import utc as _UTC
HAS_PYTZ = True
except ImportError:
HAS_PYTZ = False
log = logging.getLogger(__name__)
# Define the module's virtual name
@ -151,11 +188,12 @@ def __virtual__():
def setup_handlers():
host = port = address = None
logstash_formatter = LogstashFormatter()
if 'logstash_udp_handler' in __opts__:
host = __opts__['logstash_udp_handler'].get('host', None)
port = __opts__['logstash_udp_handler'].get('port', None)
version = __opts__['logstash_udp_handler'].get('version', 0)
if host is None and port is None:
log.debug(
'The required \'logstash_udp_handler\' configuration keys, '
@ -163,6 +201,7 @@ def setup_handlers():
'configuring the logstash UDP logging handler.'
)
else:
logstash_formatter = LogstashFormatter(version = version)
udp_handler = DatagramLogstashHandler(host, port)
udp_handler.setFormatter(logstash_formatter)
udp_handler.setLevel(
@ -184,6 +223,7 @@ def setup_handlers():
if 'logstash_zmq_handler' in __opts__:
address = __opts__['logstash_zmq_handler'].get('address', None)
zmq_hwm = __opts__['logstash_zmq_handler'].get('hwm', 1000)
version = __opts__['logstash_zmq_handler'].get('version', 0)
if address is None:
log.debug(
@ -192,6 +232,7 @@ def setup_handlers():
'configuring the logstash ZMQ logging handler.'
)
else:
logstash_formatter = LogstashFormatter(version = version)
zmq_handler = ZMQLogstashHander(address, zmq_hwm=zmq_hwm)
zmq_handler.setFormatter(logstash_formatter)
zmq_handler.setLevel(
@ -215,20 +256,20 @@ def setup_handlers():
class LogstashFormatter(logging.Formatter, NewStyleClassMixIn):
def __init__(self, msg_type='logstash', msg_path='logstash'):
def __init__(self, msg_type='logstash', msg_path='logstash', version=0):
self.msg_path = msg_path
self.msg_type = msg_type
self.version = version
self.format = getattr(self, 'format_v{0}'.format(version))
super(LogstashFormatter, self).__init__(fmt=None, datefmt=None)
def formatTime(self, record, datefmt=None):
timestamp = datetime.datetime.utcfromtimestamp(record.created)
if HAS_PYTZ:
return _UTC.localize(timestamp).isoformat()
return '{0}+00:00'.format(timestamp.isoformat())
return datetime.datetime.utcfromtimestamp(record.created).isoformat()[:-3] + 'Z'
def format(self, record):
def format_v0(self, record):
host = socket.getfqdn()
message_dict = {
'@timestamp': self.formatTime(record),
'@fields': {
'levelname': record.levelname,
'logger': record.name,
@ -247,8 +288,7 @@ class LogstashFormatter(logging.Formatter, NewStyleClassMixIn):
),
'@source_host': host,
'@source_path': self.msg_path,
'@tags': [],
'@timestamp': self.formatTime(record),
'@tags': ['salt'],
'@type': self.msg_type,
}
@ -278,6 +318,50 @@ class LogstashFormatter(logging.Formatter, NewStyleClassMixIn):
message_dict['@fields'][key] = repr(value)
return json.dumps(message_dict)
def format_v1(self, record):
message_dict = {
'@version': 1,
'@timestamp': self.formatTime(record),
'host': socket.getfqdn(),
'levelname': record.levelname,
'logger': record.name,
'lineno': record.lineno,
'pathname': record.pathname,
'process': record.process,
'threadName': record.threadName,
'funcName': record.funcName,
'processName': record.processName,
'message': record.getMessage(),
'tags': ['salt'],
'type': self.msg_type
}
if record.exc_info:
message_dict['exc_info'] = self.formatException(
record.exc_info
)
# Add any extra attributes to the message field
for key, value in record.__dict__.items():
if key in ('args', 'asctime', 'created', 'exc_info', 'exc_text',
'filename', 'funcName', 'id', 'levelname', 'levelno',
'lineno', 'module', 'msecs', 'msecs', 'message', 'msg',
'name', 'pathname', 'process', 'processName',
'relativeCreated', 'thread', 'threadName'):
# These are already handled above or not handled at all
continue
if value is None:
message_dict[key] = value
continue
if isinstance(value, (string_types, bool, dict, float, int, list)):
message_dict[key] = value
continue
message_dict[key] = repr(value)
return json.dumps(message_dict)
class DatagramLogstashHandler(logging.handlers.DatagramHandler):
'''