RuoYi-Cloud-Plus 2.X SSE逻辑分析

2024-12-12 271点热度 0人点赞 0条评论

什么是SSE

Spring 使用SSE(Server-Sent Events)学习

SSE 即服务器发送事件(Server-Sent Events),是一种服务器推送技术,允许服务器在客户端建立连接后,主动向客户端推送数据。

SSE 基于 HTTP 协议,使用简单,具有轻量级、实时性和断线重连等特点。它在一些需要实时数据更新的场景中非常有用,如股票行情、实时通知等。与传统的轮询方式相比,SSE 可以减少不必要的网络请求,提高数据传输效率。

SSE 的主要优点包括:

实时性:服务器可以实时推送数据到客户端,无需客户端不断轮询。
轻量级:SSE 使用简单的文本协议,数据量小,对网络带宽要求较低。
兼容性好:SSE 基于 HTTP 协议,大多数现代浏览器都支持。
易于实现:服务器端和客户端的实现都相对简单。

然而,SSE 也有一些局限性:

单向通信:SSE 只允许服务器向客户端推送数据,客户端无法直接向服务器发送数据。
支持的浏览器有限:虽然大多数现代浏览器支持 SSE,但一些较旧的浏览器可能不支持。
数据格式受限:SSE 通常只能传输文本数据,对于二进制数据的支持有限。

与 HTTP 相比,SSE 提供了更高效的实时数据推送机制,减少了不必要的请求和响应,降低了服务器负载。但 HTTP 更适合一般性的请求-响应模式的数据传输。

SSE WebSocket 对比

SSE 的优点:

简单易用:SSE 使用标准的 HTTP 协议,实现相对简单,不需要复杂的握手和协议转换。
单向通信:适合只需从服务器向客户端推送数据的场景,减少了不必要的双向通信开销。
低延迟:由于基于 HTTP 协议,数据可以在服务器有新数据时立即推送,延迟较低。
兼容性好:大多数现代浏览器都支持 SSE,不需要特殊的插件或扩展。
轻量级:相比 WebSocket,SSE 的实现相对较轻量,对服务器资源的消耗较少。
自动重连:如果连接中断,SSE 会自动尝试重新连接,确保数据的持续推送。

SSE 的缺点:

单向通信限制:SSE 只支持服务器向客户端发送数据,客户端无法向服务器发送数据。
数据格式受限:SSE 通常只能发送文本数据,对于二进制数据的支持有限。
连接管理:每个 SSE 连接在每次数据推送后都会关闭,然后需要重新建立连接,这可能会导致一些额外的开销。

** WebSocket 的优点:**

全双工通信:支持双向通信,客户端和服务器可以随时互相发送数据,适用于实时交互性较高的应用。
低延迟:建立连接后,数据可以实时传输,延迟较低。
二进制支持:WebSocket 可以发送文本和二进制数据,更适合处理多媒体等二进制数据。
较少的 HTTP 开销:由于建立了持久连接,减少了 HTTP 请求头和响应头的开销。

WebSocket 的缺点:

协议复杂性:WebSocket 协议相对较复杂,需要更多的代码和服务器资源来处理连接和数据传输。
兼容性问题:虽然大多数现代浏览器支持 WebSocket,但在一些旧版本的浏览器或特定环境中可能存在兼容性问题。
安全风险:由于 WebSocket 可以实现双向通信,需要注意安全问题,如防止跨站脚本攻击(XSS)和跨站请求伪造(CSRF)。
服务器资源消耗:相比 SSE,WebSocket 可能会消耗更多的服务器资源,特别是在处理大量并发连接时。
SSE 适用于简单的单向数据推送场景,如新闻更新、实时通知等,而 WebSocket 更适合需要双向实时通信的场景,如在线聊天、实时游戏等。在选择使用哪种技术时,需要根据具体的应用需求、浏览器兼容性和服务器资源等因素进行综合考虑

前端逻辑分析

连接并获取数据

在layout的index.vue中引入了sse的初始化方法:

import { initSSE } from '@/utils/sse';

