Dapr源码学习之dapr仓库-服务调用的源码分析

服务调用的初始化

Dapr服务调用的初始化流程和源码分析
func RegisterDaprServer(s *grpc.Server, srv DaprServer) {
	s.RegisterService(&_Dapr_serviceDesc, srv)
}

_Dapr_serviceDesc 中有dpar各个方法的定义,包括 InvokeService

var _Dapr_serviceDesc = grpc.ServiceDesc{
	ServiceName: "dapr.proto.runtime.v1.Dapr",
	HandlerType: (*DaprServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "InvokeService",
			Handler:    _Dapr_InvokeService_Handler,
		},
		......
	},
	Streams:  []grpc.StreamDesc{},
	Metadata: "dapr/proto/runtime/v1/dapr.proto",
}

服务调用的客户端sdk封装

Dapr服务调用的客户端sdk封装

go sdk的封装

go sdk将 dapr grpc service 中定义的 InvokeService 方法封装为 InvokeService 方法和 InvokeServiceWithContent:

// InvokeService invokes service without raw data ([]byte).
func (c *GRPCClient) InvokeService(ctx context.Context, serviceID, method string) (out []byte, err error) {
	if serviceID == "" {
		return nil, errors.New("nil serviceID")
	}
	if method == "" {
		return nil, errors.New("nil method")
	}
	req := &pb.InvokeServiceRequest{
		Id: serviceID,
		Message: &v1.InvokeRequest{
			Method: method,
			HttpExtension: &v1.HTTPExtension{
				Verb: v1.HTTPExtension_POST,
			},
		},
	}
	return c.invokeServiceWithRequest(ctx, req)
}

InvokeServiceWithContent方法用来发现带数据的请求:

// InvokeServiceWithContent invokes service without content (data + content type).
func (c *GRPCClient) InvokeServiceWithContent(ctx context.Context, serviceID, method string, content *DataContent) (out []byte, err error) {
	if serviceID == "" {
		return nil, errors.New("serviceID is required")
	}
	if method == "" {
		return nil, errors.New("method name is required")
	}
	if content == nil {
		return nil, errors.New("content required")
	}

	req := &pb.InvokeServiceRequest{
		Id: serviceID,
		Message: &v1.InvokeRequest{
			Method:      method,
			Data:        &anypb.Any{Value: content.Data},
			ContentType: content.ContentType,
			HttpExtension: &v1.HTTPExtension{
				Verb: v1.HTTPExtension_POST,
			},
		},
	}

	return c.invokeServiceWithRequest(ctx, req)
}

从实现上分析都只是实现了对 InvokeRequest 对象的组装,最后代码实现在 invokeServiceWithRequest 方法中:

func (c *GRPCClient) invokeServiceWithRequest(ctx context.Context, req *pb.InvokeServiceRequest) (out []byte, err error) {
	if req == nil {
		return nil, errors.New("nil request")
	}

   // 调用proto定义的 InvokeService 方法
	resp, err := c.protoClient.InvokeService(authContext(ctx), req)
	if err != nil {
		return nil, errors.Wrap(err, "error invoking service")
	}

	// allow for service to not return any value
	if resp != nil && resp.GetData() != nil {
		out = resp.GetData().Value
		return
	}

	out = nil
	return
}

分析

实现了从客户端SDK API到发出 grpc service 远程调用请求给 dapr runtime的功能。

代码实现很简单,非常薄的一点点封装逻辑。

服务调用的runtime转发请求

Dapr服务调用的客户端sdk封装

pkc/grpc/api.go 中的 InvokeService 方法:

