首页>教程>Dapr教程>Dapr资源绑定API

Dapr资源绑定API

内容纲要

资源绑定API的Proto定义

Output Binding的定义

Output Binding API 定义在 proto文件 dapr/proto/runtime/v1/dapr.proto 中:

service Dapr {
  // Invokes binding data to specific output bindings
  rpc InvokeBinding(InvokeBindingRequest) returns (InvokeBindingResponse) {}
  ...
}

InvokeBindingRequest 包含一个被绑定资源的name,数据data,元数据metadata和绑定资源的操作类型operaion:

// InvokeBindingRequest is the message to send data to output bindings
message InvokeBindingRequest {
  // The name of the output binding to invoke.
  string name = 1;

  // The data which will be sent to output binding.
  bytes data = 2;

  // The metadata passing to output binding components
  // 
  // Common metadata property:
  // - ttlInSeconds : the time to live in seconds for the message. 
  // If set in the binding definition will cause all messages to 
  // have a default time to live. The message ttl overrides any value
  // in the binding definition.
  map<string,string> metadata = 3;

  // The name of the operation type for the binding to invoke
  string operation = 4;
}

InvokeBindingResponse 的定义:

// InvokeBindingResponse is the message returned from an output binding invocation
message InvokeBindingResponse {
  // The data which will be sent to output binding.
  bytes data = 1;

  // The metadata returned from an external system
  map<string,string> metadata = 2;
}

Input Binding的定义

Input Binding API 定义在 proto文件 dapr/proto/runtime/v1/appcallback.proto 中:

service AppCallback {
  // Lists all input bindings subscribed by this app.
  rpc ListInputBindings(google.protobuf.Empty) returns (ListInputBindingsResponse) {}
  
  // Listens events from the input bindings
  //
  // User application can save the states or send the events to the output
  // bindings optionally by returning BindingEventResponse.
  rpc OnBindingEvent(BindingEventRequest) returns (BindingEventResponse) {}
  ...
}

ListInputBindingsResponse 的定义:

// ListInputBindingsResponse is the message including the list of input bindings.
message ListInputBindingsResponse {
  // The list of input bindings.
  repeated string bindings = 1;
}

BindingEventRequest的定义:

// BindingEventRequest represents input bindings event.
message BindingEventRequest {
  // Requried. The name of the input binding component.
  string name = 1;

  // Required. The payload that the input bindings sent
  bytes data = 2;

  // The metadata set by the input binging components.
  map<string,string> metadata = 3;
}

BindingEventResponse 的定义:

// BindingEventResponse includes operations to save state or
// send data to output bindings optionally.
message BindingEventResponse {
  // The name of state store where states are saved.
  string store_name = 1;

  // The state key values which will be stored in store_name.
  repeated common.v1.StateItem states = 2;

  // BindingEventConcurrency is the kind of concurrency 
  enum BindingEventConcurrency {
    // SEQUENTIAL sends data to output bindings specified in "to" sequentially.
    SEQUENTIAL = 0;
    // PARALLEL sends data to output bindings specified in "to" in parallel.
    PARALLEL = 1;
  }

  // The list of output bindings.
  repeated string to = 3;

  // The content which will be sent to "to" output bindings.
  bytes data = 4;

  // The concurrency of output bindings to send data to
  // "to" output bindings list. The default is SEQUENTIAL.
  BindingEventConcurrency concurrency = 5;
}

资源绑定API的Golang生成代码

从proto api定义文件生成的golang代码,被存放在dapr项目的 pkg/proto/ 目录下。

grpc服务定义

DaprServer 是 dapr 服务的服务器端API定义,包含 InvokeBinding 方法:

// DaprServer is the server API for Dapr service.
type DaprServer interface {
	// Invokes binding data to specific output bindings
	InvokeBinding(context.Context, *InvokeBindingRequest) (*InvokeBindingResponse, error)
   ......
}

AppCallbackServer 是 AppCallback 服务的服务器端API定义,包含 ListInputBindings 方法和 OnBindingEvent 方法:

// AppCallbackServer is the server API for AppCallback service.
type AppCallbackServer interface {
	// Lists all input bindings subscribed by this app.
	ListInputBindings(context.Context, *empty.Empty) (*ListInputBindingsResponse, error)
	// Listens events from the input bindings
	//
	// User application can save the states or send the events to the output
	// bindings optionally by returning BindingEventResponse.
	OnBindingEvent(context.Context, *BindingEventRequest) (*BindingEventResponse, error)
	......
}

