如何使用 Spring + WebSocket + Quartz 實現消息定時推送?

2022-01-09 SpringForAll社區

簡單的說,websocket是真正實現了全雙工通信的伺服器向客戶端推的網際網路技術。

全雙工與單工、半雙工的區別?

全雙工:簡單地說,就是可以同時進行信號的雙向傳輸(A->B且B->A),是瞬時同步的。

推送和拉取的區別?

推:由伺服器主動發消息給客戶端,就像廣播。優勢在於,信息的主動性和及時性。

實現消息通信的幾種方式?

接下來我們主要講第三種,使用websocket協議,來實現服務端定時向客戶端推送消息。

後臺:springmvc、websocket、quartz實現步驟一、環境搭建(1)導入相關約束:

在pom文件中加入需要的約束,spring相關的約束,請各位自己導入,這裡我就不貼出來了。

<!-- 定時器的包 -->
    <dependency>
      <groupId>org.quartz-scheduler</groupId>
      <artifactId>quartz</artifactId>
      <version>2.3.0</version>
    </dependency>
<!-- 
 spring-support.jar 這個jar 文件包含支持UI模版(Velocity,FreeMarker,JasperReports),郵件服務,腳本服務(JRuby),緩存Cache(EHCache),任務計劃Scheduling(uartz)方面的類。 
 外部依賴spring-context, (spring-jdbc, Velocity, FreeMarker, JasperReports, BSH, Groovy, JRuby, Quartz, EHCache) 
 -->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context-support</artifactId>
      <version>5.1.1.RELEASE</version>
    </dependency>
<!-- websocket的包 -->
    <dependency>
      <groupId>javax.websocket</groupId>
      <artifactId>javax.websocket-api</artifactId>
      <version>1.1</version>
      <scope>provided</scope>
    </dependency>
    
<!--
ps:如果使用原始的配置方式,需要導入spring-websocket、spring-messaging的包,我們這裡就通過註解實現
-->

(2)配置xml文件

web.xml中就配置前端控制器,大家自行配置。然後,加載springmvc的配置文件。

springmvc.xml文件中

    <!-- 自動將控制器加載到bean -->
    <context:component-scan base-package="com.socket.web" />
 <!-- 配置視圖解析器 -->
    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
        <property name="prefix" value="/WEB-INF/views/"/>
        <property name="suffix" value=".jsp"/>
        <property name="contentType" value="text/html; charset=utf-8"/>
    </bean>
    <!-- 自動註冊 DefaultAnnotationHandlerMapping 與 AnnotationMethodHandlerAdapter 兩個 bean, 解決了 @Controller 註解的使用前提配置 -->
    <mvc:annotation-driven/>
    
    <!-- 使用fastjson 解析json   因為本人的項目中用到了fastjson,所以這段配置大家可以忽略。 -->
    <mvc:annotation-driven>
        <mvc:message-converters register-defaults="true">
            <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter">
                <property name="supportedMediaTypes">
                    <list>
                        <value>text/html;charset=UTF-8</value>
                        <value>application/json</value>
                    </list>
                </property>
                <property name="features">
                    <list>
                        <value>WriteMapNullValue</value>
                        <value>QuoteFieldNames</value>
                    </list>
                </property>
            </bean>
        </mvc:message-converters>
    </mvc:annotation-driven>

到此,環境就基本搭建完成了。

二、完成後臺的功能

這裡我就直接貼出代碼了,上面有相關的注釋。

首先,完成websocket的實現類。

package com.socket.web.socket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Author: 清風一陣吹我心
 * @ProjectName: socket
 * @Package: com.socket.web.socket
 * @ClassName: WebSocketServer
 * @Description:
 * @Version: 1.0
 **/
//ServerEndpoint它的功能主要是將目前的類定義成一個websocket伺服器端。註解的值將被用於監聽用戶連接的終端訪問URL地址。
@ServerEndpoint(value = "/socket/{ip}")
@Component
public class WebSocketServer {