func (a *api) InvokeService(ctx context.Context, in *runtimev1pb.InvokeServiceRequest) (*commonv1pb.InvokeResponse, error) {
	req := invokev1.FromInvokeRequestMessage(in.GetMessage())

	if incomingMD, ok := metadata.FromIncomingContext(ctx); ok {
		req.WithMetadata(incomingMD)
	}

	resp, err := a.directMessaging.Invoke(ctx, in.Id, req)
	if err != nil {
		return nil, err
	}

	var headerMD = invokev1.InternalMetadataToGrpcMetadata(ctx, resp.Headers(), true)

	var respError error
	if resp.IsHTTPResponse() {
		var errorMessage = []byte("")
		if resp != nil {
			_, errorMessage = resp.RawData()
		}
		respError = invokev1.ErrorFromHTTPResponseCode(int(resp.Status().Code), string(errorMessage))
		// Populate http status code to header
		headerMD.Set(daprHTTPStatusHeader, strconv.Itoa(int(resp.Status().Code)))
	} else {
		respError = invokev1.ErrorFromInternalStatus(resp.Status())
		// ignore trailer if appchannel uses HTTP
		grpc.SetTrailer(ctx, invokev1.InternalMetadataToGrpcMetadata(ctx, resp.Trailers(), false))
	}

	grpc.SetHeader(ctx, headerMD)

	return resp.Message(), respError
}

pkg/messaging/direct_messaging.go 的 Invoke()方法:

func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	if targetAppID == d.appID {
		return d.invokeLocal(ctx, req)
	}
	return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, targetAppID, d.invokeRemote, req)
}

func (d *directMessaging) invokeRemote(ctx context.Context, targetID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	address, err := d.getAddressFromMessageRequest(targetID)
	if err != nil {
		return nil, err
	}

	conn, err := d.connectionCreatorFn(address, targetID, false, false)
	if err != nil {
		return nil, err
	}

	span := diag_utils.SpanFromContext(ctx)

	// TODO: Use built-in grpc client timeout instead of using context timeout
	ctx, cancel := context.WithTimeout(ctx, channel.DefaultChannelRequestTimeout)
	defer cancel()

	// no ops if span context is empty
	ctx = diag.SpanContextToGRPCMetadata(ctx, span.SpanContext())

	d.addForwardedHeadersToMetadata(req)
	d.addDestinationAppIDHeaderToMetadata(targetID, req)

	clientV1 := internalv1pb.NewServiceInvocationClient(conn)
	resp, err := clientV1.CallLocal(ctx, req.Proto())
	if err != nil {
		return nil, err
	}

	return invokev1.InternalInvokeResponse(resp)
}

服务调用的runtime转发请求给服务器端

Dapr服务调用的runtime转发请求给服务器端

pkc/grpc/api.go 中的 CallLocal 方法:

func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
	if a.appChannel == nil {
		return nil, status.Error(codes.Internal, "app channel is not initialized")
	}

	req, err := invokev1.InternalInvokeRequest(in)
	if err != nil {
		return nil, status.Errorf(codes.InvalidArgument, "parsing InternalInvokeRequest error: %s", err.Error())
	}

	resp, err := a.appChannel.InvokeMethod(ctx, req)

	if err != nil {
		return nil, err
	}
	return resp.Proto(), err
}

App Channel的定义

app channel 定义在 pkg/channel/channel.go 中,AppChannel 是和用户代码通讯的抽象层:

type AppChannel interface {
	GetBaseAddress() string
	InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}

Channel实现

channel有两个实现: grpc channel 和 http channel。

grpc channel

pkg/channel/grpc/grpc_channel.go 的 InvokeMethod()方法:

func (g *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	var rsp *invokev1.InvokeMethodResponse
	var err error

	switch req.APIVersion() {
	case internalv1pb.APIVersion_V1:
		rsp, err = g.invokeMethodV1(ctx, req)

	default:
		// Reject unsupported version
		rsp = nil
		err = status.Error(codes.Unimplemented, fmt.Sprintf("Unsupported spec version: %d", req.APIVersion()))
	}

	return rsp, err
}

暂时只有 invokeMethodV1 版本:

