mirror of
https://github.com/empayre/fleet.git
synced 2024-11-06 08:55:24 +00:00
33858d7301
# Checklist for submitter If some of the following don't apply, delete the relevant line. - [ ] Changes file added for user-visible changes in `changes/` or `orbit/changes/`. See [Changes files](https://fleetdm.com/docs/contributing/committing-changes#changes-files) for more information. - [ ] Documented any API changes (docs/Using-Fleet/REST-API.md or docs/Contributing/API-for-contributors.md) - [ ] Documented any permissions changes (docs/Using Fleet/manage-access.md) - [ ] Input data is properly validated, `SELECT *` is avoided, SQL injection is prevented (using placeholders for values in statements) - [ ] Added support on fleet's osquery simulator `cmd/osquery-perf` for new osquery data ingestion features. - [ ] Added/updated tests - [ ] Manual QA for all new/changed functionality - For Orbit and Fleet Desktop changes: - [ ] Manual QA must be performed in the three main OSs, macOS, Windows and Linux. - [ ] Auto-update manual QA, from released version of component to new version (see [tools/tuf/test](../tools/tuf/test/README.md)). Signed-off-by: guoguangwu <guoguangwu@magic-shield.com>
57 lines
1.2 KiB
Go
57 lines
1.2 KiB
Go
package logging
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/stretchr/testify/require"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"testing"
|
|
)
|
|
|
|
func TestKafkaRestWrite(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
var buf []byte
|
|
var err error
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
buf, err = io.ReadAll(r.Body)
|
|
require.NoError(t, err)
|
|
require.Equal(t, r.URL.Path, "/topics/foo")
|
|
require.Equal(t, r.Header.Get("Content-Type"), "foobar")
|
|
w.WriteHeader(200)
|
|
}))
|
|
defer server.Close()
|
|
|
|
producer := &kafkaRESTProducer{
|
|
client: server.Client(),
|
|
URL: fmt.Sprintf(krPublishTopicURL, server.URL, "foo"),
|
|
CheckURL: fmt.Sprintf(krCheckTopicURL, server.URL, "foo"),
|
|
ContentTypeValue: "foobar",
|
|
}
|
|
|
|
err = producer.Write(ctx, logs)
|
|
require.NoError(t, err)
|
|
|
|
expected := makeKafkaRecords(logs)
|
|
var actual kafkaRecords
|
|
err = json.Unmarshal(buf, &actual)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expected, actual)
|
|
}
|
|
|
|
func makeKafkaRecords(messages []json.RawMessage) kafkaRecords {
|
|
data := kafkaRecords{
|
|
Records: make([]kafkaValue, len(messages)),
|
|
}
|
|
|
|
for i, log := range messages {
|
|
data.Records[i] = kafkaValue{
|
|
Value: log,
|
|
}
|
|
}
|
|
return data
|
|
}
|