    //使用slf4j打日誌
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);

    //用來記錄當前在線連接數
    private static int onLineCount = 0;

    //用來存放每個客戶端對應的WebSocketServer對象
    private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<String, WebSocketServer>();

    //某個客戶端的連接會話,需要通過它來給客戶端發送數據
    private Session session;

    //客戶端的ip地址
    private String ip;

    /**
     * 連接建立成功,調用的方法,與前臺頁面的onOpen相對應
     * @param ip ip地址
     * @param session 會話
     */
    @OnOpen
    public void onOpen(@PathParam("ip")String ip,Session session){
        //根據業務,自定義邏輯實現
        this.session = session;
        this.ip = ip;
        webSocketMap.put(ip,this);  //將當前對象放入map中
        addOnLineCount();  //在線人數加一
        LOGGER.info("有新的連接加入,ip:{}!當前在線人數:{}",ip,getOnLineCount());
    }

    /**
     * 連接關閉調用的方法,與前臺頁面的onClose相對應
     * @param ip
     */
    @OnClose
    public void onClose(@PathParam("ip")String ip){
        webSocketMap.remove(ip);  //根據ip(key)移除WebSocketServer對象
        subOnLineCount();
        LOGGER.info("WebSocket關閉,ip:{},當前在線人數:{}",ip,getOnLineCount());
    }

    /**
     * 當伺服器接收到客戶端發送的消息時所調用的方法,與前臺頁面的onMessage相對應
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message,Session session){
        //根據業務,自定義邏輯實現
        LOGGER.info("收到客戶端的消息:{}",message);
    }

    /**
     * 發生錯誤時調用,與前臺頁面的onError相對應
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session,Throwable error){
        LOGGER.error("WebSocket發生錯誤");
        error.printStackTrace();
    }


    /**
     * 給當前用戶發送消息
     * @param message
     */
    public void sendMessage(String message){
        try{
            //getBasicRemote()是同步發送消息,這裡我就用這個了,推薦大家使用getAsyncRemote()異步
            this.session.getBasicRemote().sendText(message);
        }catch (IOException e){
            e.printStackTrace();
            LOGGER.info("發送數據錯誤:,ip:{},message:{}",ip,message);
        }
    }

    /**
     * 給所有用戶發消息
     * @param message
     */
    public static void sendMessageAll(final String message){
        //使用entrySet而不是用keySet的原因是,entrySet體現了map的映射關係,遍歷獲取數據更快。
        Set<Map.Entry<String, WebSocketServer>> entries = webSocketMap.entrySet();
        for (Map.Entry<String, WebSocketServer> entry : entries) {
            final WebSocketServer webSocketServer = entry.getValue();
            //這裡使用線程來控制消息的發送,這樣效率更高。
            new Thread(new Runnable() {
                public void run() {
                    webSocketServer.sendMessage(message);
                }
            }).start();
        }
    }

    /**
     * 獲取當前的連接數
     * @return
     */
    public static synchronized int getOnLineCount(){
        return WebSocketServer.onLineCount;
    }

    /**
     * 有新的用戶連接時,連接數自加1
     */
    public static synchronized void addOnLineCount(){
        WebSocketServer.onLineCount++;
    }

    /**
     * 斷開連接時,連接數自減1
     */
    public static synchronized void subOnLineCount(){
        WebSocketServer.onLineCount--;
    }

    public Session getSession(){
        return session;
    }
    public void setSession(Session session){
        this.session = session;
    }

    public static ConcurrentHashMap<String, WebSocketServer> getWebSocketMap() {
        return webSocketMap;
    }

    public static void setWebSocketMap(ConcurrentHashMap<String, WebSocketServer> webSocketMap) {
        WebSocketServer.webSocketMap = webSocketMap;
    }
}

然後寫我們的定時器(quartz),這裡我就不詳解定時器了。大家可以自行去了解。

這裡我使用的是xml註解的方式,創建一個job類,此類不需要繼承任何類和實現任何接口。

package com.socket.web.quartz;

import com.socket.web.socket.WebSocketServer;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Author: 清風一陣吹我心
 * @ProjectName: socket
 * @Package: com.socket.web.quartz
 * @ClassName: TestJob
 * @Description:
 * @Version: 1.0
 **/
public class TestJob {

