fleet/server/logging/kafkarest.go
Benjamin Edwards f8cf6ea91c
make context type value header configurable (#4441)
* make context type value header configurable
* populate config
2022-03-09 17:22:29 -05:00

113 lines
2.6 KiB
Go

package logging
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/fleetdm/fleet/v4/pkg/fleethttp"
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
)
const (
krContentTypeHeader = "Content-Type"
krTimestampHeader = "TimeStamp"
krPublishTopicURL = "%s/topics/%s"
krCheckTopicURL = "%s/topics/?topic=%s"
)
type KafkaRESTParams struct {
KafkaProxyHost string
KafkaTopic string
KafkaContentTypeValue string
KafkaTimeout int
}
type kafkaRESTProducer struct {
client *http.Client
URL string
CheckURL string
ContentTypeValue string
}
type kafkaRecords struct {
Records []kafkaValue `json:"records"`
}
type kafkaValue struct {
Value json.RawMessage `json:"value"`
}
func NewKafkaRESTWriter(p *KafkaRESTParams) (*kafkaRESTProducer, error) {
producer := &kafkaRESTProducer{
URL: fmt.Sprintf(krPublishTopicURL, p.KafkaProxyHost, p.KafkaTopic),
CheckURL: fmt.Sprintf(krCheckTopicURL, p.KafkaProxyHost, p.KafkaTopic),
client: fleethttp.NewClient(fleethttp.WithTimeout(time.Duration(p.KafkaTimeout) * time.Second)),
ContentTypeValue: p.KafkaContentTypeValue,
}
return producer, producer.checkTopic()
}
func (l *kafkaRESTProducer) Write(ctx context.Context, logs []json.RawMessage) error {
data := kafkaRecords{
Records: make([]kafkaValue, len(logs)),
}
for i, log := range logs {
data.Records[i] = kafkaValue{
Value: log,
}
}
output, err := json.Marshal(data)
if err != nil {
return ctxerr.Wrap(ctx, err, "kafka rest marshal")
}
resp, err := l.post(l.URL, bytes.NewBuffer(output))
if err != nil {
return ctxerr.Wrap(ctx, err, "kafka rest post")
}
defer resp.Body.Close()
return checkResponse(resp)
}
func checkResponse(resp *http.Response) (err error) {
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("Error: %d. %s", resp.StatusCode, string(body))
}
return nil
}
func (l *kafkaRESTProducer) checkTopic() (err error) {
resp, err := l.client.Get(l.CheckURL)
if err != nil {
return fmt.Errorf("kafka rest topic check: %w", err)
}
defer resp.Body.Close()
return checkResponse(resp)
}
func (l *kafkaRESTProducer) post(url string, buf io.Reader) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPost, url, buf)
if err != nil {
return nil, fmt.Errorf("kafka rest new request: %w", err)
}
now := float64(time.Now().UnixNano()) / float64(time.Second)
req.Header.Set(krContentTypeHeader, l.ContentTypeValue)
req.Header.Set(krTimestampHeader, fmt.Sprintf("%f", now))
return l.client.Do(req)
}