mirror of
https://github.com/empayre/fleet.git
synced 2024-11-06 17:05:18 +00:00
8723b83039
Missed locking caused a race condition detected with the --race flag: ``` ================== WARNING: DATA RACE Read at 0x00c0004b2cf0 by goroutine 67: runtime.mapaccess2_fast64() /usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:52 +0x0 github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0x84 github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1499 +0x61c testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 Previous write at 0x00c0004b2cf0 by goroutine 104: runtime.mapassign_fast64() /usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:92 +0x0 github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1a4 github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError.func1() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1493 +0x117 Goroutine 67 (running) created at: testing.(*T).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7 testing.runTests.func1() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 testing.runTests() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612 testing.(*M).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3 main.main() _testmain.go:303 +0x236 Goroutine 104 (running) created at: github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1492 +0x558 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 ================== ================== WARNING: DATA RACE Read at 0x00c0000ff2d8 by goroutine 67: github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0xa4 github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1499 +0x61c testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 Previous write at 0x00c0000ff2d8 by goroutine 104: github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1bc github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError.func1() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1493 +0x117 Goroutine 67 (running) created at: testing.(*T).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7 testing.runTests.func1() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 testing.runTests() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612 testing.(*M).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3 main.main() _testmain.go:303 +0x236 Goroutine 104 (running) created at: github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1492 +0x558 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 ================== --- FAIL: TestIngestDistributedQueryRecordCompletionError (0.01s) service_osquery_test.go:1502: PASS: QueryCompletedByHost(string,uint) testing.go:1092: race detected during execution of test ================== WARNING: DATA RACE Read at 0x00c0000f8570 by goroutine 70: runtime.mapaccess2_fast64() /usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:52 +0x0 github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0x84 github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1530 +0x5bc testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 Previous write at 0x00c0000f8570 by goroutine 71: runtime.mapassign_fast64() /usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:92 +0x0 github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1a4 github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery.func1() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1524 +0x117 Goroutine 70 (running) created at: testing.(*T).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7 testing.runTests.func1() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 testing.runTests() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612 testing.(*M).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3 main.main() _testmain.go:303 +0x236 Goroutine 71 (running) created at: github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1523 +0x4f4 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 ================== ================== WARNING: DATA RACE Read at 0x00c000c480d8 by goroutine 70: github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0xa4 github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1530 +0x5bc testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 Previous write at 0x00c000c480d8 by goroutine 71: github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1bc github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel() /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64 github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery.func1() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1524 +0x117 Goroutine 70 (running) created at: testing.(*T).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7 testing.runTests.func1() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 testing.runTests() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612 testing.(*M).Run() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3 main.main() _testmain.go:303 +0x236 Goroutine 71 (running) created at: github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery() /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1523 +0x4f4 testing.tRunner() /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202 ================== --- FAIL: TestIngestDistributedQuery (0.01s) service_osquery_test.go:1532: PASS: QueryCompletedByHost(string,uint) testing.go:1092: race detected during execution of test FAIL FAIL github.com/fleetdm/fleet/v4/server/service 42.743s ```
64 lines
1.4 KiB
Go
64 lines
1.4 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"sync"
|
|
|
|
"github.com/fleetdm/fleet/v4/server/fleet"
|
|
)
|
|
|
|
type inmemQueryResults struct {
|
|
resultChannels map[uint]chan interface{}
|
|
channelMutex sync.Mutex
|
|
}
|
|
|
|
var _ fleet.QueryResultStore = &inmemQueryResults{}
|
|
|
|
// NewInmemQueryResults initializes a new in-memory implementation of the
|
|
// QueryResultStore interface.
|
|
func NewInmemQueryResults() *inmemQueryResults {
|
|
return &inmemQueryResults{resultChannels: map[uint]chan interface{}{}}
|
|
}
|
|
|
|
func (im *inmemQueryResults) getChannel(id uint) chan interface{} {
|
|
im.channelMutex.Lock()
|
|
defer im.channelMutex.Unlock()
|
|
|
|
channel, ok := im.resultChannels[id]
|
|
if !ok {
|
|
channel = make(chan interface{})
|
|
im.resultChannels[id] = channel
|
|
}
|
|
return channel
|
|
}
|
|
|
|
func (im *inmemQueryResults) WriteResult(result fleet.DistributedQueryResult) error {
|
|
channel := im.getChannel(result.DistributedQueryCampaignID)
|
|
|
|
select {
|
|
case channel <- result:
|
|
// intentionally do nothing
|
|
default:
|
|
return noSubscriberError{strconv.Itoa(int(result.DistributedQueryCampaignID))}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (im *inmemQueryResults) ReadChannel(ctx context.Context, campaign fleet.DistributedQueryCampaign) (<-chan interface{}, error) {
|
|
channel := im.getChannel(campaign.ID)
|
|
go func() {
|
|
<-ctx.Done()
|
|
close(channel)
|
|
im.channelMutex.Lock()
|
|
delete(im.resultChannels, campaign.ID)
|
|
im.channelMutex.Unlock()
|
|
}()
|
|
return channel, nil
|
|
}
|
|
|
|
func (im *inmemQueryResults) HealthCheck() error {
|
|
return nil
|
|
}
|