From 7331cfea51e314f5029aecf9be224d22bfd93880 Mon Sep 17 00:00:00 2001 From: hy001 Date: Thu, 16 Apr 2026 21:21:32 +0800 Subject: [PATCH] feat: async quit room compensation --- .../app/command/user/LiveQuitRoomExe.java | 142 ++++++++++-------- .../repository/LiveMicrophoneGatewayImpl.java | 70 ++++++--- 2 files changed, 128 insertions(+), 84 deletions(-) diff --git a/rc-service/rc-service-live/live-application/src/main/java/com/red/circle/live/app/command/user/LiveQuitRoomExe.java b/rc-service/rc-service-live/live-application/src/main/java/com/red/circle/live/app/command/user/LiveQuitRoomExe.java index 29a0252..7d0c441 100644 --- a/rc-service/rc-service-live/live-application/src/main/java/com/red/circle/live/app/command/user/LiveQuitRoomExe.java +++ b/rc-service/rc-service-live/live-application/src/main/java/com/red/circle/live/app/command/user/LiveQuitRoomExe.java @@ -1,31 +1,34 @@ -package com.red.circle.live.app.command.user; - -import com.alibaba.fastjson.JSON; -import com.red.circle.common.business.dto.cmd.app.AppRoomIdCmd; -import com.red.circle.component.redis.service.RedisService; +package com.red.circle.live.app.command.user; + +import com.alibaba.fastjson.JSON; +import com.red.circle.common.business.dto.cmd.app.AppRoomIdCmd; +import com.red.circle.component.redis.service.RedisService; import com.red.circle.framework.dto.ResultResponse; import com.red.circle.live.domain.gateway.LiveMicrophoneGateway; import com.red.circle.live.domain.live.LiveMicrophone; import com.red.circle.live.domain.live.LiveMusicStatus; import com.red.circle.live.infra.database.cache.service.LiveMicCacheService; import com.red.circle.live.infra.database.cache.service.LiveMusicHeartbeatService; -import com.red.circle.other.inner.endpoint.live.ActiveVoiceRoomClient; -import com.red.circle.other.inner.model.dto.live.ActiveVoiceRoomCO; -import com.red.circle.tool.core.collection.CollectionUtils; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Component; +import com.red.circle.other.inner.endpoint.live.ActiveVoiceRoomClient; +import com.red.circle.other.inner.model.dto.live.ActiveVoiceRoomCO; +import com.red.circle.tool.core.collection.CollectionUtils; +import com.red.circle.tool.core.thread.ThreadPoolManager; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; /** * 退出房间. * * @author pengliang on 2023/12/12 */ -@Component -@RequiredArgsConstructor -public class LiveQuitRoomExe { +@Slf4j +@Component +@RequiredArgsConstructor +public class LiveQuitRoomExe { private final LiveMicCacheService liveMicCacheService; private final LiveMicrophoneGateway liveMicrophoneGateway; @@ -36,46 +39,67 @@ public class LiveQuitRoomExe { private static final String EMPTY_ROOM_CACHE_KEY = "empty_room_cache:"; private static final int CACHE_EXPIRE_MINUTES = 10; - public void execute(AppRoomIdCmd cmd) { - // 移除在线用户 - liveMicrophoneGateway.removeRoomOnlineUser( - cmd.getRoomId(), - cmd.requiredReqUserId() - ); - - // 下麦克风 - List liveMicrophones = liveMicCacheService.listLiveMicrophone(cmd.getRoomId(), - cmd.requiredReqUserId()); - if (CollectionUtils.isNotEmpty(liveMicrophones)) { - liveMicrophones.forEach(mic -> - liveMicrophoneGateway.tryMicOpsLock(cmd.getRoomId(), mic.getMicIndex(), () -> { - liveMicCacheService.goDown(cmd.getRoomId(), mic.getMicIndex()); - liveMicrophoneGateway.micUserChangeNotice(cmd.getRoomId()); - }) - ); - } - - // 移除播放音乐 - LiveMusicStatus musicStatus = liveMusicHeartbeatService.getMusicUser(cmd.getRoomId()); - if (Objects.nonNull(musicStatus) - && Objects.equals(musicStatus.getUserId(), cmd.requiredReqUserId())) { - liveMusicHeartbeatService.removeMickUser(cmd.getRoomId()); - } - - //将没人的房间存入缓存10分钟 - try { - ResultResponse noBodyRoom = activeVoiceRoomClient.createNoBodyRoom(cmd.getRoomId()); - ActiveVoiceRoomCO body = noBodyRoom.getBody(); - if (Objects.nonNull(body)) { - // 缓存空房间信息,设置10分钟过期 - String cacheKey = EMPTY_ROOM_CACHE_KEY + cmd.getRoomId(); - redisService.setIfAbsent(cacheKey, JSON.toJSONString(body), CACHE_EXPIRE_MINUTES, TimeUnit.MINUTES); - } - } catch (Exception e) { - //ignore - } - - - } - -} + public void execute(AppRoomIdCmd cmd) { + Long roomId = cmd.getRoomId(); + Long userId = cmd.requiredReqUserId(); + + // 同步删除房间本地在线状态,远端补偿交给后台线程处理 + liveMicrophoneGateway.removeRoomOnlineUser( + roomId, + userId + ); + + // 同步下掉当前用户占用的本地麦位,避免客户端继续感知到在房状态 + List liveMicrophones = liveMicCacheService.listLiveMicrophone(roomId, userId); + boolean micChanged = false; + if (CollectionUtils.isNotEmpty(liveMicrophones)) { + for (LiveMicrophone mic : liveMicrophones) { + final boolean[] removed = {false}; + liveMicrophoneGateway.tryMicOpsLock(roomId, mic.getMicIndex(), () -> { + liveMicCacheService.goDown(roomId, mic.getMicIndex()); + removed[0] = true; + }); + micChanged = micChanged || removed[0]; + } + } + + // 同步清理本地音乐状态 + LiveMusicStatus musicStatus = liveMusicHeartbeatService.getMusicUser(roomId); + if (Objects.nonNull(musicStatus) + && Objects.equals(musicStatus.getUserId(), userId)) { + liveMusicHeartbeatService.removeMickUser(roomId); + } + + boolean finalMicChanged = micChanged; + ThreadPoolManager.getInstance().execute(() -> { + if (finalMicChanged) { + try { + liveMicrophoneGateway.micUserChangeNotice(roomId); + } catch (Exception e) { + log.error("quitRoom micUserChangeNotice error roomId={}, userId={}", roomId, userId, e); + } + } + + cacheEmptyRoom(roomId, userId); + }); + } + + private void cacheEmptyRoom(Long roomId, Long userId) { + try { + ResultResponse noBodyRoom = activeVoiceRoomClient.createNoBodyRoom(roomId); + ActiveVoiceRoomCO body = noBodyRoom.getBody(); + if (Objects.nonNull(body)) { + String cacheKey = EMPTY_ROOM_CACHE_KEY + roomId; + redisService.setIfAbsent( + cacheKey, + JSON.toJSONString(body), + CACHE_EXPIRE_MINUTES, + TimeUnit.MINUTES + ); + } + } catch (Exception e) { + log.error("quitRoom cacheEmptyRoom error roomId={}, userId={}", roomId, userId, e); + } + } + +} diff --git a/rc-service/rc-service-live/live-infrastructure/src/main/java/com/red/circle/live/infra/repository/LiveMicrophoneGatewayImpl.java b/rc-service/rc-service-live/live-infrastructure/src/main/java/com/red/circle/live/infra/repository/LiveMicrophoneGatewayImpl.java index 7ae23b0..34dd20e 100644 --- a/rc-service/rc-service-live/live-infrastructure/src/main/java/com/red/circle/live/infra/repository/LiveMicrophoneGatewayImpl.java +++ b/rc-service/rc-service-live/live-infrastructure/src/main/java/com/red/circle/live/infra/repository/LiveMicrophoneGatewayImpl.java @@ -647,23 +647,13 @@ public class LiveMicrophoneGatewayImpl implements LiveMicrophoneGateway { } - @Override - public void removeRoomOnlineUser(Long roomId, Long userId) { - //刷新在线用户信息 - userOnlineClient.updateStatusAndSessionId(userId, UserOnlineStatusEnum.IDLE,0L); - - this.refreshUserProcess(roomId, liveMicUserCacheService.removeUser(roomId, userId)); - - //删除用户的心动值 - List liveHeartbeats = liveHeartbeatClient.listLiveHeartbeat(roomId).getBody(); - if(CollectionUtils.isNotEmpty(liveHeartbeats)){ - liveHeartbeats= liveHeartbeats.stream() - .filter(item -> !Objects.equals(item.getId(),userId)).collect( - Collectors.toList()); - liveHeartbeatClient.create(roomId, liveHeartbeats); - } - - } + @Override + public void removeRoomOnlineUser(Long roomId, Long userId) { + LiveRoomUserRefresh liveRoomUserRefresh = liveMicUserCacheService.removeUser(roomId, userId); + ThreadPoolManager.getInstance().execute( + () -> compensateRemoveRoomOnlineUser(roomId, userId, liveRoomUserRefresh) + ); + } @Override public void addRoomOnlineUser(Long roomId, LiveRoomUser liveRoomUser) { @@ -785,11 +775,41 @@ public class LiveMicrophoneGatewayImpl implements LiveMicrophoneGateway { } - public Map getLiveHeartbeatMap(List list){ - if (CollectionUtils.isEmpty(list)) { - return new HashMap<>(); - } - return list.stream().collect(Collectors.toMap(LiveHeartbeatCmd::getId,LiveHeartbeatCmd::getHeartbeatVal)); - } - -} + public Map getLiveHeartbeatMap(List list){ + if (CollectionUtils.isEmpty(list)) { + return new HashMap<>(); + } + return list.stream().collect(Collectors.toMap(LiveHeartbeatCmd::getId,LiveHeartbeatCmd::getHeartbeatVal)); + } + + private void compensateRemoveRoomOnlineUser(Long roomId, Long userId, + LiveRoomUserRefresh liveRoomUserRefresh) { + try { + userOnlineClient.updateStatusAndSessionId(userId, UserOnlineStatusEnum.IDLE, 0L); + } catch (Exception e) { + log.error("removeRoomOnlineUser updateStatusAndSessionId error roomId={}, userId={}", roomId, + userId, e); + } + + try { + refreshUserProcess(roomId, liveRoomUserRefresh); + } catch (Exception e) { + log.error("removeRoomOnlineUser refreshUserProcess error roomId={}, userId={}", roomId, + userId, e); + } + + try { + List liveHeartbeats = liveHeartbeatClient.listLiveHeartbeat(roomId).getBody(); + if (CollectionUtils.isNotEmpty(liveHeartbeats)) { + liveHeartbeats = liveHeartbeats.stream() + .filter(item -> !Objects.equals(item.getId(), userId)) + .collect(Collectors.toList()); + liveHeartbeatClient.create(roomId, liveHeartbeats); + } + } catch (Exception e) { + log.error("removeRoomOnlineUser refreshHeartbeat error roomId={}, userId={}", roomId, userId, + e); + } + } + +}