Browse Source

v0.0.1开发:调整rpcbff为flowsrv

#Suyghur 2 years ago
parent
commit
36702ff2c5

+ 2 - 2
bff/authbff/api/etc/authbff.yaml

@@ -5,12 +5,12 @@ Port: 10000
 #链路追踪
 Telemetry:
   Name: authbff-api
-  Endpoint: http://127.0.0.1:14268/api/traces
+  Endpoint: http://ylink-jaeger-trace.ylink.svc.cluster.local:14268/api/traces
   Sampler: 1.0
   Batcher: jaeger
 
 AuthRpc:
   Etcd:
     Hosts:
-      - 127.0.0.1:2379
+      - ylink-etcd.ylink.svc.cluster.local:2379
     Key: auth.rpc

+ 0 - 30
bff/rpcbff/rpc/Dockerfile

@@ -1,30 +0,0 @@
-FROM golang:alpine AS builder
-
-LABEL stage=gobuilder
-
-ENV CGO_ENABLED 0
-ENV GOPROXY https://goproxy.cn,direct
-
-RUN apk update --no-cache && apk add --no-cache tzdata
-
-WORKDIR /build
-
-ADD go.mod .
-ADD go.sum .
-RUN go mod download
-COPY . .
-COPY bff/rpcbff/rpc/etc /app/etc
-RUN go build -ldflags="-s -w" -o /app/rpcbff bff/rpcbff/rpc/rpcbff.go
-
-
-FROM alpine
-
-RUN apk update --no-cache && apk add --no-cache ca-certificates
-COPY --from=builder /usr/share/zoneinfo/Asia/Shanghai /usr/share/zoneinfo/Asia/Shanghai
-ENV TZ Asia/Shanghai
-
-WORKDIR /app
-COPY --from=builder /app/rpcbff /app/rpcbff
-COPY --from=builder /app/etc /app/etc
-
-CMD ["./rpcbff", "-f", "etc/rpcbff.yaml"]

+ 0 - 54
bff/rpcbff/rpc/internal/ext/consumerhandler.go

@@ -1,54 +0,0 @@
-//@File     consumerhandler.go
-//@Time     2022/05/07
-//@Author   #Suyghur,
-
-package ext
-
-import (
-	"context"
-	"github.com/Shopify/sarama"
-	"github.com/zeromicro/go-zero/core/logx"
-	"ylink/ext/kafka"
-)
-
-type callback func(msg []byte)
-
-type ConsumerHandler struct {
-	Callbacks     map[string]callback
-	ConsumerGroup *kafka.ConsumerGroup
-}
-
-func (handler *ConsumerHandler) Init(config kafka.KqConfig) {
-	handler.Callbacks = make(map[string]callback)
-	handler.Callbacks[config.Topic] = handler.handleMessage
-
-	consumerGroupConfig := kafka.ConsumerGroupConfig{
-		KafkaVersion:   sarama.V2_8_0_0,
-		OffsetsInitial: sarama.OffsetNewest,
-		IsReturnErr:    false,
-	}
-	logx.WithContext(context.Background()).Infof("brokers: %v", config.Brokers)
-	logx.WithContext(context.Background()).Infof("group id: %s", config.GroupId)
-	handler.ConsumerGroup = kafka.NewConsumerGroup(&consumerGroupConfig, config.Brokers, []string{config.Topic}, config.GroupId)
-}
-
-func (handler *ConsumerHandler) handleMessage(msg []byte) {
-	logx.WithContext(context.Background()).Infof("handle message from kafka: %s", string(msg))
-	//msgFromMq:=
-}
-
-func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
-	return nil
-}
-
-func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
-	return nil
-}
-
-func (handler *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
-	for msg := range claim.Messages() {
-		logx.WithContext(context.Background()).Infof("kafka get info to mysql, topic: %s, partition: %d, msg: %s", msg.Topic, msg.Partition, string(msg.Value))
-		handler.Callbacks[msg.Topic](msg.Value)
-	}
-	return nil
-}

+ 0 - 33
bff/rpcbff/rpc/internal/server/rpcbffserver.go

@@ -1,33 +0,0 @@
-// Code generated by goctl. DO NOT EDIT!
-// Source: rpcbff.proto
-
-package server
-
-import (
-	"context"
-
-	"ylink/bff/rpcbff/rpc/internal/logic"
-	"ylink/bff/rpcbff/rpc/internal/svc"
-	"ylink/bff/rpcbff/rpc/pb"
-)
-
-type RpcbffServer struct {
-	svcCtx *svc.ServiceContext
-	pb.UnimplementedRpcbffServer
-}
-
-func NewRpcbffServer(svcCtx *svc.ServiceContext) *RpcbffServer {
-	return &RpcbffServer{
-		svcCtx: svcCtx,
-	}
-}
-
-func (s *RpcbffServer) Connect(in *pb.CommandReq, stream pb.Rpcbff_ConnectServer) error {
-	l := logic.NewConnectLogic(stream.Context(), s.svcCtx)
-	return l.Connect(in, stream)
-}
-
-func (s *RpcbffServer) Disconnect(ctx context.Context, in *pb.CommandReq) (*pb.CommandResp, error) {
-	l := logic.NewDisconnectLogic(ctx, s.svcCtx)
-	return l.Disconnect(in)
-}

+ 0 - 169
bff/rpcbff/rpc/pb/rpcbff_grpc.pb.go

