首页>教程>Dapr教程>Dapr状态管理API

Dapr状态管理API

状态管理API的Proto定义

State API的定义

State Management API 定义在 proto文件 dapr/proto/runtime/v1/dapr.proto 中:

service Dapr {
  // Gets the state for a specific key.
  rpc GetState(GetStateRequest) returns (GetStateResponse) {}

  // Gets a bulk of state items for a list of keys
  rpc GetBulkState(GetBulkStateRequest) returns (GetBulkStateResponse) {}

  // Saves the state for a specific key.
  rpc SaveState(SaveStateRequest) returns (google.protobuf.Empty) {}

  // Deletes the state for a specific key.
  rpc DeleteState(DeleteStateRequest) returns (google.protobuf.Empty) {}
  
  // Executes transactions for a specified store
  rpc ExecuteStateTransaction(ExecuteStateTransactionRequest) returns (google.protobuf.Empty) {}
  ...
}

另外的common.proto中定义了和state相关的消息和枚举:

// StateOptions configures concurrency and consistency for state operations
message StateOptions {
  // Enum describing the supported concurrency for state.
  enum StateConcurrency {
    CONCURRENCY_UNSPECIFIED = 0;
    CONCURRENCY_FIRST_WRITE = 1;
    CONCURRENCY_LAST_WRITE = 2;
  }

  // Enum describing the supported consistency for state.
  enum StateConsistency {
    CONSISTENCY_UNSPECIFIED = 0;
    CONSISTENCY_EVENTUAL = 1;
    CONSISTENCY_STRONG = 2;
  }

  StateConcurrency concurrency = 1;
  StateConsistency consistency = 2;
}

get state

GetStateRequest 包含store_name/key,还有并发要求和请求级别的metadata:

// GetStateRequest is the message to get key-value states from specific state store.
message GetStateRequest {
  // The name of state store.
  string store_name = 1;

  // The key of the desired state
  string key = 2;

  // The read consistency of the state store.
  common.v1.StateOptions.StateConsistency consistency = 3;

  // The metadata which will be sent to state store components.
  map<string,string> metadata = 4;
}

GetStateResponse 包含byte[] 形式的 state 数据 data,和特殊表示数据特定版本的etag:

// GetStateResponse is the response conveying the state value and etag.
message GetStateResponse {
  // The byte array data
  bytes data = 1;

  // The entity tag which represents the specific version of data.
  // ETag format is defined by the corresponding data store.
  string etag = 2;
}

Get Bulk State

GetBulkStateRequest 是批量接口,一次性获取多个key的数据:

// GetBulkStateRequest is the message to get a list of key-value states from specific state store.
message GetBulkStateRequest {
  // The name of state store.
  string store_name = 1;

  // The keys to get.
  repeated string keys = 2;

  // The number of parallel operations executed on the state store for a get operation.
  // 在状态存储上用于get操作的并行操作执行的数量:也就是并发数,同时执行的请求数量
  int32 parallelism = 3;

  // The metadata which will be sent to state store components.
  // 请求级别,意味着所有的key都是使用同样的metadata
  map<string,string> metadata = 4;
}

GetBulkStateResponse:

// GetBulkStateResponse is the response conveying the list of state values.
message GetBulkStateResponse {
  // The list of items containing the keys to get values for.
  // 为啥不用map?
  repeated BulkStateItem items = 1;
}

// BulkStateItem is the response item for a bulk get operation.
// Return values include the item key, data and etag.
message BulkStateItem {
  // state item key
  string key = 1;

  // The byte array data
  bytes data = 2;

  // The entity tag which represents the specific version of data.
  // ETag format is defined by the corresponding data store.
  string etag = 3;

  // The error that was returned from the state store in case of a failed get operation.
  // 这里考虑了出错的可能,有机会给出错误信息
  // 但是,单个get state 操作怎么没有定义错误信息?
  // 只能在http/grpc协议层上报错?TODO:看看代码实现
  string error = 4;
}

Save State

SaveStateRequest 支持多个状态的保存:

// SaveStateRequest is the message to save multiple states into state store.
message SaveStateRequest {
  // The name of state store.
  string store_name = 1;

  // The array of the state key values.
  repeated common.v1.StateItem states = 2;
}

// StateItem represents state key, value, and additional options to save state.
message StateItem {
  // Required. The state key
  string key = 1;

  // Required. The state data for key
  bytes value = 2;

  // The entity tag which represents the specific version of data.
  // The exact ETag format is defined by the corresponding data store.
  string etag = 3;

  // The metadata which will be passed to state store component.
  map<string,string> metadata = 4;

  // Options for concurrency and consistency to save the state.
  StateOptions options = 5;
}

response为 google.protobuf.Empty。

Delete State

DeleteStateRequest:

// DeleteStateRequest is the message to delete key-value states in the specific state store.
message DeleteStateRequest {
  // The name of state store.
  string store_name = 1;

  // The key of the desired state
  string key = 2;

  // The entity tag which represents the specific version of data.
  // The exact ETag format is defined by the corresponding data store.
  string etag = 3;

  // State operation options which includes concurrency/
  // consistency/retry_policy.
  common.v1.StateOptions options = 4;

  // The metadata which will be sent to state store components.
  map<string,string> metadata = 5;
}

