示例代码参考
public Flux<ChatResult> chatSSE(String qa,String sessionId) {
Flux<ChatResult> chatResultFlux = sseWebClient.post().bodyValue("""
{
"session_id":"%s",
"bot_app_key":"%s",
"visitor_biz_id":"%s",
"content":"%s",
"incremental": true,
"streaming_throttle": 10,
"visitor_labels": [],
"custom_variables":{},
"search_network":"disable",
"stream":"enable",
"workflow_status":"disable",
"tcadp_user_id":""
}
"""
.formatted(
sessionId,//会话id
tencentProperties.getBotAppKey(),//bot app key
sessionId,//访客id
qa // 问题
)
).retrieve()
.bodyToFlux(ChatResult.class)
// .doOnNext(chatResult -> log.info("chatResult:\n{}", chatResult))
// concatWith 在流正常结束后发送结束标记
.concatWith(
Flux.defer(()->{
log.info("【兜底】发送结束标记");
return Flux.just(
new ChatResult("done", null)
);
})
)
// 出错时也发送结束(可选)
.onErrorResume(e ->{
log.error("【兜底】发送结束标记error");
return Flux.just(new ChatResult("error", new JSONObject(
Map.of("content","三方调用错误e:"+e.toString())
)));
});
return chatResultFlux.filter(chatResult ->
(chatResult.getType().equals("reply")
&& Objects.nonNull(chatResult.getPayload())
&&!chatResult.getPayload().getBoolean("is_from_self")
)||chatResult.getType().equals("done")||chatResult.getType().equals("error"));
}
提示:腾讯云智能体SSE接口虽然有个
is_final
字段但是自己都不建议使用。所以没有推荐的终止事件标记,官方人员建议通过连接断开来判断是否结束标记位置,所以加了concatWith
和onErrorResume
两个兜底处理。
https://blog.xqlee.com/article/2509031704182115.html