@@ -1,169 +0,0 @@
-// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
-// versions:
-// - protoc-gen-go-grpc v1.2.0
-// - protoc             v3.19.4
-// source: pb/rpcbff.proto
-
-package pb
-
-import (
-	context "context"
-	grpc "google.golang.org/grpc"
-	codes "google.golang.org/grpc/codes"
-	status "google.golang.org/grpc/status"
-)
-
-// This is a compile-time assertion to ensure that this generated file
-// is compatible with the grpc package it is being compiled against.
-// Requires gRPC-Go v1.32.0 or later.
-const _ = grpc.SupportPackageIsVersion7
-
-// RpcbffClient is the client API for Rpcbff service.
-//
-// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
-type RpcbffClient interface {
-	Connect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (Rpcbff_ConnectClient, error)
-	Disconnect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (*CommandResp, error)
-}
-
-type rpcbffClient struct {
-	cc grpc.ClientConnInterface
-}
-
-func NewRpcbffClient(cc grpc.ClientConnInterface) RpcbffClient {
-	return &rpcbffClient{cc}
-}
-
-func (c *rpcbffClient) Connect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (Rpcbff_ConnectClient, error) {
-	stream, err := c.cc.NewStream(ctx, &Rpcbff_ServiceDesc.Streams[0], "/pb.Rpcbff/connect", opts...)
-	if err != nil {
-		return nil, err
-	}
-	x := &rpcbffConnectClient{stream}
-	if err := x.ClientStream.SendMsg(in); err != nil {
-		return nil, err
-	}
-	if err := x.ClientStream.CloseSend(); err != nil {
-		return nil, err
-	}
-	return x, nil
-}
-
-type Rpcbff_ConnectClient interface {
-	Recv() (*CommandResp, error)
-	grpc.ClientStream
-}
-
-type rpcbffConnectClient struct {
-	grpc.ClientStream
-}
-
-func (x *rpcbffConnectClient) Recv() (*CommandResp, error) {
-	m := new(CommandResp)
-	if err := x.ClientStream.RecvMsg(m); err != nil {
-		return nil, err
-	}
-	return m, nil
-}
-
-func (c *rpcbffClient) Disconnect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (*CommandResp, error) {
-	out := new(CommandResp)
-	err := c.cc.Invoke(ctx, "/pb.Rpcbff/disconnect", in, out, opts...)
-	if err != nil {
-		return nil, err
-	}
-	return out, nil
-}
-
-// RpcbffServer is the server API for Rpcbff service.
-// All implementations must embed UnimplementedRpcbffServer
-// for forward compatibility
-type RpcbffServer interface {
-	Connect(*CommandReq, Rpcbff_ConnectServer) error
-	Disconnect(context.Context, *CommandReq) (*CommandResp, error)
-	mustEmbedUnimplementedRpcbffServer()
-}
-
-// UnimplementedRpcbffServer must be embedded to have forward compatible implementations.
-type UnimplementedRpcbffServer struct {
-}
-
-func (UnimplementedRpcbffServer) Connect(*CommandReq, Rpcbff_ConnectServer) error {
-	return status.Errorf(codes.Unimplemented, "method Connect not implemented")
-}
-func (UnimplementedRpcbffServer) Disconnect(context.Context, *CommandReq) (*CommandResp, error) {
-	return nil, status.Errorf(codes.Unimplemented, "method Disconnect not implemented")
-}
-func (UnimplementedRpcbffServer) mustEmbedUnimplementedRpcbffServer() {}
-
-// UnsafeRpcbffServer may be embedded to opt out of forward compatibility for this service.
-// Use of this interface is not recommended, as added methods to RpcbffServer will
-// result in compilation errors.
-type UnsafeRpcbffServer interface {
-	mustEmbedUnimplementedRpcbffServer()
-}
-
-func RegisterRpcbffServer(s grpc.ServiceRegistrar, srv RpcbffServer) {
-	s.RegisterService(&Rpcbff_ServiceDesc, srv)
-}
-
-func _Rpcbff_Connect_Handler(srv interface{}, stream grpc.ServerStream) error {
-	m := new(CommandReq)
-	if err := stream.RecvMsg(m); err != nil {
-		return err
-	}
-	return srv.(RpcbffServer).Connect(m, &rpcbffConnectServer{stream})
-}
-
-type Rpcbff_ConnectServer interface {
-	Send(*CommandResp) error
-	grpc.ServerStream
-}
-
-type rpcbffConnectServer struct {
-	grpc.ServerStream
-}
-
-func (x *rpcbffConnectServer) Send(m *CommandResp) error {
-	return x.ServerStream.SendMsg(m)
-}
-
-func _Rpcbff_Disconnect_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
-	in := new(CommandReq)
-	if err := dec(in); err != nil {
-		return nil, err
-	}
-	if interceptor == nil {
-		return srv.(RpcbffServer).Disconnect(ctx, in)
-	}
-	info := &grpc.UnaryServerInfo{
-		Server:     srv,
-		FullMethod: "/pb.Rpcbff/disconnect",
-	}
-	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
-		return srv.(RpcbffServer).Disconnect(ctx, req.(*CommandReq))
-	}
-	return interceptor(ctx, in, info, handler)
-}
-
-// Rpcbff_ServiceDesc is the grpc.ServiceDesc for Rpcbff service.
-// It's only intended for direct use with grpc.RegisterService,
-// and not to be introspected or modified (even as a copy)
-var Rpcbff_ServiceDesc = grpc.ServiceDesc{
-	ServiceName: "pb.Rpcbff",
-	HandlerType: (*RpcbffServer)(nil),
-	Methods: []grpc.MethodDesc{
-		{
-			MethodName: "disconnect",
-			Handler:    _Rpcbff_Disconnect_Handler,
-		},
-	},
-	Streams: []grpc.StreamDesc{
-		{
-			StreamName:    "connect",
-			Handler:       _Rpcbff_Connect_Handler,
-			ServerStreams: true,
-		},
-	},
-	Metadata: "pb/rpcbff.proto",
-}

