THRIFT-5164: Add middleware framework for Go servers

This commit adds a simple middleware framework for Go servers.

It provides:

 * A `ProcessorMiddleware` function interface used to define the actual middleware
 * `WrapProcessor`, the function that you use to wrap a `TProcessor` in a list of middleware
 * A helper `WrappedTProcessorFunction` struct to help with developing middleware

This is a breaking change for any custom implementations of the `TProcessor`
interface, but does not effect the code generated by compiling Thrift files. It
adds two functions to the interface that are a part of the generated `TProcessor`
code, but were not defined in the interface explicitly.
This commit is contained in:
Andrew Boyle 2020-04-27 11:32:24 -07:00 committed by GitHub
parent 52655cec79
commit 00c039adeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 435 additions and 1 deletions

View File

@ -10,6 +10,7 @@
- [THRIFT-5006](https://issues.apache.org/jira/browse/THRIFT-5006) - Implement DEFAULT_MAX_LENGTH at TFramedTransport
- [THRIFT-5069](https://issues.apache.org/jira/browse/THRIFT-5069) - In Go library TDeserializer.Transport is now typed \*TMemoryBuffer instead of TTransport
- [THRIFT-5072](https://issues.apache.org/jira/browse/THRIFT-5072) - Haskell generator fails to distinguish between multiple enum types with conflicting enum identifiers
- [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - In Go library TProcessor interface now includes ProcessorMap and AddToProcessorMap functions.
### Java
@ -18,6 +19,7 @@
### Go
- [THRIFT-5069](https://issues.apache.org/jira/browse/THRIFT-5069) - Add TSerializerPool and TDeserializerPool, which are thread-safe versions of TSerializer and TDeserializer.
- [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - Add ProcessorMiddleware function type and WrapProcessor function to support wrapping a TProcessor with middleware functions.
## 0.13.0

View File

@ -19,7 +19,10 @@
package thrift
import "context"
import (
"context"
"fmt"
)
type mockProcessor struct {
ProcessFunc func(in, out TProtocol) (bool, TException)
@ -28,3 +31,76 @@ type mockProcessor struct {
func (m *mockProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) {
return m.ProcessFunc(in, out)
}
func (m *mockProcessor) ProcessorMap() map[string]TProcessorFunction {
return map[string]TProcessorFunction{
"mock": WrappedTProcessorFunction{
Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
return m.ProcessFunc(in, out)
},
},
}
}
func (m *mockProcessor) AddToProcessorMap(name string, processorFunc TProcessorFunction) {}
type mockWrappedProcessorContextKey int
const (
processorName mockWrappedProcessorContextKey = iota
)
// setMockWrappableProcessorName sets the "name" of the TProcessorFunction to
// call on a mockWrappableProcessor when calling Process.
//
// In a normal TProcessor, the request name is read from the request itself
// which happens in TProcessor.Process, so it is not passed into the call to
// Process itself, to get around this in testing, mockWrappableProcessor calls
// getMockWrappableProcessorName to get the name to use from the context
// object.
func setMockWrappableProcessorName(ctx context.Context, name string) context.Context {
return context.WithValue(ctx, processorName, name)
}
// getMockWrappableProcessorName gets the "name" of the TProcessorFunction to
// call on a mockWrappableProcessor when calling Process.
func getMockWrappableProcessorName(ctx context.Context) (string, bool) {
val, ok := ctx.Value(processorName).(string)
return val, ok
}
// mockWrappableProcessor can be used to create a mock object that fufills the
// TProcessor interface in testing.
type mockWrappableProcessor struct {
ProcessorFuncs map[string]TProcessorFunction
}
// Process calls the TProcessorFunction assigned to the "name" set on the
// context object by setMockWrappableProcessorName.
//
// If no name is set on the context or there is no TProcessorFunction mapped to
// that name, the call will panic.
func (p *mockWrappableProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) {
name, ok := getMockWrappableProcessorName(ctx)
if !ok {
panic("MockWrappableProcessorName not set on context")
}
processor, ok := p.ProcessorMap()[name]
if !ok {
panic(fmt.Sprintf("No processor set for name %q", name))
}
return processor.Process(ctx, 0, in, out)
}
func (p *mockWrappableProcessor) ProcessorMap() map[string]TProcessorFunction {
return p.ProcessorFuncs
}
func (p *mockWrappableProcessor) AddToProcessorMap(name string, processorFunc TProcessorFunction) {
p.ProcessorFuncs[name] = processorFunc
}
var (
_ TProcessor = (*mockProcessor)(nil)
_ TProcessor = (*mockWrappableProcessor)(nil)
)

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package thrift
import (
"context"
"log"
)
func simpleLoggingMiddleware(name string, next TProcessorFunction) TProcessorFunction {
return WrappedTProcessorFunction{
Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
log.Printf("Before: %q", name)
success, err := next.Process(ctx, seqId, in, out)
log.Printf("After: %q", name)
log.Printf("Success: %v", success)
if err != nil {
log.Printf("Error: %v", err)
}
return success, err
},
}
}
func ExampleProcessorMiddleware() {
var (
processor TProcessor
trans TServerTransport
transFactory TTransportFactory
protoFactory TProtocolFactory
)
processor = WrapProcessor(processor, simpleLoggingMiddleware)
server := NewTSimpleServer4(processor, trans, transFactory, protoFactory)
log.Fatal(server.Serve())
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package thrift
import "context"
// ProcessorMiddleware is a function that can be passed to WrapProcessor to wrap the
// TProcessorFunctions for that TProcessor.
//
// Middlewares are passed in the name of the function as set in the processor
// map of the TProcessor.
type ProcessorMiddleware func(name string, next TProcessorFunction) TProcessorFunction
// WrapProcessor takes an existing TProcessor and wraps each of its inner
// TProcessorFunctions with the middlewares passed in and returns it.
//
// Middlewares will be called in the order that they are defined:
//
// 1. Middlewares[0]
// 2. Middlewares[1]
// ...
// N. Middlewares[n]
func WrapProcessor(processor TProcessor, middlewares ...ProcessorMiddleware) TProcessor {
for name, processorFunc := range processor.ProcessorMap() {
wrapped := processorFunc
// Add middlewares in reverse so the first in the list is the outermost.
for i := len(middlewares) - 1; i >= 0; i-- {
wrapped = middlewares[i](name, wrapped)
}
processor.AddToProcessorMap(name, wrapped)
}
return processor
}
// WrappedTProcessorFunction is a convenience struct that implements the
// TProcessorFunction interface that can be used when implementing custom
// Middleware.
type WrappedTProcessorFunction struct {
// Wrapped is called by WrappedTProcessorFunction.Process and should be a
// "wrapped" call to a base TProcessorFunc.Process call.
Wrapped func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException)
}
// Process implements the TProcessorFunction interface using p.Wrapped.
func (p WrappedTProcessorFunction) Process(ctx context.Context, seqID int32, in, out TProtocol) (bool, TException) {
return p.Wrapped(ctx, seqID, in, out)
}
// verify that WrappedTProcessorFunction implements TProcessorFunction
var (
_ TProcessorFunction = WrappedTProcessorFunction{}
_ TProcessorFunction = (*WrappedTProcessorFunction)(nil)
)

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package thrift
import (
"context"
"testing"
)
type counter struct {
count int
}
func (c *counter) incr() {
c.count++
}
func testMiddleware(c *counter) ProcessorMiddleware {
return func(name string, next TProcessorFunction) TProcessorFunction {
return WrappedTProcessorFunction{
Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
c.incr()
return next.Process(ctx, seqId, in, out)
},
}
}
}
func newCounter(t *testing.T) *counter {
c := counter{}
if c.count != 0 {
t.Fatal("Unexpected initial count.")
}
return &c
}
func TestWrapProcessor(t *testing.T) {
name := "test"
processor := &mockWrappableProcessor{
ProcessorFuncs: map[string]TProcessorFunction{
name: WrappedTProcessorFunction{
Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
return true, nil
},
},
},
}
c := newCounter(t)
ctx := setMockWrappableProcessorName(context.Background(), name)
wrapped := WrapProcessor(processor, testMiddleware(c))
wrapped.Process(ctx, nil, nil)
if c.count != 1 {
t.Fatalf("Unexpected count value %v", c.count)
}
}
func TestWrapTMultiplexedProcessor(t *testing.T) {
name := "test"
processorName := "foo"
c := newCounter(t)
processor := &TMultiplexedProcessor{}
processor.RegisterDefault(&mockWrappableProcessor{
ProcessorFuncs: map[string]TProcessorFunction{
name: WrappedTProcessorFunction{
Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
return true, nil
},
},
},
})
processor.RegisterProcessor(processorName, &mockWrappableProcessor{
ProcessorFuncs: map[string]TProcessorFunction{
name: WrappedTProcessorFunction{
Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
return true, nil
},
},
},
})
wrapped := WrapProcessor(processor, testMiddleware(c))
ctx := setMockWrappableProcessorName(context.Background(), name)
in := NewStoredMessageProtocol(nil, name, 1, 1)
wrapped.Process(ctx, in, nil)
if c.count != 1 {
t.Fatalf("Unexpected count value %v", c.count)
}
in = NewStoredMessageProtocol(nil, processorName+MULTIPLEXED_SEPARATOR+name, 1, 1)
wrapped.Process(ctx, in, nil)
if c.count != 2 {
t.Fatalf("Unexpected count value %v", c.count)
}
}

View File

@ -117,6 +117,67 @@ func NewTMultiplexedProcessor() *TMultiplexedProcessor {
}
}
// ProcessorMap returns a mapping of "{ProcessorName}{MULTIPLEXED_SEPARATOR}{FunctionName}"
// to TProcessorFunction for any registered processors. If there is also a
// DefaultProcessor, the keys for the methods on that processor will simply be
// "{FunctionName}". If the TMultiplexedProcessor has both a DefaultProcessor and
// other registered processors, then the keys will be a mix of both formats.
//
// The implementation differs with other TProcessors in that the map returned is
// a new map, while most TProcessors just return their internal mapping directly.
// This means that edits to the map returned by this implementation of ProcessorMap
// will not affect the underlying mapping within the TMultiplexedProcessor.
func (t *TMultiplexedProcessor) ProcessorMap() map[string]TProcessorFunction {
processorFuncMap := make(map[string]TProcessorFunction)
for name, processor := range t.serviceProcessorMap {
for method, processorFunc := range processor.ProcessorMap() {
processorFuncName := name + MULTIPLEXED_SEPARATOR + method
processorFuncMap[processorFuncName] = processorFunc
}
}
if t.DefaultProcessor != nil {
for method, processorFunc := range t.DefaultProcessor.ProcessorMap() {
processorFuncMap[method] = processorFunc
}
}
return processorFuncMap
}
// AddToProcessorMap updates the underlying TProcessor ProccessorMaps depending on
// the format of "name".
//
// If "name" is in the format "{ProcessorName}{MULTIPLEXED_SEPARATOR}{FunctionName}",
// then it sets the given TProcessorFunction on the inner TProcessor with the
// ProcessorName component using the FunctionName component.
//
// If "name" is just in the format "{FunctionName}", that is to say there is no
// MULTIPLEXED_SEPARATOR, and the TMultiplexedProcessor has a DefaultProcessor
// configured, then it will set the given TProcessorFunction on the DefaultProcessor
// using the given name.
//
// If there is not a TProcessor available for the given name, then this function
// does nothing. This can happen when there is no TProcessor registered for
// the given ProcessorName or if all that is given is the FunctionName and there
// is no DefaultProcessor set.
func (t *TMultiplexedProcessor) AddToProcessorMap(name string, processorFunc TProcessorFunction) {
components := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2)
if len(components) != 2 {
if t.DefaultProcessor != nil && len(components) == 1 {
t.DefaultProcessor.AddToProcessorMap(components[0], processorFunc)
}
return
}
processorName := components[0]
funcName := components[1]
if processor, ok := t.serviceProcessorMap[processorName]; ok {
processor.AddToProcessorMap(funcName, processorFunc)
}
}
// verify that TMultiplexedProcessor implements TProcessor
var _ TProcessor = (*TMultiplexedProcessor)(nil)
func (t *TMultiplexedProcessor) RegisterDefault(processor TProcessor) {
t.DefaultProcessor = processor
}

