shiyj
2019-08-31 7d8c79cd05aa3f729ded1e66563129c18734eb3c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
package com.java110.job.smo;
import com.java110.common.constant.RuleDomain;
import com.java110.common.util.DateUtil;
import com.java110.common.util.StringUtil;
import com.java110.job.dao.IHccFtpFileDAO;
 
import com.java110.job.model.FtpTaskLog;
import com.java110.job.model.FtpTaskLogDetail;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
 
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
 
/**
 * Ftp定时任务执行类,所有的任务都在这里执行,主要为了ftp文件保存到文件系统,业务逻辑单独处理
 * 
 * 特别注意,事前过程调用是没有传递参数的,事后过程调用是需要传递一个param的参数,需要在子类process方法中写入,如果不需要传也要传个""过来,
 * 相应过程需要两个参数,一个是入参一个是出参
 * 
 * @author wuxw7 add by 2016-01-03
 * 
 */
public abstract class HcFtpToFileSystemQuartz {
 
    protected static final Logger logger = LoggerFactory.getLogger(HcFtpToFileSystemQuartz.class);
    @Autowired
    private IHccFtpFileDAO iprvncFtpFileDAO;
    @Autowired
    private IHcFtpFileSMO iprvncFtpFileSMO;
 
    /*private IPrvncDumpSMO prvncDumpSMO;*/
    // 运行状态,R:正在执行 T:等待运行 TD1:文件下载失败 TD2:文件内容保存失败 TU1:数据文件生成失败 TU2:数据文件上传失败
    private static final String TASK_STATE_R = "R";// 正在运行
    private static final String TASK_STATE_T = "T";// 等待运行
    private static final String TASK_STATE_E1 = "E1";// 执行事前过程失败
    private static final String TASK_STATE_E2 = "E2";// 处理数据失败
    private static final String TASK_STATE_E3 = "E3";// 执行事后过程失败
 
    public void initTask() {
 
        // 将所有的任务状态改为等待运行状态
        Map paramIn = new HashMap();
        paramIn.put("oldRunState", "R");
        paramIn.put("runState", "T");
        int updateFtpItemRunStateFlag = iprvncFtpFileDAO.updateFtpItemRunState(paramIn);
        if (updateFtpItemRunStateFlag < 1) {
            logger.error("--【PrvncFtpToFileSystemQuartz.initTask】,没有需要更新的内容(没有下载一半后停止应用的情况)", paramIn);
        }
    }
 