+ 0 - 43
bff/rpcbff/rpc/rpcbff/rpcbff.go

@@ -1,43 +0,0 @@
-// Code generated by goctl. DO NOT EDIT!
-// Source: rpcbff.proto
-
-package rpcbff
-
-import (
-	"context"
-
-	"ylink/bff/rpcbff/rpc/pb"
-
-	"github.com/zeromicro/go-zero/zrpc"
-	"google.golang.org/grpc"
-)
-
-type (
-	CommandReq  = pb.CommandReq
-	CommandResp = pb.CommandResp
-
-	Rpcbff interface {
-		Connect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (pb.Rpcbff_ConnectClient, error)
-		Disconnect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (*CommandResp, error)
-	}
-
-	defaultRpcbff struct {
-		cli zrpc.Client
-	}
-)
-
-func NewRpcbff(cli zrpc.Client) Rpcbff {
-	return &defaultRpcbff{
-		cli: cli,
-	}
-}
-
-func (m *defaultRpcbff) Connect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (pb.Rpcbff_ConnectClient, error) {
-	client := pb.NewRpcbffClient(m.cli.Conn())
-	return client.Connect(ctx, in, opts...)
-}
-
-func (m *defaultRpcbff) Disconnect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (*CommandResp, error) {
-	client := pb.NewRpcbffClient(m.cli.Conn())
-	return client.Disconnect(ctx, in, opts...)
-}

+ 2 - 2
core/auth/rpc/etc/auth.yaml

@@ -3,13 +3,13 @@ ListenOn: 0.0.0.0:10400
 
 Etcd:
   Hosts:
-    - 127.0.0.1:2379
+    - ylink-etcd.ylink.svc.cluster.local:2379
   Key: auth.rpc
 
 
 Telemetry:
   Name: auth-rpc
-  Endpoint: http://127.0.0.1:14268/api/traces
+  Endpoint: http://ylink-jaeger-trace.ylink.svc.cluster.local:14268/api/traces
   Sampler: 1.0
   Batcher: jaeger
 

+ 6 - 1
core/cmd/rpc/etc/cmd.yaml

@@ -16,4 +16,9 @@ KqChatMsgConf:
   Brokers:
     - 127.0.0.1:9092
   Topic: chat-msg-topic
-  GroupId:
+  GroupId:
+
+Redis:
+  Host: redis:6379
+  Type: node
+  Pass: ylink

+ 4 - 18
core/cmd/rpc/internal/logic/playersendmsglogic.go

@@ -6,6 +6,7 @@ import (
 	"time"
 	"ylink/core/cmd/rpc/internal/svc"
 	"ylink/core/cmd/rpc/pb"
+	"ylink/ext/model"
 
 	"github.com/zeromicro/go-zero/core/logx"
 )
@@ -16,14 +17,6 @@ type PlayerSendMsgLogic struct {
 	logx.Logger
 }
 