View File

@ -0,0 +1,53 @@
package thrift
import (
"context"
"strings"
"testing"
)
func TestMultiplexedProcessorMap(t *testing.T) {
name := "test"
processorName := "foo"
processor := &TMultiplexedProcessor{}
processor.RegisterDefault(&mockWrappableProcessor{
ProcessorFuncs: map[string]TProcessorFunction{
name: WrappedTProcessorFunction{
Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
return true, nil
},
},
},
})
processor.RegisterProcessor(processorName, &mockWrappableProcessor{
ProcessorFuncs: map[string]TProcessorFunction{
name: WrappedTProcessorFunction{
Wrapped: func(ctx context.Context, seqId int32, in, out TProtocol) (bool, TException) {
return true, nil
},
},
},
})
processorMap := processor.ProcessorMap()
if len(processorMap) != 2 {
t.Fatalf("Wrong processor map size %#v", processorMap)
}
for k := range processorMap {
components := strings.SplitN(k, MULTIPLEXED_SEPARATOR, 2)
if len(components) == 1 {
if components[0] != name {
t.Fatalf("Wrong name for default processor func, expected %q, got %q", name, components[0])
}
} else if len(components) == 2 {
if components[0] != processorName {
t.Errorf("Wrong processor name, expected %q, got %q", processorName, components[0])
}
if components[1] != name {
t.Errorf("Wrong name for processor func, expected %q, got %q", name, components[1])
}
} else {
t.Fatalf("Wrong number of components %#v", components)
}
}
}

View File

@ -25,6 +25,16 @@ import "context"
// writes to some output stream.
type TProcessor interface {
Process(ctx context.Context, in, out TProtocol) (bool, TException)
// ProcessorMap returns a map of thrift method names to TProcessorFunctions.
ProcessorMap() map[string]TProcessorFunction
// AddToProcessorMap adds the given TProcessorFunction to the internal
// processor map at the given key.
//
// If one is already set at the given key, it will be replaced with the new
// TProcessorFunction.
AddToProcessorMap(string, TProcessorFunction)
}
type TProcessorFunction interface {