項目中使用的場景:
帳戶掃碼登錄,微信掃碼授權,消息實時提醒,配置結果響應,客戶端同步數據。。。
之前項目裡做即時通信都是用的輪循,輪詢的效率低,非常浪費資源,後面好幾個項目都開始用的websocket配合koa和redis來實現,現在整理整理深入了解下整個即時通信實現的過程。
搭建方法跟node服務差不多,多了koa-websocket中間件
const Koa = require('koa')const router = require('koa-router')()const websockify = require('koa-websocket')const app = websockify(new Koa())router.all('/kapi/socket/init', async ctx => { const { channel } = ctx.query console.log(channel) ctx.websocket.send('message')})app.ws.use(router.routes()).use(router.allowedMethods())app.listen(3131, () => console.log(`socket listening on port ${Config.socket.port}`))
二、連結訂閱redis頻道在socket的接口基礎上,添加的了redis連結,頻道的訂閱,這裡只展示關鍵的代碼,後面會再做拆分整理
const Koa = require('koa')const router = require('koa-router')()const websockify = require('koa-websocket')const app = websockify(new Koa())const redis = require('redis')const redisConfig = { host: '18.8.1.3', port: '32', password: '1232343', db: 4}router.all('/kapi/socket/init', async ctx => { const { channel } = ctx.query console.log(channel) let client = redis.createClient(redisConfig) client.subscribe(channel) client.on('message', async (channel, message) => { console.log( 'Received subscribe message, channel [%s] message [%s]', channel, message ) await ctx.websocket.send(message) })}) app.ws.use(router.routes()).use(router.allowedMethods())app.listen(3131, () => console.log(`socket listening on port ${Config.socket.port}`))
三、客戶端調用客戶端調用之前一定別忘了先啟動socket,可以用pm2或者npm運行koa文件,展示的都是關鍵的代碼,至於監聽error 、open、close
import React, { useState, useEffect, useCallback } from 'react'import createSocket from 'src/api/socket' export default ()=>{ const [mess,setmess] = useState('暫無消息') const [channel] = useState(2) const sendSocket = useCallback(() => { wsServer = new WebSocket( `localhost:3131/kapi/socket/init?channel=${channel}` ) wsServer.addEventListener('message', socketMessage) const socketMessage = (event) => { const data = JSON.parse(event.data) if(data.code===200){ console.log(data) setmess(data) wsServer.removeEventListener('message', socketMessage) } } }, [channel]) useEffect(() => { sendSocket() }, [ sendSocket]) return ( <div>{mess}</div> )}
四、拆分整理細化一、將原來的socket服務拆分再細化
1.index.js
const Koa = require('koa')const router = require('koa-router')()const websockify = require('koa-websocket')const app = websockify(new Koa())const Config = require('../../config/const')const socketApiRoutes = require('./route')socketApiRoutes(router)app.ws.use(router.routes()).use(router.allowedMethods())app.listen(Config.socket.port, () => console.log(`socket listening on port ${Config.socket.port}`))
2.redis.js
const redis = require('redis')const dotenv = require('dotenv').config({ path: process.env.NODE_ENV == 'production' ? '.env' : '.env.local'})const ENV = dotenv.parsedconst redisConfig = { host: ENV.REDIS_HOST, port: ENV.REDIS_PORT, password: ENV.REDIS_PASSWORD, db: ENV.REDIS_DB}const createRedisClient = (channel, callback) => { let client = redis.createClient(redisConfig) client.subscribe(channel) client.on('ready', () => { console.log( 'Redis [%s:%s/%s] is connected and ready for subscribe channel [%s] use.', redisConfig.host, redisConfig.port, redisConfig.database, channel ) }) client.on('connect', () => { console.log('Redis connect') }) client.on('message', async (channel, message) => { console.log( 'Received subscribe message, channel [%s] message [%s]', channel, message ) await callback(channel, message) }) client.on('reconnecting', err => { console.log('Redis reconnecting:' + err) }) client.on('error', err => { console.log('Redis Error:' + err) }) client.on('subscribe', (channel, count) => { console.log( 'client subscribed to ' + channel + ',' + count + ' total subscriptions' ) }) client.on('unsubscribe', (channel, count) => { console.log( 'client unsubscribed from' + channel + ', ' + count + ' total subscriptions' ) }) return client}module.exports = createRedisClient
3.route.js
const createRedisClient = require('./redis')const socketApiRoutes = router => { router.all('/kapi/socket/init', async ctx => { const { channel } = ctx.query console.log(channel) createRedisClient(channel, (channel, message) => { console.log(`on message channel: ${channel}`) waitForSocketConnection(ctx.websocket, () => { ctx.websocket.send(message) }) }) function waitForSocketConnection(socket, callback) { setTimeout(() => { if (socket.readyState == 1) { if (callback != null) { callback() } } else { waitForSocketConnection(socket, callback) } }, 1000) } ctx.websocket.on('message', function(res) { }) ctx.websocket.on('close', function() { console.log('ctx websocket close') }) ctx.websocket.body = { status: true } })}module.exports = socketApiRoutes
二、將客戶端socket的調用封裝一下import { isSupportSocket } from 'src/utils/util'interface ScoketEvent { open?: () => void message?: (event: MessageEvent) => void error?: () => void close?: () => void}const createSocket = (pathname: string, eventOption?: ScoketEvent) => { let wsServer: WebSocket | null = null if (isSupportSocket()) { wsServer = new WebSocket( `${window.location.protocol === 'https:' ? 'wss' : 'ws'}://${ window.location.host }${pathname}` ) let _interval: NodeJS.Timer | null = null let _setTimeout: NodeJS.Timer | null = null if (wsServer) { const socketOpen = () => { if (eventOption && eventOption.open) { eventOption.open() } wsServer && wsServer.send('socket heart check') _interval = setInterval(() => { wsServer && wsServer.send('socket heart check') }, 30 * 1000) } const socketMessage = (event: MessageEvent) => { if (eventOption && eventOption.message) { eventOption.message(event) console.log(event, 'socketMessage') } } const socketError = () => { if (eventOption && eventOption.error) { eventOption.error() } if (_interval) clearInterval(_interval) wsServer = null console.log('websocket error') } const socketClose = () => { if (eventOption && eventOption.close) { eventOption.close() } if (wsServer) { wsServer.removeEventListener('open', socketOpen) wsServer.removeEventListener('message', socketMessage) wsServer.removeEventListener('error', socketError) } if (_interval) clearInterval(_interval) if (_setTimeout) clearTimeout(_setTimeout) } wsServer.addEventListener('open', socketOpen) wsServer.addEventListener('message', socketMessage) wsServer.addEventListener('error', socketError) wsServer.addEventListener('close', socketClose) } } else { console.error('該瀏覽器不支持socket, 請安裝新版本') } return wsServer}export default createSocket
三、客戶端socket調用在頁面裡調用上面封裝好的socket
import React, { useState, useEffect, useCallback } from 'react'import { RouteConfig } from 'react-router-config'import createSocket from 'src/api/socket'interface IProps extends RouteConfig {}export default (props: IProps) => { const [channel] = useState<number>(2) const [mess,setmess] = useState<string>('暫無消息') const sendSocket = useCallback(() => { const wsServer: WebSocket | null = createSocket( `/kapi/socket/init?channel=${channel}`, { message: socketMessage } ) function socketMessage(event: MessageEvent) { const data = JSON.parse(event.data) console.log(data) setmess(data) wsServer && wsServer.close() } return () => { if (wsServer) { wsServer.removeEventListener('message', socketMessage) wsServer.close() } } }, [channel]) useEffect(() => { sendSocket() }, [sendSocket]) return ( <div>{mess}</div> )}
整理完畢,希望對小夥伴有所幫助。