    public void task(){
        //獲取WebSocketServer對象的映射。
        ConcurrentHashMap<String, WebSocketServer> map = WebSocketServer.getWebSocketMap();
        if (map.size() != 0){
            for (Map.Entry<String, WebSocketServer> entry : map.entrySet()) {
                WebSocketServer webSocketServer = entry.getValue();
                try {
                    //向客戶端推送消息
                    webSocketServer.getSession().getBasicRemote().sendText("每隔兩秒,向客戶端推送一次數據");
                }catch (IOException e){
                    e.printStackTrace();
                }
            }
        }else {
            System.out.println("WebSocket未連接");
        }
    }
}

定時器的實現類就完成了,我們還需要在springmvc.xml中進行配置

springmvc.xml配置:

<!-- 要執行的任務類 -->
    <bean id="testJob" class="com.socket.web.quartz.TestJob"></bean>

    <!-- 將需要執行的定時任務注入job中 -->
    <bean id="jobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
        <property name="targetObject" ref="testJob"/>
        <!-- 任務類中需要執行的方法 -->
        <property name="targetMethod" value="task"></property>
        <!-- 上一次未執行完成的,要等待有再執行。 -->
        <property name="concurrent" value="false" />
    </bean>

    <!-- 基本的定時器,會綁定具體的任務。 -->
    <bean id="trigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean">
        <property name="jobDetail" ref="jobDetail"/>
        <property name="startDelay" value="3000"/>
        <property name="repeatInterval" value="2000"/>
    </bean>

    <bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
        <property name="triggers">
            <list>
                <ref bean="trigger"/>
            </list>
        </property>
    </bean>

接下來是controller層的代碼,就一個登錄的功能。

package com.socket.web.controller;

import com.socket.domain.User;
import com.sun.org.apache.bcel.internal.generic.RETURN;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.util.UUID;

/**
 * @Author: 清風一陣吹我心
 * @ProjectName: socket
 * @Package: com.socket.web
 * @ClassName: ChatController
 * @Description:
 * @CreateDate: 2018/11/9 11:04
 * @Version: 1.0
 **/
@RequestMapping("socket")
@Controller
public class ChatController {

    /**
     * 跳轉到登錄頁面
     * @return
     */
    @RequestMapping(value = "/login",method = RequestMethod.GET)
    public String goLogin(){
        return "login";
    }

    /**
     * 跳轉到聊天頁面
     * @param request
     * @return
     */
    @RequestMapping(value = "/home",method = RequestMethod.GET)
    public String goMain(HttpServletRequest request){
        HttpSession session = request.getSession();
        if (null == session.getAttribute("USER_SESSION")){
            return "login";
        }
        return "home";
    }

    @RequestMapping(value = "/login",method = RequestMethod.POST)
    public String login(User user, HttpServletRequest request){
        HttpSession session = request.getSession();
        //將用戶放入session
        session.setAttribute("USER_SESSION",user);
        return "redirect:home";
    }

}

以上就是登錄的代碼了,基本上就是偽代碼,只要輸入用戶名就可以了,後面的邏輯,大家可以根據自己的業務來實現。

最後就是前臺頁面的設計了,登錄,login.jsp

<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<c:set var="path" value="${pageContext.request.contextPath}"/>
<html>
<head>
    <title>登錄</title>
</head>
<body>
<form action="${path}/socket/login" method="post">
    登錄名:<input type="text" name="username"/>
    <input type="submit" value="登錄"/>
</form>
</body>
</html>

消息接收頁面,home.jsp

<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<head>
    <title>聊天</title>
    
    <script type="text/javascript">
        //判斷當前瀏覽器是否支持WebSocket
        var webSocket = null;
        if ('WebSocket' in window) {
            webSocket = new WebSocket("ws://localhost:9001/socket/127.0.0.1");
        }
        else if ('MozWebSocket' in window) {
            webSocket = new MozWebSocket("ws://localhost:9001/socket/127.0.0.1");
        }
        else {
            alert('Not support webSocket');
        }

        //打開socket,握手
        webSocket.onopen = function (event) {
            alert("websocket已經連接");
        }
        //接收推送的消息
        webSocket.onmessage = function (event) {
            console.info(event);
            alert(event.data);
        }
        //錯誤時
        webSocket.onerror = function (event) {
            console.info("發生錯誤");
            alert("websocket發生錯誤" + event);
        }

        //關閉連接
        webSocket.onclose = function () {
            console.info("關閉連接");
        }

        //監聽窗口關閉
        window.onbeforeunload = function (event) {
            webSocket.close();
        }
    </script>
