//@File     flowmgr.go
//@Time     2022/5/30
//@Author   #Suyghur,

package mgr

import (
	treemap "github.com/liyue201/gostl/ds/map"
	"sync"
	"time"
	"ylink/comm/result"
	"ylink/core/inner/rpc/inner"
	"ylink/flowsrv/rpc/internal/model"
	"ylink/flowsrv/rpc/pb"
)

type flowManager struct {
	flowMap *treemap.Map
}

var (
	instance *flowManager
	once     sync.Once
)

func GetFlowMgrInstance() *flowManager {
	once.Do(func() {
		instance = &flowManager{
			flowMap: treemap.New(treemap.WithGoroutineSafe()),
		}
	})
	return instance
}

func (manager *flowManager) Register(flow *model.Flow) {
	//go registerWorker(flow)
	go manager.registerFlow(flow)
	manager.flowMap.Insert(flow.User.Uid, flow)
}

func (manager *flowManager) registerFlow(flow *model.Flow) {
	go manager.subscribeRmq(flow)
	for {
		select {
		case <-flow.Stream.Context().Done():
			if manager.Has(flow.User.Uid) {
				flow.Logger.Infof("flowstream was disconnected abnormally")
				manager.UnRegister(flow.User.Uid)
				flow.SvcCtx.InnerRpc.NotifyUserOffline(flow.Ctx, &inner.NotifyUserStatusReq{
					Type:   flow.User.Type,
					Uid:    flow.User.Uid,
					GameId: flow.User.GameId,
				})
			}
			flow.EndFlow <- 1
			return
		case msg, open := <-flow.Message:
			if open {
				flow.Stream.Send(&pb.CommandResp{
					Code: result.Ok,
					Msg:  "success",
					Data: []byte(msg),
				})
			} else {
				flow.Logger.Error("message channel is close")
				return
			}
		}
	}
}

func (manager *flowManager) subscribeRmq(flow *model.Flow) {
	for {
		select {
		case <-flow.Stream.Context().Done():
			flow.Logger.Infof("unsubscribe rmq...")
			return
		default:
			resultCmd := flow.SvcCtx.RedisClient.BRPop(flow.Ctx, 10*time.Second, flow.User.Uid)
			if message, err := resultCmd.Result(); err != nil {
				flow.Logger.Errorf("get message from redis err: %v", err)
			} else {
				flow.Message <- message[1]
			}
		}
	}
}

func (manager *flowManager) Get(uid string) *model.Flow {
	return manager.flowMap.Get(uid).(*model.Flow)
}

func (manager *flowManager) UnRegister(uid string) {
	if manager.flowMap.Contains(uid) {
		flow := manager.Get(uid)
		close(flow.Message)
		//flow.EndRmq <- 0
		manager.flowMap.Erase(uid)
	}
}

func (manager *flowManager) Has(uid string) bool {
	return manager.flowMap.Contains(uid)
}