    /**
     * 启动任务
     * 
     * @param ftpItemConfigInfo
     */
    public  void startFtpTask(Map ftpItemConfigInfo) throws Exception {
 
        // 这么做是为了,单线程调用,防止多线程导致数据重复处理
        if (!ftpItemConfigInfo.containsKey("RUN_STATE") || "R".equals(ftpItemConfigInfo.get("RUN_STATE"))) {
            return;
        }
 
        long taskId = Long.parseLong(ftpItemConfigInfo.get("TASKID").toString());
 
        if (logger.isDebugEnabled()) {
            logger.debug("---【PrvncFtpToFileSystemQuartz.startFtpTask】:任务【" + taskId + "】开始运行!", taskId);
        }
 
        // 保存任务执行主要日志信息
        long taskLogID = insertTaskInfo(ftpItemConfigInfo);
 
        ftpItemConfigInfo.put("logid", taskLogID);
        ftpItemConfigInfo.put("taskid", taskId);
        ftpItemConfigInfo.put("threadrunstate", TASK_STATE_R);
        ftpItemConfigInfo.put("tnum", 1);
        // 修改任务状态为正在执行状态
        updateTaskState(taskId, TASK_STATE_R);
        // 方法调用是否成功,S成功(默认),E表示失败(在方法中失败时,需要修改)
        ftpItemConfigInfo.put("PRE_METHOD_FLAG", "S");
        try {
            // 1.0空方法,让子类去实现
            prepare(ftpItemConfigInfo);
 
            // 2.0调用事前过程
            if (ftpItemConfigInfo.containsKey("PREFLAG") && "0".equals(ftpItemConfigInfo.get("PREFLAG"))) {
                callPreFunction(ftpItemConfigInfo);
            }
 
            if (ftpItemConfigInfo.containsKey("PRE_METHOD_FLAG") && "E".equals(ftpItemConfigInfo.get("PRE_METHOD_FLAG"))) {
                // 此时调用事前过程失败,直接返回 查询标识为E,更新日志
                udpateTaskLog(ftpItemConfigInfo);
                updateTaskState(taskId, TASK_STATE_E1);
                return;
            }
 
            // 3.0核心业务处理逻辑,需要子类去实现
            process(ftpItemConfigInfo);
 
            if (ftpItemConfigInfo.containsKey("PRE_METHOD_FLAG") && "E".equals(ftpItemConfigInfo.get("PRE_METHOD_FLAG"))) {
                // 程序处理失败,直接返回
                ftpItemConfigInfo.put("threadrunstate", TASK_STATE_E2);
                updateTaskState(taskId, TASK_STATE_E2);
                udpateTaskLog(ftpItemConfigInfo);
                saveTaskLogDetail(ftpItemConfigInfo);// 保存detail
                return;
            }
            // 记录详细日志
            ftpItemConfigInfo.put("threadrunstate", "T");
            saveTaskLogDetail(ftpItemConfigInfo);
 
            // 4.0调用事后过程
            if (ftpItemConfigInfo.containsKey("AFTERFLAG") && "0".equals(ftpItemConfigInfo.get("AFTERFLAG"))) {
                callAfterFunction(ftpItemConfigInfo);
            }
            if (ftpItemConfigInfo.containsKey("PRE_METHOD_FLAG") && "E".equals(ftpItemConfigInfo.get("PRE_METHOD_FLAG"))) {
                // 此时调用事前过程失败,直接返回 查询标识为E,更新日志
                udpateTaskLog(ftpItemConfigInfo);
                updateTaskState(taskId, TASK_STATE_E3);
                return;
            }
            // 5.0空方法,让子类去实现
            post(ftpItemConfigInfo);
        } catch (Exception ex) {
            ftpItemConfigInfo.put("threadrunstate", TASK_STATE_E2);
            udpateTaskLog(ftpItemConfigInfo);
            ftpItemConfigInfo.put("threadrunstate", TASK_STATE_E2);
            ftpItemConfigInfo.put("remark", ex);
            saveTaskLogDetail(ftpItemConfigInfo);
            updateTaskState(taskId, TASK_STATE_E2);
            // 接续向外抛出去
            logger.error("处理出现问题:", ex);
            return;
        }
 
        // 修改任务状态为执行完毕状态
        updateTaskState(taskId, TASK_STATE_T);
        ftpItemConfigInfo.put("threadrunstate", TASK_STATE_T);
        udpateTaskLog(ftpItemConfigInfo);
 
        // 发送任务运行结果通知短信给相关人员 **暂时不调用短信
        if (!TASK_STATE_T.equals(ftpItemConfigInfo.get("RUN_STATE").toString())) {
            /*sendErrLogPhoneMsg(ftpItemConfigInfo, taskLogID);*/
        }
 
    }
 
    // 如果有事前存过需要调用,则先调用存过
    public void callPreFunction(Map taskInfo) {
 
        if (taskInfo.containsKey("PREFUNCTION") && taskInfo.get("PREFUNCTION") != null && !"".equals(taskInfo.get("PREFUNCTION"))) {
            try {
                iprvncFtpFileSMO.saveDbFunction(taskInfo.get("PREFUNCTION").toString());
                taskInfo.put("threadrunstate", "T");
                taskInfo.put("remark", "调用事前存过结束");
                saveTaskLogDetail(taskInfo);
            } catch (Exception ex) {
                logger.error("调用事前存过失败:", ex);
                taskInfo.put("threadrunstate", "E1");
                taskInfo.put("remark", "调用事前存过失败" + ex);
                taskInfo.put("PRE_METHOD_FLAG", "E");
                saveTaskLogDetail(taskInfo);
            }
        }
    }
 
    /**
     * 主要业务处理(上传下载),让子类去实现
     * 
     * @param ftpItemConfigInfo
     */
    protected abstract void process(Map ftpItemConfigInfo) throws Exception;
 
    // 如果有事后存过需要调用,则调用存过
    public void callAfterFunction(Map taskInfo) {
 
        if (taskInfo.containsKey("AFTERFUNCTION") && taskInfo.get("AFTERFUNCTION") != null && !"".equals(taskInfo.get("AFTERFUNCTION"))) {
            try {
                taskInfo.put("functionname", taskInfo.get("AFTERFUNCTION"));
                // taskInfo 参数param需要在process方法中需要自己写入
                iprvncFtpFileSMO.saveDbFunctionWithParam(taskInfo);
                taskInfo.put("threadrunstate", "T");
                taskInfo.put("remark", "调用事后存过结束");
                saveTaskLogDetail(taskInfo);
            } catch (Exception ex) {
                ex.printStackTrace();
                taskInfo.put("threadrunstate", "E3");
                taskInfo.put("remark", "调用事后存过失败" + ex);
                taskInfo.put("PRE_METHOD_FLAG", "E");
                saveTaskLogDetail(taskInfo);
            }
        }
    }
 