当页面加载的时候调用:

onMounted(() => {
  initSSE(import.meta.env.VITE_APP_BASE_API + '/resource/sse');
});

initSSE方法定义在utils/sse.ts中,用于处理sse相关功能:
1. 首先调用useEventSource建立sse连接,返回数据保存在data中,错误保存在error中;
2. 如果出现错误,控制台打印一条日志;
3. 如果有数据返回,使用useNoticeStore().addNotice添加到用户通知中;

export const initSSE = (url: any) => {
  if (import.meta.env.VITE_APP_SSE === 'false') {
    return;
  }

  url = url + '?Authorization=Bearer ' + getToken() + '&clientid=' + import.meta.env.VITE_APP_CLIENT_ID;
  const { data, error } = useEventSource(url, [], {
    autoReconnect: {
      retries: 10,
      delay: 3000,
      onFailed() {
        console.log('Failed to connect after 10 retries');
      }
    }
  });

  watch(error, () => {
    console.log('SSE connection error:', error.value);
    error.value = null;
  });

  watch(data, () => {
    if (!data.value) return;
    useNoticeStore().addNotice({
      message: data.value,
      read: false,
      time: new Date().toLocaleString()
    });
    ElNotification({
      title: '消息',
      message: data.value,
      type: 'success',
      duration: 3000
    });
    data.value = null;
  });
};

显示通知及已读处理

在layout/notice/index.vue中显示通知,进入页面获取通知列表:

onMounted(() => {
  nextTick(() => {
    getTableData();
  });
});
const getTableData = async () => {
  state.loading = true;
  newsList.value = noticeStore.state.value.notices;
  state.loading = false;
};

用户点击通知的时候调用onNewsClick方法,把已读状态更新为true

//点击消息,写入已读
const onNewsClick = (item: any) => {
  newsList.value[item].read = true;
  //并且写入pinia
  noticeStore.state.value.notices = newsList.value;
};

后端逻辑分析

后端代码在ruoyi-common\ruoyi-common-sse中实现。引入ruoyi-common\ruoyi-common-sse的时候自动导入org.dromara.common.sse.config.SseAutoConfiguration
该类自动注入了3个bean:
1. SseEmitterManager:管理 Server-Sent Events (SSE) 连接,建立与指定用户的连接,断开连接等;
2. SseTopicListener:SSE 主题订阅监听器
3. SseController:SSE 控制器

/**
 * SSE 自动装配
 *
 * @author Lion Li
 */
@AutoConfiguration
@ConditionalOnProperty(value = "sse.enabled", havingValue = "true")
@EnableConfigurationProperties(SseProperties.class)
public class SseAutoConfiguration {

    @Bean
    public SseEmitterManager sseEmitterManager() {
        return new SseEmitterManager();
    }

    @Bean
    public SseTopicListener sseTopicListener() {
        return new SseTopicListener();
    }

    @Bean
    public SseController sseController(SseEmitterManager sseEmitterManager) {
        return new SseController(sseEmitterManager);
    }

}

建立连接

前端请求/resource/sse的时候到达SseController的connect方法:

    /**
     * 建立 SSE 连接
     */
    @GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter connect() {
        String tokenValue = StpUtil.getTokenValue();
        Long userId = LoginHelper.getUserId();
        return sseEmitterManager.connect(userId, tokenValue);
    }

sse.path在nacos中ruoyi-resource.yml配置:

# 默认/推荐使用sse推送
sse:
  enabled: true
  path: /sse