InvokeBindingRequest的定义

pkg/proto/runtime/v1/dapr.pb.go:

// InvokeBindingResponse is the message returned from an output binding invocation
type InvokeBindingResponse struct {
	// The data which will be sent to output binding.
	Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
	// The metadata returned from an external system
	Metadata             map[string]string `protobuf:"bytes,2,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
	XXX_NoUnkeyedLiteral struct{}          `json:"-"`
	XXX_unrecognized     []byte            `json:"-"`
	XXX_sizecache        int32             `json:"-"`
}

InvokeBindingResponse的定义

pkg/proto/runtime/v1/dapr.pb.go:

// InvokeBindingResponse is the message returned from an output binding invocation
type InvokeBindingResponse struct {
	// The data which will be sent to output binding.
	Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
	// The metadata returned from an external system
	Metadata             map[string]string `protobuf:"bytes,2,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
	XXX_NoUnkeyedLiteral struct{}          `json:"-"`
	XXX_unrecognized     []byte            `json:"-"`
	XXX_sizecache        int32             `json:"-"`
}

ListInputBindingsResponse的定义

pkg/proto/runtime/v1/appcallback.pb.go:

// ListInputBindingsResponse is the message including the list of input bindings.
type ListInputBindingsResponse struct {
	// The list of input bindings.
	Bindings             []string `protobuf:"bytes,1,rep,name=bindings,proto3" json:"bindings,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

BindingEventRequest的定义

pkg/proto/runtime/v1/appcallback.pb.go:

// BindingEventRequest represents input bindings event.
type BindingEventRequest struct {
	// Requried. The name of the input binding component.
	Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	// Required. The payload that the input bindings sent
	Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
	// The metadata set by the input binging components.
	Metadata             map[string]string `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
	XXX_NoUnkeyedLiteral struct{}          `json:"-"`
	XXX_unrecognized     []byte            `json:"-"`
	XXX_sizecache        int32             `json:"-"`
}

BindingEventResponse的定义

pkg/proto/runtime/v1/appcallback.pb.go:

// BindingEventResponse includes operations to save state or
// send data to output bindings optionally.
type BindingEventResponse struct {
	// The name of state store where states are saved.
	StoreName string `protobuf:"bytes,1,opt,name=store_name,json=storeName,proto3" json:"store_name,omitempty"`
	// The state key values which will be stored in store_name.
	States []*v1.StateItem `protobuf:"bytes,2,rep,name=states,proto3" json:"states,omitempty"`
	// The list of output bindings.
	To []string `protobuf:"bytes,3,rep,name=to,proto3" json:"to,omitempty"`
	// The content which will be sent to "to" output bindings.
	Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"`
	// The concurrency of output bindings to send data to
	// "to" output bindings list. The default is SEQUENTIAL.
	Concurrency          BindingEventResponse_BindingEventConcurrency `protobuf:"varint,5,opt,name=concurrency,proto3,enum=dapr.proto.runtime.v1.BindingEventResponse_BindingEventConcurrency" json:"concurrency,omitempty"`
	XXX_NoUnkeyedLiteral struct{}                                     `json:"-"`
	XXX_unrecognized     []byte                                       `json:"-"`
	XXX_sizecache        int32                                        `json:"-"`
}

备注:只是在proto定义的字段上增加了一些 XXX_ 字段。

资源绑定API的go client定义

DaprClient

/pkg/proto/runtime/v1/dapr.pb.go

// DaprClient is the client API for Dapr service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type DaprClient interface {
	// Invokes binding data to specific output bindings
	InvokeBinding(ctx context.Context, in *InvokeBindingRequest, opts ...grpc.CallOption) (*InvokeBindingResponse, error)
	......
}

DaprClient 的实现:

type daprClient struct {
	cc *grpc.ClientConn
}

func NewDaprClient(cc *grpc.ClientConn) DaprClient {
	return &daprClient{cc}
}

