chengf
2025-10-28 2807cca4b6f2e8af204d798679dcee78e695ee28
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package com.ruoyi.business.uav.config;
 
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ruoyi.business.aidetection.config.Analyzer;
import com.ruoyi.business.aidetection.config.ZLMediaKit;
import com.ruoyi.business.aidetection.domain.AvControl;
import com.ruoyi.business.aidetection.domain.vo.AvAlgorithmVo;
import com.ruoyi.business.aidetection.domain.vo.AvControlAddVo;
import com.ruoyi.business.aidetection.service.AvAlgorithmService;
import com.ruoyi.business.aidetection.service.AvControlService;
import com.ruoyi.business.uav.domain.UavConfig;
import com.ruoyi.business.uav.domain.UavMessage;
import com.ruoyi.business.uav.domain.vo.UavStateMessageVo;
import com.ruoyi.business.uav.mapper.UavConfigMapper;
import com.ruoyi.business.uav.service.UavConfigMessageService;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.web.pb.UavStateMessage;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
 
import java.util.List;
import java.util.Map;
 
 
/**
 * @Author:yuankun
 * @Package:com.ruoyi.business.uav.config
 * @Project:ruoyi-vue-service
 * @name:aaaaa
 * @Date:2024/5/12 20:21
 * @Filename:MqttConsumerCallBack
 */
@Configuration
public class MqttConsumerCallBack implements MqttCallbackExtended {
 
    private RedisCache redisCache;
    @Autowired
    private UavConfigMapper uavConfigMapper;
    @Autowired
    private AvControlService avControlService;
    @Autowired
    private AvAlgorithmService avAlgorithmService;
    @Autowired
    private UavConfigMessageService uavConfigMessageService;
    @Autowired
    private Analyzer analyzer;
    @Autowired
    private ZLMediaKit zlMediaKit;
    /**
     * 客户端断开连接的回调
     */
    private int count = 1;
 
    @Override
    public void connectionLost(Throwable throwable) {
        //uavConfigMapper.
    }
 