-type message struct {
-	CreateTime string `json:"create_time"`
-	Content    string `json:"content"`
-	Pic        string `json:"pic"`
-	ReceiverId string `json:"receiver_id"`
-	SenderId   string `json:"sender_id"`
-}
-
 func NewPlayerSendMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PlayerSendMsgLogic {
 	return &PlayerSendMsgLogic{
 		ctx:    ctx,
@@ -33,24 +26,17 @@ func NewPlayerSendMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Pla
 }
 
 func (l *PlayerSendMsgLogic) PlayerSendMsg(in *pb.PlayerSendMsgReq) (*pb.PlayerSendMsgResp, error) {
-	// todo 投递到对应客服的收件箱
-	// todo 写入db
-	msg, _ := json.Marshal(message{
+	// 投递到自己的发件箱
+	msg, _ := json.Marshal(model.ChatMessage{
 		CreateTime: time.Now().Format("2006-01-02 15:04:05"),
 		Content:    in.Content,
 		Pic:        in.Pic,
 		ReceiverId: "",
 		SenderId:   in.PlayerId,
 	})
-
-	//if err := l.svcCtx.ChatMsgProducerClient.Push(string(msg)); err != nil {
-	//	return nil, err
-	//}
-	pid, offset, err := l.svcCtx.ChatMsgProducer.SendMessage(string(msg), in.PlayerId)
+	_, _, err := l.svcCtx.ChatMsgProducer.SendMessage(string(msg), in.PlayerId)
 	if err != nil {
 		return nil, err
 	}
-	l.Logger.Infof("pid: %d", pid)
-	l.Logger.Infof("offset: %d", offset)
 	return &pb.PlayerSendMsgResp{}, nil
 }

+ 6 - 0
core/cmd/rpc/internal/svc/servicecontext.go

@@ -1,6 +1,7 @@
 package svc
 
 import (
+	"github.com/zeromicro/go-zero/core/stores/redis"
 	"ylink/core/cmd/rpc/internal/config"
 	"ylink/ext/kafka"
 )
@@ -8,11 +9,16 @@ import (
 type ServiceContext struct {
 	Config          config.Config
 	ChatMsgProducer *kafka.Producer
+	RedisClient     *redis.Redis
 }
 
 func NewServiceContext(c config.Config) *ServiceContext {
 	return &ServiceContext{
 		Config:          c,
 		ChatMsgProducer: kafka.NewKafkaProducer(c.KqChatMsgConf.Brokers, c.KqChatMsgConf.Topic),
+		RedisClient: redis.New(c.Redis.Host, func(r *redis.Redis) {
+			r.Type = c.Redis.Type
+			r.Pass = c.Redis.Pass
+		}),
 	}
 }

+ 17 - 0
docker-compose-env.yml

@@ -144,6 +144,23 @@ services:
     networks:
       - ylink_net
 
+  #redis容器
+  redis:
+    image: redis:6.2.5
+    container_name: redis
+    ports:
+      - "6379:6379"
+    environment:
+      - "TZ=Asia/Shanghai"
+    volumes:
+      # 数据文件
+      - "/data/docker_mount/redis/data:/data:rw"
+    command: "redis-server --requirepass ylink  --appendonly yes"
+    privileged: true
+    restart: always
+    networks:
+      - ylink_net
+
   etcd:
     hostname: etcd
     image: bitnami/etcd:3

+ 13 - 0
ext/model/message.go

@@ -0,0 +1,13 @@
+//@File     message.go
+//@Time     2022/05/10
+//@Author   #Suyghur,
+
+package model
+
+type ChatMessage struct {
+	CreateTime string `json:"create_time"`
+	Content    string `json:"content"`
+	Pic        string `json:"pic"`
+	ReceiverId string `json:"receiver_id"`
+	SenderId   string `json:"sender_id"`
+}

+ 7 - 7
bff/rpcbff/rpc/etc/rpcbff.yaml → flowsrv/rpc/etc/flowsrv.yaml

@@ -1,14 +1,14 @@
-Name: rpcbff.rpc
+Name: flowsrv.rpc
 ListenOn: 0.0.0.0:10200
 
 Etcd:
   Hosts:
     - 127.0.0.1:2379
-  Key: rpcbff.rpc
+  Key: flowsrv.rpc
 
 Telemetry:
-  Name: rpcbff-rpc
-  Endpoint: http://127.0.0.1:14268/api/traces
+  Name: flowsrv-rpc
+  Endpoint: http://ylink-jaeger-trace.ylink.svc.cluster.local:14268/api/traces
   Sampler: 1.0
   Batcher: jaeger
 
@@ -18,8 +18,8 @@ AuthRpcConf:
       - 127.0.0.1:2379
     Key: auth.rpc
 
-KqChatMsgConf:
+KqMsgConf:
   Brokers:
     - 127.0.0.1:9092
-  Topic: chat-msg-topic
-  GroupId: rpcbff
+  Topic: recv-box-topic
+  GroupId: flowsrv

+ 7 - 7
bff/rpcbff/rpc/rpcbff.go → flowsrv/rpc/flowsrv.go

@@ -4,10 +4,10 @@ import (
 	"flag"
 	"fmt"
 
-	"ylink/bff/rpcbff/rpc/internal/config"
-	"ylink/bff/rpcbff/rpc/internal/server"
-	"ylink/bff/rpcbff/rpc/internal/svc"
-	"ylink/bff/rpcbff/rpc/pb"
+	"ylink/flowsrv/rpc/internal/config"
+	"ylink/flowsrv/rpc/internal/server"
+	"ylink/flowsrv/rpc/internal/svc"
+	"ylink/flowsrv/rpc/pb"
 
 	"github.com/zeromicro/go-zero/core/conf"
 	"github.com/zeromicro/go-zero/core/service"
@@ -16,7 +16,7 @@ import (
 	"google.golang.org/grpc/reflection"
 )
 
-var configFile = flag.String("f", "etc/rpcbff.yaml", "the config file")
+var configFile = flag.String("f", "etc/flowsrv.yaml", "the config file")
 
 func main() {
 	flag.Parse()
@@ -24,10 +24,10 @@ func main() {
 	var c config.Config
 	conf.MustLoad(*configFile, &c)
 	ctx := svc.NewServiceContext(c)
-	svr := server.NewRpcbffServer(ctx)
+	svr := server.NewFlowsrvServer(ctx)
 
 	s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
-		pb.RegisterRpcbffServer(grpcServer, svr)
+		pb.RegisterFlowsrvServer(grpcServer, svr)
 
 		if c.Mode == service.DevMode || c.Mode == service.TestMode {
 			reflection.Register(grpcServer)

+ 43 - 0
flowsrv/rpc/flowsrv/flowsrv.go

@@ -0,0 +1,43 @@
+// Code generated by goctl. DO NOT EDIT!
+// Source: flowsrv.proto
+
+package flowsrv
+
+import (
+	"context"
+
+	"ylink/flowsrv/rpc/pb"
+
+	"github.com/zeromicro/go-zero/zrpc"
+	"google.golang.org/grpc"
+)
+
+type (
+	CommandReq  = pb.CommandReq
+	CommandResp = pb.CommandResp
+
+	Flowsrv interface {
+		Connect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (pb.Flowsrv_ConnectClient, error)
+		Disconnect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (*CommandResp, error)
+	}
+
+	defaultFlowsrv struct {
+		cli zrpc.Client
+	}
+)
+
+func NewFlowsrv(cli zrpc.Client) Flowsrv {
+	return &defaultFlowsrv{
+		cli: cli,
+	}
+}
+
+func (m *defaultFlowsrv) Connect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (pb.Flowsrv_ConnectClient, error) {
+	client := pb.NewFlowsrvClient(m.cli.Conn())
+	return client.Connect(ctx, in, opts...)
+}
+
+func (m *defaultFlowsrv) Disconnect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (*CommandResp, error) {
+	client := pb.NewFlowsrvClient(m.cli.Conn())
+	return client.Disconnect(ctx, in, opts...)
+}

+ 2 - 2
bff/rpcbff/rpc/internal/config/config.go → flowsrv/rpc/internal/config/config.go

@@ -7,6 +7,6 @@ import (
 
 type Config struct {
 	zrpc.RpcServerConf
-	AuthRpcConf   zrpc.RpcClientConf
-	KqChatMsgConf kafka.KqConfig
+	AuthRpcConf zrpc.RpcClientConf
+	KqMsgConf   kafka.KqConfig
 }

+ 3 - 3
bff/rpcbff/rpc/internal/logic/connectlogic.go → flowsrv/rpc/internal/logic/connectlogic.go

@@ -5,8 +5,8 @@ import (
 	"ylink/core/auth/rpc/auth"
 	"ylink/ext/result"
 
-	"ylink/bff/rpcbff/rpc/internal/svc"
-	"ylink/bff/rpcbff/rpc/pb"
+	"ylink/flowsrv/rpc/internal/svc"
+	"ylink/flowsrv/rpc/pb"
 
 	"github.com/zeromicro/go-zero/core/logx"
 )
@@ -25,7 +25,7 @@ func NewConnectLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ConnectLo
 	}
 }
 
-func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Rpcbff_ConnectServer) error {
+func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServer) error {
 	_, err := l.svcCtx.AuthRpc.CheckAuth(l.ctx, &auth.CheckAuthReq{
 		AccessToken: in.AccessToken,
 	})

+ 2 - 2
bff/rpcbff/rpc/internal/logic/disconnectlogic.go → flowsrv/rpc/internal/logic/disconnectlogic.go

@@ -5,8 +5,8 @@ import (
 	"ylink/core/auth/rpc/auth"
 	"ylink/ext/result"
 
-	"ylink/bff/rpcbff/rpc/internal/svc"
-	"ylink/bff/rpcbff/rpc/pb"
+	"ylink/flowsrv/rpc/internal/svc"
+	"ylink/flowsrv/rpc/pb"
 
 	"github.com/zeromicro/go-zero/core/logx"
 )

+ 33 - 0
flowsrv/rpc/internal/server/flowsrvserver.go

@@ -0,0 +1,33 @@
+// Code generated by goctl. DO NOT EDIT!
+// Source: flowsrv.proto
+
+package server
+
+import (
+	"context"
+
+	"ylink/flowsrv/rpc/internal/logic"
+	"ylink/flowsrv/rpc/internal/svc"
+	"ylink/flowsrv/rpc/pb"
+)
+
+type FlowsrvServer struct {
+	svcCtx *svc.ServiceContext
+	pb.UnimplementedFlowsrvServer
+}
+
+func NewFlowsrvServer(svcCtx *svc.ServiceContext) *FlowsrvServer {
+	return &FlowsrvServer{
+		svcCtx: svcCtx,
+	}
+}
+
+func (s *FlowsrvServer) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServer) error {
+	l := logic.NewConnectLogic(stream.Context(), s.svcCtx)
+	return l.Connect(in, stream)
+}
+
+func (s *FlowsrvServer) Disconnect(ctx context.Context, in *pb.CommandReq) (*pb.CommandResp, error) {
+	l := logic.NewDisconnectLogic(ctx, s.svcCtx)
+	return l.Disconnect(in)
+}

+ 1 - 5
bff/rpcbff/rpc/internal/svc/servicecontext.go → flowsrv/rpc/internal/svc/servicecontext.go

@@ -2,9 +2,8 @@ package svc
 
 import (
 	"github.com/zeromicro/go-zero/zrpc"
-	"ylink/bff/rpcbff/rpc/internal/config"
-	"ylink/bff/rpcbff/rpc/internal/ext"
 	"ylink/core/auth/rpc/auth"
+	"ylink/flowsrv/rpc/internal/config"
 )
 
 type ServiceContext struct {
@@ -13,9 +12,6 @@ type ServiceContext struct {
 }
 
 func NewServiceContext(c config.Config) *ServiceContext {
-	consumerHandler := ext.ConsumerHandler{}
-	consumerHandler.Init(c.KqChatMsgConf)
-	go consumerHandler.ConsumerGroup.RegisterHandleAndConsumer(&consumerHandler)
 	return &ServiceContext{
 		Config:  c,
 		AuthRpc: auth.NewAuth(zrpc.MustNewClient(c.AuthRpcConf)),

+ 66 - 66
bff/rpcbff/rpc/pb/rpcbff.pb.go → flowsrv/rpc/pb/flowsrv.pb.go

@@ -2,7 +2,7 @@
 // versions:
 // 	protoc-gen-go v1.28.0
 // 	protoc        v3.19.4
-// source: pb/rpcbff.proto
+// source: pb/flowsrv.proto
 
 package pb
 
@@ -51,11 +51,11 @@ func (x ConnectType) String() string {
 }
 
 func (ConnectType) Descriptor() protoreflect.EnumDescriptor {
-	return file_pb_rpcbff_proto_enumTypes[0].Descriptor()
+	return file_pb_flowsrv_proto_enumTypes[0].Descriptor()
 }
 
 func (ConnectType) Type() protoreflect.EnumType {
-	return &file_pb_rpcbff_proto_enumTypes[0]
+	return &file_pb_flowsrv_proto_enumTypes[0]
 }
 
 func (x ConnectType) Number() protoreflect.EnumNumber {
@@ -64,7 +64,7 @@ func (x ConnectType) Number() protoreflect.EnumNumber {
 
 // Deprecated: Use ConnectType.Descriptor instead.
 func (ConnectType) EnumDescriptor() ([]byte, []int) {
-	return file_pb_rpcbff_proto_rawDescGZIP(), []int{0}
+	return file_pb_flowsrv_proto_rawDescGZIP(), []int{0}
 }
 
 type CommandReq struct {
@@ -79,7 +79,7 @@ type CommandReq struct {
 func (x *CommandReq) Reset() {
 	*x = CommandReq{}
 	if protoimpl.UnsafeEnabled {
-		mi := &file_pb_rpcbff_proto_msgTypes[0]
+		mi := &file_pb_flowsrv_proto_msgTypes[0]
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		ms.StoreMessageInfo(mi)
 	}
@@ -92,7 +92,7 @@ func (x *CommandReq) String() string {
 func (*CommandReq) ProtoMessage() {}
 
 func (x *CommandReq) ProtoReflect() protoreflect.Message {
-	mi := &file_pb_rpcbff_proto_msgTypes[0]
+	mi := &file_pb_flowsrv_proto_msgTypes[0]
 	if protoimpl.UnsafeEnabled && x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -105,7 +105,7 @@ func (x *CommandReq) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use CommandReq.ProtoReflect.Descriptor instead.
 func (*CommandReq) Descriptor() ([]byte, []int) {
-	return file_pb_rpcbff_proto_rawDescGZIP(), []int{0}
+	return file_pb_flowsrv_proto_rawDescGZIP(), []int{0}
 }
 
 func (x *CommandReq) GetType() ConnectType {
@@ -135,7 +135,7 @@ type CommandResp struct {
 func (x *CommandResp) Reset() {
 	*x = CommandResp{}
 	if protoimpl.UnsafeEnabled {
-		mi := &file_pb_rpcbff_proto_msgTypes[1]
+		mi := &file_pb_flowsrv_proto_msgTypes[1]
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		ms.StoreMessageInfo(mi)
 	}
@@ -148,7 +148,7 @@ func (x *CommandResp) String() string {
 func (*CommandResp) ProtoMessage() {}
 
 func (x *CommandResp) ProtoReflect() protoreflect.Message {
-	mi := &file_pb_rpcbff_proto_msgTypes[1]
+	mi := &file_pb_flowsrv_proto_msgTypes[1]
 	if protoimpl.UnsafeEnabled && x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -161,7 +161,7 @@ func (x *CommandResp) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use CommandResp.ProtoReflect.Descriptor instead.
 func (*CommandResp) Descriptor() ([]byte, []int) {
-	return file_pb_rpcbff_proto_rawDescGZIP(), []int{1}
+	return file_pb_flowsrv_proto_rawDescGZIP(), []int{1}
 }
 
 func (x *CommandResp) GetCode() int64 {
@@ -185,63 +185,63 @@ func (x *CommandResp) GetData() *structpb.Struct {
 	return nil
 }
 
-var File_pb_rpcbff_proto protoreflect.FileDescriptor
-
-var file_pb_rpcbff_proto_rawDesc = []byte{
-	0x0a, 0x0f, 0x70, 0x62, 0x2f, 0x72, 0x70, 0x63, 0x62, 0x66, 0x66, 0x2e, 0x70, 0x72, 0x6f, 0x74,
-	0x6f, 0x12, 0x02, 0x70, 0x62, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72,
-	0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72,
-	0x6f, 0x74, 0x6f, 0x22, 0x54, 0x0a, 0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65,
-	0x71, 0x12, 0x23, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32,
-	0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65,
-	0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73,
-	0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63,
-	0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x60, 0x0a, 0x0b, 0x43, 0x6f, 0x6d,
-	0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65,
-	0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, 0x03,
-	0x6d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x12, 0x2b,
-	0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67,
-	0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53,
-	0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x2a, 0x21, 0x0a, 0x0b, 0x43,
-	0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x50, 0x4c,
-	0x41, 0x59, 0x45, 0x52, 0x10, 0x00, 0x12, 0x06, 0x0a, 0x02, 0x43, 0x53, 0x10, 0x01, 0x32, 0x65,
-	0x0a, 0x06, 0x52, 0x70, 0x63, 0x62, 0x66, 0x66, 0x12, 0x2c, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x6e,
-	0x65, 0x63, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
-	0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
-	0x52, 0x65, 0x73, 0x70, 0x30, 0x01, 0x12, 0x2d, 0x0a, 0x0a, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x6e,
-	0x6e, 0x65, 0x63, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
-	0x64, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
-	0x64, 0x52, 0x65, 0x73, 0x70, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70,
-	0x72, 0x6f, 0x74, 0x6f, 0x33,
+var File_pb_flowsrv_proto protoreflect.FileDescriptor
+
+var file_pb_flowsrv_proto_rawDesc = []byte{
+	0x0a, 0x10, 0x70, 0x62, 0x2f, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x72, 0x76, 0x2e, 0x70, 0x72, 0x6f,
+	0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70,
+	0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70,
+	0x72, 0x6f, 0x74, 0x6f, 0x22, 0x54, 0x0a, 0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
+	0x65, 0x71, 0x12, 0x23, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e,
+	0x32, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x54, 0x79, 0x70,
+	0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x63, 0x65, 0x73,
+	0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61,
+	0x63, 0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x60, 0x0a, 0x0b, 0x43, 0x6f,
+	0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64,
+	0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a,
+	0x03, 0x6d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x12,
+	0x2b, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e,
+	0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
+	0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x2a, 0x21, 0x0a, 0x0b,
+	0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x50,
+	0x4c, 0x41, 0x59, 0x45, 0x52, 0x10, 0x00, 0x12, 0x06, 0x0a, 0x02, 0x43, 0x53, 0x10, 0x01, 0x32,
+	0x66, 0x0a, 0x07, 0x46, 0x6c, 0x6f, 0x77, 0x73, 0x72, 0x76, 0x12, 0x2c, 0x0a, 0x07, 0x63, 0x6f,
+	0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61,
+	0x6e, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61,
+	0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x30, 0x01, 0x12, 0x2d, 0x0a, 0x0a, 0x64, 0x69, 0x73, 0x63,
+	0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d,
+	0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d,
+	0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62,
+	0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
-	file_pb_rpcbff_proto_rawDescOnce sync.Once
-	file_pb_rpcbff_proto_rawDescData = file_pb_rpcbff_proto_rawDesc
+	file_pb_flowsrv_proto_rawDescOnce sync.Once
+	file_pb_flowsrv_proto_rawDescData = file_pb_flowsrv_proto_rawDesc
 )
 
-func file_pb_rpcbff_proto_rawDescGZIP() []byte {
-	file_pb_rpcbff_proto_rawDescOnce.Do(func() {
-		file_pb_rpcbff_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_rpcbff_proto_rawDescData)
+func file_pb_flowsrv_proto_rawDescGZIP() []byte {
+	file_pb_flowsrv_proto_rawDescOnce.Do(func() {
+		file_pb_flowsrv_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_flowsrv_proto_rawDescData)
 	})
-	return file_pb_rpcbff_proto_rawDescData
+	return file_pb_flowsrv_proto_rawDescData
 }
 
-var file_pb_rpcbff_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
-var file_pb_rpcbff_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
-var file_pb_rpcbff_proto_goTypes = []interface{}{
+var file_pb_flowsrv_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
+var file_pb_flowsrv_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_pb_flowsrv_proto_goTypes = []interface{}{
 	(ConnectType)(0),        // 0: pb.ConnectType
 	(*CommandReq)(nil),      // 1: pb.CommandReq
 	(*CommandResp)(nil),     // 2: pb.CommandResp
 	(*structpb.Struct)(nil), // 3: google.protobuf.Struct
 }
-var file_pb_rpcbff_proto_depIdxs = []int32{
+var file_pb_flowsrv_proto_depIdxs = []int32{
 	0, // 0: pb.CommandReq.type:type_name -> pb.ConnectType
 	3, // 1: pb.CommandResp.data:type_name -> google.protobuf.Struct
-	1, // 2: pb.Rpcbff.connect:input_type -> pb.CommandReq
-	1, // 3: pb.Rpcbff.disconnect:input_type -> pb.CommandReq
-	2, // 4: pb.Rpcbff.connect:output_type -> pb.CommandResp
-	2, // 5: pb.Rpcbff.disconnect:output_type -> pb.CommandResp
+	1, // 2: pb.Flowsrv.connect:input_type -> pb.CommandReq
+	1, // 3: pb.Flowsrv.disconnect:input_type -> pb.CommandReq
+	2, // 4: pb.Flowsrv.connect:output_type -> pb.CommandResp
+	2, // 5: pb.Flowsrv.disconnect:output_type -> pb.CommandResp
 	4, // [4:6] is the sub-list for method output_type
 	2, // [2:4] is the sub-list for method input_type
 	2, // [2:2] is the sub-list for extension type_name
@@ -249,13 +249,13 @@ var file_pb_rpcbff_proto_depIdxs = []int32{
 	0, // [0:2] is the sub-list for field type_name
 }
 
-func init() { file_pb_rpcbff_proto_init() }
-func file_pb_rpcbff_proto_init() {
-	if File_pb_rpcbff_proto != nil {
+func init() { file_pb_flowsrv_proto_init() }
+func file_pb_flowsrv_proto_init() {
+	if File_pb_flowsrv_proto != nil {
 		return
 	}
 	if !protoimpl.UnsafeEnabled {
-		file_pb_rpcbff_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+		file_pb_flowsrv_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
 			switch v := v.(*CommandReq); i {
 			case 0:
 				return &v.state
@@ -267,7 +267,7 @@ func file_pb_rpcbff_proto_init() {
 				return nil
 			}
 		}
-		file_pb_rpcbff_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+		file_pb_flowsrv_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
 			switch v := v.(*CommandResp); i {
 			case 0:
 				return &v.state
@@ -284,19 +284,19 @@ func file_pb_rpcbff_proto_init() {
 	out := protoimpl.TypeBuilder{
 		File: protoimpl.DescBuilder{
 			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
-			RawDescriptor: file_pb_rpcbff_proto_rawDesc,
+			RawDescriptor: file_pb_flowsrv_proto_rawDesc,
 			NumEnums:      1,
 			NumMessages:   2,
 			NumExtensions: 0,
 			NumServices:   1,
 		},
-		GoTypes:           file_pb_rpcbff_proto_goTypes,
-		DependencyIndexes: file_pb_rpcbff_proto_depIdxs,
-		EnumInfos:         file_pb_rpcbff_proto_enumTypes,
-		MessageInfos:      file_pb_rpcbff_proto_msgTypes,
+		GoTypes:           file_pb_flowsrv_proto_goTypes,
+		DependencyIndexes: file_pb_flowsrv_proto_depIdxs,
+		EnumInfos:         file_pb_flowsrv_proto_enumTypes,
+		MessageInfos:      file_pb_flowsrv_proto_msgTypes,
 	}.Build()
-	File_pb_rpcbff_proto = out.File
-	file_pb_rpcbff_proto_rawDesc = nil
-	file_pb_rpcbff_proto_goTypes = nil
-	file_pb_rpcbff_proto_depIdxs = nil
+	File_pb_flowsrv_proto = out.File
+	file_pb_flowsrv_proto_rawDesc = nil
+	file_pb_flowsrv_proto_goTypes = nil
+	file_pb_flowsrv_proto_depIdxs = nil
 }

+ 1 - 1
bff/rpcbff/rpc/pb/rpcbff.proto → flowsrv/rpc/pb/flowsrv.proto

@@ -22,7 +22,7 @@ message CommandResp {
   google.protobuf.Struct data = 3;
 }
 
-service Rpcbff {
+service Flowsrv {
   rpc connect(CommandReq) returns (stream CommandResp);
   rpc disconnect(CommandReq) returns (CommandResp);
 }

+ 169 - 0
flowsrv/rpc/pb/flowsrv_grpc.pb.go

@@ -0,0 +1,169 @@
+// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
+// versions:
+// - protoc-gen-go-grpc v1.2.0
+// - protoc             v3.19.4
+// source: pb/flowsrv.proto
+
+package pb
+
+import (
+	context "context"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+)
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+// Requires gRPC-Go v1.32.0 or later.
+const _ = grpc.SupportPackageIsVersion7
+
+// FlowsrvClient is the client API for Flowsrv service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
+type FlowsrvClient interface {
+	Connect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (Flowsrv_ConnectClient, error)
+	Disconnect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (*CommandResp, error)
+}
+
+type flowsrvClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewFlowsrvClient(cc grpc.ClientConnInterface) FlowsrvClient {
+	return &flowsrvClient{cc}
+}
+
+func (c *flowsrvClient) Connect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (Flowsrv_ConnectClient, error) {
+	stream, err := c.cc.NewStream(ctx, &Flowsrv_ServiceDesc.Streams[0], "/pb.Flowsrv/connect", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &flowsrvConnectClient{stream}
+	if err := x.ClientStream.SendMsg(in); err != nil {
+		return nil, err
+	}
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	return x, nil
+}
+
+type Flowsrv_ConnectClient interface {
+	Recv() (*CommandResp, error)
+	grpc.ClientStream
+}
+
+type flowsrvConnectClient struct {
+	grpc.ClientStream
+}
+
+func (x *flowsrvConnectClient) Recv() (*CommandResp, error) {
+	m := new(CommandResp)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+func (c *flowsrvClient) Disconnect(ctx context.Context, in *CommandReq, opts ...grpc.CallOption) (*CommandResp, error) {
+	out := new(CommandResp)
+	err := c.cc.Invoke(ctx, "/pb.Flowsrv/disconnect", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// FlowsrvServer is the server API for Flowsrv service.
+// All implementations must embed UnimplementedFlowsrvServer
+// for forward compatibility
+type FlowsrvServer interface {
+	Connect(*CommandReq, Flowsrv_ConnectServer) error
+	Disconnect(context.Context, *CommandReq) (*CommandResp, error)
+	mustEmbedUnimplementedFlowsrvServer()
+}
+
+// UnimplementedFlowsrvServer must be embedded to have forward compatible implementations.
+type UnimplementedFlowsrvServer struct {
+}
+
+func (UnimplementedFlowsrvServer) Connect(*CommandReq, Flowsrv_ConnectServer) error {
+	return status.Errorf(codes.Unimplemented, "method Connect not implemented")
+}
+func (UnimplementedFlowsrvServer) Disconnect(context.Context, *CommandReq) (*CommandResp, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Disconnect not implemented")
+}
+func (UnimplementedFlowsrvServer) mustEmbedUnimplementedFlowsrvServer() {}
+
+// UnsafeFlowsrvServer may be embedded to opt out of forward compatibility for this service.
+// Use of this interface is not recommended, as added methods to FlowsrvServer will
+// result in compilation errors.
+type UnsafeFlowsrvServer interface {
+	mustEmbedUnimplementedFlowsrvServer()
+}
+
+func RegisterFlowsrvServer(s grpc.ServiceRegistrar, srv FlowsrvServer) {
+	s.RegisterService(&Flowsrv_ServiceDesc, srv)
+}
+
+func _Flowsrv_Connect_Handler(srv interface{}, stream grpc.ServerStream) error {
+	m := new(CommandReq)
+	if err := stream.RecvMsg(m); err != nil {
+		return err
+	}
+	return srv.(FlowsrvServer).Connect(m, &flowsrvConnectServer{stream})
+}
+
+type Flowsrv_ConnectServer interface {
+	Send(*CommandResp) error
+	grpc.ServerStream
+}
+
+type flowsrvConnectServer struct {
+	grpc.ServerStream
+}
+
+func (x *flowsrvConnectServer) Send(m *CommandResp) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func _Flowsrv_Disconnect_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(CommandReq)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(FlowsrvServer).Disconnect(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/pb.Flowsrv/disconnect",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(FlowsrvServer).Disconnect(ctx, req.(*CommandReq))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+// Flowsrv_ServiceDesc is the grpc.ServiceDesc for Flowsrv service.
+// It's only intended for direct use with grpc.RegisterService,
+// and not to be introspected or modified (even as a copy)
+var Flowsrv_ServiceDesc = grpc.ServiceDesc{
+	ServiceName: "pb.Flowsrv",
+	HandlerType: (*FlowsrvServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "disconnect",
+			Handler:    _Flowsrv_Disconnect_Handler,
+		},
+	},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "connect",
+			Handler:       _Flowsrv_Connect_Handler,
+			ServerStreams: true,
+		},
+	},
+	Metadata: "pb/flowsrv.proto",
+}

+ 2 - 0
go.sum

@@ -204,6 +204,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
 github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/gomodule/redigo v1.8.8 h1:f6cXq6RRfiyrOJEV7p3JhLDlmawGBVBBP1MggY8Mo4E=
+github.com/gomodule/redigo v1.8.8/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
 github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=

+ 1 - 0
inner/pb/inner.proto → inner/rpc/pb/inner.proto

@@ -4,5 +4,6 @@ option go_package = "./pb";
 
 package pb;
 
+
 service Inner {
 }