    /**
     * 修改任务状态
     * 
     * @return
     */
    private void updateTaskState(long taskId, String state) {
        Map info = new HashMap();
 
        info.put("taskId", taskId);
        info.put("runState", state);
        int updateFtpItemFlag = iprvncFtpFileDAO.updateFtpItemByTaskId(info);
        // 这里只是后台提示,不进行日志保存
        if (updateFtpItemFlag < 1) {
            logger.error("---【PrvncFtpToFileSystemQuartz.updateTaskState】修改任务【" + taskId + "】的状态失败", info);
        }
    }
 
    /**
     * 修改任务执行日志的状态
     */
    private void udpateTaskLog(Map taskInfo) {
        FtpTaskLog loginfo = new FtpTaskLog();
        loginfo.setLogid(Long.valueOf(taskInfo.get("logid").toString()));
        loginfo.setState(taskInfo.get("threadrunstate").toString());
        iprvncFtpFileSMO.updateTaskRunLog(loginfo);
    }
 
    /**
     * 保存任务执行的详细日志
     */
    protected void saveTaskLogDetail(Map taskInfo) {
        FtpTaskLogDetail logdetail = new FtpTaskLogDetail();
        logdetail.setLogid(Long.valueOf(taskInfo.get("logid").toString()));
        logdetail.setTaskid(Long.valueOf(taskInfo.get("taskid").toString()));
        logdetail.setState((String) taskInfo.get("threadrunstate"));
        logdetail.setTnum(Integer.valueOf(taskInfo.get("tnum").toString()));
        if (taskInfo.get("begin") != null) {
            logdetail.setBegin(Long.valueOf(taskInfo.get("begin").toString()));
        }
        if (taskInfo.get("end") != null) {
            logdetail.setEnd(Long.valueOf(taskInfo.get("end").toString()));
        }
        if (taskInfo.get("havedown") != null) {
            logdetail.setHavedown(Long.valueOf(taskInfo.get("havedown").toString()));
        }
        logdetail.setRemark(taskInfo.get("remark") == null ? "" : (taskInfo.get("remark").toString().trim().length() > 2000 ? taskInfo.get("remark").toString().trim().substring(0,
                1600) : taskInfo.get("remark").toString().trim()));
        logdetail.setData(taskInfo.get("data") == null ? "" : taskInfo.get("data").toString());
        logdetail.setServerfilename(taskInfo.get("serverfilename") == null ? "" : taskInfo.get("serverfilename").toString());
        logdetail.setLocalfilename(taskInfo.get("localfilename") == null ? "" : taskInfo.get("localfilename").toString());
        int logdetailid = iprvncFtpFileSMO.saveTaskRunDetailLog(logdetail);
        taskInfo.put("logdetailid", logdetailid);
    }
 
    // /**
    // * 修改任务执行的详细日志的状态
    // */
    // private void updateTaskLogDetail(Map taskInfo){
    // FtpTaskLogDetail logdetail=new FtpTaskLogDetail();//
    // logdetail.setId(Long.valueOf(taskInfo.get("logdetailid").toString()));
    // logdetail.setState(taskInfo.get("threadrunstate").toString());
    // logdetail.setRemark((String)taskInfo.get("remark"));
    // logdetail.setData((String)taskInfo.get("data"));
    // if(taskInfo.get("downedlength")!=null)
    // logdetail.setHavedown(Long.valueOf(taskInfo.get("downedlength").toString()));
    // prvncFtpFileSMO.updateTaskRunDetailLog(logdetail);
    // }
 
    /**
     * 生成任务执行日志
     */
    private long insertTaskInfo(Map taskInfo) {
        FtpTaskLog loginfo = new FtpTaskLog();
        loginfo.setTaskid(Long.valueOf(taskInfo.get("TASKID").toString()));
        loginfo.setState("R");
        loginfo.setServerfilename("");// taskInfo.get("serverfilename").toString()
        loginfo.setLocalfilename("");// taskInfo.get("localfilename").toString()
        loginfo.setUord(taskInfo.get("U_OR_D").toString());
        return iprvncFtpFileSMO.saveTaskRunLog(loginfo);
    }
 
