server.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package core
  2. import (
  3. "call_center/call/rpc/internal/role"
  4. "call_center/call/rpc/pb"
  5. public "call_center/public/common"
  6. "container/list"
  7. "fmt"
  8. mapset "github.com/deckarep/golang-set"
  9. "log"
  10. "time"
  11. )
  12. var instance *Server
  13. func ServerInit(conf Config) {
  14. instance = new(Server)
  15. // 加载配置
  16. instance.Config = conf
  17. // 等待队列
  18. instance.waitQueue = public.NewQueue()
  19. log.Println("<Server.ServerInit> end, conf:", conf)
  20. }
  21. func GetServer() *Server {
  22. return instance
  23. }
  24. type Server struct {
  25. Condition
  26. Communication
  27. Config
  28. // 玩家流管理类
  29. playerMgr public.ObjMgr // pId -> Player
  30. // 客服流管理类
  31. serviceMgr public.ObjMgr // sId -> Service
  32. // 玩家 -> 客服 映射管理类
  33. p2sMgr public.ObjMgr // pId -> sId
  34. // 客服 -> 玩家 映射管理类
  35. s2pMgr public.ObjMgr // sId -> set(pId)
  36. // 等待队列
  37. waitQueue *public.SyncQueue
  38. }
  39. func (sel *Server) MakeId(i interface{}) string {
  40. now := time.Now().Unix()
  41. objId := fmt.Sprintf("%d%d", now, i)
  42. return objId
  43. }
  44. func (sel *Server) GetServiceStream(scId string) interface{} {
  45. service := sel.serviceMgr.GetObj(scId)
  46. if service != nil {
  47. return service.(*role.Service).Stream
  48. }
  49. return nil
  50. }
  51. func (sel *Server) GetPlayerStream(pcId string) interface{} {
  52. player := sel.playerMgr.GetObj(pcId)
  53. if player != nil {
  54. return player.(*role.Player).Stream
  55. }
  56. return nil
  57. }
  58. func (sel *Server) GetPlayer(pId string) *role.Player {
  59. p := sel.playerMgr.GetObj(pId)
  60. if p != nil {
  61. return p.(*role.Player)
  62. }
  63. return nil
  64. }
  65. func (sel *Server) GetService(sId string) *role.Service {
  66. s := sel.serviceMgr.GetObj(sId)
  67. if s != nil {
  68. return s.(*role.Service)
  69. }
  70. return nil
  71. }
  72. func (sel *Server) GetServiceByPlayerId(pId string) *role.Service {
  73. scId := sel.p2sMgr.GetObj(pId)
  74. if scId == nil {
  75. return nil
  76. }
  77. return sel.GetService(scId.(string))
  78. }
  79. func (sel *Server) GetPlayersByServiceId(sid string) []interface{} {
  80. var res []interface{}
  81. setPid := sel.s2pMgr.GetObj(sid)
  82. if setPid != nil {
  83. ss := setPid.(mapset.Set)
  84. return ss.ToSlice()
  85. }
  86. return res
  87. }
  88. func (sel *Server) GetAllService() []interface{} {
  89. return sel.serviceMgr.GetObjValues()
  90. }
  91. func (sel *Server) ConfirmService(scId string, pcId string) bool {
  92. curScId := sel.p2sMgr.GetObj(pcId)
  93. if curScId != nil {
  94. if curScId == scId {
  95. // 当前客服对应玩家信息一致
  96. return true
  97. }
  98. log.Printf("<Server.ConfirmService> player<%v> already in service by %v \n", pcId, curScId)
  99. return false
  100. }
  101. sel.p2sMgr.Register(pcId, scId)
  102. obj := sel.s2pMgr.GetObj(scId)
  103. idSet := obj.(mapset.Set)
  104. idSet.Add(pcId)
  105. log.Printf("<Server.ConfirmService> pcId:{%v}, scId:{%v}, idSet:{%v} \n", pcId, scId, idSet.String())
  106. return true
  107. }
  108. func (sel *Server) ConnPlayer(pcId string, stream interface{}) *role.Player {
  109. // 玩家连接
  110. /*
  111. 注册玩家信息
  112. */
  113. pInfo := new(role.Player)
  114. pInfo.Id = pcId
  115. pInfo.Stream = stream
  116. sel.playerMgr.Register(pcId, pInfo)
  117. return pInfo
  118. }
  119. func (sel *Server) KickPlayer(pId string, reason int32) {
  120. log.Printf("<Server.KickPlayer>, id:%s, reason:%d", pId, reason)
  121. player := sel.playerMgr.GetObj(pId)
  122. if player != nil {
  123. p := player.(*role.Player)
  124. p.StopChan(reason)
  125. log.Println("<Server.KickPlayer> <- begin kick player, id:", pId)
  126. <-p.WaitLogOut()
  127. log.Println("<Server.KickPlayer> -> end kick player, id:", pId)
  128. }
  129. }
  130. func (sel *Server) KickService(sId string, reason int32) {
  131. log.Printf("<Server.KickService>, id:%s, reason:%d", sId, reason)
  132. service := sel.serviceMgr.GetObj(sId)
  133. if service != nil {
  134. s := service.(*role.Service)
  135. s.StopChan(reason)
  136. log.Println("<Server.KickService> <- begin kick service, id:", sId)
  137. <-s.WaitLogOut()
  138. log.Println("<Server.KickService> -> end kick service, id:", sId)
  139. }
  140. }
  141. func (sel *Server) DisConnPlayer(pcId string) {
  142. // 玩家连接关闭
  143. player := sel.playerMgr.GetObj(pcId)
  144. /*
  145. 删除wait queue
  146. */
  147. sel.waitQueue.Remove(player)
  148. /*
  149. 删除玩家info
  150. */
  151. sel.playerMgr.DeleteObj(pcId)
  152. /*
  153. 删除对接该玩家的客服映射
  154. */
  155. scId := sel.p2sMgr.GetObj(pcId)
  156. if scId != nil {
  157. obj := sel.s2pMgr.GetObj(scId)
  158. if obj != nil {
  159. idSet := obj.(mapset.Set)
  160. idSet.Remove(pcId)
  161. log.Printf("<Server.DisConnPlayer> disConnPcId: {%v} curScId: {%v} -> pcIds: {%v} \n", pcId, scId, idSet.String())
  162. }
  163. }
  164. sel.p2sMgr.DeleteObj(pcId)
  165. player.(*role.Player).Final()
  166. log.Printf("<Server.DisConnPlayer> rmPcId: %v, scId: %v", pcId, scId)
  167. }
  168. func (sel *Server) ConnService(scId string, stream interface{}) *role.Service {
  169. // 客服连接
  170. /*
  171. 注册客服stream
  172. */
  173. service := new(role.Service)
  174. service.Id = scId
  175. service.Stream = stream
  176. sel.serviceMgr.Register(scId, service)
  177. /*
  178. * 创建客服映射
  179. */
  180. idSet := mapset.NewSet()
  181. sel.s2pMgr.Register(scId, idSet)
  182. return service
  183. }
  184. func (sel *Server) DisConnService(scId string) []interface{} {
  185. // 客服连接关闭
  186. service := sel.serviceMgr.GetObj(scId)
  187. sel.serviceMgr.DeleteObj(scId)
  188. /*
  189. 删除该客服对接的所有玩家
  190. */
  191. // 遍历字典,获取该scId所服务的pcId
  192. var delList []interface{}
  193. obj := sel.s2pMgr.GetObj(scId)
  194. if obj == nil {
  195. return delList
  196. }
  197. idSet := obj.(mapset.Set)
  198. it := idSet.Iterator()
  199. for itId := range it.C {
  200. sel.p2sMgr.DeleteObj(itId)
  201. delList = append(delList, itId)
  202. }
  203. sel.s2pMgr.DeleteObj(scId)
  204. service.(*role.Service).Final()
  205. log.Printf("<Server.DisConnService> scId: %v, selId: %v \n", scId, delList)
  206. return delList
  207. }
  208. func (sel *Server) AddWaitQueue(player *role.Player) {
  209. queueLen := sel.waitQueue.PushBack(player)
  210. time.AfterFunc(time.Second*time.Duration(sel.WaitConnServiceLimit), sel.verifyWaitQueue) // 初始化一个校验等待队列计时器
  211. log.Printf("<Server.AddWaitQueue> playerId:%s, wait sec:%d, queue_len:%d \n",
  212. player.Id, sel.WaitConnServiceLimit, queueLen)
  213. }
  214. func (sel *Server) RemoveWaitQueue(player *role.Player) interface{} {
  215. res := sel.waitQueue.Remove(player)
  216. log.Printf("<Server.RemoveWaitQueue> playerId:%s, queue_len:%d \n", player.Id, sel.waitQueue.Len())
  217. return res
  218. }
  219. func (sel *Server) verifyWaitQueue() {
  220. log.Println("<Server.verifyWaitQueue> begin, queue len:", sel.waitQueue.Len())
  221. back := sel.waitQueue.Back()
  222. if back == nil {
  223. return
  224. }
  225. e := back.(*list.Element)
  226. player := e.Value.(*role.Player)
  227. nowTimeStamp := time.Now().Unix()
  228. offset := nowTimeStamp - player.LoginTimeStamp
  229. if offset >= sel.WaitConnServiceLimit {
  230. log.Println("<Server.verifyWaitQueue> wait overtime, kick out id:", player.Id)
  231. // sel.waitQueue.RemoveE(e)
  232. sel.KickPlayer(player.Id, int32(pb.ErrorReason_PLAYER_WAIT_QUEUE_OVERTIME))
  233. }
  234. log.Println("<Server.verifyWaitQueue> end, queue len:", sel.waitQueue.Len())
  235. }