概述
MQ配置器,作为GongqiOS平台的核心组件,提供了一套高效的消息队列配置与管理解决方案,旨在优化用户在消息传输、订阅配置以及系统间数据流转与事件驱动方面的管理效率。该工具允许用户便捷地设定RabbitMQ与MongoDB的连接参数,并执行消息主题创建、订阅管理及历史查询等操作。同时,MQ配置器具备执行程序任务和脚本任务的能力,通过消息订阅自动触发操作,显著提升了系统响应能力和业务流程的自动化水平。兼容RabbitMQ与MongoDB,MQ配置器与GongqiOS生态系统紧密联动,支持与数字表格及其他业务应用的无缝集成,适用于事件驱动和任务自动化的多种业务场景,如表单操作触发和多域数据联动。操作界面直观易用,即便非专业开发人员也能快速掌握,同时提供高度可定制的消息处理逻辑,以满足个性化任务自动化的需求。
引言
MQ(消息队列) 是一种用于在不同程序之间传递数据的技术。它的工作原理类似于一个“中转站”:发送方(生产者)将消息发送到队列中,接收方(消费者)从队列中获取并处理这些消息。这种机制允许生产和消费过程异步进行,从而提高系统的效率和灵活性。
核心概念:
- 生产者(Producer):负责生成消息并将其发送到队列。
- 消费者(Consumer):负责从队列中获取消息并处理。
- 队列(Queue):存储消息的地方,生产者和消费者通过队列间接通信。
消息队列的作用
消息队列在软件开发中有许多重要的用途,以下是几个常见的场景:
- 解耦 在复杂系统中,不同的服务往往需要相互通信。如果直接调用对方的功能,会导致服务之间高度依赖,难以维护。通过消息队列,服务之间不需要直接交互,只需关注如何发送或接收消息即可。
- 异步处理 当某些任务需要较长时间完成时,可以将其放入队列中由后台处理,而不需要让前台用户等待。例如,上传文件后,系统可以立即返回成功提示,而文件的处理则在后台完成。
- 缓冲能力 在高并发场景下,消息队列可以作为缓冲区,平滑地处理大量涌入的请求,避免系统过载。
MQ配置器APP 是一个基于图形化界面的应用程序,旨在简化 RabbitMQ 的连接配置、主题创建与管理、订阅配置以及消息推送与接收的过程。通过该工具,用户无需直接编写复杂的 RabbitMQ 客户端代码,即可快速搭建消息队列系统并进行测试。
入口
点击打开MQ配置器
MQ配置
打开MQ配置,点击【记录操作-新增】
1)RabbitMQ设置:主机IP:127.0.0.1;端口号:25022(环境端口号最后两位变成22);用户名:guest;密码:guest;
2)MongoDB:主机IP:127.0.0.1;端口号:25020(环境端口号最后两位变成20);用户名和密码都没有。(主要用于把数据存到了MongoDB)
用于存放MQ的发送记录和消费记录
3)记录保留时间
用于配置发送记录和消费记录的表留时间,0表示不清除记录
消息主题
在消息队列系统中,“消息主题”可以被看作是消息的一个分类或标签。通过为消息指定一个主题,您可以决定哪些消息应该被发送到哪里,以及谁应该接收这些消息。这使得不同的应用程序可以根据它们感兴趣的主题来选择性地接收消息。
消息订阅
建立消息订阅。当MQ收到该主题的消息时,所有该主题的订阅者,都会被触发,执行相应的触发逻辑
消息主题:必填项,下拉选择要订阅的消息主题
消息标签:手动输入。收到的消息可能会携带标签,当订阅的标签不为空且与收到的消息的标签不匹配时,此消息将不触发
消息域:下拉选择,如果设置了消息域,则只有这个域发出的消息会被执行
处理类型:必填项,有脚本片段、脚本文件、执行程序
执行用户:必填项,下拉选择,os有效的用户
执行域:下拉选择,只和执行程序有关(执行哪个域的程序)
脚本参数:手动输入,执行脚本片段或者脚本程序时,可以作为变量传入
脚本片段
订阅可以执行JS脚本
JS脚本写法请参考脚本编辑器
如需测试,请选择后台运行该脚本
在JS脚本中可以通过$params.message.body拿到消息的内容
脚本文件
还可以直接执行某个JS脚本文件
执行程序
还可以执行指定域的java代码
该java类需要实现MessageListener接口
package gongqi.erp.layers.ent.utils;
import com.gongqilink.tools.message.MessageListener;
public class MQDailyReport implements MessageListener{
@Override
public void onMessage(String topic, String tag, String content) {
System.out.println("主题:" + topic);
System.out.println("标签:" + tag);
System.out.println("内容:" + content);
}
}
测试消息
回到消息主题界面,点击测试按钮
输入消息内容和消息标签
点击确认
发送记录
消费记录
执行订阅的消费记录
消息推送
可以使用框架的API,直接推送消息到MQ,写法如下:
package gongqi.erp.layers.ent.jobs;
import com.gongqilink.tools.message.MessageHelper;
import gongqi.erp.gotmodel.job.GongqiJob;
public class Job001 extends GongqiJob{
public void execute() {
MessageHelper.multicastMessage("测试", "这是标签", "这是消息内容");
}
}
示例一(数字表格)
数字表格在创建、提交、审核、删除时,都会推送消息至MQ,
以下示例是在生产日报提交后,通过MQ,推送消息到聊天窗口,并执行相应的java代码
1.准备一个生产日报的数字表格
2.创建主题gongqi.os.form_Maked,并启用
注:gongqi.os.form_Maked是数字表格提交的主题,具体请查看数字表格
3.创建订阅执行java代码的订阅
4.创建执行JS脚本的订阅(这个JS脚本用来发消息)
启用两个订阅
MQ收到数字表格推送的消息
执行两个订阅
示例二(域通讯)
MQ还可以用在 域之间的通讯
现有场景 要求两个物料域的币种要保持一致
实现思路:
在币种表的新增、更新、删除方法中,新增MQ消息推送
另一个域,订阅这条消息,根据消息标签,同步新增,更新,删除币种表
package gongqi.erp.layers.ent.tables;
import java.util.Map;
import com.gongqilink.tools.message.MessageHelper;
import gongqi.erp.framework.fastjson.FastJsonUtils;
import gongqi.erp.layers.ent.tables.base.BaseCurrencyTable;
public class CurrencyTable extends BaseCurrencyTable {
@Override
public void insert() {
super.insert();
Map<String, Object> map = this.castToMap();
//推送记录新增消息
MessageHelper.multicastMessage("CurrencyTable", "insert", FastJsonUtils.toJSONString(map));
}
@Override
public void delete() {
super.delete();
//推送记录删除消息
MessageHelper.multicastMessage("CurrencyTable", "delete", this.getCurrencyCode());
}
@Override
public void update() {
super.update();
Map<String, Object> map = this.castToMap();
//推送记录修改消息
MessageHelper.multicastMessage("CurrencyTable", "update", FastJsonUtils.toJSONString(map));
}
}
创建主题
创建订阅
执行的代码
package gongqi.erp.layers.ent.utils;
import java.util.Map;
import com.alibaba.fastjson.JSONObject;
import com.gongqilink.tools.message.MessageListener;
import gongqi.erp.framework.data.GongqiSession;
import gongqi.erp.gotmodel.util.SysDateHelper;
import gongqi.erp.layers.obj.tables.CurrencyTable;
public class MQMessageListener implements MessageListener{
@Override
public void onMessage(String topic, String tag, String content) {
switch (tag) {
case "insert":
GongqiSession.ttsbegin();
JSONObject data = JSONObject.parseObject(content);
Map<String,Object> map = data.getInnerMap();
CurrencyTable currencyTable = CurrencyTable.newInstance();
currencyTable.init();
currencyTable.setCurrencyCode(map.get("currencyCode").toString());
currencyTable.setCurrencyName(map.get("currencyName").toString());
currencyTable.setCurrencySymbol(map.get("currencySymbol").toString());
currencyTable.setAmountNumOfDecimals(Long.parseLong(map.get("amountNumOfDecimals").toString()));
currencyTable.setPriceNumOfDecimals(Long.parseLong(map.get("priceNumOfDecimals").toString()));
currencyTable.setSysActive(Boolean.parseBoolean(map.get("sysActive").toString()));
currencyTable.setStopDate(SysDateHelper.valueOf(map.get("stopDate").toString()));
currencyTable.doInsert();
GongqiSession.ttscommit();
break;
case "delete":
GongqiSession.ttsbegin();
CurrencyTable find = CurrencyTable.util().find(CurrencyTable.class, content);
if(find != null) {
find.delete();
}
GongqiSession.ttscommit();
break;
case "update":
GongqiSession.ttsbegin();
data = JSONObject.parseObject(content);
map = data.getInnerMap();
CurrencyTable ct = CurrencyTable.util().find(CurrencyTable.class, map.get("currencyCode").toString());
if(ct != null) {
ct.setCurrencyCode(map.get("currencyCode").toString());
ct.setCurrencyName(map.get("currencyName").toString());
ct.setCurrencySymbol(map.get("currencySymbol").toString());
ct.setAmountNumOfDecimals(Long.parseLong(map.get("amountNumOfDecimals").toString()));
ct.setPriceNumOfDecimals(Long.parseLong(map.get("priceNumOfDecimals").toString()));
ct.setSysActive(Boolean.parseBoolean(map.get("sysActive").toString()));
ct.setStopDate(SysDateHelper.valueOf(map.get("stopDate").toString()));
ct.doUpdate();
}else {
CurrencyTable ct1 = CurrencyTable.newInstance();
ct1.init();
ct1.setCurrencyCode(map.get("currencyCode").toString());
ct1.setCurrencyName(map.get("currencyName").toString());
ct1.setCurrencySymbol(map.get("currencySymbol").toString());
ct1.setAmountNumOfDecimals(Long.parseLong(map.get("amountNumOfDecimals").toString()));
ct1.setPriceNumOfDecimals(Long.parseLong(map.get("priceNumOfDecimals").toString()));
ct1.setSysActive(Boolean.parseBoolean(map.get("sysActive").toString()));
ct1.setStopDate(SysDateHelper.valueOf(map.get("stopDate").toString()));
ct1.doInsert();
}
GongqiSession.ttscommit();
break;
default:
break;
}
}
}
测试效果
物料域1新增币种
物料域2 同步到新增的币种