服务调用的初始化
func RegisterDaprServer(s *grpc.Server, srv DaprServer) {
s.RegisterService(&_Dapr_serviceDesc, srv)
}
_Dapr_serviceDesc 中有dpar各个方法的定义,包括 InvokeService
var _Dapr_serviceDesc = grpc.ServiceDesc{
ServiceName: "dapr.proto.runtime.v1.Dapr",
HandlerType: (*DaprServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "InvokeService",
Handler: _Dapr_InvokeService_Handler,
},
......
},
Streams: []grpc.StreamDesc{},
Metadata: "dapr/proto/runtime/v1/dapr.proto",
}
服务调用的客户端sdk封装
go sdk的封装
go sdk将 dapr grpc service 中定义的 InvokeService 方法封装为 InvokeService 方法和 InvokeServiceWithContent:
// InvokeService invokes service without raw data ([]byte).
func (c *GRPCClient) InvokeService(ctx context.Context, serviceID, method string) (out []byte, err error) {
if serviceID == "" {
return nil, errors.New("nil serviceID")
}
if method == "" {
return nil, errors.New("nil method")
}
req := &pb.InvokeServiceRequest{
Id: serviceID,
Message: &v1.InvokeRequest{
Method: method,
HttpExtension: &v1.HTTPExtension{
Verb: v1.HTTPExtension_POST,
},
},
}
return c.invokeServiceWithRequest(ctx, req)
}
InvokeServiceWithContent方法用来发现带数据的请求:
// InvokeServiceWithContent invokes service without content (data + content type).
func (c *GRPCClient) InvokeServiceWithContent(ctx context.Context, serviceID, method string, content *DataContent) (out []byte, err error) {
if serviceID == "" {
return nil, errors.New("serviceID is required")
}
if method == "" {
return nil, errors.New("method name is required")
}
if content == nil {
return nil, errors.New("content required")
}
req := &pb.InvokeServiceRequest{
Id: serviceID,
Message: &v1.InvokeRequest{
Method: method,
Data: &anypb.Any{Value: content.Data},
ContentType: content.ContentType,
HttpExtension: &v1.HTTPExtension{
Verb: v1.HTTPExtension_POST,
},
},
}
return c.invokeServiceWithRequest(ctx, req)
}
从实现上分析都只是实现了对 InvokeRequest 对象的组装,最后代码实现在 invokeServiceWithRequest 方法中:
func (c *GRPCClient) invokeServiceWithRequest(ctx context.Context, req *pb.InvokeServiceRequest) (out []byte, err error) {
if req == nil {
return nil, errors.New("nil request")
}
// 调用proto定义的 InvokeService 方法
resp, err := c.protoClient.InvokeService(authContext(ctx), req)
if err != nil {
return nil, errors.Wrap(err, "error invoking service")
}
// allow for service to not return any value
if resp != nil && resp.GetData() != nil {
out = resp.GetData().Value
return
}
out = nil
return
}
分析
实现了从客户端SDK API到发出 grpc service 远程调用请求给 dapr runtime的功能。
代码实现很简单,非常薄的一点点封装逻辑。
服务调用的runtime转发请求
pkc/grpc/api.go
中的 InvokeService 方法:
func (a *api) InvokeService(ctx context.Context, in *runtimev1pb.InvokeServiceRequest) (*commonv1pb.InvokeResponse, error) {
req := invokev1.FromInvokeRequestMessage(in.GetMessage())
if incomingMD, ok := metadata.FromIncomingContext(ctx); ok {
req.WithMetadata(incomingMD)
}
resp, err := a.directMessaging.Invoke(ctx, in.Id, req)
if err != nil {
return nil, err
}
var headerMD = invokev1.InternalMetadataToGrpcMetadata(ctx, resp.Headers(), true)
var respError error
if resp.IsHTTPResponse() {
var errorMessage = []byte("")
if resp != nil {
_, errorMessage = resp.RawData()
}
respError = invokev1.ErrorFromHTTPResponseCode(int(resp.Status().Code), string(errorMessage))
// Populate http status code to header
headerMD.Set(daprHTTPStatusHeader, strconv.Itoa(int(resp.Status().Code)))
} else {
respError = invokev1.ErrorFromInternalStatus(resp.Status())
// ignore trailer if appchannel uses HTTP
grpc.SetTrailer(ctx, invokev1.InternalMetadataToGrpcMetadata(ctx, resp.Trailers(), false))
}
grpc.SetHeader(ctx, headerMD)
return resp.Message(), respError
}
pkg/messaging/direct_messaging.go
的 Invoke()方法:
func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
if targetAppID == d.appID {
return d.invokeLocal(ctx, req)
}
return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, targetAppID, d.invokeRemote, req)
}
func (d *directMessaging) invokeRemote(ctx context.Context, targetID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
address, err := d.getAddressFromMessageRequest(targetID)
if err != nil {
return nil, err
}
conn, err := d.connectionCreatorFn(address, targetID, false, false)
if err != nil {
return nil, err
}
span := diag_utils.SpanFromContext(ctx)
// TODO: Use built-in grpc client timeout instead of using context timeout
ctx, cancel := context.WithTimeout(ctx, channel.DefaultChannelRequestTimeout)
defer cancel()
// no ops if span context is empty
ctx = diag.SpanContextToGRPCMetadata(ctx, span.SpanContext())
d.addForwardedHeadersToMetadata(req)
d.addDestinationAppIDHeaderToMetadata(targetID, req)
clientV1 := internalv1pb.NewServiceInvocationClient(conn)
resp, err := clientV1.CallLocal(ctx, req.Proto())
if err != nil {
return nil, err
}
return invokev1.InternalInvokeResponse(resp)
}
服务调用的runtime转发请求给服务器端
pkc/grpc/api.go
中的 CallLocal 方法:
func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
if a.appChannel == nil {
return nil, status.Error(codes.Internal, "app channel is not initialized")
}
req, err := invokev1.InternalInvokeRequest(in)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "parsing InternalInvokeRequest error: %s", err.Error())
}
resp, err := a.appChannel.InvokeMethod(ctx, req)
if err != nil {
return nil, err
}
return resp.Proto(), err
}
App Channel的定义
app channel 定义在 pkg/channel/channel.go
中,AppChannel 是和用户代码通讯的抽象层:
type AppChannel interface {
GetBaseAddress() string
InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}
Channel实现
channel有两个实现: grpc channel 和 http channel。
grpc channel
pkg/channel/grpc/grpc_channel.go
的 InvokeMethod()方法:
func (g *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
var rsp *invokev1.InvokeMethodResponse
var err error
switch req.APIVersion() {
case internalv1pb.APIVersion_V1:
rsp, err = g.invokeMethodV1(ctx, req)
default:
// Reject unsupported version
rsp = nil
err = status.Error(codes.Unimplemented, fmt.Sprintf("Unsupported spec version: %d", req.APIVersion()))
}
return rsp, err
}
暂时只有 invokeMethodV1 版本:
func (g *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
if g.ch != nil {
g.ch <- 1
}
clientV1 := runtimev1pb.NewAppCallbackClient(g.client)
grpcMetadata := invokev1.InternalMetadataToGrpcMetadata(ctx, req.Metadata(), true)
// Prepare gRPC Metadata
ctx = metadata.NewOutgoingContext(context.Background(), grpcMetadata)
ctx, cancel := context.WithTimeout(ctx, channel.DefaultChannelRequestTimeout)
defer cancel()
var header, trailer metadata.MD
resp, err := clientV1.OnInvoke(ctx, req.Message(), grpc.Header(&header), grpc.Trailer(&trailer))
if g.ch != nil {
<-g.ch
}
var rsp *invokev1.InvokeMethodResponse
if err != nil {
// Convert status code
respStatus := status.Convert(err)
// Prepare response
rsp = invokev1.NewInvokeMethodResponse(int32(respStatus.Code()), respStatus.Message(), respStatus.Proto().Details)
} else {
rsp = invokev1.NewInvokeMethodResponse(int32(codes.OK), "", nil)
}
rsp.WithHeaders(header).WithTrailers(trailer)
return rsp.WithMessage(resp), nil
}
HTTP Channel
HTTP Channel 的实现在文件 pkg/channel/http/http_channel.go
中,其 InvokeMethod()方法:
func (h *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
// Check if HTTP Extension is given. Otherwise, it will return error.
httpExt := req.Message().GetHttpExtension()
if httpExt == nil {
return nil, status.Error(codes.InvalidArgument, "missing HTTP extension field")
}
if httpExt.GetVerb() == commonv1pb.HTTPExtension_NONE {
return nil, status.Error(codes.InvalidArgument, "invalid HTTP verb")
}
var rsp *invokev1.InvokeMethodResponse
var err error
switch req.APIVersion() {
case internalv1pb.APIVersion_V1:
rsp, err = h.invokeMethodV1(ctx, req)
default:
// Reject unsupported version
err = status.Error(codes.Unimplemented, fmt.Sprintf("Unsupported spec version: %d", req.APIVersion()))
}
return rsp, err
}
暂时只有 invokeMethodV1 版本:
func (h *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
channelReq := h.constructRequest(ctx, req)
if h.ch != nil {
h.ch <- 1
}
// Emit metric when request is sent
verb := string(channelReq.Header.Method())
diag.DefaultHTTPMonitoring.ClientRequestStarted(ctx, verb, req.Message().Method, int64(len(req.Message().Data.GetValue())))
startRequest := time.Now()
// Send request to user application
var resp = fasthttp.AcquireResponse()
err := h.client.DoTimeout(channelReq, resp, channel.DefaultChannelRequestTimeout)
defer func() {
fasthttp.ReleaseRequest(channelReq)
fasthttp.ReleaseResponse(resp)
}()
elapsedMs := float64(time.Since(startRequest) / time.Millisecond)
if h.ch != nil {
<-h.ch
}
rsp := h.parseChannelResponse(req, resp, err)
diag.DefaultHTTPMonitoring.ClientRequestCompleted(ctx, verb, req.Message().GetMethod(), strconv.Itoa(int(rsp.Status().Code)), int64(resp.Header.ContentLength()), elapsedMs)
return rsp, nil
}
这是将收到的请求内容,转成HTTP协议的标准格式,然后通过 fasthttp 发给用户代码。其中转为标准http请求的代码在方法 constructRequest() 中:
func (h *Channel) constructRequest(ctx context.Context, req *invokev1.InvokeMethodRequest) *fasthttp.Request {
var channelReq = fasthttp.AcquireRequest()
// Construct app channel URI: VERB http://localhost:3000/method?query1=value1
uri := fmt.Sprintf("%s/%s", h.baseAddress, req.Message().GetMethod())
channelReq.SetRequestURI(uri)
channelReq.URI().SetQueryString(req.EncodeHTTPQueryString())
channelReq.Header.SetMethod(req.Message().HttpExtension.Verb.String())
// Recover headers
invokev1.InternalMetadataToHTTPHeader(ctx, req.Metadata(), channelReq.Header.Set)
// Set Content body and types
contentType, body := req.RawData()
channelReq.Header.SetContentType(contentType)
channelReq.SetBody(body)
return channelReq
}
这样就直接绕开用户代码中的 dapr sdk,直接调用到用户代码中的标准 http endpoint。
服务调用AppCallbackServer的实现
pkg/proto/runtime/v1/appcallback.pb.go
中的 OnInvoke 方法:
// AppCallbackServer is the server API for AppCallback service.
type AppCallbackServer interface {
// Invokes service method with InvokeRequest.
OnInvoke(context.Context, *v1.InvokeRequest) (*v1.InvokeResponse, error)
}
实现
AppCallbackServer 的实现在各个不同语言的 sdk 里面。
go-sdk实现
实现在 go-sdk 的 service/grpc/invoke.go
文件的 OnInvoke方法:
func (s *Server) OnInvoke(ctx context.Context, in *cpb.InvokeRequest) (*cpb.InvokeResponse, error) {
if in == nil {
return nil, errors.New("nil invoke request")
}
if fn, ok := s.invokeHandlers[in.Method]; ok {
e := &cc.InvocationEvent{}
if in != nil {
e.ContentType = in.ContentType
if in.Data != nil {
e.Data = in.Data.Value
e.DataTypeURL = in.Data.TypeUrl
}
if in.HttpExtension != nil {
e.Verb = in.HttpExtension.Verb.String()
e.QueryString = in.HttpExtension.Querystring
}
}
ct, er := fn(ctx, e)
if er != nil {
return nil, errors.Wrap(er, "error executing handler")
}
if ct == nil {
return &cpb.InvokeResponse{}, nil
}
return &cpb.InvokeResponse{
ContentType: ct.ContentType,
Data: &any.Any{
Value: ct.Data,
TypeUrl: ct.DataTypeURL,
},
}, nil
}
return nil, fmt.Errorf("method not implemented: %s", in.Method)
}
其中 s.invokeHandlers[in.Method]
是要求找到处理请求的对应方法(由参数method制定)。这些方法通过方法名进行映射,增加方法名到hanlder的方法是:
func (s *Server) AddServiceInvocationHandler(method string, fn func(ctx context.Context, in *cc.InvocationEvent) (our *cc.Content, err error)) error {
if method == "" {
return fmt.Errorf("servie name required")
}
s.invokeHandlers[method] = fn
return nil
}
用户实现
用户在开发支持 dapr 的服务器端应用时,需要在应用中启动 dapr service server,然后添加各种 handler,包括 ServiceInvocationHandler,如下面这个例子(go-sdk下的 example/serving/grpc/main.go
):
func main() {
// create a Dapr service server
s, err := daprd.NewService(":50001")
if err != nil {
log.Fatalf("failed to start the server: %v", err)
}
// add a service to service invocation handler
if err := s.AddServiceInvocationHandler("echo", echoHandler); err != nil {
log.Fatalf("error adding invocation handler: %v", err)
}
// start the server
if err := s.Start(); err != nil {
log.Fatalf("server error: %v", err)
}
}
服务调用的HTTP API
HTTP API Server 初始化
Dapr在 runtime 启动时会启动 Dapr 的 HTTP API server (以及 grpc API server 和用于 dapr runtime相互通讯的 grpc internal server):
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
err = a.startGRPCInternalServer(grpcAPI, a.runtimeConfig.InternalGRPCPort)
a.startHTTPServer(a.runtimeConfig.HTTPPort, a.runtimeConfig.ProfilePort, a.runtimeConfig.AllowedOrigins, pipeline)
......
}
startHTTPServer的代码:
func (a *DaprRuntime) startHTTPServer(port, profilePort int, allowedOrigins string, pipeline http_middleware.Pipeline) {
a.daprHTTPAPI = http.NewAPI(a.runtimeConfig.ID, a.appChannel, a.directMessaging, a.stateStores, a.secretStores, a.getPublishAdapter(), a.actor, a.sendToOutputBinding, a.globalConfig.Spec.TracingSpec)
serverConf := http.NewServerConfig(a.runtimeConfig.ID, a.hostAddress, port, profilePort, allowedOrigins, a.runtimeConfig.EnableProfiling)
server := http.NewServer(a.daprHTTPAPI, serverConf, a.globalConfig.Spec.TracingSpec, pipeline)
server.StartNonBlocking()
}
NewAPI方法:
func NewAPI(appID string, appChannel channel.AppChannel, directMessaging messaging.DirectMessaging, ......) (*bindings.InvokeResponse, error), tracingSpec config.TracingSpec) API {
api := &api{
appChannel: appChannel,
directMessaging: directMessaging,
......
}
api.endpoints = append(api.endpoints, api.constructDirectMessagingEndpoints()...)
return api
}
constructDirectMessagingEndpoints方法构建了接收 service invoke 请求的 HTTP Endpoint:
func (a *api) constructDirectMessagingEndpoints() []Endpoint {
return []Endpoint{
{
Methods: []string{fasthttp.MethodGet, fasthttp.MethodPost, fasthttp.MethodDelete, fasthttp.MethodPut},
Route: "invoke/{id}/method/{method:*}",
Version: apiVersionV1,
Handler: a.onDirectMessage, // 注意这里注册了 handler
},
}
}
这就和直接用 curl 等工具给 dapr 发请求的方式对应上了:
POST/GET/PUT/DELETE http://localhost:<daprPort>/v1.0/invoke/<appId>/method/<method-name>
这样在 dapr runtime 启动之后,dapr 的 http server 也就启动起来了,然后就有 "invoke/{id}/method/{method:*}"
这样的 HTTP Endpoint 可以用来处理请求。
处理dapr 请求
pkg/http/api.go
的 onDirectMessage 方法:
func (a *api) onDirectMessage(reqCtx *fasthttp.RequestCtx) {
targetID := reqCtx.UserValue(idParam).(string)
verb := strings.ToUpper(string(reqCtx.Method()))
invokeMethodName := reqCtx.UserValue(methodParam).(string)
if invokeMethodName == "" {
msg := NewErrorResponse("ERR_DIRECT_INVOKE", "invalid method name")
respondWithError(reqCtx, fasthttp.StatusBadRequest, msg)
log.Debug(msg)
return
}
// 构建 internal invoke request
req := invokev1.NewInvokeMethodRequest(invokeMethodName).WithHTTPExtension(verb, reqCtx.QueryArgs().String())
req.WithRawData(reqCtx.Request.Body(), string(reqCtx.Request.Header.ContentType()))
// Save headers to internal metadata
req.WithFastHTTPHeaders(&reqCtx.Request.Header)
// 发出请求
resp, err := a.directMessaging.Invoke(reqCtx, targetID, req)
// err does not represent user application response
if err != nil {
msg := NewErrorResponse("ERR_DIRECT_INVOKE", err.Error())
respondWithError(reqCtx, fasthttp.StatusInternalServerError, msg)
return
}
invokev1.InternalMetadataToHTTPHeader(reqCtx, resp.Headers(), reqCtx.Response.Header.Set)
contentType, body := resp.RawData()
reqCtx.Response.Header.SetContentType(contentType)
// 构建 response
statusCode := int(resp.Status().Code)
if !resp.IsHTTPResponse() {
statusCode = invokev1.HTTPStatusFromCode(codes.Code(statusCode))
}
respond(reqCtx, statusCode, body)
}
发送请求的具体实现代码在 pkg/messaging/direct_messaging.go
中:
func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
if targetAppID == d.appID {
return d.invokeLocal(ctx, req)
}
return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, targetAppID, d.invokeRemote, req)
}
后面的具体实现就和 gRPC API server / grpc dapr service 的实现一致了。