fleet/server/pubsub/inmem_query_results.go

64 lines
1.4 KiB
Go
Raw Normal View History

package pubsub
import (
"context"
"strconv"
"sync"
2021-06-26 04:46:51 +00:00
"github.com/fleetdm/fleet/v4/server/fleet"
)
type inmemQueryResults struct {
resultChannels map[uint]chan interface{}
channelMutex sync.Mutex
}
var _ fleet.QueryResultStore = &inmemQueryResults{}
// NewInmemQueryResults initializes a new in-memory implementation of the
// QueryResultStore interface.
func NewInmemQueryResults() *inmemQueryResults {
return &inmemQueryResults{resultChannels: map[uint]chan interface{}{}}
}
func (im *inmemQueryResults) getChannel(id uint) chan interface{} {
im.channelMutex.Lock()
defer im.channelMutex.Unlock()
channel, ok := im.resultChannels[id]
if !ok {
channel = make(chan interface{})
im.resultChannels[id] = channel
}
return channel
}
func (im *inmemQueryResults) WriteResult(result fleet.DistributedQueryResult) error {
Fix data race in inmem query results test (#1369) Missed locking caused a race condition detected with the --race flag: ``` ================== WARNING: DATA RACE Read at 0x00c0004b2cf0 by goroutine 67: runtime.mapaccess2_fast64() /usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:52 +0x0 github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0x84 github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1499 +0x61c testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 Previous write at 0x00c0004b2cf0 by goroutine 104: runtime.mapassign_fast64() /usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:92 +0x0 github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1a4 github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError.func1() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1493 +0x117 Goroutine 67 (running) created at: testing.(*T).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7 testing.runTests.func1() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 testing.runTests() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612 testing.(*M).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3 main.main() _testmain.go:303 +0x236 Goroutine 104 (running) created at: github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1492 +0x558 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 ================== ================== WARNING: DATA RACE Read at 0x00c0000ff2d8 by goroutine 67: github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0xa4 github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1499 +0x61c testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 Previous write at 0x00c0000ff2d8 by goroutine 104: github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1bc github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError.func1() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1493 +0x117 Goroutine 67 (running) created at: testing.(*T).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7 testing.runTests.func1() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 testing.runTests() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612 testing.(*M).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3 main.main() _testmain.go:303 +0x236 Goroutine 104 (running) created at: github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1492 +0x558 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 ================== --- FAIL: TestIngestDistributedQueryRecordCompletionError (0.01s) service_osquery_test.go:1502: PASS: QueryCompletedByHost(string,uint) testing.go:1092: race detected during execution of test ================== WARNING: DATA RACE Read at 0x00c0000f8570 by goroutine 70: runtime.mapaccess2_fast64() /usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:52 +0x0 github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0x84 github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1530 +0x5bc testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 Previous write at 0x00c0000f8570 by goroutine 71: runtime.mapassign_fast64() /usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:92 +0x0 github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1a4 github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery.func1() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1524 +0x117 Goroutine 70 (running) created at: testing.(*T).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7 testing.runTests.func1() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 testing.runTests() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612 testing.(*M).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3 main.main() _testmain.go:303 +0x236 Goroutine 71 (running) created at: github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1523 +0x4f4 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 ================== ================== WARNING: DATA RACE Read at 0x00c000c480d8 by goroutine 70: github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0xa4 github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1530 +0x5bc testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 Previous write at 0x00c000c480d8 by goroutine 71: github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1bc github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery.func1() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1524 +0x117 Goroutine 70 (running) created at: testing.(*T).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7 testing.runTests.func1() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 testing.runTests() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612 testing.(*M).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3 main.main() _testmain.go:303 +0x236 Goroutine 71 (running) created at: github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1523 +0x4f4 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 ================== --- FAIL: TestIngestDistributedQuery (0.01s) service_osquery_test.go:1532: PASS: QueryCompletedByHost(string,uint) testing.go:1092: race detected during execution of test FAIL FAIL github.com/fleetdm/fleet/v4/server/service 42.743s ```
2021-07-13 19:27:41 +00:00
channel := im.getChannel(result.DistributedQueryCampaignID)
select {
case channel <- result:
// intentionally do nothing
default:
return noSubscriberError{strconv.Itoa(int(result.DistributedQueryCampaignID))}
}
return nil
}
func (im *inmemQueryResults) ReadChannel(ctx context.Context, campaign fleet.DistributedQueryCampaign) (<-chan interface{}, error) {
channel := im.getChannel(campaign.ID)
go func() {
<-ctx.Done()
close(channel)
im.channelMutex.Lock()
delete(im.resultChannels, campaign.ID)
im.channelMutex.Unlock()
}()
return channel, nil
}
func (im *inmemQueryResults) HealthCheck() error {
return nil
}