一 系統架構二 開發環境三 API接口服務四 AI業務處理服務五 任務處理請求信息發送和接收六 業務處理服務集成Celery任務調度
為什麼要跨平臺呢?
1,Java + Spring Boot開發Web服務是常用搭配,豐富的組件和易用的功能;
2,Python在AI領域是主流開發語言,實現業務處理更方便,不需要代碼移植;
3,招聘工程師組建技術團隊有針對性,發揮各自優勢。
一,系統架構
ActiveMQ是一個非常流行的消息隊列服務中間件,基於JMS(Java Message Service)規範。
Celery是一個靈活可靠的分布式系統,用於異步任務調度,系統通常將一些耗時的操作任務提交給Celery去異步執行,典型系統架構示意圖如下:
本文基於Java + Spring Boot,Python + Django,集成ActiveMQ和Celery,搭建起一個跨平臺異步任務調度系統。
API接口服務
├── controller│ └── CheckController.java├── mq│ └── MqService.java│ └── MqConsumer.java
AI業務處理服務
├── celery.py├── tasks.py├── mq│ └── mq_service.py│ └── mq_listener.py
二,開發環境
系統依賴ActiveMQ和Redis運行,手動安裝配置稍顯繁瑣,可以使用Docker一鍵部署,下載資源編排腳本後運行docker-compose up -d
開發環境部署
├── docker-compose.yml├── up.sh
三,API接口服務
Spring Boot集成ActiveMQ只需簡單配置,開發步驟如下:
1,在application.yml中配置ActiveMQ伺服器信息
spring:activemq:broker-url: tcp://127.0.0.1:61616user: adminpassword: adminin-memory: falsepackages: trust-all: truepool: enabled: false
2,MqService封裝了消息發送功能,注意Java環境下使用文本消息TextMessage,發送時將Map轉換為JSON字符串,和Python環境下STOMP簡單文本協議對應。
3,MqConsumer.java接收任務處理狀態消息,使用的是發布訂閱消息Topic
4,配置完成後,啟動API服務,運行單元測試驗證消息發送接收功能
5,API服務接收到的狀態信息
四,AI業務處理服務
Python集成ActiveMQ使用stomp.py,基於STOMP協議(埠為61613),簡單(流)文本消息,開發步驟如下:
1,將ActiveMQ伺服器地址等信息配置在settings.py中,方便維護管理
MQ_URL = '127.0.0.1'MQ_PORT = 61613MQ_USER = 'admin'MQ_PASSWORD = 'admin'MQ_QUEUE = '/queue/starter.process'MQ_TOPIC = '/topic/starter.status'
2,為了增加代碼的兼容和容錯能力,封裝輔助函數send_msg(), consume_msg(), get_conn(), close_conn(),詳見代碼文件mq_service.py
3,增加mq_listener.py,聲明消息處理類,繼承stomp.ConnectionListener,在on_message()函數中將消息字符串解析為JSON,注意STOMP協議只支持簡單文本協議,所以此步轉換是必須的。
4,根據接收到的消息內容創建一個異步任務
5,封裝一個Django Command,調用comsume_msg啟動消息監聽服務,代碼在目錄management/commands下的mq.py
6,運行命令python manage.py mq,看到消息提示,啟動監聽服務成功
7,增加測試test_mq_service.py,發送消息
8,運行測試python manage.py test,看到消息發送和接收
五,任務處理請求信息發送和接收
現在API服務和AI業務處理服務已經能夠發送和接收ActiveMQ消息,將它們連接起來:
1,API服務:REST接口處理客戶端請求時,發送業務處理消息,使用點對點消息Queue, CheckController.java
2,業務處理服務:消息監聽服務接收到請求消息,調用Celery創建一個異步任務,代碼mq_listener.py,調用tasks.py中do_task()函數
3,將異步任務創建功能封裝為dispatch_task()函數
4,Celery異步任務處理時,發送狀態信息到API服務,代碼tasks.py,Celery的集成方法見下一章節。
5,API服務接收到信息,更新狀態並通知客戶端,代碼MqConsumer.java
消息格式轉換過程:
基本時序圖如下:
六,AI業務處理服務集成Celery
Django集成Celery配置方法步驟如下:
1. 增加celery.py,配置信息
2. 打開settings.py,配置BROKER和BACKEND地址
CELERY_BROKER = 'redis://127.0.0.1:6379/2'CELERY_BACKEND = 'redis://127.0.0.1:6379/3'
3. 打開__init__.py,增加代碼
from __future__ import absolute_import, unicode_literalsfrom .celery import app as celery_app__all__ = ['celery_app']
4. 增加tasks.py,聲明異步任務。任務處理過程中將調用mq_service發送狀態信息到API服務,Topic類型的文本信息
5. 正確配置後,運行命令啟動celery worker異步任務處理服務:celery -A hello_celery worker -l info -P eventlet,注意Win10環境中需要增加eventlet,Celery成功啟動信息:
6. 增加單元測試test_task_util.py,創建一個任務
7. 運行python manage.py test,Celery服務將執行測試函數創建的任務