response为 google.protobuf.Empty。

Execute State Transaction

ExecuteStateTransactionRequest

// ExecuteStateTransactionRequest is the message to execute multiple operations on a specified store.
message ExecuteStateTransactionRequest {
  // Required. name of state store.
  string storeName = 1;

  // Required. transactional operation list.
  repeated TransactionalStateOperation operations = 2;

  // The metadata used for transactional operations.
  map<string,string> metadata = 3;
}

// TransactionalStateOperation is the message to execute a specified operation with a key-value pair.
message TransactionalStateOperation {
  // The type of operation to be executed
  // 具体有哪些操作?
  string operationType = 1;

  // State values to be operated on 
  common.v1.StateItem request = 2;
}

response为 google.protobuf.Empty。

状态管理API的golang生成代码

从proto api定义文件生成的golang代码,被存放在dapr项目的 pkg/proto/ 目录下。

grpc服务定义

DaprServer 是 dapr 服务的服务器端API定义,包含多个 state 相关的方法:

// DaprServer is the server API for Dapr service.
type DaprServer interface {
	// Gets the state for a specific key.
	GetState(context.Context, *GetStateRequest) (*GetStateResponse, error)
	// Gets a bulk of state items for a list of keys
	GetBulkState(context.Context, *GetBulkStateRequest) (*GetBulkStateResponse, error)
	// Saves the state for a specific key.
	SaveState(context.Context, *SaveStateRequest) (*empty.Empty, error)
	// Deletes the state for a specific key.
	DeleteState(context.Context, *DeleteStateRequest) (*empty.Empty, error)
	// Executes transactions for a specified store
	ExecuteStateTransaction(context.Context, *ExecuteStateTransactionRequest) (*empty.Empty, error)
   ......
}

状态管理API的go client定义

DaprClient

https://github.com/dapr/dapr/blob/11741c6cd697e08b2e776943e61bb2e3388c85a8/pkg/proto/runtime/v1/dapr.pb.go

这是根据proto生成的go代码

type DaprClient interface {
	// Gets the state for a specific key.
	GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error)
	// Gets a bulk of state items for a list of keys
	GetBulkState(ctx context.Context, in *GetBulkStateRequest, opts ...grpc.CallOption) (*GetBulkStateResponse, error)
	// Saves the state for a specific key.
	SaveState(ctx context.Context, in *SaveStateRequest, opts ...grpc.CallOption) (*empty.Empty, error)
	// Deletes the state for a specific key.
	DeleteState(ctx context.Context, in *DeleteStateRequest, opts ...grpc.CallOption) (*empty.Empty, error)
	// Executes transactions for a specified store
	ExecuteStateTransaction(ctx context.Context, in *ExecuteStateTransactionRequest, opts ...grpc.CallOption) (*empty.Empty, error)
	......
}

Get State

以 Get State 为例看 DaprClient 的实现:

func (c *daprClient) GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error) {
	out := new(GetStateResponse)
	err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/GetState", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

只是简单调用远程方法。

状态管理API的go sdk封装

go sdk使用案例

https://github.com/dapr/go-sdk

对于简单场景,只要给出 store name / key / data 就好了:

ctx := context.Background()
data := []byte("hello")
store := "my-store" // defined in the component YAML 

// save state with the key key1
if err := client.SaveState(ctx, store, "key1", data); err != nil {
    panic(err)
}

// get state for key key1
item, err := client.GetState(ctx, store, "key1")
if err != nil {
    panic(err)
}
fmt.Printf("data [key:%s etag:%s]: %s", item.Key, item.Etag, string(item.Value))

// delete state for key key1
if err := client.DeleteState(ctx, store, "key1"); err != nil {
    panic(err)
}

get state

简单get方法,使用默认的并发选项:

// GetState retreaves state from specific store using default consistency option.
func (c *GRPCClient) GetState(ctx context.Context, store, key string) (item *StateItem, err error) {
	return c.GetStateWithConsistency(ctx, store, key, StateConsistencyStrong)
}

但,默认并发选项是 StateConsistencyStrong,强一致性。

完整的get 方法:

// GetStateWithConsistency retreaves state from specific store using provided state consistency.
func (c *GRPCClient) GetStateWithConsistency(ctx context.Context, store, key string, sc StateConsistency) (item *StateItem, err error) {
	if store == "" {
		return nil, errors.New("nil store")
	}
	if key == "" {
		return nil, errors.New("nil key")
	}

	req := &pb.GetStateRequest{
		StoreName:   store,
		Key:         key,
		Consistency: (v1.StateOptions_StateConsistency(sc)),
	}

	result, err := c.protoClient.GetState(authContext(ctx), req)
	if err != nil {
		return nil, errors.Wrap(err, "error getting state")
	}

	return &StateItem{
		Etag:  result.Etag,
		Key:   key,
		Value: result.Data,
	}, nil
}

基本上也没做什么。

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
今日签到
有新私信 私信列表
搜索