Fixes #2: Send bot start/stop messages to slack channel

- Fix timestamps in delete notifications
This commit is contained in:
Prasad Ghangal 2019-01-02 14:12:57 +05:30
parent 334bdda7c6
commit 5a4430ee79
6 changed files with 97 additions and 32 deletions

View File

@ -1,9 +1,9 @@
package controller
import (
//"fmt"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
@ -11,7 +11,7 @@ import (
"github.com/infracloudio/kubeops/pkg/config"
"github.com/infracloudio/kubeops/pkg/events"
"github.com/infracloudio/kubeops/pkg/filterengine"
"github.com/infracloudio/kubeops/pkg/logging"
log "github.com/infracloudio/kubeops/pkg/logging"
"github.com/infracloudio/kubeops/pkg/notify"
"github.com/infracloudio/kubeops/pkg/utils"
@ -34,10 +34,23 @@ func findNamespace(ns string) string {
// RegisterInformers creates new informer controllers to watch k8s resources
func RegisterInformers(c *config.Config) {
sendMessage("...and now my watch begins! :crossed_swords:")
startTime = time.Now().Local()
// Get resync period
rsyncTimeStr, ok := os.LookupEnv("INFORMERS_RESYNC_PERIOD")
if !ok {
rsyncTimeStr = "30"
}
rsyncTime, err := strconv.Atoi(rsyncTimeStr)
if err != nil {
log.Logger.Fatal("Error in reading INFORMERS_RESYNC_PERIOD env var.", err)
}
// Register informers for resource lifecycle events
if len(c.Resources) > 0 {
logging.Logger.Info("Registering resource lifecycle informer")
log.Logger.Info("Registering resource lifecycle informer")
for _, r := range c.Resources {
if _, ok := utils.ResourceGetterMap[r.Name]; !ok {
continue
@ -47,7 +60,7 @@ func RegisterInformers(c *config.Config) {
continue
}
for _, ns := range r.Namespaces {
logging.Logger.Infof("Adding informer for resource:%s namespace:%s", r.Name, ns)
log.Logger.Infof("Adding informer for resource:%s namespace:%s", r.Name, ns)
watchlist := cache.NewListWatchFromClient(
utils.ResourceGetterMap[r.Name], r.Name, findNamespace(ns), fields.Everything())
@ -55,7 +68,7 @@ func RegisterInformers(c *config.Config) {
_, controller := cache.NewInformer(
watchlist,
object,
30*time.Minute,
time.Duration(rsyncTime)*time.Minute,
registerEventHandlers(r.Name, r.Events),
)
stopCh := make(chan struct{})
@ -69,14 +82,14 @@ func RegisterInformers(c *config.Config) {
// Register informers for k8s events
if len(c.Events.Types) > 0 {
logging.Logger.Info("Registering kubernetes events informer")
log.Logger.Info("Registering kubernetes events informer")
watchlist := cache.NewListWatchFromClient(
utils.KubeClient.CoreV1().RESTClient(), "events", apiV1.NamespaceAll, fields.Everything())
_, controller := cache.NewInformer(
watchlist,
&apiV1.Event{},
30*time.Minute,
time.Duration(rsyncTime)*time.Minute,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
@ -89,12 +102,12 @@ func RegisterInformers(c *config.Config) {
ns := eventObj.InvolvedObject.Namespace
eType := strings.ToLower(eventObj.Type)
logging.Logger.Debugf("Received event: kind:%s ns:%s type:%s", kind, ns, eType)
log.Logger.Debugf("Received event: kind:%s ns:%s type:%s", kind, ns, eType)
// Filter and forward
if (utils.AllowedEventKindsMap[utils.EventKind{kind, "all"}] ||
utils.AllowedEventKindsMap[utils.EventKind{kind, ns}]) && (utils.AllowedEventTypesMap[eType]) {
logging.Logger.Infof("Processing add to events: %s. Invoked Object: %s:%s", key, eventObj.InvolvedObject.Kind, eventObj.InvolvedObject.Namespace)
logEvent(obj, "events", "create", err)
log.Logger.Infof("Processing add to events: %s. Invoked Object: %s:%s", key, eventObj.InvolvedObject.Kind, eventObj.InvolvedObject.Namespace)
sendEvent(obj, "events", "create", err)
}
},
},
@ -109,7 +122,7 @@ func RegisterInformers(c *config.Config) {
signal.Notify(sigterm, syscall.SIGTERM)
signal.Notify(sigterm, syscall.SIGINT)
<-sigterm
sendMessage("my watch has ended!")
}
func registerEventHandlers(resourceType string, events []string) (handlerFns cache.ResourceEventHandlerFuncs) {
@ -117,33 +130,33 @@ func registerEventHandlers(resourceType string, events []string) (handlerFns cac
if event == "all" || event == "create" {
handlerFns.AddFunc = func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
logging.Logger.Debugf("Processing add to %v: %s", resourceType, key)
logEvent(obj, resourceType, "create", err)
log.Logger.Debugf("Processing add to %v: %s", resourceType, key)
sendEvent(obj, resourceType, "create", err)
}
}
if event == "all" || event == "update" {
handlerFns.UpdateFunc = func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
logging.Logger.Debugf("Processing update to %v: %s", resourceType, key)
logEvent(new, resourceType, "update", err)
log.Logger.Debugf("Processing update to %v: %s", resourceType, key)
sendEvent(new, resourceType, "update", err)
}
}
if event == "all" || event == "delete" {
handlerFns.DeleteFunc = func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
logging.Logger.Debugf("Processing delete to %v: %s", resourceType, key)
logEvent(obj, resourceType, "delete", err)
log.Logger.Debugf("Processing delete to %v: %s", resourceType, key)
sendEvent(obj, resourceType, "delete", err)
}
}
}
return handlerFns
}
func logEvent(obj interface{}, kind, eventType string, err error) {
func sendEvent(obj interface{}, kind, eventType string, err error) {
if err != nil {
logging.Logger.Error("Error while receiving event: ", err.Error())
log.Logger.Error("Error while receiving event: ", err.Error())
return
}
@ -151,14 +164,23 @@ func logEvent(obj interface{}, kind, eventType string, err error) {
if eventType == "create" {
objectMeta := utils.GetObjectMetaData(obj)
if objectMeta.CreationTimestamp.Sub(startTime).Seconds() <= 0 {
logging.Logger.Debug("Skipping older events")
log.Logger.Debug("Skipping older events")
return
}
}
// Skip older events
if eventType == "delete" {
objectMeta := utils.GetObjectMetaData(obj)
if objectMeta.DeletionTimestamp.Sub(startTime).Seconds() <= 0 {
log.Logger.Debug("Skipping older events")
return
}
}
// Check if Notify disabled
if !config.Notify {
logging.Logger.Info("Skipping notification")
log.Logger.Info("Skipping notification")
return
}
@ -168,5 +190,14 @@ func logEvent(obj interface{}, kind, eventType string, err error) {
// Send notification to communication chennel
notifier := notify.NewSlack()
notifier.Send(event)
notifier.SendEvent(event)
}
func sendMessage(msg string) {
if len(msg) <= 0 {
return
}
notifier := notify.NewSlack()
notifier.SendMessage(msg)
}

View File

@ -51,7 +51,7 @@ var LevelMap map[string]Level
func init() {
LevelMap = make(map[string]Level)
LevelMap["create"] = Info
LevelMap["Update"] = Debug
LevelMap["update"] = Warn
LevelMap["delete"] = Critical
LevelMap["error"] = Error
LevelMap["Warning"] = Critical
@ -69,7 +69,15 @@ func New(object interface{}, eventType string, kind string) Event {
Kind: objectTypeMeta.Kind,
Level: LevelMap[eventType],
Type: eventType,
TimeStamp: objectMeta.CreationTimestamp.Time,
}
// Add TimeStamps
if eventType == "create" {
event.TimeStamp = objectMeta.CreationTimestamp.Time
}
if eventType == "delete" {
event.TimeStamp = objectMeta.DeletionTimestamp.Time
}
if kind != "events" {

View File

@ -132,11 +132,13 @@ func runNotifierCommand(args []string) string {
}
if args[1] == "start" {
config.Notify = true
return "Notifier started!"
log.Logger.Info("Notifier enabled")
return "Brace yourselves, notifications are coming."
}
if args[1] == "stop" {
config.Notify = false
return "Notifier stopped!"
log.Logger.Info("Notifier disabled")
return "Sure! I won't send you notifications anymore."
}
if args[1] == "status" {
if config.Notify == false {

View File

@ -38,7 +38,7 @@ func (iv *IngressValidator) Run(object interface{}, event *events.Event) {
}
_, err := ValidServicePort(serviceName, ns, int32(servicePort))
if err != nil {
event.Messages = append(event.Messages, "Warning: Service "+serviceName+" used in ingress config does not exist or port not exposed\n")
event.Messages = append(event.Messages, "Service "+serviceName+" used in ingress config does not exist or port not exposed\n")
event.Level = events.Warn
}
}

View File

@ -6,5 +6,6 @@ import (
// Notifier to send event notification on the communication channels
type Notifier interface {
Send(events.Event) error
SendEvent(events.Event) error
SendMessage(string) error
}

View File

@ -40,8 +40,8 @@ func NewSlack() Notifier {
}
}
// Send event notification to slack
func (s *Slack) Send(event events.Event) error {
// SendEvent sends event notification to slack
func (s *Slack) SendEvent(event events.Event) error {
log.Logger.Info(fmt.Sprintf(">> Sending to slack: %+v", event))
api := slack.New(s.Token)
@ -63,7 +63,12 @@ func (s *Slack) Send(event events.Event) error {
},
},
Footer: "kubeops",
Ts: json.Number(strconv.FormatInt(event.TimeStamp.Unix(), 10)),
}
// Add timestamp
ts := json.Number(strconv.FormatInt(event.TimeStamp.Unix(), 10))
if ts > "0" {
attachment.Ts = ts
}
if event.Namespace != "" {
@ -114,7 +119,6 @@ func (s *Slack) Send(event events.Event) error {
attachment.Color = attachmentColor[event.Level]
params.Attachments = []slack.Attachment{attachment}
log.Logger.Infof("Sending message on %v with token %s", s.Channel, s.Token)
channelID, timestamp, err := api.PostMessage(s.Channel, "", params)
if err != nil {
log.Logger.Errorf("Error in sending slack message %s", err.Error())
@ -124,3 +128,22 @@ func (s *Slack) Send(event events.Event) error {
log.Logger.Infof("Message successfully sent to channel %s at %s", channelID, timestamp)
return nil
}
// SendMessage sends message to slack channel
func (s *Slack) SendMessage(msg string) error {
log.Logger.Info(fmt.Sprintf(">> Sending to slack: %+v", msg))
api := slack.New(s.Token)
params := slack.PostMessageParameters{
AsUser: true,
}
channelID, timestamp, err := api.PostMessage(s.Channel, msg, params)
if err != nil {
log.Logger.Errorf("Error in sending slack message %s", err.Error())
return err
}
log.Logger.Infof("Message successfully sent to channel %s at %s", channelID, timestamp)
return nil
}