func (g *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	if g.ch != nil {
		g.ch <- 1
	}

	clientV1 := runtimev1pb.NewAppCallbackClient(g.client)
	grpcMetadata := invokev1.InternalMetadataToGrpcMetadata(ctx, req.Metadata(), true)
	// Prepare gRPC Metadata
	ctx = metadata.NewOutgoingContext(context.Background(), grpcMetadata)

	ctx, cancel := context.WithTimeout(ctx, channel.DefaultChannelRequestTimeout)
	defer cancel()
	var header, trailer metadata.MD
	resp, err := clientV1.OnInvoke(ctx, req.Message(), grpc.Header(&header), grpc.Trailer(&trailer))

	if g.ch != nil {
		<-g.ch
	}

	var rsp *invokev1.InvokeMethodResponse
	if err != nil {
		// Convert status code
		respStatus := status.Convert(err)
		// Prepare response
		rsp = invokev1.NewInvokeMethodResponse(int32(respStatus.Code()), respStatus.Message(), respStatus.Proto().Details)
	} else {
		rsp = invokev1.NewInvokeMethodResponse(int32(codes.OK), "", nil)
	}

	rsp.WithHeaders(header).WithTrailers(trailer)

	return rsp.WithMessage(resp), nil
}

HTTP Channel

HTTP Channel 的实现在文件 pkg/channel/http/http_channel.go 中,其 InvokeMethod()方法:

func (h *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	// Check if HTTP Extension is given. Otherwise, it will return error.
	httpExt := req.Message().GetHttpExtension()
	if httpExt == nil {
		return nil, status.Error(codes.InvalidArgument, "missing HTTP extension field")
	}
	if httpExt.GetVerb() == commonv1pb.HTTPExtension_NONE {
		return nil, status.Error(codes.InvalidArgument, "invalid HTTP verb")
	}

	var rsp *invokev1.InvokeMethodResponse
	var err error
	switch req.APIVersion() {
	case internalv1pb.APIVersion_V1:
		rsp, err = h.invokeMethodV1(ctx, req)

	default:
		// Reject unsupported version
		err = status.Error(codes.Unimplemented, fmt.Sprintf("Unsupported spec version: %d", req.APIVersion()))
	}

	return rsp, err
}

暂时只有 invokeMethodV1 版本:

func (h *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	channelReq := h.constructRequest(ctx, req)

	if h.ch != nil {
		h.ch <- 1
	}

	// Emit metric when request is sent
	verb := string(channelReq.Header.Method())
	diag.DefaultHTTPMonitoring.ClientRequestStarted(ctx, verb, req.Message().Method, int64(len(req.Message().Data.GetValue())))
	startRequest := time.Now()

	// Send request to user application
	var resp = fasthttp.AcquireResponse()
	err := h.client.DoTimeout(channelReq, resp, channel.DefaultChannelRequestTimeout)
	defer func() {
		fasthttp.ReleaseRequest(channelReq)
		fasthttp.ReleaseResponse(resp)
	}()

	elapsedMs := float64(time.Since(startRequest) / time.Millisecond)

	if h.ch != nil {
		<-h.ch
	}

	rsp := h.parseChannelResponse(req, resp, err)
	diag.DefaultHTTPMonitoring.ClientRequestCompleted(ctx, verb, req.Message().GetMethod(), strconv.Itoa(int(rsp.Status().Code)), int64(resp.Header.ContentLength()), elapsedMs)

	return rsp, nil
}

这是将收到的请求内容,转成HTTP协议的标准格式,然后通过 fasthttp 发给用户代码。其中转为标准http请求的代码在方法 constructRequest() 中:

func (h *Channel) constructRequest(ctx context.Context, req *invokev1.InvokeMethodRequest) *fasthttp.Request {
	var channelReq = fasthttp.AcquireRequest()

	// Construct app channel URI: VERB http://localhost:3000/method?query1=value1
	uri := fmt.Sprintf("%s/%s", h.baseAddress, req.Message().GetMethod())
	channelReq.SetRequestURI(uri)
	channelReq.URI().SetQueryString(req.EncodeHTTPQueryString())
	channelReq.Header.SetMethod(req.Message().HttpExtension.Verb.String())

	// Recover headers
	invokev1.InternalMetadataToHTTPHeader(ctx, req.Metadata(), channelReq.Header.Set)

	// Set Content body and types
	contentType, body := req.RawData()
	channelReq.Header.SetContentType(contentType)
	channelReq.SetBody(body)

	return channelReq
}

这样就直接绕开用户代码中的 dapr sdk,直接调用到用户代码中的标准 http endpoint。

