Documentation
¶
Index ¶
- Constants
- Variables
- func SetLogger(l logr.Logger)
- type Channel
- type Claim
- type ClientMultiRPCHandler
- type ClientMultiRPCInterceptor
- type ClientOption
- func WithClientChannelSize(size int) ClientOption
- func WithClientID(id string) ClientOption
- func WithClientMultiRPCInterceptors(interceptors ...ClientMultiRPCInterceptor) ClientOption
- func WithClientOptions(opts ...ClientOption) ClientOption
- func WithClientRPCInterceptors(interceptors ...ClientRPCInterceptor) ClientOption
- func WithClientRequestHooks(hooks ...ClientRequestHook) ClientOption
- func WithClientResponseHooks(hooks ...ClientResponseHook) ClientOption
- func WithClientSelectTimeout(timeout time.Duration) ClientOption
- func WithClientStreamInterceptors(interceptors ...StreamInterceptor) ClientOption
- func WithClientTimeout(timeout time.Duration) ClientOption
- type ClientOpts
- type ClientRPCHandler
- type ClientRPCInterceptor
- type ClientRequestHook
- type ClientResponseHook
- type ClientStream
- type Error
- type ErrorCode
- type MessageBus
- type RPCInfo
- type RequestInterceptor
- type RequestOption
- type RequestOpts
- type Response
- type SelectionOpts
- type ServerOption
- func WithServerChannelSize(size int) ServerOption
- func WithServerID(id string) ServerOption
- func WithServerOptions(opts ...ServerOption) ServerOption
- func WithServerRPCInterceptors(interceptors ...ServerRPCInterceptor) ServerOption
- func WithServerStreamInterceptors(interceptors ...StreamInterceptor) ServerOption
- func WithServerTimeout(timeout time.Duration) ServerOption
- type ServerOpts
- type ServerRPCHandler
- type ServerRPCInterceptor
- type ServerStream
- type Stream
- type StreamHandler
- type StreamInterceptor
- type StreamOption
- type StreamOpts
- type Subscription
Constants ¶
View Source
const ( DefaultClientTimeout = time.Second * 3 DefaultAffinityTimeout = time.Second DefaultAffinityShortCircuit = time.Millisecond * 200 )
View Source
const DefaultServerTimeout = time.Second * 3
Variables ¶
View Source
var ( ErrRequestCanceled = NewErrorf(Canceled, "request canceled") ErrRequestTimedOut = NewErrorf(DeadlineExceeded, "request timed out") ErrNoResponse = NewErrorf(Unavailable, "no response from servers") ErrStreamEOF = NewError(Unavailable, io.EOF) ErrClientClosed = NewErrorf(Canceled, "client is closed") ErrServerClosed = NewErrorf(Canceled, "server is closed") ErrStreamClosed = NewErrorf(Canceled, "stream closed") ErrSlowConsumer = NewErrorf(Unavailable, "stream message discarded by slow consumer") )
Functions ¶
Types ¶
type ClientMultiRPCHandler ¶
type ClientMultiRPCInterceptor ¶
type ClientMultiRPCInterceptor func(info RPCInfo, next ClientMultiRPCHandler) ClientMultiRPCHandler
type ClientOption ¶
type ClientOption func(*ClientOpts)
func WithClientChannelSize ¶
func WithClientChannelSize(size int) ClientOption
func WithClientID ¶
func WithClientID(id string) ClientOption
func WithClientMultiRPCInterceptors ¶
func WithClientMultiRPCInterceptors(interceptors ...ClientMultiRPCInterceptor) ClientOption
func WithClientOptions ¶
func WithClientOptions(opts ...ClientOption) ClientOption
func WithClientRPCInterceptors ¶
func WithClientRPCInterceptors(interceptors ...ClientRPCInterceptor) ClientOption
func WithClientRequestHooks ¶
func WithClientRequestHooks(hooks ...ClientRequestHook) ClientOption
func WithClientResponseHooks ¶
func WithClientResponseHooks(hooks ...ClientResponseHook) ClientOption
func WithClientSelectTimeout ¶
func WithClientSelectTimeout(timeout time.Duration) ClientOption
func WithClientStreamInterceptors ¶
func WithClientStreamInterceptors(interceptors ...StreamInterceptor) ClientOption
func WithClientTimeout ¶
func WithClientTimeout(timeout time.Duration) ClientOption
type ClientOpts ¶
type ClientOpts struct {
ClientID string
Timeout time.Duration
SelectionTimeout time.Duration
ChannelSize int
EnableStreams bool
RequestHooks []ClientRequestHook
ResponseHooks []ClientResponseHook
RpcInterceptors []ClientRPCInterceptor
MultiRPCInterceptors []ClientMultiRPCInterceptor
StreamInterceptors []StreamInterceptor
}
type ClientRPCHandler ¶
type ClientRPCInterceptor ¶
type ClientRPCInterceptor func(info RPCInfo, next ClientRPCHandler) ClientRPCHandler
type ClientRequestHook ¶
Request hooks are called as soon as the request is made
type ClientResponseHook ¶
type ClientResponseHook func(ctx context.Context, req proto.Message, info RPCInfo, res proto.Message, err error)
Response hooks are called just before responses are returned For multi-requests, response hooks are called on every response, and block while executing
type ClientStream ¶
type Error ¶
type Error interface {
error
Code() ErrorCode
Details() []any
DetailsProto() []*anypb.Any
// convenience methods
ToHttp() int
GRPCStatus() *status.Status
}
func NewErrorFromResponse ¶
type ErrorCode ¶
type ErrorCode string
const ( OK ErrorCode = "" // Request Canceled by client Canceled ErrorCode = "canceled" // Could not unmarshal request MalformedRequest ErrorCode = "malformed_request" // Could not unmarshal result MalformedResponse ErrorCode = "malformed_result" // Request timed out DeadlineExceeded ErrorCode = "deadline_exceeded" Unavailable ErrorCode = "unavailable" // Unknown (server returned non-psrpc error) Unknown ErrorCode = "unknown" // Invalid argument in request InvalidArgument ErrorCode = "invalid_argument" // Entity not found NotFound ErrorCode = "not_found" // Cannot produce and entity matching requested format NotAcceptable ErrorCode = "not_acceptable" // Duplicate creation attempted AlreadyExists ErrorCode = "already_exists" // Caller does not have required permissions PermissionDenied ErrorCode = "permission_denied" // Some resource has been exhausted, e.g. memory or quota ResourceExhausted ErrorCode = "resource_exhausted" // Inconsistent state to carry out request FailedPrecondition ErrorCode = "failed_precondition" // Request aborted Aborted ErrorCode = "aborted" // Operation was out of range OutOfRange ErrorCode = "out_of_range" // Operation is not implemented by the server Unimplemented ErrorCode = "unimplemented" // Operation failed due to an internal error Internal ErrorCode = "internal" // Irrecoverable loss or corruption of data DataLoss ErrorCode = "data_loss" // Similar to PermissionDenied, used when the caller is unidentified Unauthenticated ErrorCode = "unauthenticated" )
func ErrorCodeFromGRPC ¶
func GetErrorCode ¶
type MessageBus ¶
type MessageBus bus.MessageBus
func NewLocalMessageBus ¶
func NewLocalMessageBus() MessageBus
func NewNatsMessageBus ¶
func NewNatsMessageBus(nc *nats.Conn) MessageBus
func NewRedisMessageBus ¶
func NewRedisMessageBus(rc redis.UniversalClient) MessageBus
type RequestInterceptor ¶
type RequestInterceptor interface {
ClientRPCInterceptor | ClientMultiRPCInterceptor | StreamInterceptor
}
type RequestOption ¶
type RequestOption func(*RequestOpts)
func WithRequestInterceptors ¶
func WithRequestInterceptors[T RequestInterceptor](interceptors ...T) RequestOption
func WithRequestTimeout ¶
func WithRequestTimeout(timeout time.Duration) RequestOption
func WithSelectionOpts ¶
func WithSelectionOpts(opts SelectionOpts) RequestOption
type RequestOpts ¶
type RequestOpts struct {
Timeout time.Duration
SelectionOpts SelectionOpts
Interceptors []any
}
type SelectionOpts ¶
type SelectionOpts struct {
MinimumAffinity float32 // minimum affinity for a server to be considered a valid handler
MaximumAffinity float32 // if > 0, any server returning a max score will be selected immediately
AcceptFirstAvailable bool // go fast
AffinityTimeout time.Duration // server selection deadline
ShortCircuitTimeout time.Duration // deadline imposed after receiving first response
SelectionFunc func([]*Claim) (string, error) // custom server selection function
}
type ServerOption ¶
type ServerOption func(*ServerOpts)
func WithServerChannelSize ¶
func WithServerChannelSize(size int) ServerOption
func WithServerID ¶
func WithServerID(id string) ServerOption
func WithServerOptions ¶
func WithServerOptions(opts ...ServerOption) ServerOption
func WithServerRPCInterceptors ¶
func WithServerRPCInterceptors(interceptors ...ServerRPCInterceptor) ServerOption
func WithServerStreamInterceptors ¶
func WithServerStreamInterceptors(interceptors ...StreamInterceptor) ServerOption
func WithServerTimeout ¶
func WithServerTimeout(timeout time.Duration) ServerOption
type ServerOpts ¶
type ServerOpts struct {
ServerID string
Timeout time.Duration
ChannelSize int
Interceptors []ServerRPCInterceptor
StreamInterceptors []StreamInterceptor
ChainedInterceptor ServerRPCInterceptor
}
type ServerRPCHandler ¶
type ServerRPCInterceptor ¶
type ServerRPCInterceptor func(ctx context.Context, req proto.Message, info RPCInfo, handler ServerRPCHandler) (proto.Message, error)
Server interceptors wrap the service implementation
type ServerStream ¶
type StreamHandler ¶
type StreamInterceptor ¶
type StreamInterceptor func(info RPCInfo, next StreamHandler) StreamHandler
type StreamOption ¶
type StreamOption func(*StreamOpts)
func WithTimeout ¶
func WithTimeout(timeout time.Duration) StreamOption
type StreamOpts ¶
type Subscription ¶
type Subscription[MessageType proto.Message] bus.Subscription[MessageType]
Source Files
¶
Click to show internal directories.
Click to hide internal directories.