    /**
     * 消息到达的回调
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        //System.out.println(topic);
        //ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(message.getPayload());
        //ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
 
        //TelemetryData telemetryData= (TelemetryData) objectInputStream.readObject();
 
 
        if (topic.contains("yukong/uav/state/1/")) {
            /*String pattern = "/([^/]+)$"; // 匹配最后一个斜杠后面的内容
 
            Pattern regex = Pattern.compile(pattern);
            Matcher matcher = regex.matcher(topic);
 
            if (matcher.find()) {
                String uavId = matcher.group(1);
                QueryWrapper<UavConfig> uavConfigQueryWrapper=new QueryWrapper<>();
                uavConfigQueryWrapper.eq("uav_id",uavId);
                uavConfigMapper.selectOne(uavConfigQueryWrapper);
                //System.out.println(extracted);
            }*/
            UavStateMessage uavStateMessage = UavStateMessage.parseFrom(message.getPayload());
            UavStateMessageVo uavStateMessageVo = new UavStateMessageVo();
            BeanUtils.copyProperties(uavStateMessage, uavStateMessageVo);
            //System.out.println(telemetryData);
            //JSONObject jsonObject=JSONUtil.parseObj(telemetryData,false);
            //Map<String>
            redisCache.setCacheObject("UavStatus", uavStateMessageVo);
        }
 
        if (topic.contains("yukong/message/company")) {
 
 
            String messageJson = new String(message.getPayload());
            JSONObject jsonObject = JSONUtil.parseObj(messageJson);
            //开始直播
            if ("C20001".equals(jsonObject.get("messageType").toString())) {
                Object messageMe = jsonObject.get("message");
                JSONObject mesObject = JSONUtil.parseObj(messageMe);
                Object boxSn = jsonObject.get("boxSn");
                String boxSnStr = JSONUtil.parseObj(boxSn).toString();
                //查询uav配置
                QueryWrapper<UavConfig> uavConfigQueryWrapper=new QueryWrapper<>();
                uavConfigQueryWrapper.eq("uav_id",boxSnStr);
                UavConfig uavConfig = uavConfigMapper.selectOne(uavConfigQueryWrapper);
                //查询是否此飞机订阅了该消息订阅
                if(uavConfig==null){
                    return;
                }
                List<UavMessage> uavMessages = uavConfigMessageService.queryUavMessageByConfigId(uavConfig.getId());
                int i = 0;
                for (UavMessage  uavMessage:uavMessages) {
                    if(uavMessage.getMessageTopic().contains("yukong/message/company")){
                        i++;
                        break;
                    }
                }
                if(i==0){//没有订阅该消息
                    return;
                }
                //开启识别
                String[] split = uavConfig.getUavControlId().split(",");//获取布控id
                for (String controlId:split) {
                    AvControl avControlVo = avControlService.getById(controlId);
                    if(avControlVo==null){
                        break;
                    }
                    AvAlgorithmVo avAlgorithmVo = avAlgorithmService.queryByAlgorithmCode(avControlVo.getAlgorithmCode());
                    if(avAlgorithmVo.getObjects()==null||avAlgorithmVo.getObjects().length()==0){
                        break;
                    }
                    AvControlAddVo avControlAddVo = new AvControlAddVo();
                    avControlAddVo.setCode(avControlVo.getCode());
                    avControlAddVo.setAlgorithmCode(avAlgorithmVo.getAlgorithmCode());
                    avControlAddVo.setObjects(avAlgorithmVo.getObjects());
                    avControlAddVo.setObjectCode(avAlgorithmVo.getObjects());
                    avControlAddVo.setMinInterval(avControlVo.getMinInterval());
                    avControlAddVo.setClassThresh(avControlVo.getClassThresh());
                    avControlAddVo.setOverlapThresh(avControlVo.getOverlapThresh());
                    avControlAddVo.setStreamUrl(zlMediaKit.getRtspUrl(avControlVo.getStreamApp(), avControlVo.getStreamName()));
                    avControlAddVo.setPushStream(avControlVo.getPushStream());
                    avControlAddVo.setPushStreamUrl(zlMediaKit.getRtspUrl(avControlVo.getPushStreamApp(), avControlVo.getPushStreamName()));
                    avControlAddVo.setRecognitionRegion(avControlVo.getRecognitionRegion());
                    avControlAddVo.setModelCode(avAlgorithmVo.getModelCode());
 
                    Map<String, Object> map = analyzer.controlAdd(avControlAddVo);
                    if("200".equals(map.get("code").toString()))
                        avControlVo.setState(1L);
                    avControlService.updateById(avControlVo);
                }
 
            }
            //结束直播
            if ("C20002".equals(jsonObject.get("messageType"))) {
                Object boxSn = jsonObject.get("boxSn");
                String boxSnStr = JSONUtil.parseObj(boxSn).toString();
                //查询uav配置
                QueryWrapper<UavConfig> uavConfigQueryWrapper=new QueryWrapper<>();
                uavConfigQueryWrapper.eq("uav_id",boxSnStr);
                UavConfig uavConfig = uavConfigMapper.selectOne(uavConfigQueryWrapper);
                //开启识别
                String[] split = uavConfig.getUavControlId().split(",");//获取布控id
                for (String controlId:split) {
                    AvControl avControlVo = avControlService.getById(controlId);
                    if(avControlVo==null){
                        break;
                    }
                    Map<String, Object> map = analyzer.controlCancel(avControlVo.getCode());
 
                    if("200".equals(map.get("code").toString()))
                        avControlVo.setState(0L);
                    avControlService.updateById(avControlVo);
                }
            }
        }
    }
 
    /**
     * 消息发布成功的回调
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
 
    }
 
    /**
     * 连接完成回调函数
     *
     * @param b 连接是否成功的布尔值
     * @param s 连接信息字符串
     */
    @Override
    public void connectComplete(boolean b, String s) {
 
    }
}