服务调用AppCallbackServer的实现

Dapr服务调用AppCallbackServer的实现

pkg/proto/runtime/v1/appcallback.pb.go 中的 OnInvoke 方法:

// AppCallbackServer is the server API for AppCallback service.
type AppCallbackServer interface {
	// Invokes service method with InvokeRequest.
	OnInvoke(context.Context, *v1.InvokeRequest) (*v1.InvokeResponse, error)
}

实现

AppCallbackServer 的实现在各个不同语言的 sdk 里面。

go-sdk实现

实现在 go-sdk 的 service/grpc/invoke.go 文件的 OnInvoke方法:

func (s *Server) OnInvoke(ctx context.Context, in *cpb.InvokeRequest) (*cpb.InvokeResponse, error) {
	if in == nil {
		return nil, errors.New("nil invoke request")
	}
	if fn, ok := s.invokeHandlers[in.Method]; ok {
		e := &cc.InvocationEvent{}
		if in != nil {
			e.ContentType = in.ContentType

			if in.Data != nil {
				e.Data = in.Data.Value
				e.DataTypeURL = in.Data.TypeUrl
			}

			if in.HttpExtension != nil {
				e.Verb = in.HttpExtension.Verb.String()
				e.QueryString = in.HttpExtension.Querystring
			}
		}

		ct, er := fn(ctx, e)
		if er != nil {
			return nil, errors.Wrap(er, "error executing handler")
		}

		if ct == nil {
			return &cpb.InvokeResponse{}, nil
		}

		return &cpb.InvokeResponse{
			ContentType: ct.ContentType,
			Data: &any.Any{
				Value:   ct.Data,
				TypeUrl: ct.DataTypeURL,
			},
		}, nil
	}
	return nil, fmt.Errorf("method not implemented: %s", in.Method)
}

其中 s.invokeHandlers[in.Method] 是要求找到处理请求的对应方法(由参数method制定)。这些方法通过方法名进行映射,增加方法名到hanlder的方法是:

func (s *Server) AddServiceInvocationHandler(method string, fn func(ctx context.Context, in *cc.InvocationEvent) (our *cc.Content, err error)) error {
	if method == "" {
		return fmt.Errorf("servie name required")
	}
	s.invokeHandlers[method] = fn
	return nil
}

用户实现

用户在开发支持 dapr 的服务器端应用时,需要在应用中启动 dapr service server,然后添加各种 handler,包括 ServiceInvocationHandler,如下面这个例子(go-sdk下的 example/serving/grpc/main.go ):

func main() {
	// create a Dapr service server
	s, err := daprd.NewService(":50001")
	if err != nil {
		log.Fatalf("failed to start the server: %v", err)
	}

	// add a service to service invocation handler
	if err := s.AddServiceInvocationHandler("echo", echoHandler); err != nil {
		log.Fatalf("error adding invocation handler: %v", err)
	}

	// start the server
	if err := s.Start(); err != nil {
		log.Fatalf("server error: %v", err)
	}
}

服务调用的HTTP API

Dapr服务调用的HTTP API

HTTP API Server 初始化

Dapr在 runtime 启动时会启动 Dapr 的 HTTP API server (以及 grpc API server 和用于 dapr runtime相互通讯的 grpc internal server):

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
	......
	err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
	
	err = a.startGRPCInternalServer(grpcAPI, a.runtimeConfig.InternalGRPCPort)
	
	a.startHTTPServer(a.runtimeConfig.HTTPPort, a.runtimeConfig.ProfilePort, a.runtimeConfig.AllowedOrigins, pipeline)
	......
}

startHTTPServer的代码:

func (a *DaprRuntime) startHTTPServer(port, profilePort int, allowedOrigins string, pipeline http_middleware.Pipeline) {
	a.daprHTTPAPI = http.NewAPI(a.runtimeConfig.ID, a.appChannel, a.directMessaging, a.stateStores, a.secretStores, a.getPublishAdapter(), a.actor, a.sendToOutputBinding, a.globalConfig.Spec.TracingSpec)
	serverConf := http.NewServerConfig(a.runtimeConfig.ID, a.hostAddress, port, profilePort, allowedOrigins, a.runtimeConfig.EnableProfiling)

	server := http.NewServer(a.daprHTTPAPI, serverConf, a.globalConfig.Spec.TracingSpec, pipeline)
	server.StartNonBlocking()
}

