java110
2023-06-19 2888221f8f0e6cc31bedd4cfa8dc1283f54a1a0c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package com.java110.job.importData;
 
import com.java110.core.log.LoggerFactory;
import com.java110.dto.data.ImportDataDto;
import com.java110.dto.log.AssetImportLogDetailDto;
import com.java110.dto.log.AssetImportLogDto;
import com.java110.intf.common.IAssetImportLogDetailInnerServiceSMO;
import com.java110.intf.common.IAssetImportLogInnerServiceSMO;
import com.java110.po.log.AssetImportLogPo;
import com.java110.utils.factory.ApplicationContextFactory;
import com.java110.utils.util.Assert;
import org.slf4j.Logger;
 
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
/**
 * 导入资产数据执行器
 */
public class ImportDataExecutor implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(ImportDataQueue.class);
 
 
    private IAssetImportLogInnerServiceSMO assetImportLogInnerServiceSMOImpl;
 
 
    private IAssetImportLogDetailInnerServiceSMO assetImportLogDetailInnerServiceSMOImpl;
 
    private static final int MAX_ROW = 200;
 
    //默认线程大小
    private static final int DEFAULT_EXPORT_POOL = 4;
 
    private boolean isRun = false;
 
    public ImportDataExecutor(boolean isRun) {
 
        this.isRun = isRun;
    }
 
    public ImportDataExecutor() {
    }
 
    @Override
    public void run() {
 
        while (isRun) {
            log.debug("导入数据线程开始处理");
            try {
                doImportData();
            } catch (Throwable e) {
                log.error("处理消息异常", e);
                e.printStackTrace();
            }
            log.debug("导入数据线程处理完成");
 
        }
 
    }
 
    private void doImportData() throws Exception {
 
        ImportDataDto importDataDto = ImportDataQueue.getData();
        if (importDataDto == null) {
            return;
        }
 
        String businessAdapt = importDataDto.getBusinessAdapt();
 
        IImportDataAdapt importDataAdaptImpl = ApplicationContextFactory.getBean(businessAdapt + "QueueData", IImportDataAdapt.class);
 
        if (importDataAdaptImpl == null) {
            return;
        }
 
        try {
            assetImportLogInnerServiceSMOImpl
                    = ApplicationContextFactory.getBean(IAssetImportLogInnerServiceSMO.class.getName(), IAssetImportLogInnerServiceSMO.class);
        } catch (Exception e) {
        }
        if (assetImportLogInnerServiceSMOImpl == null) {
            assetImportLogInnerServiceSMOImpl
                    = ApplicationContextFactory.getBean("assetImportLogInnerServiceSMOImpl", IAssetImportLogInnerServiceSMO.class);
        }
        Assert.hasLength(importDataDto.getLogId(), "未包含导入数据");
        Assert.hasLength(importDataDto.getCommunityId(), "未包含小区信息");
 
        AssetImportLogDto assetImportLogDto = new AssetImportLogDto();
        assetImportLogDto.setLogId(importDataDto.getLogId());
        assetImportLogDto.setCommunityId(importDataDto.getCommunityId());
        assetImportLogDto.setState(AssetImportLogDto.STATE_WAIT_IMPORT);
        int count = assetImportLogInnerServiceSMOImpl.queryAssetImportLogsCount(assetImportLogDto);
        if (count < 1) {
            throw new IllegalArgumentException("没有需要导入的房产数据" + importDataDto.getLogId());
        }
 
        //todo 修改为 导入中
        AssetImportLogPo assetImportLogPo = new AssetImportLogPo();
        assetImportLogPo.setLogId(importDataDto.getLogId());
        assetImportLogPo.setState(AssetImportLogDto.STATE_DOING_IMPORT);
        assetImportLogInnerServiceSMOImpl.updateAssetImportLog(assetImportLogPo);
 
        // todo 查询detail数据
        try {
            assetImportLogDetailInnerServiceSMOImpl
                    = ApplicationContextFactory.getBean(IAssetImportLogDetailInnerServiceSMO.class.getName(), IAssetImportLogDetailInnerServiceSMO.class);
        } catch (Exception e) {
        }
        if (assetImportLogDetailInnerServiceSMOImpl == null) {
            assetImportLogDetailInnerServiceSMOImpl
                    = ApplicationContextFactory.getBean("assetImportLogDetailInnerServiceSMOImpl", IAssetImportLogDetailInnerServiceSMO.class);
        }
 
        // todo 查询 房产 导入数据
        AssetImportLogDetailDto assetImportLogDetailDto = new AssetImportLogDetailDto();
        assetImportLogDetailDto.setLogId(importDataDto.getLogId());
        assetImportLogDetailDto.setCommunityId(importDataDto.getCommunityId());
        int total = assetImportLogDetailInnerServiceSMOImpl.queryAssetImportLogDetailsCount(assetImportLogDetailDto);
 
        if (total < 1) {
            return;
        }
 
        for (int page = 1; page <= count; page++) {
            assetImportLogDetailDto.setPage(page);
            assetImportLogDetailDto.setRow(MAX_ROW);
 
            List<AssetImportLogDetailDto> assetImportLogDetailDtos = assetImportLogDetailInnerServiceSMOImpl.queryAssetImportLogDetails(assetImportLogDetailDto);
            if (assetImportLogDetailDtos == null || assetImportLogDetailDtos.size() < 1) {
                continue;
            }
 
            try {
                importDataAdaptImpl.importData(assetImportLogDetailDtos);
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
 
        //todo 修改为 处理完成
        assetImportLogPo = new AssetImportLogPo();
        assetImportLogPo.setLogId(importDataDto.getLogId());
        assetImportLogPo.setState(AssetImportLogDto.STATE_COMPLETE_IMPORT);
        assetImportLogInnerServiceSMOImpl.updateAssetImportLog(assetImportLogPo);
    }
 
    /**
     * 线程启动器
     */
    public static void startExportDataExecutor() {
        log.debug("开始初始化导入队列");
        ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_EXPORT_POOL);
        executorService.execute(new ImportDataExecutor(true));
        log.debug("初始化导入队列完成");
 
    }
}