func (c *daprClient) InvokeBinding(ctx context.Context, in *InvokeBindingRequest, opts ...grpc.CallOption) (*InvokeBindingResponse, error) {
	out := new(InvokeBindingResponse)
  // 调用固定的grpc方法 `/dapr.proto.runtime.v1.Dapr/InvokeBinding`
	err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/InvokeBinding", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

AppCallbackClient

/pkg/proto/runtime/v1/appcallback.pb.go

// AppCallbackClient is the client API for AppCallback service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type AppCallbackClient interface {
  // Lists all input bindings subscribed by this app.
	ListInputBindings(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*ListInputBindingsResponse, error)
	// Listens events from the input bindings
	//
	// User application can save the states or send the events to the output
	// bindings optionally by returning BindingEventResponse.
	OnBindingEvent(ctx context.Context, in *BindingEventRequest, opts ...grpc.CallOption) (*BindingEventResponse, error)
	......
}

AppCallbackClient 的实现:

type appCallbackClient struct {
	cc *grpc.ClientConn
}

func NewAppCallbackClient(cc *grpc.ClientConn) AppCallbackClient {
	return &appCallbackClient{cc}
}

func (c *appCallbackClient) OnInvoke(ctx context.Context, in *v1.InvokeRequest, opts ...grpc.CallOption) (*v1.InvokeResponse, error) {
	out := new(v1.InvokeResponse)
	err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.AppCallback/OnInvoke", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

资源绑定API的go sdk

Output Binding

go sdk使用案例

https://github.com/dapr/go-sdk

与Service类似,Dapr客户端提供了两种方法来调用Dapr定义的绑定上的操作。Dapr支持输入、输出和双向绑定。

对于简单的,只输出的绑定。

in := &dapr.BindingInvocation{ Name: "binding-name", Operation: "operation-name" }
err = client.InvokeOutputBinding(ctx, in)

调用带有内容和元数据的方法:

in := &dapr.BindingInvocation{
    Name:      "binding-name",
    Operation: "operation-name",
    Data: []byte("hello"),
    Metadata: map[string]string{"k1": "v1", "k2": "v2"},
}

out, err := client.InvokeBinding(ctx, in)

go sdk提供的API

/client/invoke.go

go sdk在 client 上封装了 InvokeOutputBinding 方法用于发起 output binding 调用:

// InvokeOutputBinding invokes configured Dapr binding with data (allows nil).InvokeOutputBinding
// This method differs from InvokeBinding in that it doesn't expect any content being returned from the invoked method.
func (c *GRPCClient) InvokeOutputBinding(ctx context.Context, in *BindingInvocation) error {
	if _, err := c.InvokeBinding(ctx, in); err != nil {
		return errors.Wrap(err, "error invoking output binding")
	}
	return nil
}

InvokeServiceWithContent方法用来发现带数据的请求:

// InvokeBinding invokes specific operation on the configured Dapr binding.
// This method covers input, output, and bi-directional bindings.
func (c *GRPCClient) InvokeBinding(ctx context.Context, in *BindingInvocation) (out *BindingEvent, err error) {
	if in == nil {
		return nil, errors.New("binding invocation required")
	}
	if in.Name == "" {
		return nil, errors.New("binding invocation name required")
	}
	if in.Operation == "" {
		return nil, errors.New("binding invocation operation required")
	}

	req := &pb.InvokeBindingRequest{
		Name:      in.Name,
		Operation: in.Operation,
		Data:      in.Data,
		Metadata:  in.Metadata,
	}

	resp, err := c.protoClient.InvokeBinding(authContext(ctx), req)
	if err != nil {
		return nil, errors.Wrapf(err, "error invoking binding %s/%s", in.Name, in.Operation)
	}

	out = &BindingEvent{}

	if resp != nil {
		out.Data = resp.Data
		out.Metadata = resp.Metadata
	}

	return
}

BindingInvocation 的定义:

// BindingInvocation represents binding invocation request
type BindingInvocation struct {
	// Name is name of binding to invoke.
	Name string
	// Operation is the name of the operation type for the binding to invoke
	Operation string
	// Data is the input bindings sent
	Data []byte
	// Metadata is the input binding metadata
	Metadata map[string]string
}

和根据 proto 生成的 InvokeBindingRequest 是完全一样的,除了去除了生成的 state/sizeCache/unknownFields 等字段

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
今日签到
有新私信 私信列表
搜索