NewAPI方法:

func NewAPI(appID string, appChannel channel.AppChannel, directMessaging messaging.DirectMessaging, ......) (*bindings.InvokeResponse, error), tracingSpec config.TracingSpec) API {
	api := &api{
		appChannel:            appChannel,
		directMessaging:       directMessaging,
		......
	}

	api.endpoints = append(api.endpoints, api.constructDirectMessagingEndpoints()...)

	return api
}

constructDirectMessagingEndpoints方法构建了接收 service invoke 请求的 HTTP Endpoint:

func (a *api) constructDirectMessagingEndpoints() []Endpoint {
	return []Endpoint{
		{
			Methods: []string{fasthttp.MethodGet, fasthttp.MethodPost, fasthttp.MethodDelete, fasthttp.MethodPut},
			Route:   "invoke/{id}/method/{method:*}",
			Version: apiVersionV1,
			Handler: a.onDirectMessage, // 注意这里注册了 handler
		},
	}
}

这就和直接用 curl 等工具给 dapr 发请求的方式对应上了:

POST/GET/PUT/DELETE http://localhost:<daprPort>/v1.0/invoke/<appId>/method/<method-name>

这样在 dapr runtime 启动之后,dapr 的 http server 也就启动起来了,然后就有 "invoke/{id}/method/{method:*}" 这样的 HTTP Endpoint 可以用来处理请求。

处理dapr 请求

pkg/http/api.go 的 onDirectMessage 方法:

func (a *api) onDirectMessage(reqCtx *fasthttp.RequestCtx) {
	targetID := reqCtx.UserValue(idParam).(string)
	verb := strings.ToUpper(string(reqCtx.Method()))
	invokeMethodName := reqCtx.UserValue(methodParam).(string)
	if invokeMethodName == "" {
		msg := NewErrorResponse("ERR_DIRECT_INVOKE", "invalid method name")
		respondWithError(reqCtx, fasthttp.StatusBadRequest, msg)
		log.Debug(msg)
		return
	}

	// 构建 internal invoke request
	req := invokev1.NewInvokeMethodRequest(invokeMethodName).WithHTTPExtension(verb, reqCtx.QueryArgs().String())
	req.WithRawData(reqCtx.Request.Body(), string(reqCtx.Request.Header.ContentType()))
	// Save headers to internal metadata
	req.WithFastHTTPHeaders(&reqCtx.Request.Header)

  // 发出请求 
	resp, err := a.directMessaging.Invoke(reqCtx, targetID, req)
	// err does not represent user application response
	if err != nil {
		msg := NewErrorResponse("ERR_DIRECT_INVOKE", err.Error())
		respondWithError(reqCtx, fasthttp.StatusInternalServerError, msg)
		return
	}

	invokev1.InternalMetadataToHTTPHeader(reqCtx, resp.Headers(), reqCtx.Response.Header.Set)
	contentType, body := resp.RawData()
	reqCtx.Response.Header.SetContentType(contentType)

	// 构建 response
	statusCode := int(resp.Status().Code)
	if !resp.IsHTTPResponse() {
		statusCode = invokev1.HTTPStatusFromCode(codes.Code(statusCode))
	}

	respond(reqCtx, statusCode, body)
}

发送请求的具体实现代码在 pkg/messaging/direct_messaging.go 中:

func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	if targetAppID == d.appID {
		return d.invokeLocal(ctx, req)
	}
	return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, targetAppID, d.invokeRemote, req)
}

后面的具体实现就和 gRPC API server / grpc dapr service 的实现一致了。

给TA打赏
共{{data.count}}人
人已打赏
Dapr

Dapr源码学习之dapr仓库-Metrics的源码学习

2022-3-15 19:02:58

Dapr

Dapr源码学习之dapr仓库-状态管理的源码

2022-3-16 18:57:01

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
今日签到
有新私信 私信列表
搜索