websocket+定时任务实现实时推送

时间:2022-12-30 11:23:10

websocket+定时任务实现实时推送

有时候业务要求websocket连接后,服务端实时每隔一段时间就将数据推送给客户端进行响应,这时就需要websocket+定时任务一起来实现实时推送数据给客户端了。
使用的定时任务方式为spring的TaskScheduler对象实现任务调度。

TaskScheduler定时任务实现

  1. TaskScheduler接口提供了多种调度方法来实现运行任务的执行。
public interface TaskScheduler {
 
 	//通过触发器来决定task是否执行
    ScheduledFuture schedule(Runnable task, Trigger trigger); 
 
 	//在starttime的时候执行一次
    ScheduledFuture schedule(Runnable task, Date startTime);  
    ScheduledFuture schedule(Runnable task, Instant startTime); 
 
 	//从starttime开始每个period时间段执行一次task
    ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period); 
    ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period); 
 
 	//每隔period执行一次
    ScheduledFuture scheduleAtFixedRate(Runnable task, long period);  
    ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);  
 
 	//从startTime开始每隔delay长时间执行一次
    ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay); 
 
 	//每隔delay时间执行一次
    ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay); 
}
  1. 简单测试一下
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;

/**
 * The type Task scheduler test.
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -12-28 15:45:17
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class TaskSchedulerTest {

    private final TaskScheduler taskScheduler;

    @Bean
    public void test() {
        //每隔3秒执行一次
        Trigger trigger = new CronTrigger("0/3 * * * * *");
        //每隔1秒执行一次
        //Trigger trigger1 = new PeriodicTrigger(1, TimeUnit.SECONDS);
        taskScheduler.schedule(new MyThread(), trigger);
    }

    private class MyThread implements Runnable {
        @Override
        public void run() {
            log.info("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
        }
    }

}

效果就是每个3秒执行一次
websocket+定时任务实现实时推送

websocket+定时任务实时推送

实现的业务需求如下:客户端连上来以后就每隔3秒向客户端实时推送消息。有关websocket的实现见文章websocket简单实现

  1. TestWebsocket.java
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;

/**
 * 测试websocket
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -12-28 14:55:29
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class TestWebsocket implements WebSocketHandler {

    protected static final CopyOnWriteArrayList<WebSocketSession> WEB_SOCKET_SESSIONS = new CopyOnWriteArrayList<>();

    /**
     * 定时任务集合
     */
    Map<String, ScheduledFuture<?>> stringScheduledFutureMap = new ConcurrentHashMap<>();

    /**
     * taskScheduler
     */
    private final TaskScheduler taskScheduler;

    /**
     * 建立连接后操作
     *
     * @param session 连接session信息
     * @throws Exception exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sendMessage("连接成功~~~~~~,sessionId=" + session.getId());
        WEB_SOCKET_SESSIONS.add(session);
        //设置定时任务,每隔3s执行一次
        Trigger trigger = new CronTrigger("0/3 * * * * *");
        //开启一个定时任务
        ScheduledFuture<?> schedule = taskScheduler.schedule(new CustomizeTask(session.getId()), trigger);
        //根据session连接id定时任务线程存到map中
        stringScheduledFutureMap.put(session.getId(), schedule);
    }

    private class CustomizeTask implements Runnable {
        private final String sessionId;

        CustomizeTask(String sessionId) {
            this.sessionId = sessionId;
        }

        @Override
        public void run() {
            try {
                String message = CharSequenceUtil.format("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
                sendMessage(JSONUtil.toJsonStr(message), sessionId);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 接收到消息后的处理
     *
     * @param session 连接session信息
     * @param message 信息
     * @throws Exception exception
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        sendMessage("接收到的消息为=【" + message + "】,sessionId=【" + session.getId() + "】,回复消息=【你好呀!】");
    }

    /**
     * ws连接出错时调用
     *
     * @param session   session连接信息
     * @param exception exception
     * @throws Exception exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        if (session.isOpen()) {
            sendMessage("ws连接出错,即将关闭此session,sessionId=【" + session.getId() + "】");
            session.close();
        }
        WEB_SOCKET_SESSIONS.remove(session);
    }

    /**
     * 连接关闭后调用
     *
     * @param session     session连接信息
     * @param closeStatus 关闭状态
     * @throws Exception exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        if (session.isOpen()) {
            sendMessage("ws连接即将关闭此session,sessionId=【" + session.getId() + "】");
            session.close();
        }
        WEB_SOCKET_SESSIONS.remove(session);
        String sessionId = session.getId();
        ScheduledFuture<?> scheduledFuture = MapUtil.get(stringScheduledFutureMap, sessionId, ScheduledFuture.class);
        if (scheduledFuture != null) {
            //暂停对应session的开启的定时任务
            scheduledFuture.cancel(true);
            //集合移除
            stringScheduledFutureMap.remove(sessionId);
        }
    }

    /**
     * 是否支持分片消息
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * 群发发送消息
     *
     * @param message 消息
     * @throws IOException ioException
     */
    public void sendMessage(String message) throws IOException {
        if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
            for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
                webSocketSession.sendMessage(new TextMessage(message));
            }
        }
    }

    /**
     * 发给指定连接消息
     *
     * @param message 消息
     * @throws IOException ioException
     */
    public void sendMessage(String message, String sessionId) throws IOException {
        if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
            for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
                if (sessionId.equals(webSocketSession.getId())) {
                    webSocketSession.sendMessage(new TextMessage(message));
                }
            }
        }
    }
}
  1. websocket绑定URL
import com.yjj.test.websocket.TestWebsocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

import javax.annotation.Resource;

/**
 * websocket配置
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -12-28 15:10:11
 */
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {

    @Resource
    private TestWebsocket testWebsocket;

    /**
     * Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired.
     *
     * @param registry
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(testWebsocket, "/test").setAllowedOrigins("*");
    }
}
  1. websocket与定时任务同时存在时,需要加入配置定义线程池进行线程的管理
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/**
 * 当定时任务和websocket同时存在时报错解决
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -04-28 17:35:54
 */
@Configuration
public class ScheduledConfig {

    /**
     * Schedule本身是单线程执行的
     *
     * @return the task scheduler
     */
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler();
        scheduling.setPoolSize(20);
        return scheduling;
    }
}
  1. 效果如下
    连接上以后服务每隔3秒会向客户端实时推送消息
    websocket+定时任务实现实时推送