什么是SSE
Spring 使用SSE(Server-Sent Events)学习
SSE 即服务器发送事件(Server-Sent Events),是一种服务器推送技术,允许服务器在客户端建立连接后,主动向客户端推送数据。
SSE 基于 HTTP 协议,使用简单,具有轻量级、实时性和断线重连等特点。它在一些需要实时数据更新的场景中非常有用,如股票行情、实时通知等。与传统的轮询方式相比,SSE 可以减少不必要的网络请求,提高数据传输效率。
SSE 的主要优点包括:
实时性:服务器可以实时推送数据到客户端,无需客户端不断轮询。
轻量级:SSE 使用简单的文本协议,数据量小,对网络带宽要求较低。
兼容性好:SSE 基于 HTTP 协议,大多数现代浏览器都支持。
易于实现:服务器端和客户端的实现都相对简单。
然而,SSE 也有一些局限性:
单向通信:SSE 只允许服务器向客户端推送数据,客户端无法直接向服务器发送数据。
支持的浏览器有限:虽然大多数现代浏览器支持 SSE,但一些较旧的浏览器可能不支持。
数据格式受限:SSE 通常只能传输文本数据,对于二进制数据的支持有限。
与 HTTP 相比,SSE 提供了更高效的实时数据推送机制,减少了不必要的请求和响应,降低了服务器负载。但 HTTP 更适合一般性的请求-响应模式的数据传输。
SSE WebSocket 对比
SSE 的优点:
简单易用:SSE 使用标准的 HTTP 协议,实现相对简单,不需要复杂的握手和协议转换。
单向通信:适合只需从服务器向客户端推送数据的场景,减少了不必要的双向通信开销。
低延迟:由于基于 HTTP 协议,数据可以在服务器有新数据时立即推送,延迟较低。
兼容性好:大多数现代浏览器都支持 SSE,不需要特殊的插件或扩展。
轻量级:相比 WebSocket,SSE 的实现相对较轻量,对服务器资源的消耗较少。
自动重连:如果连接中断,SSE 会自动尝试重新连接,确保数据的持续推送。
SSE 的缺点:
单向通信限制:SSE 只支持服务器向客户端发送数据,客户端无法向服务器发送数据。
数据格式受限:SSE 通常只能发送文本数据,对于二进制数据的支持有限。
连接管理:每个 SSE 连接在每次数据推送后都会关闭,然后需要重新建立连接,这可能会导致一些额外的开销。
** WebSocket 的优点:**
全双工通信:支持双向通信,客户端和服务器可以随时互相发送数据,适用于实时交互性较高的应用。
低延迟:建立连接后,数据可以实时传输,延迟较低。
二进制支持:WebSocket 可以发送文本和二进制数据,更适合处理多媒体等二进制数据。
较少的 HTTP 开销:由于建立了持久连接,减少了 HTTP 请求头和响应头的开销。
WebSocket 的缺点:
协议复杂性:WebSocket 协议相对较复杂,需要更多的代码和服务器资源来处理连接和数据传输。
兼容性问题:虽然大多数现代浏览器支持 WebSocket,但在一些旧版本的浏览器或特定环境中可能存在兼容性问题。
安全风险:由于 WebSocket 可以实现双向通信,需要注意安全问题,如防止跨站脚本攻击(XSS)和跨站请求伪造(CSRF)。
服务器资源消耗:相比 SSE,WebSocket 可能会消耗更多的服务器资源,特别是在处理大量并发连接时。
SSE 适用于简单的单向数据推送场景,如新闻更新、实时通知等,而 WebSocket 更适合需要双向实时通信的场景,如在线聊天、实时游戏等。在选择使用哪种技术时,需要根据具体的应用需求、浏览器兼容性和服务器资源等因素进行综合考虑
前端逻辑分析
连接并获取数据
在layout的index.vue中引入了sse的初始化方法:
import { initSSE } from '@/utils/sse';
当页面加载的时候调用:
onMounted(() => {
initSSE(import.meta.env.VITE_APP_BASE_API + '/resource/sse');
});
initSSE方法定义在utils/sse.ts中,用于处理sse相关功能:
1. 首先调用useEventSource建立sse连接,返回数据保存在data中,错误保存在error中;
2. 如果出现错误,控制台打印一条日志;
3. 如果有数据返回,使用useNoticeStore().addNotice添加到用户通知中;
export const initSSE = (url: any) => {
if (import.meta.env.VITE_APP_SSE === 'false') {
return;
}
url = url + '?Authorization=Bearer ' + getToken() + '&clientid=' + import.meta.env.VITE_APP_CLIENT_ID;
const { data, error } = useEventSource(url, [], {
autoReconnect: {
retries: 10,
delay: 3000,
onFailed() {
console.log('Failed to connect after 10 retries');
}
}
});
watch(error, () => {
console.log('SSE connection error:', error.value);
error.value = null;
});
watch(data, () => {
if (!data.value) return;
useNoticeStore().addNotice({
message: data.value,
read: false,
time: new Date().toLocaleString()
});
ElNotification({
title: '消息',
message: data.value,
type: 'success',
duration: 3000
});
data.value = null;
});
};
显示通知及已读处理
在layout/notice/index.vue中显示通知,进入页面获取通知列表:
onMounted(() => {
nextTick(() => {
getTableData();
});
});
const getTableData = async () => {
state.loading = true;
newsList.value = noticeStore.state.value.notices;
state.loading = false;
};
用户点击通知的时候调用onNewsClick方法,把已读状态更新为true
//点击消息,写入已读
const onNewsClick = (item: any) => {
newsList.value[item].read = true;
//并且写入pinia
noticeStore.state.value.notices = newsList.value;
};
后端逻辑分析
后端代码在ruoyi-common\ruoyi-common-sse中实现。引入ruoyi-common\ruoyi-common-sse的时候自动导入org.dromara.common.sse.config.SseAutoConfiguration
该类自动注入了3个bean:
1. SseEmitterManager:管理 Server-Sent Events (SSE) 连接,建立与指定用户的连接,断开连接等;
2. SseTopicListener:SSE 主题订阅监听器
3. SseController:SSE 控制器
/**
* SSE 自动装配
*
* @author Lion Li
*/
@AutoConfiguration
@ConditionalOnProperty(value = "sse.enabled", havingValue = "true")
@EnableConfigurationProperties(SseProperties.class)
public class SseAutoConfiguration {
@Bean
public SseEmitterManager sseEmitterManager() {
return new SseEmitterManager();
}
@Bean
public SseTopicListener sseTopicListener() {
return new SseTopicListener();
}
@Bean
public SseController sseController(SseEmitterManager sseEmitterManager) {
return new SseController(sseEmitterManager);
}
}
建立连接
前端请求/resource/sse的时候到达SseController的connect方法:
/**
* 建立 SSE 连接
*/
@GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter connect() {
String tokenValue = StpUtil.getTokenValue();
Long userId = LoginHelper.getUserId();
return sseEmitterManager.connect(userId, tokenValue);
}
sse.path在nacos中ruoyi-resource.yml配置:
# 默认/推荐使用sse推送
sse:
enabled: true
path: /sse
connect方法中首先获取用户token和当前登录用户id,调用sseEmitterManager.connect(userId, tokenValue)方法,建立与用户的连接:
/**
* 建立与指定用户的 SSE 连接
*
* @param userId 用户的唯一标识符,用于区分不同用户的连接
* @param token 用户的唯一令牌,用于识别具体的连接
* @return 返回一个 SseEmitter 实例,客户端可以通过该实例接收 SSE 事件
*/
public SseEmitter connect(Long userId, String token) {
// 从 USER_TOKEN_EMITTERS 中获取或创建当前用户的 SseEmitter 映射表(ConcurrentHashMap)
// 每个用户可以有多个 SSE 连接,通过 token 进行区分
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
// 创建一个新的 SseEmitter 实例,超时时间设置为 0 表示无限制
SseEmitter emitter = new SseEmitter(0L);
emitters.put(token, emitter);
// 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token
emitter.onCompletion(() -> emitters.remove(token));
emitter.onTimeout(() -> emitters.remove(token));
emitter.onError((e) -> emitters.remove(token));
try {
// 向客户端发送一条连接成功的事件
emitter.send(SseEmitter.event().comment("connected"));
} catch (IOException e) {
// 如果发送消息失败,则从映射表中移除 emitter
emitters.remove(token);
}
return emitter;
}
SseEmitterManager类中定义了订阅消息的方法,首先是单个用户订阅:
/**
* 订阅SSE消息主题,并提供一个消费者函数来处理接收到的消息
*
* @param consumer 处理SSE消息的消费者函数
*/
public void subscribeMessage(Consumer<SseMessageDto> consumer) {
RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer);
}
通过订阅redis中的SSE_TOPIC(global:sse)向consumer发送SseMessageDto消息。
subscribeMessage方法由SseTopicListener调用,作用是在Spring Boot应用程序启动时初始化SSE主题订阅监听器,redis中值发生变化的时候,则向用户发送sse消息。SseTopicListener源码如下:
/**
* SSE 主题订阅监听器
*
* @author Lion Li
*/
@Slf4j
public class SseTopicListener implements ApplicationRunner, Ordered {
@Autowired
private SseEmitterManager sseEmitterManager;
/**
* 在Spring Boot应用程序启动时初始化SSE主题订阅监听器
*
* @param args 应用程序参数
* @throws Exception 初始化过程中可能抛出的异常
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sseEmitterManager.subscribeMessage((message) -> {
log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage());
// 如果key不为空就按照key发消息 如果为空就群发
if (CollUtil.isNotEmpty(message.getUserIds())) {
message.getUserIds().forEach(key -> {
sseEmitterManager.sendMessage(key, message.getMessage());
});
} else {
sseEmitterManager.sendMessage(message.getMessage());
}
});
log.info("初始化SSE主题订阅监听器成功");
}
@Override
public int getOrder() {
return -1;
}
}
发送sse消息举例
用户登录完成之后会发送登录成功消息。
scheduledExecutorService.schedule(() -> {
remoteMessageService.publishMessage(userId, "欢迎登录微惊客");
}, 3, TimeUnit.SECONDS);
public class RemoteMessageServiceImpl implements RemoteMessageService {
/**
* 发送消息
*
* @param sessionKey session主键 一般为用户id
* @param message 消息文本
*/
@Override
public void publishMessage(Long sessionKey, String message) {
SseMessageDto dto = new SseMessageDto();
dto.setMessage(message);
dto.setUserIds(List.of(sessionKey));
SseMessageUtils.publishMessage(dto);
}
}
最终调用SseEmitterManager的publishMessage方法:
/**
* 发布SSE订阅消息
*
* @param sseMessageDto 要发布的SSE消息对象
*/
public static void publishMessage(SseMessageDto sseMessageDto) {
MANAGER.publishMessage(sseMessageDto);
}
/**
* 发布SSE订阅消息
*
* @param sseMessageDto 要发布的SSE消息对象
*/
public void publishMessage(SseMessageDto sseMessageDto) {
SseMessageDto broadcastMessage = new SseMessageDto();
broadcastMessage.setMessage(sseMessageDto.getMessage());
broadcastMessage.setUserIds(sseMessageDto.getUserIds());
RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage());
});
}
文章评论