package com.java110.job.smo; import com.java110.common.constant.RuleDomain; import com.java110.common.util.DateUtil; import com.java110.common.util.StringUtil; import com.java110.job.dao.IPrvncFtpFileDAO; import com.java110.job.model.FtpTaskLog; import com.java110.job.model.FtpTaskLogDetail; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; /** * Ftp定时任务执行类,所有的任务都在这里执行,主要为了ftp文件保存到文件系统,业务逻辑单独处理 * * 特别注意,事前过程调用是没有传递参数的,事后过程调用是需要传递一个param的参数,需要在子类process方法中写入,如果不需要传也要传个""过来, * 相应过程需要两个参数,一个是入参一个是出参 * * @author wuxw7 add by 2016-01-03 * */ public abstract class PrvncFtpToFileSystemQuartz { protected static final Logger logger = LoggerFactory.getLogger(PrvncFtpToFileSystemQuartz.class); @Autowired private IPrvncFtpFileDAO iprvncFtpFileDAO; @Autowired private IPrvncFtpFileSMO iprvncFtpFileSMO; /*private IPrvncDumpSMO prvncDumpSMO;*/ // 运行状态,R:正在执行 T:等待运行 TD1:文件下载失败 TD2:文件内容保存失败 TU1:数据文件生成失败 TU2:数据文件上传失败 private static final String TASK_STATE_R = "R";// 正在运行 private static final String TASK_STATE_T = "T";// 等待运行 private static final String TASK_STATE_E1 = "E1";// 执行事前过程失败 private static final String TASK_STATE_E2 = "E2";// 处理数据失败 private static final String TASK_STATE_E3 = "E3";// 执行事后过程失败 public void initTask() { // 将所有的任务状态改为等待运行状态 Map paramIn = new HashMap(); paramIn.put("oldRunState", "R"); paramIn.put("runState", "T"); int updateFtpItemRunStateFlag = iprvncFtpFileDAO.updateFtpItemRunState(paramIn); if (updateFtpItemRunStateFlag < 1) { logger.error("--【PrvncFtpToFileSystemQuartz.initTask】,没有需要更新的内容(没有下载一半后停止应用的情况)", paramIn); } } /** * 启动任务 * * @param ftpItemConfigInfo */ public void startFtpTask(Map ftpItemConfigInfo) throws Exception { // 这么做是为了,单线程调用,防止多线程导致数据重复处理 if (!ftpItemConfigInfo.containsKey("RUN_STATE") || "R".equals(ftpItemConfigInfo.get("RUN_STATE"))) { return; } long taskId = Long.parseLong(ftpItemConfigInfo.get("TASKID").toString()); if (logger.isDebugEnabled()) { logger.debug("---【PrvncFtpToFileSystemQuartz.startFtpTask】:任务【" + taskId + "】开始运行!", taskId); } // 保存任务执行主要日志信息 long taskLogID = insertTaskInfo(ftpItemConfigInfo); ftpItemConfigInfo.put("logid", taskLogID); ftpItemConfigInfo.put("taskid", taskId); ftpItemConfigInfo.put("threadrunstate", TASK_STATE_R); ftpItemConfigInfo.put("tnum", 1); // 修改任务状态为正在执行状态 updateTaskState(taskId, TASK_STATE_R); // 方法调用是否成功,S成功(默认),E表示失败(在方法中失败时,需要修改) ftpItemConfigInfo.put("PRE_METHOD_FLAG", "S"); try { // 1.0空方法,让子类去实现 prepare(ftpItemConfigInfo); // 2.0调用事前过程 if (ftpItemConfigInfo.containsKey("PREFLAG") && "0".equals(ftpItemConfigInfo.get("PREFLAG"))) { callPreFunction(ftpItemConfigInfo); } if (ftpItemConfigInfo.containsKey("PRE_METHOD_FLAG") && "E".equals(ftpItemConfigInfo.get("PRE_METHOD_FLAG"))) { // 此时调用事前过程失败,直接返回 查询标识为E,更新日志 udpateTaskLog(ftpItemConfigInfo); updateTaskState(taskId, TASK_STATE_E1); return; } // 3.0核心业务处理逻辑,需要子类去实现 process(ftpItemConfigInfo); if (ftpItemConfigInfo.containsKey("PRE_METHOD_FLAG") && "E".equals(ftpItemConfigInfo.get("PRE_METHOD_FLAG"))) { // 程序处理失败,直接返回 ftpItemConfigInfo.put("threadrunstate", TASK_STATE_E2); updateTaskState(taskId, TASK_STATE_E2); udpateTaskLog(ftpItemConfigInfo); saveTaskLogDetail(ftpItemConfigInfo);// 保存detail return; } // 记录详细日志 ftpItemConfigInfo.put("threadrunstate", "T"); saveTaskLogDetail(ftpItemConfigInfo); // 4.0调用事后过程 if (ftpItemConfigInfo.containsKey("AFTERFLAG") && "0".equals(ftpItemConfigInfo.get("AFTERFLAG"))) { callAfterFunction(ftpItemConfigInfo); } if (ftpItemConfigInfo.containsKey("PRE_METHOD_FLAG") && "E".equals(ftpItemConfigInfo.get("PRE_METHOD_FLAG"))) { // 此时调用事前过程失败,直接返回 查询标识为E,更新日志 udpateTaskLog(ftpItemConfigInfo); updateTaskState(taskId, TASK_STATE_E3); return; } // 5.0空方法,让子类去实现 post(ftpItemConfigInfo); } catch (Exception ex) { ftpItemConfigInfo.put("threadrunstate", TASK_STATE_E2); udpateTaskLog(ftpItemConfigInfo); ftpItemConfigInfo.put("threadrunstate", TASK_STATE_E2); ftpItemConfigInfo.put("remark", ex); saveTaskLogDetail(ftpItemConfigInfo); updateTaskState(taskId, TASK_STATE_E2); // 接续向外抛出去 logger.error("处理出现问题:", ex); return; } // 修改任务状态为执行完毕状态 updateTaskState(taskId, TASK_STATE_T); ftpItemConfigInfo.put("threadrunstate", TASK_STATE_T); udpateTaskLog(ftpItemConfigInfo); // 发送任务运行结果通知短信给相关人员 **暂时不调用短信 if (!TASK_STATE_T.equals(ftpItemConfigInfo.get("RUN_STATE").toString())) { /*sendErrLogPhoneMsg(ftpItemConfigInfo, taskLogID);*/ } } // 如果有事前存过需要调用,则先调用存过 public void callPreFunction(Map taskInfo) { if (taskInfo.containsKey("PREFUNCTION") && taskInfo.get("PREFUNCTION") != null && !"".equals(taskInfo.get("PREFUNCTION"))) { try { iprvncFtpFileSMO.saveDbFunction(taskInfo.get("PREFUNCTION").toString()); taskInfo.put("threadrunstate", "T"); taskInfo.put("remark", "调用事前存过结束"); saveTaskLogDetail(taskInfo); } catch (Exception ex) { logger.error("调用事前存过失败:", ex); taskInfo.put("threadrunstate", "E1"); taskInfo.put("remark", "调用事前存过失败" + ex); taskInfo.put("PRE_METHOD_FLAG", "E"); saveTaskLogDetail(taskInfo); } } } /** * 主要业务处理(上传下载),让子类去实现 * * @param ftpItemConfigInfo */ protected abstract void process(Map ftpItemConfigInfo) throws Exception; // 如果有事后存过需要调用,则调用存过 public void callAfterFunction(Map taskInfo) { if (taskInfo.containsKey("AFTERFUNCTION") && taskInfo.get("AFTERFUNCTION") != null && !"".equals(taskInfo.get("AFTERFUNCTION"))) { try { taskInfo.put("functionname", taskInfo.get("AFTERFUNCTION")); // taskInfo 参数param需要在process方法中需要自己写入 iprvncFtpFileSMO.saveDbFunctionWithParam(taskInfo); taskInfo.put("threadrunstate", "T"); taskInfo.put("remark", "调用事后存过结束"); saveTaskLogDetail(taskInfo); } catch (Exception ex) { ex.printStackTrace(); taskInfo.put("threadrunstate", "E3"); taskInfo.put("remark", "调用事后存过失败" + ex); taskInfo.put("PRE_METHOD_FLAG", "E"); saveTaskLogDetail(taskInfo); } } } /** * 修改任务状态 * * @return */ private void updateTaskState(long taskId, String state) { Map info = new HashMap(); info.put("taskId", taskId); info.put("runState", state); int updateFtpItemFlag = iprvncFtpFileDAO.updateFtpItemByTaskId(info); // 这里只是后台提示,不进行日志保存 if (updateFtpItemFlag < 1) { logger.error("---【PrvncFtpToFileSystemQuartz.updateTaskState】修改任务【" + taskId + "】的状态失败", info); } } /** * 修改任务执行日志的状态 */ private void udpateTaskLog(Map taskInfo) { FtpTaskLog loginfo = new FtpTaskLog(); loginfo.setLogid(Long.valueOf(taskInfo.get("logid").toString())); loginfo.setState(taskInfo.get("threadrunstate").toString()); iprvncFtpFileSMO.updateTaskRunLog(loginfo); } /** * 保存任务执行的详细日志 */ protected void saveTaskLogDetail(Map taskInfo) { FtpTaskLogDetail logdetail = new FtpTaskLogDetail(); logdetail.setLogid(Long.valueOf(taskInfo.get("logid").toString())); logdetail.setTaskid(Long.valueOf(taskInfo.get("taskid").toString())); logdetail.setState((String) taskInfo.get("threadrunstate")); logdetail.setTnum(Integer.valueOf(taskInfo.get("tnum").toString())); if (taskInfo.get("begin") != null) { logdetail.setBegin(Long.valueOf(taskInfo.get("begin").toString())); } if (taskInfo.get("end") != null) { logdetail.setEnd(Long.valueOf(taskInfo.get("end").toString())); } if (taskInfo.get("havedown") != null) { logdetail.setHavedown(Long.valueOf(taskInfo.get("havedown").toString())); } logdetail.setRemark(taskInfo.get("remark") == null ? "" : (taskInfo.get("remark").toString().trim().length() > 2000 ? taskInfo.get("remark").toString().trim().substring(0, 1600) : taskInfo.get("remark").toString().trim())); logdetail.setData(taskInfo.get("data") == null ? "" : taskInfo.get("data").toString()); logdetail.setServerfilename(taskInfo.get("serverfilename") == null ? "" : taskInfo.get("serverfilename").toString()); logdetail.setLocalfilename(taskInfo.get("localfilename") == null ? "" : taskInfo.get("localfilename").toString()); int logdetailid = iprvncFtpFileSMO.saveTaskRunDetailLog(logdetail); taskInfo.put("logdetailid", logdetailid); } // /** // * 修改任务执行的详细日志的状态 // */ // private void updateTaskLogDetail(Map taskInfo){ // FtpTaskLogDetail logdetail=new FtpTaskLogDetail();// // logdetail.setId(Long.valueOf(taskInfo.get("logdetailid").toString())); // logdetail.setState(taskInfo.get("threadrunstate").toString()); // logdetail.setRemark((String)taskInfo.get("remark")); // logdetail.setData((String)taskInfo.get("data")); // if(taskInfo.get("downedlength")!=null) // logdetail.setHavedown(Long.valueOf(taskInfo.get("downedlength").toString())); // prvncFtpFileSMO.updateTaskRunDetailLog(logdetail); // } /** * 生成任务执行日志 */ private long insertTaskInfo(Map taskInfo) { FtpTaskLog loginfo = new FtpTaskLog(); loginfo.setTaskid(Long.valueOf(taskInfo.get("TASKID").toString())); loginfo.setState("R"); loginfo.setServerfilename("");// taskInfo.get("serverfilename").toString() loginfo.setLocalfilename("");// taskInfo.get("localfilename").toString() loginfo.setUord(taskInfo.get("U_OR_D").toString()); return iprvncFtpFileSMO.saveTaskRunLog(loginfo); } /** * 如果任务运行有异常,则发送警告短信给配置的手机号码 */ private void sendErrLogPhoneMsg(Map taskInfo, long taskLogID) { Map msginfo = new HashMap(); String phone = (String) taskInfo.get("errphone"); if (phone != null && !"".equals(phone)) { String[] phonelist = phone.split(","); for (int i = 0; i < phonelist.length; i++) { msginfo.put("taskid", taskInfo.get("taskid")); msginfo.put("phone", phonelist[i]); msginfo.put("msg", "通用FTP数据文件传接任务:" + (String) taskInfo.get("taskname") + "运行提示"); DateFormat df = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss"); String detail = "任务已于" + df.format(new Date()) + "运行完毕。运行过程中出现异常,详情请登录系统查看!"; msginfo.put("detail", detail); /*prvncDumpSMO.saveTaskErrInfoPhoneMsg(msginfo);*/ } } } /** * 处理文件名,校验文件名是中是否存在****(4个),表示通配符,如果不存在就是确定唯一文件名 * 文件名支持日期型的如CRM_########001.txt 程序处理后是 CRM_20170105001.txt 文件名支持sql 语句生成的 * 文件名支持通配符的如863_****.txt 程序下载所有以863_开头的文件 863_****001.txt * 以863_开头,以001结尾,****001.txt 以001结尾的 * * @param fileName * @return */ protected List dealFileName(String fileName) { // TODO Auto-generated method stub List results = new ArrayList(); String result = ""; // 文件中使用的日期 if (StringUtils.contains(fileName, RuleDomain.REPLAY_TYPE_F)) { result = StringUtil.replace(fileName, RuleDomain.REPLAY_TYPE_F, DateUtil.getFormatTimeString(new Date(), "yyyyMMddHHmm")); } else if (StringUtils.contains(fileName, RuleDomain.REPLAY_TYPE_E)) { result = StringUtil.replace(fileName, RuleDomain.REPLAY_TYPE_E, DateUtil.getFormatTimeString(new Date(), "yyyyMMddHH")); } else if (StringUtils.contains(fileName, RuleDomain.REPLAY_TYPE_A)) { result = StringUtil.replace(fileName, RuleDomain.REPLAY_TYPE_A, DateUtil.getFormatTimeString(new Date(), "yyyyMMdd")); } else if (StringUtils.contains(fileName == null ? "" : fileName.toLowerCase(), RuleDomain.REPLAY_TYPE_SQL)) { // 后期改造,文件名如果配置的是sql的话,以sql查询文件名 List fileNames = this.getPrvncFtpFileDAO().execConfigSql(fileName); // if (fileNames != null && fileNames.size() > 0) { // result = fileNames.get(0); // } return fileNames; } else { result = fileName; } results.add(result); return results; } /** * 空方法,如果在事前过程处理前,还需要做一定的处理,需要子类重写这个方法,实现业务逻辑 * * @param ftpItemConfigInfo */ protected void prepare(Map ftpItemConfigInfo) { } /** * 空方法,如果在事后过程处理完后,还需要做一定的处理,需要子类重写这个方法,实现业务逻辑 * * @param ftpItemConfigInfo */ protected void post(Map ftpItemConfigInfo) { } public IPrvncFtpFileDAO getPrvncFtpFileDAO() { return iprvncFtpFileDAO; } public void setPrvncFtpFileDAO(IPrvncFtpFileDAO prvncFtpFileDAO) { this.iprvncFtpFileDAO = prvncFtpFileDAO; } public IPrvncFtpFileSMO getPrvncFtpFileSMO() { return iprvncFtpFileSMO; } public void setPrvncFtpFileSMO(IPrvncFtpFileSMO prvncFtpFileSMO) { this.iprvncFtpFileSMO = prvncFtpFileSMO; } /*public IPrvncDumpSMO getPrvncDumpSMO() { return prvncDumpSMO; } public void setPrvncDumpSMO(IPrvncDumpSMO prvncDumpSMO) { this.prvncDumpSMO = prvncDumpSMO; } */ }