</head>
<body>

</body>
</html>

基本上,數據推送的功能就完成了,下面附上效果圖。

啟動tomcat。後臺定時器兩秒刷新一次,判斷是否有websocket連接。

登錄頁面:

數據推送頁面:

伺服器定時向客戶端推送數據的功能就完成了,有不明白的可以給博主留言,如果有什麼錯誤,也希望各位朋友指出,謝謝大家。

本文源碼:

https://github.com/Qingfengchuiwoxin/websocket

相關焦點

  • 基於Spring整合Quartz集群的定時任務應用
    使用 Quartz 的集群能力可以更好的支持你的業務需求,並且即使是其中一臺機器在最糟的時間崩潰了也能確保所有的 Job 得到執行。Quartz 中集群如何工作一個 Quartz 集群中的每個節點是一個獨立的 Quartz 應用,它又管理著其他的節點。意思是你必須對每個節點分別啟動或停止。
  • Spring WebSocket 實現
    最近項目中需要使用WebSocket實現實時傳遞消息功能,網上大多數都是 Socket 實現,而 使用Spring
  • 初探SpringBoot整合Quartz定時任務
    定時任務有很多實現的方式,包括timer,timertask,scheduledexecutorservice,以及第三方框架Quartz。本篇文章主要介紹SpringBoot整合Quartz實現動態定時任務。1、Quartz是功能強大的開源作業調度庫,可以創建簡單或複雜的計劃,可以運行十個,百個,甚至幾萬個Jobs這樣複雜的日程序表。
  • Spring Boot 集成 WebSocket
    在一次項目開發中,使用到了Netty網絡應用框架,以及MQTT進行消息數據的收發,這其中需要後臺來將獲取到的消息主動推送給前端,於是就使用到了MQTT,特此記錄一下。 一、什麼是websocket? WebSocket協議是基於TCP的一種新的網絡協議。
  • SpringBoot定時任務:schedule、quartz
    優勢:是spring框架提供的計劃任務,開發簡單,執行效率比較高。且在計劃任務數量太多的時候,可能出現阻塞,崩潰,延遲啟動等問題。Scheduled定時任務是spring3.0版本之後自帶的一個定時任務。其所屬Spring的資源包為:spring-context-support。
  • Springboot整合Websocket案例(後端向前端主動推送消息)
    在手機上相信都有來自伺服器的推送消息,比如一些及時的新聞信息,這篇文章主要就是實現這個功能,只演示一個基本的案例。使用的是websocket技術。一、什麼是websocketWebSocket協議是基於TCP的一種新的網絡協議。
  • Spring 整合 Quartz 分布式調度
    為了保證應用的高可用和高並發性,一般都會部署多個節點;對於定時任務,如果每個節點都執行自己的定時任務,一方面耗費了系統資源,另一方面有些任務多次執行,可能引發應用邏輯問題,所以需要一個分布式的調度系統,來協調每個節點執行定時任務。
  • SpringBoot2.0.3之quartz集成,不是你想的那樣哦!
    回復」666「獲取新整理的面試文章作者:青石路cnblogs.com/youzhibing/p/10024558.htmljava定時任務調度的實現方式
  • SpringBoot+Netty+Websocket整合案例(實現基本的聊天功能)
    之前使用Springboot整合了websocket,實現了一個後端向前端推送信息的基本小案例,這篇文章主要是增加了一個新的框架就是Netty,實現一個高性能的websocket伺服器,並結合前端代碼,實現一個基本的聊天功能。你可以根據自己的業務需求進行更改。
  • SpringBoot整合Quartz調度框架實現任務調度(附學習源碼)
    所以Quartz和Spring Task就成了我們項目開發技術選型最多的,在這裡我們著重探討一下Quartz在Spring Boot 2.X版本中的使用。Quartz是OpenSymphony開源組織在Job scheduling領域的開源項目,它可以與J2EE與J2SE應用程式相結合也可以單獨使用。
  • Spring Boot實現定時任務新解,你是否能get到?
    在日常的開發過程中經常使用到定時任務,在springMVC的開發中,經常和quartz框架進行集成使用,但在springboot中沒有這麼做,而是使用了java的線程池來實現定時任務。一、概述在springboot中使用定時任務非常簡單,只需要簡單的幾步即可完成。
  • 搭建websocket消息推送服務,必須要考慮的幾個問題
    websocket適用場景面對各種新場景對websocket功能和性能越來越高的需求,不同的團隊有不同的選擇,有的直接使用由專業團隊開發的成熟穩定的第三方websocket服務,有些則選擇自建websocket服務。
  • 學會這 10 種定時任務,我有點飄了
    使用Timer類的優缺點:優點:非常方便實現多個周期性的定時任務,並且支持延遲執行,還支持在指定時間之後執行,功能還算強大。缺點:如果其中一個任務耗時非常長,會影響其他任務的執行。三. spring 支持的定時任務1.spring taskspring task是spring3以上版本自帶的定時任務,實現定時任務的功能時,需要引入spring-context包,目前它支持:xml 和 註解 兩種方式。1.
  • 我有 10 種方法搞定定時任務,10種!
    使用Timer類的優缺點:優點:非常方便實現多個周期性的定時任務,並且支持延遲執行,還支持在指定時間之後支持,功能還算強大。缺點:如果其中一個任務耗時非常長,會影響其他任務的執行。三. spring支持的定時任務1.spring taskspring task是spring3以上版本自帶的定時任務,實現定時任務的功能時,需要引入spring-context包,目前它支持:xml 和 註解 兩種方式。1.
  • Java中Websocket使用實例解讀
    ,現在J2EE7的JSR356已經定義了統一的標準,請儘量使用支持最新通用標準的伺服器。實現import com.dooioo.websocket.utils.SessionUtils;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import javax.websocket
  • 微信小程序中如何使用WebSocket實現長連接(含完整源碼)
    》《WebSocket詳解(二):技術原理、代碼演示和應用案例》《WebSocket詳解(三):深入WebSocket通信協議細節》《WebSocket詳解(四):刨根問底HTTP與WebSocket的關係(上篇)》《WebSocket詳解(五):刨根問底HTTP與WebSocket的關係(下篇)》《WebSocket詳解(六):刨根問底WebSocket與Socket的關係》《socket.io實現消息推送的一點實踐及思路
  • 無需後端,小程序全自動定時推送模板消息系統
    >我寫了一個小程序,通過爬取網絡的API來展示每天的圖文信息,因為我是直接使用的第三方API,所以沒有編寫也沒必要編寫後端服務。但是我遇到了一個難題:我想每天都定時爬取API的新數據,然後推送給小程序的用戶,怎麼做呢?解決解決方法很簡單,甚至幾分鐘內你就可以做到!
  • 使用iris框架和websocket開發簡單的頁面訪問次數記錄功能
    Iris 框架裡內置 websocket 模塊,可以實現 websocket 伺服器,核心是封裝了 neffos 開源模塊。Websocket 可實現全雙工通信,也就是客戶端和伺服器在建立連接之後,可以互相接收和發送消息,具有長連接的特性。可以利用 websocket 實現移動端消息推送,地圖位置實時顯示等功能,提高用戶體驗。
  • Redis是如何實現點讚、取消點讚的?
    get個新技能:redis實現自動補全利用 Redis 實現「附近的人」功能!我是如何用redis做實時訂閱推送的?正文:本文基於 SpringCloud, 用戶發起點讚、取消點讚後先存入 Redis 中,再每隔兩小時從 Redis 讀取點讚數據寫入資料庫中做持久化存儲。
  • 實戰Spring Boot 2.0系列:單機定時任務的幾種實現
    常見的就是 金融服務系統推送回調,一般支付系統訂單在沒有收到成功的回調返回內容時會 持續性的回調,這種回調一般都是 定時任務 來完成。使用這種方式可以讓你的程序按照某一個 頻度執行,但不能在 指定時間 運行。現在一般用的較少。