    /**
     * 如果任务运行有异常,则发送警告短信给配置的手机号码
     */
    private void sendErrLogPhoneMsg(Map taskInfo, long taskLogID) {
        Map msginfo = new HashMap();
        String phone = (String) taskInfo.get("errphone");
        if (phone != null && !"".equals(phone)) {
            String[] phonelist = phone.split(",");
            for (int i = 0; i < phonelist.length; i++) {
                msginfo.put("taskid", taskInfo.get("taskid"));
                msginfo.put("phone", phonelist[i]);
                msginfo.put("msg", "通用FTP数据文件传接任务:" + (String) taskInfo.get("taskname") + "运行提示");
 
                DateFormat df = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
                String detail = "任务已于" + df.format(new Date()) + "运行完毕。运行过程中出现异常,详情请登录系统查看!";
                msginfo.put("detail", detail);
                /*prvncDumpSMO.saveTaskErrInfoPhoneMsg(msginfo);*/
            }
        }
    }
 
    /**
     * 处理文件名,校验文件名是中是否存在****(4个),表示通配符,如果不存在就是确定唯一文件名
     * 文件名支持日期型的如CRM_########001.txt 程序处理后是 CRM_20170105001.txt 文件名支持sql 语句生成的
     * 文件名支持通配符的如863_****.txt 程序下载所有以863_开头的文件 863_****001.txt
     * 以863_开头,以001结尾,****001.txt 以001结尾的
     * 
     * @param fileName
     * @return
     */
    protected List<String> dealFileName(String fileName) {
        // TODO Auto-generated method stub
        List<String> results = new ArrayList<String>();
        String result = "";
        // 文件中使用的日期
        if (StringUtils.contains(fileName, RuleDomain.REPLAY_TYPE_F)) {
            result = StringUtil.replace(fileName, RuleDomain.REPLAY_TYPE_F, DateUtil.getFormatTimeString(new Date(), "yyyyMMddHHmm"));
        } else if (StringUtils.contains(fileName, RuleDomain.REPLAY_TYPE_E)) {
            result = StringUtil.replace(fileName, RuleDomain.REPLAY_TYPE_E, DateUtil.getFormatTimeString(new Date(), "yyyyMMddHH"));
        } else if (StringUtils.contains(fileName, RuleDomain.REPLAY_TYPE_A)) {
            result = StringUtil.replace(fileName, RuleDomain.REPLAY_TYPE_A, DateUtil.getFormatTimeString(new Date(), "yyyyMMdd"));
        } else if (StringUtils.contains(fileName == null ? "" : fileName.toLowerCase(), RuleDomain.REPLAY_TYPE_SQL)) {
            // 后期改造,文件名如果配置的是sql的话,以sql查询文件名
            List<String> fileNames = this.getPrvncFtpFileDAO().execConfigSql(fileName);
            // if (fileNames != null && fileNames.size() > 0) {
            // result = fileNames.get(0);
            // }
            return fileNames;
        } else {
            result = fileName;
        }
        results.add(result);
        return results;
    }
 
    /**
     * 空方法,如果在事前过程处理前,还需要做一定的处理,需要子类重写这个方法,实现业务逻辑
     * 
     * @param ftpItemConfigInfo
     */
    protected void prepare(Map ftpItemConfigInfo) {
 
    }
 
    /**
     * 空方法,如果在事后过程处理完后,还需要做一定的处理,需要子类重写这个方法,实现业务逻辑
     * 
     * @param ftpItemConfigInfo
     */
    protected void post(Map ftpItemConfigInfo) {
 
    }
 
    public IHccFtpFileDAO getPrvncFtpFileDAO() {
        return iprvncFtpFileDAO;
    }
 
    public void setPrvncFtpFileDAO(IHccFtpFileDAO prvncFtpFileDAO) {
        this.iprvncFtpFileDAO = prvncFtpFileDAO;
    }
 
    public IHcFtpFileSMO getPrvncFtpFileSMO() {
        return iprvncFtpFileSMO;
    }
 
    public void setPrvncFtpFileSMO(IHcFtpFileSMO prvncFtpFileSMO) {
        this.iprvncFtpFileSMO = prvncFtpFileSMO;
    }
 
    /*public IPrvncDumpSMO getPrvncDumpSMO() {
        return prvncDumpSMO;
    }
 
    public void setPrvncDumpSMO(IPrvncDumpSMO prvncDumpSMO) {
        this.prvncDumpSMO = prvncDumpSMO;
    }
*/
}