feat: async quit room compensation

This commit is contained in:
hy001 2026-04-16 21:21:32 +08:00
parent 25c4152325
commit 7331cfea51
2 changed files with 128 additions and 84 deletions

View File

@ -1,31 +1,34 @@
package com.red.circle.live.app.command.user; package com.red.circle.live.app.command.user;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.red.circle.common.business.dto.cmd.app.AppRoomIdCmd; import com.red.circle.common.business.dto.cmd.app.AppRoomIdCmd;
import com.red.circle.component.redis.service.RedisService; import com.red.circle.component.redis.service.RedisService;
import com.red.circle.framework.dto.ResultResponse; import com.red.circle.framework.dto.ResultResponse;
import com.red.circle.live.domain.gateway.LiveMicrophoneGateway; import com.red.circle.live.domain.gateway.LiveMicrophoneGateway;
import com.red.circle.live.domain.live.LiveMicrophone; import com.red.circle.live.domain.live.LiveMicrophone;
import com.red.circle.live.domain.live.LiveMusicStatus; import com.red.circle.live.domain.live.LiveMusicStatus;
import com.red.circle.live.infra.database.cache.service.LiveMicCacheService; import com.red.circle.live.infra.database.cache.service.LiveMicCacheService;
import com.red.circle.live.infra.database.cache.service.LiveMusicHeartbeatService; import com.red.circle.live.infra.database.cache.service.LiveMusicHeartbeatService;
import com.red.circle.other.inner.endpoint.live.ActiveVoiceRoomClient; import com.red.circle.other.inner.endpoint.live.ActiveVoiceRoomClient;
import com.red.circle.other.inner.model.dto.live.ActiveVoiceRoomCO; import com.red.circle.other.inner.model.dto.live.ActiveVoiceRoomCO;
import com.red.circle.tool.core.collection.CollectionUtils; import com.red.circle.tool.core.collection.CollectionUtils;
import java.util.List; import com.red.circle.tool.core.thread.ThreadPoolManager;
import java.util.Objects; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.Objects;
import lombok.RequiredArgsConstructor; import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Component; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/** /**
* 退出房间. * 退出房间.
* *
* @author pengliang on 2023/12/12 * @author pengliang on 2023/12/12
*/ */
@Component @Slf4j
@RequiredArgsConstructor @Component
public class LiveQuitRoomExe { @RequiredArgsConstructor
public class LiveQuitRoomExe {
private final LiveMicCacheService liveMicCacheService; private final LiveMicCacheService liveMicCacheService;
private final LiveMicrophoneGateway liveMicrophoneGateway; private final LiveMicrophoneGateway liveMicrophoneGateway;
@ -36,46 +39,67 @@ public class LiveQuitRoomExe {
private static final String EMPTY_ROOM_CACHE_KEY = "empty_room_cache:"; private static final String EMPTY_ROOM_CACHE_KEY = "empty_room_cache:";
private static final int CACHE_EXPIRE_MINUTES = 10; private static final int CACHE_EXPIRE_MINUTES = 10;
public void execute(AppRoomIdCmd cmd) { public void execute(AppRoomIdCmd cmd) {
// 移除在线用户 Long roomId = cmd.getRoomId();
liveMicrophoneGateway.removeRoomOnlineUser( Long userId = cmd.requiredReqUserId();
cmd.getRoomId(),
cmd.requiredReqUserId() // 同步删除房间本地在线状态远端补偿交给后台线程处理
); liveMicrophoneGateway.removeRoomOnlineUser(
roomId,
// 下麦克风 userId
List<LiveMicrophone> liveMicrophones = liveMicCacheService.listLiveMicrophone(cmd.getRoomId(), );
cmd.requiredReqUserId());
if (CollectionUtils.isNotEmpty(liveMicrophones)) { // 同步下掉当前用户占用的本地麦位避免客户端继续感知到在房状态
liveMicrophones.forEach(mic -> List<LiveMicrophone> liveMicrophones = liveMicCacheService.listLiveMicrophone(roomId, userId);
liveMicrophoneGateway.tryMicOpsLock(cmd.getRoomId(), mic.getMicIndex(), () -> { boolean micChanged = false;
liveMicCacheService.goDown(cmd.getRoomId(), mic.getMicIndex()); if (CollectionUtils.isNotEmpty(liveMicrophones)) {
liveMicrophoneGateway.micUserChangeNotice(cmd.getRoomId()); for (LiveMicrophone mic : liveMicrophones) {
}) final boolean[] removed = {false};
); liveMicrophoneGateway.tryMicOpsLock(roomId, mic.getMicIndex(), () -> {
} liveMicCacheService.goDown(roomId, mic.getMicIndex());
removed[0] = true;
// 移除播放音乐 });
LiveMusicStatus musicStatus = liveMusicHeartbeatService.getMusicUser(cmd.getRoomId()); micChanged = micChanged || removed[0];
if (Objects.nonNull(musicStatus) }
&& Objects.equals(musicStatus.getUserId(), cmd.requiredReqUserId())) { }
liveMusicHeartbeatService.removeMickUser(cmd.getRoomId());
} // 同步清理本地音乐状态
LiveMusicStatus musicStatus = liveMusicHeartbeatService.getMusicUser(roomId);
//将没人的房间存入缓存10分钟 if (Objects.nonNull(musicStatus)
try { && Objects.equals(musicStatus.getUserId(), userId)) {
ResultResponse<ActiveVoiceRoomCO> noBodyRoom = activeVoiceRoomClient.createNoBodyRoom(cmd.getRoomId()); liveMusicHeartbeatService.removeMickUser(roomId);
ActiveVoiceRoomCO body = noBodyRoom.getBody(); }
if (Objects.nonNull(body)) {
// 缓存空房间信息设置10分钟过期 boolean finalMicChanged = micChanged;
String cacheKey = EMPTY_ROOM_CACHE_KEY + cmd.getRoomId(); ThreadPoolManager.getInstance().execute(() -> {
redisService.setIfAbsent(cacheKey, JSON.toJSONString(body), CACHE_EXPIRE_MINUTES, TimeUnit.MINUTES); if (finalMicChanged) {
} try {
} catch (Exception e) { liveMicrophoneGateway.micUserChangeNotice(roomId);
//ignore } catch (Exception e) {
} log.error("quitRoom micUserChangeNotice error roomId={}, userId={}", roomId, userId, e);
}
}
}
cacheEmptyRoom(roomId, userId);
} });
}
private void cacheEmptyRoom(Long roomId, Long userId) {
try {
ResultResponse<ActiveVoiceRoomCO> noBodyRoom = activeVoiceRoomClient.createNoBodyRoom(roomId);
ActiveVoiceRoomCO body = noBodyRoom.getBody();
if (Objects.nonNull(body)) {
String cacheKey = EMPTY_ROOM_CACHE_KEY + roomId;
redisService.setIfAbsent(
cacheKey,
JSON.toJSONString(body),
CACHE_EXPIRE_MINUTES,
TimeUnit.MINUTES
);
}
} catch (Exception e) {
log.error("quitRoom cacheEmptyRoom error roomId={}, userId={}", roomId, userId, e);
}
}
}

View File

@ -647,23 +647,13 @@ public class LiveMicrophoneGatewayImpl implements LiveMicrophoneGateway {
} }
@Override @Override
public void removeRoomOnlineUser(Long roomId, Long userId) { public void removeRoomOnlineUser(Long roomId, Long userId) {
//刷新在线用户信息 LiveRoomUserRefresh liveRoomUserRefresh = liveMicUserCacheService.removeUser(roomId, userId);
userOnlineClient.updateStatusAndSessionId(userId, UserOnlineStatusEnum.IDLE,0L); ThreadPoolManager.getInstance().execute(
() -> compensateRemoveRoomOnlineUser(roomId, userId, liveRoomUserRefresh)
this.refreshUserProcess(roomId, liveMicUserCacheService.removeUser(roomId, userId)); );
}
//删除用户的心动值
List<LiveHeartbeatCmd> liveHeartbeats = liveHeartbeatClient.listLiveHeartbeat(roomId).getBody();
if(CollectionUtils.isNotEmpty(liveHeartbeats)){
liveHeartbeats= liveHeartbeats.stream()
.filter(item -> !Objects.equals(item.getId(),userId)).collect(
Collectors.toList());
liveHeartbeatClient.create(roomId, liveHeartbeats);
}
}
@Override @Override
public void addRoomOnlineUser(Long roomId, LiveRoomUser liveRoomUser) { public void addRoomOnlineUser(Long roomId, LiveRoomUser liveRoomUser) {
@ -785,11 +775,41 @@ public class LiveMicrophoneGatewayImpl implements LiveMicrophoneGateway {
} }
public Map<Long,Long> getLiveHeartbeatMap(List<LiveHeartbeatCmd> list){ public Map<Long,Long> getLiveHeartbeatMap(List<LiveHeartbeatCmd> list){
if (CollectionUtils.isEmpty(list)) { if (CollectionUtils.isEmpty(list)) {
return new HashMap<>(); return new HashMap<>();
} }
return list.stream().collect(Collectors.toMap(LiveHeartbeatCmd::getId,LiveHeartbeatCmd::getHeartbeatVal)); return list.stream().collect(Collectors.toMap(LiveHeartbeatCmd::getId,LiveHeartbeatCmd::getHeartbeatVal));
} }
} private void compensateRemoveRoomOnlineUser(Long roomId, Long userId,
LiveRoomUserRefresh liveRoomUserRefresh) {
try {
userOnlineClient.updateStatusAndSessionId(userId, UserOnlineStatusEnum.IDLE, 0L);
} catch (Exception e) {
log.error("removeRoomOnlineUser updateStatusAndSessionId error roomId={}, userId={}", roomId,
userId, e);
}
try {
refreshUserProcess(roomId, liveRoomUserRefresh);
} catch (Exception e) {
log.error("removeRoomOnlineUser refreshUserProcess error roomId={}, userId={}", roomId,
userId, e);
}
try {
List<LiveHeartbeatCmd> liveHeartbeats = liveHeartbeatClient.listLiveHeartbeat(roomId).getBody();
if (CollectionUtils.isNotEmpty(liveHeartbeats)) {
liveHeartbeats = liveHeartbeats.stream()
.filter(item -> !Objects.equals(item.getId(), userId))
.collect(Collectors.toList());
liveHeartbeatClient.create(roomId, liveHeartbeats);
}
} catch (Exception e) {
log.error("removeRoomOnlineUser refreshHeartbeat error roomId={}, userId={}", roomId, userId,
e);
}
}
}