connect方法中首先获取用户token和当前登录用户id,调用sseEmitterManager.connect(userId, tokenValue)方法,建立与用户的连接:

    /**
     * 建立与指定用户的 SSE 连接
     *
     * @param userId 用户的唯一标识符,用于区分不同用户的连接
     * @param token  用户的唯一令牌,用于识别具体的连接
     * @return 返回一个 SseEmitter 实例,客户端可以通过该实例接收 SSE 事件
     */
    public SseEmitter connect(Long userId, String token) {
        // 从 USER_TOKEN_EMITTERS 中获取或创建当前用户的 SseEmitter 映射表(ConcurrentHashMap)
        // 每个用户可以有多个 SSE 连接,通过 token 进行区分
        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());

        // 创建一个新的 SseEmitter 实例,超时时间设置为 0 表示无限制
        SseEmitter emitter = new SseEmitter(0L);

        emitters.put(token, emitter);

        // 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token
        emitter.onCompletion(() -> emitters.remove(token));
        emitter.onTimeout(() -> emitters.remove(token));
        emitter.onError((e) -> emitters.remove(token));

        try {
            // 向客户端发送一条连接成功的事件
            emitter.send(SseEmitter.event().comment("connected"));
        } catch (IOException e) {
            // 如果发送消息失败,则从映射表中移除 emitter
            emitters.remove(token);
        }
        return emitter;
    }

SseEmitterManager类中定义了订阅消息的方法,首先是单个用户订阅:

    /**
     * 订阅SSE消息主题,并提供一个消费者函数来处理接收到的消息
     *
     * @param consumer 处理SSE消息的消费者函数
     */
    public void subscribeMessage(Consumer<SseMessageDto> consumer) {
        RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer);
    }

通过订阅redis中的SSE_TOPIC(global:sse)向consumer发送SseMessageDto消息。

subscribeMessage方法由SseTopicListener调用,作用是在Spring Boot应用程序启动时初始化SSE主题订阅监听器,redis中值发生变化的时候,则向用户发送sse消息。SseTopicListener源码如下:

/**
 * SSE 主题订阅监听器
 *
 * @author Lion Li
 */
@Slf4j
public class SseTopicListener implements ApplicationRunner, Ordered {

    @Autowired
    private SseEmitterManager sseEmitterManager;

    /**
     * 在Spring Boot应用程序启动时初始化SSE主题订阅监听器
     *
     * @param args 应用程序参数
     * @throws Exception 初始化过程中可能抛出的异常
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sseEmitterManager.subscribeMessage((message) -> {
            log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage());
            // 如果key不为空就按照key发消息 如果为空就群发
            if (CollUtil.isNotEmpty(message.getUserIds())) {
                message.getUserIds().forEach(key -> {
                    sseEmitterManager.sendMessage(key, message.getMessage());
                });
            } else {
                sseEmitterManager.sendMessage(message.getMessage());
            }
        });
        log.info("初始化SSE主题订阅监听器成功");
    }

    @Override
    public int getOrder() {
        return -1;
    }
}

发送sse消息举例

用户登录完成之后会发送登录成功消息。

        scheduledExecutorService.schedule(() -> {
            remoteMessageService.publishMessage(userId, "欢迎登录微惊客");
        }, 3, TimeUnit.SECONDS);

public class RemoteMessageServiceImpl implements RemoteMessageService {

    /**
     * 发送消息
     *
     * @param sessionKey session主键 一般为用户id
     * @param message    消息文本
     */
    @Override
    public void publishMessage(Long sessionKey, String message) {
        SseMessageDto dto = new SseMessageDto();
        dto.setMessage(message);
        dto.setUserIds(List.of(sessionKey));
        SseMessageUtils.publishMessage(dto);
    }
}

最终调用SseEmitterManager的publishMessage方法:

/**
 * 发布SSE订阅消息
 *
 * @param sseMessageDto 要发布的SSE消息对象
 */
public static void publishMessage(SseMessageDto sseMessageDto) {
    MANAGER.publishMessage(sseMessageDto);
}

/**
 * 发布SSE订阅消息
 *
 * @param sseMessageDto 要发布的SSE消息对象
 */
public void publishMessage(SseMessageDto sseMessageDto) {
    SseMessageDto broadcastMessage = new SseMessageDto();
    broadcastMessage.setMessage(sseMessageDto.getMessage());
    broadcastMessage.setUserIds(sseMessageDto.getUserIds());
    RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
        log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
            SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage());
    });
}

王显锋

激情工作,快乐生活!

文章评论