mirror of
https://github.com/empayre/fleet.git
synced 2024-11-06 00:45:19 +00:00
Issue 2027 better error visibility (#2069)
This commit is contained in:
parent
43d1a8c9a0
commit
2902da76ca
@ -5,6 +5,7 @@ import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
@ -20,10 +21,12 @@ import (
|
||||
eeservice "github.com/fleetdm/fleet/v4/ee/server/service"
|
||||
"github.com/fleetdm/fleet/v4/server"
|
||||
"github.com/fleetdm/fleet/v4/server/config"
|
||||
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
|
||||
"github.com/fleetdm/fleet/v4/server/datastore/cached_mysql"
|
||||
"github.com/fleetdm/fleet/v4/server/datastore/mysql"
|
||||
"github.com/fleetdm/fleet/v4/server/datastore/redis"
|
||||
"github.com/fleetdm/fleet/v4/server/datastore/s3"
|
||||
"github.com/fleetdm/fleet/v4/server/errorstore"
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
"github.com/fleetdm/fleet/v4/server/health"
|
||||
"github.com/fleetdm/fleet/v4/server/launcher"
|
||||
@ -345,6 +348,11 @@ the way that the Fleet server works.
|
||||
// Instantiate a gRPC service to handle launcher requests.
|
||||
launcher := launcher.New(svc, logger, grpc.NewServer(), healthCheckers)
|
||||
|
||||
// TODO: gather all the different contexts and use just one
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
eh := errorstore.NewHandler(ctx, redisPool, logger, config.Logging.ErrorRetentionPeriod)
|
||||
|
||||
rootMux := http.NewServeMux()
|
||||
rootMux.Handle("/healthz", prometheus.InstrumentHandler("healthz", health.Handler(httpLogger, healthCheckers)))
|
||||
rootMux.Handle("/version", prometheus.InstrumentHandler("version", version.Handler()))
|
||||
@ -352,7 +360,7 @@ the way that the Fleet server works.
|
||||
rootMux.Handle("/metrics", prometheus.InstrumentHandler("metrics", promhttp.Handler()))
|
||||
rootMux.Handle("/api/", apiHandler)
|
||||
rootMux.Handle("/", frontendHandler)
|
||||
rootMux.Handle("/debug/", service.MakeDebugHandler(svc, config, logger))
|
||||
rootMux.Handle("/debug/", service.MakeDebugHandler(svc, config, logger, eh))
|
||||
|
||||
if path, ok := os.LookupEnv("FLEET_TEST_PAGE_PATH"); ok {
|
||||
// test that we can load this
|
||||
@ -411,6 +419,7 @@ the way that the Fleet server works.
|
||||
writeTimeout = liveQueryRestPeriod
|
||||
}
|
||||
|
||||
httpSrvCtx := ctxerr.NewContext(ctx, eh)
|
||||
srv := &http.Server{
|
||||
Addr: config.Server.Address,
|
||||
Handler: launcher.Handler(rootMux),
|
||||
@ -419,6 +428,9 @@ the way that the Fleet server works.
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
IdleTimeout: 5 * time.Minute,
|
||||
MaxHeaderBytes: 1 << 18, // 0.25 MB (262144 bytes)
|
||||
BaseContext: func(l net.Listener) context.Context {
|
||||
return httpSrvCtx
|
||||
},
|
||||
}
|
||||
srv.SetKeepAlivesEnabled(config.Server.Keepalive)
|
||||
errs := make(chan error, 2)
|
||||
@ -443,6 +455,7 @@ the way that the Fleet server works.
|
||||
defer cancel()
|
||||
errs <- func() error {
|
||||
cancelBackground()
|
||||
cancelFunc()
|
||||
launcher.GracefulStop()
|
||||
return srv.Shutdown(ctx)
|
||||
}()
|
||||
|
@ -1105,6 +1105,20 @@ Whether or not to log the welcome banner.
|
||||
disable_banner: true
|
||||
```
|
||||
|
||||
##### logging_error_retention_period
|
||||
|
||||
The amount of time to keep an error. Unique instances of errors are stored temporarily to help
|
||||
with troubleshooting, this setting controls that duration.
|
||||
|
||||
- Default value: 24h
|
||||
- Environment variable: `FLEET_LOGGING_ERROR_RETENTION_PERIOD`
|
||||
- Config file format:
|
||||
|
||||
```
|
||||
logging:
|
||||
error_retention_period: 1h
|
||||
```
|
||||
|
||||
#### Filesystem
|
||||
|
||||
##### filesystem_status_log_file
|
||||
|
2
go.mod
2
go.mod
@ -59,6 +59,7 @@ require (
|
||||
github.com/prometheus/common v0.4.1 // indirect
|
||||
github.com/prometheus/procfs v0.2.0 // indirect
|
||||
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
|
||||
github.com/rotisserie/eris v0.5.1 // indirect
|
||||
github.com/rs/zerolog v1.20.0
|
||||
github.com/russellhaering/goxmldsig v1.1.0
|
||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
@ -71,6 +72,7 @@ require (
|
||||
github.com/urfave/cli/v2 v2.3.0
|
||||
github.com/valyala/fasthttp v1.31.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
|
||||
golang.org/x/tools v0.1.5 // indirect
|
||||
google.golang.org/grpc v1.38.0
|
||||
|
2
go.sum
2
go.sum
@ -835,6 +835,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
|
||||
github.com/rogpeppe/go-internal v1.5.2/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
||||
github.com/rogpeppe/go-internal v1.6.2 h1:aIihoIOHCiLZHxyoNQ+ABL4NKhFTgKLBdMLyEAh98m0=
|
||||
github.com/rogpeppe/go-internal v1.6.2/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
|
||||
github.com/rotisserie/eris v0.5.1 h1:SbzZloAUjoKX0eiQW187wop45Q5740Pz212NlIz5mLs=
|
||||
github.com/rotisserie/eris v0.5.1/go.mod h1:JmkIDhvuvDk1kDFGe5RZ3LXIrkEGEN0E6HskH5BCehE=
|
||||
github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc=
|
||||
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
||||
github.com/rs/zerolog v1.20.0 h1:38k9hgtUBdxFwE34yS8rTHmHBa4eN16E4DJlv177LNs=
|
||||
|
@ -128,9 +128,10 @@ type OsqueryConfig struct {
|
||||
|
||||
// LoggingConfig defines configs related to logging
|
||||
type LoggingConfig struct {
|
||||
Debug bool
|
||||
JSON bool
|
||||
DisableBanner bool `yaml:"disable_banner"`
|
||||
Debug bool
|
||||
JSON bool
|
||||
DisableBanner bool `yaml:"disable_banner"`
|
||||
ErrorRetentionPeriod time.Duration `yaml:"error_retention_period"`
|
||||
}
|
||||
|
||||
// FirehoseConfig defines configs for the AWS Firehose logging plugin
|
||||
@ -430,6 +431,8 @@ func (man Manager) addConfigs() {
|
||||
"Log in JSON format")
|
||||
man.addConfigBool("logging.disable_banner", false,
|
||||
"Disable startup banner")
|
||||
man.addConfigDuration("logging.error_retention_period", 24*time.Hour,
|
||||
"Amount of time to keep errors")
|
||||
|
||||
// Firehose
|
||||
man.addConfigString("firehose.region", "", "AWS Region to use")
|
||||
@ -614,9 +617,10 @@ func (man Manager) LoadConfig() FleetConfig {
|
||||
AsyncHostRedisScanKeysCount: man.getConfigInt("osquery.async_host_redis_scan_keys_count"),
|
||||
},
|
||||
Logging: LoggingConfig{
|
||||
Debug: man.getConfigBool("logging.debug"),
|
||||
JSON: man.getConfigBool("logging.json"),
|
||||
DisableBanner: man.getConfigBool("logging.disable_banner"),
|
||||
Debug: man.getConfigBool("logging.debug"),
|
||||
JSON: man.getConfigBool("logging.json"),
|
||||
DisableBanner: man.getConfigBool("logging.disable_banner"),
|
||||
ErrorRetentionPeriod: man.getConfigDuration("logging.error_retention_period"),
|
||||
},
|
||||
Firehose: FirehoseConfig{
|
||||
Region: man.getConfigString("firehose.region"),
|
||||
|
79
server/contexts/ctxerr/ctxerr.go
Normal file
79
server/contexts/ctxerr/ctxerr.go
Normal file
@ -0,0 +1,79 @@
|
||||
// Package ctxerr provides functions to wrap errors with annotations and
|
||||
// stack traces, and to handle those errors such that unique instances of
|
||||
// those errors will be stored for an amount of time so that it can be
|
||||
// used to troubleshoot issues.
|
||||
//
|
||||
// Typical uses of this package should be to call New or Wrap[f] as close as
|
||||
// possible from where the error is encountered (or where it needs to be
|
||||
// created for New), and then to call Handle with the error only once, after it
|
||||
// bubbled back to the top of the call stack (e.g. in the HTTP handler, or in
|
||||
// the CLI command, etc.). It is fine to wrap the error with more annotations
|
||||
// along the way, by calling Wrap[f].
|
||||
package ctxerr
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/errorstore"
|
||||
"github.com/rotisserie/eris"
|
||||
)
|
||||
|
||||
type key int
|
||||
|
||||
const errHandlerKey key = 0
|
||||
|
||||
// NewContext returns a context derived from ctx that contains the provided
|
||||
// error handler.
|
||||
func NewContext(ctx context.Context, eh *errorstore.Handler) context.Context {
|
||||
return context.WithValue(ctx, errHandlerKey, eh)
|
||||
}
|
||||
|
||||
func fromContext(ctx context.Context) *errorstore.Handler {
|
||||
v, _ := ctx.Value(errHandlerKey).(*errorstore.Handler)
|
||||
return v
|
||||
}
|
||||
|
||||
// New creates a new error with the provided error message.
|
||||
func New(ctx context.Context, errMsg string) error {
|
||||
return ensureCommonMetadata(ctx, errors.New(errMsg))
|
||||
}
|
||||
|
||||
// Wrap annotates err with the provided message.
|
||||
func Wrap(ctx context.Context, err error, msg string) error {
|
||||
err = ensureCommonMetadata(ctx, err)
|
||||
// do not wrap with eris.Wrap, as we want only the root error closest to the
|
||||
// actual error condition to capture the stack trace, others just wrap to
|
||||
// annotate the error.
|
||||
return fmt.Errorf("%s: %w", msg, err)
|
||||
}
|
||||
|
||||
// Wrapf annotates err with the provided formatted message.
|
||||
func Wrapf(ctx context.Context, err error, fmsg string, args ...interface{}) error {
|
||||
err = ensureCommonMetadata(ctx, err)
|
||||
// do not wrap with eris.Wrap, as we want only the root error closest to the
|
||||
// actual error condition to capture the stack trace, others just wrap to
|
||||
// annotate the error.
|
||||
return fmt.Errorf("%s: %w", fmt.Sprintf(fmsg, args...), err)
|
||||
}
|
||||
|
||||
// Handle handles err by passing it to the registered error handler,
|
||||
// deduplicating it and storing it for a configured duration.
|
||||
func Handle(ctx context.Context, err error) error {
|
||||
if eh := fromContext(ctx); eh != nil {
|
||||
return eh.Store(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func ensureCommonMetadata(ctx context.Context, err error) error {
|
||||
var sf interface{ StackFrames() []uintptr }
|
||||
if err != nil && !errors.As(err, &sf) {
|
||||
// no eris error nowhere in the chain, add the common metadata with the stack trace
|
||||
// TODO: more metadata from ctx: user, host, etc.
|
||||
err = eris.Wrapf(err, "timestamp: %s", time.Now().Format(time.RFC3339))
|
||||
}
|
||||
return err
|
||||
}
|
@ -12,6 +12,29 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type nopRedis struct{}
|
||||
|
||||
func (nopRedis) Get() redigo.Conn { return nopConn{} }
|
||||
|
||||
func (nopRedis) Close() error { return nil }
|
||||
|
||||
func (nopRedis) Stats() map[string]redigo.PoolStats { return nil }
|
||||
|
||||
func (nopRedis) Mode() fleet.RedisMode { return fleet.RedisStandalone }
|
||||
|
||||
type nopConn struct{}
|
||||
|
||||
func (nopConn) Close() error { return nil }
|
||||
func (nopConn) Err() error { return nil }
|
||||
func (nopConn) Do(_ string, _ ...interface{}) (interface{}, error) { return nil, nil }
|
||||
func (nopConn) Send(_ string, _ ...interface{}) error { return nil }
|
||||
func (nopConn) Flush() error { return nil }
|
||||
func (nopConn) Receive() (interface{}, error) { return nil, nil }
|
||||
|
||||
func NopRedis() fleet.RedisPool {
|
||||
return nopRedis{}
|
||||
}
|
||||
|
||||
func SetupRedis(tb testing.TB, cluster, redir, readReplica bool) fleet.RedisPool {
|
||||
if _, ok := os.LookupEnv("REDIS_TEST"); !ok {
|
||||
tb.Skip("set REDIS_TEST environment variable to run redis-based tests")
|
||||
|
300
server/errorstore/errors.go
Normal file
300
server/errorstore/errors.go
Normal file
@ -0,0 +1,300 @@
|
||||
// Package errorstore implements a Handler type that can be used to store
|
||||
// deduplicated instances of errors in an ephemeral storage, and provides a
|
||||
// Flush method to retrieve the list of errors while clearing it at the same
|
||||
// time. It provides a foundation to facilitate troubleshooting and building
|
||||
// tooling for support while trying to keep the impact of storage to a minimum
|
||||
// (ephemeral data, deduplication, flush on read).
|
||||
package errorstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/datastore/redis"
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/log/level"
|
||||
redigo "github.com/gomodule/redigo/redis"
|
||||
"github.com/rotisserie/eris"
|
||||
)
|
||||
|
||||
// Handler defines an error handler. Call Handler.Store to handle an error, and
|
||||
// Handler.Flush to retrieve all stored errors and clear them from the store.
|
||||
// It is safe to call those methods concurrently.
|
||||
type Handler struct {
|
||||
pool fleet.RedisPool
|
||||
logger kitlog.Logger
|
||||
ttl time.Duration
|
||||
running int32 // accessed atomically
|
||||
errCh chan error
|
||||
|
||||
// for tests
|
||||
syncStore bool // if true, store error synchronously
|
||||
testOnStore func(error) // if set, called each time an error is stored
|
||||
testOnStart func() // if set, called once the handler is running
|
||||
}
|
||||
|
||||
// NewHandler creates an error handler using the provided pool and logger,
|
||||
// storing unique instances of errors in Redis using the pool. It stops storing
|
||||
// errors when ctx is cancelled. Errors are kept for the duration of ttl.
|
||||
func NewHandler(ctx context.Context, pool fleet.RedisPool, logger kitlog.Logger, ttl time.Duration) *Handler {
|
||||
eh := &Handler{
|
||||
pool: pool,
|
||||
logger: logger,
|
||||
ttl: ttl,
|
||||
}
|
||||
runHandler(ctx, eh)
|
||||
return eh
|
||||
}
|
||||
|
||||
func newTestHandler(ctx context.Context, pool fleet.RedisPool, logger kitlog.Logger, ttl time.Duration, onStart func(), onStore func(error)) *Handler {
|
||||
eh := &Handler{
|
||||
pool: pool,
|
||||
logger: logger,
|
||||
ttl: ttl,
|
||||
|
||||
syncStore: true,
|
||||
testOnStart: onStart,
|
||||
testOnStore: onStore,
|
||||
}
|
||||
runHandler(ctx, eh)
|
||||
return eh
|
||||
}
|
||||
|
||||
func runHandler(ctx context.Context, eh *Handler) {
|
||||
ch := make(chan error, 1)
|
||||
eh.errCh = ch
|
||||
go eh.handleErrors(ctx)
|
||||
}
|
||||
|
||||
// Flush retrieves all stored errors from Redis and returns them as a slice of
|
||||
// JSON-encoded strings. It is a destructive read - the errors are removed from
|
||||
// Redis on return.
|
||||
func (h *Handler) Flush() ([]string, error) {
|
||||
errorKeys, err := redis.ScanKeys(h.pool, "error:*", 100)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
keysBySlot := redis.SplitKeysBySlot(h.pool, errorKeys...)
|
||||
var errors []string
|
||||
for _, qkeys := range keysBySlot {
|
||||
if len(qkeys) > 0 {
|
||||
gotErrors, err := h.collectBatchErrors(qkeys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
errors = append(errors, gotErrors...)
|
||||
}
|
||||
}
|
||||
return errors, nil
|
||||
}
|
||||
|
||||
func (h *Handler) collectBatchErrors(errorKeys []string) ([]string, error) {
|
||||
conn := redis.ConfigureDoer(h.pool, h.pool.Get())
|
||||
defer conn.Close()
|
||||
|
||||
var args redigo.Args
|
||||
args = args.AddFlat(errorKeys)
|
||||
errorList, err := redigo.Strings(conn.Do("MGET", args...))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, err := conn.Do("DEL", args...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return errorList, nil
|
||||
}
|
||||
|
||||
func sha256b64(s string) string {
|
||||
src := sha256.Sum256([]byte(s))
|
||||
return base64.URLEncoding.EncodeToString(src[:])
|
||||
}
|
||||
|
||||
func hashError(err error) string {
|
||||
// Ok so the hashing process is as follows:
|
||||
//
|
||||
// a) we want to hash the type and error message of the *root* error (the
|
||||
// last unwrapped error) so that if by mistake the same error is sent to
|
||||
// Handler.Handle in multiple places in the code, after being wrapped
|
||||
// differently any number of times, it still hashes to the same value
|
||||
// (because the root is the same). The type is not sufficient because some
|
||||
// errors have the same type but variable parts (e.g. a struct value that
|
||||
// implements the error interface and the message contains a file name that
|
||||
// caused the error and that is stored in a struct field).
|
||||
//
|
||||
// b) in addition that a), we also want to hash all locations in the stack
|
||||
// trace, so that the same error type and message (say, sql.ErrNoRows or
|
||||
// io.UnexpectedEOF) caused at two different places in the code are not
|
||||
// considered the same error. To get that location, the error must be wrapped
|
||||
// at some point by eris.Wrap (or must be a user-created error via eris.New).
|
||||
// We cannot hash only the leaf frame in the stack trace as that would all be
|
||||
// ctxerr.New or ctxerr.Wrap (i.e. whatever common helper function used to
|
||||
// create the eris error).
|
||||
//
|
||||
// c) if we call eris.Unpack on an error that is not *directly* an "eris"
|
||||
// error (i.e. an error value returned from eris.Wrap or eris.New), then
|
||||
// eris.Unpack will not return any location information. So if for example
|
||||
// the error was wrapped with the pkg/errors.Wrap or the stdlib's fmt.Errorf
|
||||
// calls at some point, eris.Unpack will not give us any location info. To
|
||||
// get around this, we look for an eris-created error in the wrapped chain,
|
||||
// and only give up hashing the location if we can't find any.
|
||||
//
|
||||
// d) there is no easy way to identify an "eris" error (i.e. we cannot simply
|
||||
// use errors.As(err, <some Eris error type>)) as eris does not export its
|
||||
// error type, and it actually uses 2 different internal error types. To get
|
||||
// around this, we look for an error that has the `StackFrames() []uintptr`
|
||||
// method, as both of eris internal errors implement that (see
|
||||
// https://github.com/rotisserie/eris/blob/v0.5.1/eris.go#L182).
|
||||
|
||||
var sf interface{ StackFrames() []uintptr }
|
||||
if errors.As(err, &sf) {
|
||||
err = sf.(error)
|
||||
}
|
||||
|
||||
unpackedErr := eris.Unpack(err)
|
||||
|
||||
if unpackedErr.ErrExternal == nil &&
|
||||
len(unpackedErr.ErrRoot.Stack) == 0 &&
|
||||
len(unpackedErr.ErrChain) == 0 {
|
||||
return sha256b64(unpackedErr.ErrRoot.Msg)
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
if unpackedErr.ErrExternal != nil {
|
||||
root := eris.Cause(unpackedErr.ErrExternal)
|
||||
fmt.Fprintf(&sb, "%T\n%s\n", root, root.Error())
|
||||
}
|
||||
|
||||
if len(unpackedErr.ErrRoot.Stack) > 0 {
|
||||
for _, frame := range unpackedErr.ErrRoot.Stack {
|
||||
fmt.Fprintf(&sb, "%s:%d\n", frame.File, frame.Line)
|
||||
}
|
||||
} else if len(unpackedErr.ErrChain) > 0 {
|
||||
lastFrame := unpackedErr.ErrChain[0].Frame
|
||||
fmt.Fprintf(&sb, "%s:%d", lastFrame.File, lastFrame.Line)
|
||||
}
|
||||
return sha256b64(sb.String())
|
||||
}
|
||||
|
||||
func hashAndMarshalError(externalErr error) (errHash string, errAsJson string, err error) {
|
||||
m := eris.ToJSON(externalErr, true)
|
||||
bytes, err := json.MarshalIndent(m, "", " ")
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
return hashError(externalErr), string(bytes), nil
|
||||
}
|
||||
|
||||
func (h *Handler) handleErrors(ctx context.Context) {
|
||||
atomic.StoreInt32(&h.running, 1)
|
||||
defer func() {
|
||||
atomic.StoreInt32(&h.running, 0)
|
||||
}()
|
||||
|
||||
if h.testOnStart != nil {
|
||||
h.testOnStart()
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case err := <-h.errCh:
|
||||
h.storeError(ctx, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) storeError(ctx context.Context, err error) {
|
||||
errorHash, errorJson, err := hashAndMarshalError(err)
|
||||
if err != nil {
|
||||
level.Error(h.logger).Log("err", err, "msg", "hashErr failed")
|
||||
if h.testOnStore != nil {
|
||||
h.testOnStore(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
jsonKey := fmt.Sprintf("error:%s:json", errorHash)
|
||||
|
||||
conn := redis.ConfigureDoer(h.pool, h.pool.Get())
|
||||
defer conn.Close()
|
||||
|
||||
secs := int(h.ttl.Seconds())
|
||||
if secs <= 0 {
|
||||
secs = 1 // SET EX fails if ttl is <= 0
|
||||
}
|
||||
if _, err := conn.Do("SET", jsonKey, errorJson, "EX", secs); err != nil {
|
||||
level.Error(h.logger).Log("err", err, "msg", "redis SET failed")
|
||||
if h.testOnStore != nil {
|
||||
h.testOnStore(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if h.testOnStore != nil {
|
||||
h.testOnStore(nil)
|
||||
}
|
||||
}
|
||||
|
||||
// Store handles the provided error by storing it into Redis if the handler is
|
||||
// still running. In any case, it always returns the error as provided.
|
||||
//
|
||||
// It waits for a predefined period of time to try to store the error but does
|
||||
// so in a goroutine so the call returns immediately.
|
||||
func (h *Handler) Store(err error) error {
|
||||
exec := func() {
|
||||
if atomic.LoadInt32(&h.running) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
timer := time.NewTimer(2 * time.Second)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case h.errCh <- err:
|
||||
case <-timer.C:
|
||||
}
|
||||
}
|
||||
|
||||
if h.syncStore {
|
||||
exec()
|
||||
} else {
|
||||
go exec()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ServeHTTP implements an http.Handler that flushes the errors stored
|
||||
// by the Handler and returns them in the response as JSON.
|
||||
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
errors, err := h.Flush()
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// each string returned by eh.Flush is already JSON-encoded, so to prevent
|
||||
// double-marshaling while still marshaling the list of errors as a JSON
|
||||
// array, treat them as raw json messages.
|
||||
raw := make([]json.RawMessage, len(errors))
|
||||
for i, s := range errors {
|
||||
raw[i] = json.RawMessage(s)
|
||||
}
|
||||
|
||||
bytes, err := json.Marshal(raw)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.Write(bytes)
|
||||
}
|
346
server/errorstore/errors_test.go
Normal file
346
server/errorstore/errors_test.go
Normal file
@ -0,0 +1,346 @@
|
||||
package errorstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/datastore/redis/redistest"
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
pkgErrors "github.com/pkg/errors"
|
||||
"github.com/rotisserie/eris"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func alwaysErrors() error { return pkgErrors.New("always errors") }
|
||||
|
||||
func alwaysCallsAlwaysErrors() error { return alwaysErrors() }
|
||||
|
||||
func alwaysErisErrors() error { return eris.New("always eris errors") }
|
||||
|
||||
func alwaysNewError(eh *Handler) error {
|
||||
return eh.Store(eris.New("always new errors"))
|
||||
}
|
||||
|
||||
func alwaysNewErrorTwo(eh *Handler) error {
|
||||
return eh.Store(eris.New("always new errors two"))
|
||||
}
|
||||
|
||||
func alwaysWrappedErr() error { return eris.Wrap(io.EOF, "always EOF") }
|
||||
|
||||
func TestHashErr(t *testing.T) {
|
||||
t.Run("without stack trace, same error is same hash", func(t *testing.T) {
|
||||
err1 := alwaysErrors()
|
||||
err2 := alwaysCallsAlwaysErrors()
|
||||
assert.Equal(t, hashError(err1), hashError(err2))
|
||||
})
|
||||
|
||||
t.Run("different location, same error is different hash", func(t *testing.T) {
|
||||
err1 := alwaysErisErrors()
|
||||
err2 := alwaysErisErrors()
|
||||
assert.NotEqual(t, hashError(err1), hashError(err2))
|
||||
})
|
||||
|
||||
t.Run("same error, wrapped, same hash", func(t *testing.T) {
|
||||
eris1 := alwaysErisErrors()
|
||||
|
||||
w1, w2 := fmt.Errorf("wrap: %w", eris1), pkgErrors.Wrap(eris1, "wrap")
|
||||
h1, h2 := hashError(w1), hashError(w2)
|
||||
assert.Equal(t, h1, h2)
|
||||
})
|
||||
|
||||
t.Run("generates json", func(t *testing.T) {
|
||||
var m map[string]interface{}
|
||||
|
||||
generatedErr := pkgErrors.New("some err")
|
||||
res, jsonBytes, err := hashAndMarshalError(generatedErr)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "mWoqz7iS1IPOZXGhpzHLl_DVQOyemWxCmvkpLz8uEZk=", res)
|
||||
assert.True(t, strings.HasPrefix(jsonBytes, `{
|
||||
"external": "some err`))
|
||||
require.NoError(t, json.Unmarshal([]byte(jsonBytes), &m))
|
||||
|
||||
generatedErr2 := pkgErrors.New("some other err")
|
||||
res, jsonBytes, err = hashAndMarshalError(generatedErr2)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "8AXruOzQmQLF4H3SrzLxXSwFQgZ8DcbkoF1owo0RhTs=", res)
|
||||
assert.True(t, strings.HasPrefix(jsonBytes, `{
|
||||
"external": "some other err`))
|
||||
require.NoError(t, json.Unmarshal([]byte(jsonBytes), &m))
|
||||
})
|
||||
}
|
||||
|
||||
func TestHashErrEris(t *testing.T) {
|
||||
t.Run("Marshal", func(t *testing.T) {
|
||||
wd, err := os.Getwd()
|
||||
require.NoError(t, err)
|
||||
|
||||
generatedErr := eris.New("some err")
|
||||
res, jsonBytes, err := hashAndMarshalError(generatedErr)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, res)
|
||||
|
||||
assert.Regexp(t, regexp.MustCompile(fmt.Sprintf(`\{
|
||||
"root": \{
|
||||
"message": "some err",
|
||||
"stack": \[
|
||||
"errorstore.TestHashErrEris\.func\d+:%s/errors_test\.go:\d+"
|
||||
\]
|
||||
\}
|
||||
\}`, regexp.QuoteMeta(wd))), jsonBytes)
|
||||
})
|
||||
|
||||
t.Run("HashWrapped", func(t *testing.T) {
|
||||
// hashing an eris error that wraps a root error hashes to the same
|
||||
// value if it is from the same location, even if wrapped differently
|
||||
// afterwards.
|
||||
err := alwaysWrappedErr()
|
||||
werr1, werr2 := pkgErrors.Wrap(err, "wrap pkg"), fmt.Errorf("wrap fmt: %w", err)
|
||||
wantHash := hashError(err)
|
||||
h1, h2 := hashError(werr1), hashError(werr2)
|
||||
assert.Equal(t, wantHash, h1)
|
||||
assert.Equal(t, wantHash, h2)
|
||||
})
|
||||
|
||||
t.Run("HashNew", func(t *testing.T) {
|
||||
err := alwaysErisErrors()
|
||||
werr := eris.Wrap(err, "wrap eris")
|
||||
werr1, werr2 := pkgErrors.Wrap(err, "wrap pkg"), fmt.Errorf("wrap fmt: %w", err)
|
||||
wantHash := hashError(err)
|
||||
h0, h1, h2 := hashError(werr), hashError(werr1), hashError(werr2)
|
||||
assert.Equal(t, wantHash, h0)
|
||||
assert.Equal(t, wantHash, h1)
|
||||
assert.Equal(t, wantHash, h2)
|
||||
})
|
||||
|
||||
t.Run("HashSameRootDifferentLocation", func(t *testing.T) {
|
||||
err1 := alwaysWrappedErr()
|
||||
err2 := func() error { return eris.Wrap(io.EOF, "always EOF") }()
|
||||
err3 := func() error { return eris.Wrap(io.EOF, "always EOF") }()
|
||||
h1, h2, h3 := hashError(err1), hashError(err2), hashError(err3)
|
||||
assert.NotEqual(t, h1, h2)
|
||||
assert.NotEqual(t, h1, h3)
|
||||
assert.NotEqual(t, h2, h3)
|
||||
})
|
||||
}
|
||||
|
||||
func TestUnwrapAll(t *testing.T) {
|
||||
root := sql.ErrNoRows
|
||||
werr := pkgErrors.Wrap(root, "pkg wrap")
|
||||
gerr := fmt.Errorf("fmt wrap: %w", werr)
|
||||
eerr := eris.Wrap(gerr, "eris wrap")
|
||||
eerr2 := eris.Wrap(eerr, "eris wrap 2")
|
||||
|
||||
uw := eris.Cause(eerr2)
|
||||
assert.Equal(t, uw, root)
|
||||
assert.Nil(t, eris.Cause(nil))
|
||||
}
|
||||
|
||||
func TestErrorHandler(t *testing.T) {
|
||||
t.Run("works if the error handler is down", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel() // cancel immediately
|
||||
|
||||
eh := newTestHandler(ctx, nil, kitlog.NewNopLogger(), time.Minute, nil, nil)
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
eh.Store(pkgErrors.New("test"))
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
// should not even block in the call to Store as there is no handler running
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-ticker.C:
|
||||
t.FailNow()
|
||||
}
|
||||
})
|
||||
|
||||
wd, err := os.Getwd()
|
||||
require.NoError(t, err)
|
||||
wd = regexp.QuoteMeta(wd)
|
||||
|
||||
t.Run("standalone", func(t *testing.T) {
|
||||
pool := redistest.SetupRedis(t, false, false, false)
|
||||
t.Run("collects errors", func(t *testing.T) { testErrorHandlerCollectsErrors(t, pool, wd) })
|
||||
t.Run("collects different errors", func(t *testing.T) { testErrorHandlerCollectsDifferentErrors(t, pool, wd) })
|
||||
})
|
||||
|
||||
t.Run("cluster", func(t *testing.T) {
|
||||
pool := redistest.SetupRedis(t, true, true, false)
|
||||
t.Run("collects errors", func(t *testing.T) { testErrorHandlerCollectsErrors(t, pool, wd) })
|
||||
t.Run("collects different errors", func(t *testing.T) { testErrorHandlerCollectsDifferentErrors(t, pool, wd) })
|
||||
})
|
||||
}
|
||||
|
||||
func testErrorHandlerCollectsErrors(t *testing.T, pool fleet.RedisPool, wd string) {
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
|
||||
chGo, chDone := make(chan struct{}), make(chan struct{})
|
||||
|
||||
var storeCalls int32 = 3
|
||||
testOnStart := func() {
|
||||
close(chGo)
|
||||
}
|
||||
testOnStore := func(err error) {
|
||||
require.NoError(t, err)
|
||||
if atomic.AddInt32(&storeCalls, -1) == 0 {
|
||||
close(chDone)
|
||||
}
|
||||
}
|
||||
eh := newTestHandler(ctx, pool, kitlog.NewNopLogger(), time.Minute, testOnStart, testOnStore)
|
||||
|
||||
<-chGo
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
alwaysNewError(eh)
|
||||
}
|
||||
|
||||
<-chDone
|
||||
|
||||
errors, err := eh.Flush()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, errors, 1)
|
||||
|
||||
assert.Regexp(t, regexp.MustCompile(fmt.Sprintf(`\{
|
||||
"root": \{
|
||||
"message": "always new errors",
|
||||
"stack": \[
|
||||
"errorstore\.TestErrorHandler\.func\d\.\d+:%s/errors_test\.go:\d+",
|
||||
"errorstore\.testErrorHandlerCollectsErrors:%[1]s/errors_test\.go:\d+",
|
||||
"errorstore\.alwaysNewError:%s/errors_test\.go:\d+"
|
||||
\]
|
||||
\}`, wd, wd)), errors[0])
|
||||
|
||||
// and then errors are gone
|
||||
errors, err = eh.Flush()
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, errors, 0)
|
||||
}
|
||||
|
||||
func testErrorHandlerCollectsDifferentErrors(t *testing.T, pool fleet.RedisPool, wd string) {
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
|
||||
var storeCalls int32 = 5
|
||||
|
||||
chGo, chDone := make(chan struct{}), make(chan struct{})
|
||||
|
||||
testOnStart := func() {
|
||||
close(chGo)
|
||||
}
|
||||
testOnStore := func(err error) {
|
||||
require.NoError(t, err)
|
||||
if atomic.AddInt32(&storeCalls, -1) == 0 {
|
||||
close(chDone)
|
||||
}
|
||||
}
|
||||
|
||||
eh := newTestHandler(ctx, pool, kitlog.NewNopLogger(), time.Minute, testOnStart, testOnStore)
|
||||
|
||||
<-chGo
|
||||
|
||||
// those two errors are different because from a different strack trace
|
||||
// (different line)
|
||||
alwaysNewError(eh)
|
||||
alwaysNewError(eh)
|
||||
|
||||
// while those two are the same, only one gets store
|
||||
for i := 0; i < 2; i++ {
|
||||
alwaysNewError(eh)
|
||||
}
|
||||
|
||||
alwaysNewErrorTwo(eh)
|
||||
|
||||
<-chDone
|
||||
|
||||
errors, err := eh.Flush()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, errors, 4)
|
||||
|
||||
// order is not guaranteed by scan keys
|
||||
for _, jsonErr := range errors {
|
||||
if strings.Contains(jsonErr, "new errors two") {
|
||||
assert.Regexp(t, regexp.MustCompile(fmt.Sprintf(`\{
|
||||
"root": \{
|
||||
"message": "always new errors two",
|
||||
"stack": \[
|
||||
"errorstore\.TestErrorHandler\.func\d\.\d+:%s/errors_test\.go:\d+",
|
||||
"errorstore\.testErrorHandlerCollectsDifferentErrors:%[1]s/errors_test\.go:\d+",
|
||||
"errorstore\.alwaysNewErrorTwo:%[1]s/errors_test\.go:\d+"
|
||||
\]
|
||||
\}`, wd)), jsonErr)
|
||||
|
||||
} else {
|
||||
assert.Regexp(t, regexp.MustCompile(fmt.Sprintf(`\{
|
||||
"root": \{
|
||||
"message": "always new errors",
|
||||
"stack": \[
|
||||
"errorstore\.TestErrorHandler\.func\d\.\d+:%s/errors_test\.go:\d+",
|
||||
"errorstore\.testErrorHandlerCollectsDifferentErrors:%[1]s/errors_test\.go:\d+",
|
||||
"errorstore\.alwaysNewError:%[1]s/errors_test\.go:\d+"
|
||||
\]
|
||||
\}`, wd)), jsonErr)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestHttpHandler(t *testing.T) {
|
||||
pool := redistest.SetupRedis(t, false, false, false)
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
|
||||
var storeCalls int32 = 2
|
||||
|
||||
chGo, chDone := make(chan struct{}), make(chan struct{})
|
||||
testOnStart := func() {
|
||||
close(chGo)
|
||||
}
|
||||
testOnStore := func(err error) {
|
||||
require.NoError(t, err)
|
||||
if atomic.AddInt32(&storeCalls, -1) == 0 {
|
||||
close(chDone)
|
||||
}
|
||||
}
|
||||
|
||||
eh := newTestHandler(ctx, pool, kitlog.NewNopLogger(), time.Minute, testOnStart, testOnStore)
|
||||
|
||||
<-chGo
|
||||
// store two errors
|
||||
alwaysNewError(eh)
|
||||
alwaysNewErrorTwo(eh)
|
||||
<-chDone
|
||||
|
||||
req := httptest.NewRequest("GET", "/", nil)
|
||||
res := httptest.NewRecorder()
|
||||
eh.ServeHTTP(res, req)
|
||||
|
||||
require.Equal(t, res.Code, 200)
|
||||
var errs []struct {
|
||||
Root struct {
|
||||
Message string
|
||||
}
|
||||
Wrap []struct {
|
||||
Message string
|
||||
}
|
||||
}
|
||||
require.NoError(t, json.Unmarshal(res.Body.Bytes(), &errs))
|
||||
require.Len(t, errs, 2)
|
||||
require.NotEmpty(t, errs[0].Root.Message)
|
||||
require.NotEmpty(t, errs[1].Root.Message)
|
||||
}
|
@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/fleetdm/fleet/v4/server/config"
|
||||
"github.com/fleetdm/fleet/v4/server/contexts/token"
|
||||
"github.com/fleetdm/fleet/v4/server/errorstore"
|
||||
"github.com/fleetdm/fleet/v4/server/fleet"
|
||||
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
@ -42,12 +43,13 @@ func (m *debugAuthenticationMiddleware) Middleware(next http.Handler) http.Handl
|
||||
}
|
||||
|
||||
// MakeDebugHandler creates an HTTP handler for the Fleet debug endpoints.
|
||||
func MakeDebugHandler(svc fleet.Service, config config.FleetConfig, logger kitlog.Logger) http.Handler {
|
||||
func MakeDebugHandler(svc fleet.Service, config config.FleetConfig, logger kitlog.Logger, eh *errorstore.Handler) http.Handler {
|
||||
r := mux.NewRouter()
|
||||
r.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
||||
r.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
||||
r.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
||||
r.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
||||
r.Handle("/debug/errors", eh)
|
||||
r.PathPrefix("/debug/pprof/").HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
pprof.Index(rw, req)
|
||||
})
|
||||
|
@ -39,7 +39,7 @@ var testConfig = config.FleetConfig{
|
||||
}
|
||||
|
||||
func TestDebugHandlerAuthenticationTokenMissing(t *testing.T) {
|
||||
handler := MakeDebugHandler(&mockService{}, testConfig, nil)
|
||||
handler := MakeDebugHandler(&mockService{}, testConfig, nil, nil)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "https://fleetdm.com/debug/pprof/profile", nil)
|
||||
res := httptest.NewRecorder()
|
||||
@ -56,7 +56,7 @@ func TestDebugHandlerAuthenticationSessionInvalid(t *testing.T) {
|
||||
"fake_session_key",
|
||||
).Return(nil, errors.New("invalid session"))
|
||||
|
||||
handler := MakeDebugHandler(svc, testConfig, nil)
|
||||
handler := MakeDebugHandler(svc, testConfig, nil, nil)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "https://fleetdm.com/debug/pprof/profile", nil)
|
||||
req.Header.Add("Authorization", "BEARER fake_session_key")
|
||||
@ -79,7 +79,7 @@ func TestDebugHandlerAuthenticationSuccess(t *testing.T) {
|
||||
uint(42),
|
||||
).Return(&fleet.User{}, nil)
|
||||
|
||||
handler := MakeDebugHandler(svc, testConfig, nil)
|
||||
handler := MakeDebugHandler(svc, testConfig, nil, nil)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "https://fleetdm.com/debug/pprof/cmdline", nil)
|
||||
req.Header.Add("Authorization", "BEARER fake_session_key")
|
||||
|
Loading…
Reference in New Issue
Block a user