|
@@ -0,0 +1,255 @@
|
|
|
+package com.zfire.jiasm.syncdata.utils;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.zfire.jiasm.syncdata.data.Token;
|
|
|
+import com.zfire.jiasm.syncdata.service.OLDSystemService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+public class OLDSystemUtil {
|
|
|
+ @Value("${baseurl}")
|
|
|
+ private String baseurl;
|
|
|
+
|
|
|
+ @Value("${installTopicName}")
|
|
|
+ private String installTopicName;
|
|
|
+
|
|
|
+ @Value("${repaireTopicName}")
|
|
|
+ private String repaireTopicName;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private TokenCenter tokenCenter;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private OLDSystemService oldSystemService;
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ public void process(String name, String requestTable, String requestJsonName, Object[] collectParam,
|
|
|
+ Object[] finishParam, boolean isInstall, int type, String idColName) {
|
|
|
+
|
|
|
+ List<Map<String, Object>> result = oldSystemService.getRequests(requestTable, type);
|
|
|
+
|
|
|
+ try {
|
|
|
+
|
|
|
+ Token token = tokenCenter.getCurrentToken();
|
|
|
+
|
|
|
+ for (Map<String, Object> row : result) {
|
|
|
+
|
|
|
+ int status = (Integer) row.get("status");
|
|
|
+
|
|
|
+ String aTableDataName;
|
|
|
+ String aTableDataJsonName;
|
|
|
+ String[] tableDatasAry;
|
|
|
+ String[] tableDatasJsonName;
|
|
|
+ String otherJsons;
|
|
|
+ String messageTableName;
|
|
|
+ String messageJsonName;
|
|
|
+
|
|
|
+ String proccessName;
|
|
|
+
|
|
|
+ if (status == 1) {
|
|
|
+
|
|
|
+ aTableDataName = (String) collectParam[0];
|
|
|
+ aTableDataJsonName = (String) collectParam[1];
|
|
|
+ tableDatasAry = (String[]) collectParam[2];
|
|
|
+ tableDatasJsonName = (String[]) collectParam[3];
|
|
|
+ otherJsons = (String) collectParam[4];
|
|
|
+ messageTableName = (String) collectParam[5];
|
|
|
+ messageJsonName = (String) collectParam[6];
|
|
|
+
|
|
|
+ proccessName = name + "采集";
|
|
|
+
|
|
|
+ } else {
|
|
|
+
|
|
|
+ aTableDataName = (String) finishParam[0];
|
|
|
+ aTableDataJsonName = (String) finishParam[1];
|
|
|
+ tableDatasAry = (String[]) finishParam[2];
|
|
|
+ tableDatasJsonName = (String[]) finishParam[3];
|
|
|
+ otherJsons = (String) finishParam[4];
|
|
|
+ messageTableName = (String) finishParam[5];
|
|
|
+ messageJsonName = (String) finishParam[6];
|
|
|
+
|
|
|
+ proccessName = name + "完工";
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ processATask(row, token, requestTable, requestJsonName, proccessName, aTableDataName,
|
|
|
+ aTableDataJsonName, tableDatasAry, tableDatasJsonName, otherJsons, isInstall, type, idColName,
|
|
|
+ messageTableName, messageJsonName);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error("[" + name + "]上传就系统失败", ex);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void processATask(Map<String, Object> row, Token token, String requestTable, String requestJsonName,
|
|
|
+ String name, String aTableDataName, String aTableDataJsonName, String[] tableDatasAry,
|
|
|
+ String[] tableDatasJsonName, String otherJsons, boolean isInstall, int type, String idColName,
|
|
|
+ String messageTableName, String messageJsonName) throws Exception {
|
|
|
+
|
|
|
+ String synTaskNo = (String) row.get("syn_task_no");
|
|
|
+
|
|
|
+ String key = "12345678";
|
|
|
+ Object pgid = row.get("pgid");
|
|
|
+ if (pgid != null) {
|
|
|
+ key = pgid.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ String requestJson = "\"" + requestJsonName + "\":"
|
|
|
+ + JSON.toJSONStringWithDateFormat(filterDataMap(row, true), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
|
|
|
+
|
|
|
+ StringBuffer messageJson = new StringBuffer();
|
|
|
+
|
|
|
+ if (messageTableName != null && messageTableName.trim().length() > 0) {
|
|
|
+
|
|
|
+ log.info("开始取数据[synTaskNo=" + synTaskNo + ",tableName=" + messageTableName + "]");
|
|
|
+ List<Map<String, Object>> messageTableDatas = oldSystemService.getTableDatas(messageTableName, synTaskNo);
|
|
|
+ if (messageTableDatas.size() > 0) {
|
|
|
+ Map<String, Object> messageTableData = messageTableDatas.get(0);
|
|
|
+ messageJson.append(",\"" + messageJsonName + "\":");
|
|
|
+ messageJson.append(JSON.toJSONStringWithDateFormat(filterDataMap(messageTableData, true),
|
|
|
+ "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"));
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("开始取数据[synTaskNo=" + synTaskNo + ",tableName=" + aTableDataName + "]");
|
|
|
+ List<Map<String, Object>> aTableDataList = oldSystemService.getTableDatas(aTableDataName, synTaskNo);
|
|
|
+ StringBuffer sb = new StringBuffer();
|
|
|
+ for (Map<String, Object> aTableData : aTableDataList) {
|
|
|
+ String tableData = JSON.toJSONStringWithDateFormat(filterDataMap(aTableData, true),
|
|
|
+ "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
|
|
|
+ String id = (String) aTableData.get(idColName);
|
|
|
+ if (sb.length() > 0) {
|
|
|
+ sb.append(",");
|
|
|
+ }
|
|
|
+ sb.append("{\"id\":\"" + id + "\"");
|
|
|
+
|
|
|
+ StringBuffer sb3 = new StringBuffer();
|
|
|
+ for (int i = 0; i < tableDatasAry.length; i++) {
|
|
|
+
|
|
|
+ String tableName = tableDatasAry[i];
|
|
|
+ String jsonName = tableDatasJsonName[i];
|
|
|
+ log.info("开始取数据[synTaskNo=" + synTaskNo + ",tableName=" + tableName + "]");
|
|
|
+ String childIdName="id";
|
|
|
+
|
|
|
+ if(aTableDataName.trim().equalsIgnoreCase("itf_tblWxjsJykt")) {
|
|
|
+ childIdName="wxjsguid";
|
|
|
+ }
|
|
|
+
|
|
|
+ if(tableName.trim().equalsIgnoreCase("itf_tblAzWgmxSyktTmmxLs")
|
|
|
+ || tableName.trim().equalsIgnoreCase("itf_tblazwgmxsyktfj")) {
|
|
|
+ childIdName="relationid";
|
|
|
+ }
|
|
|
+
|
|
|
+ if(tableName.trim().equalsIgnoreCase("itf_tblazwgmxjyktFj")
|
|
|
+ || tableName.trim().equalsIgnoreCase("itf_tblAzWgmxQitaFj")
|
|
|
+ ) {
|
|
|
+ childIdName="pgwcmxid";
|
|
|
+ }
|
|
|
+
|
|
|
+ List<Map<String, Object>> tableDatas = oldSystemService.getTableDatasJoinParent(tableName, synTaskNo,
|
|
|
+ aTableDataName,aTableData.get("id").toString(),childIdName);
|
|
|
+ StringBuffer sb2 = new StringBuffer();
|
|
|
+ for (Map<String, Object> theTableData : tableDatas) {
|
|
|
+ if (sb2.length() > 0) {
|
|
|
+ sb2.append(",");
|
|
|
+ }
|
|
|
+ sb2.append(JSON.toJSONStringWithDateFormat(filterDataMap(theTableData, true),
|
|
|
+ "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"));
|
|
|
+ }
|
|
|
+ String rowData = "\"" + jsonName + "\":" + "[" + sb2.toString() + "]";
|
|
|
+
|
|
|
+ if (sb3.length() > 0) {
|
|
|
+ sb3.append(",");
|
|
|
+ }
|
|
|
+ sb3.append(rowData);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (sb3.length() > 0) {
|
|
|
+ sb.append("," + sb3.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (otherJsons.length() > 0) {
|
|
|
+ sb.append("," + otherJsons);
|
|
|
+ }
|
|
|
+
|
|
|
+ sb.append(
|
|
|
+ ",\"" + aTableDataJsonName + "\":" + tableData + (!isInstall ? messageJson.toString() : "") + "}");
|
|
|
+ }
|
|
|
+
|
|
|
+ String json;
|
|
|
+ if (isInstall) {
|
|
|
+ json = "{\"dataAcquiredList\": [" + sb.toString() + "]," + requestJson + messageJson.toString() + "}";
|
|
|
+ } else {
|
|
|
+ json = sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info(name + json);
|
|
|
+
|
|
|
+ sendMessage2Server((isInstall ? this.installTopicName : this.repaireTopicName), json, token, key);
|
|
|
+
|
|
|
+ oldSystemService.successUpdate(requestTable, synTaskNo);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public void sendMessage2Server(String topic, String json, Token token, String key) throws Exception {
|
|
|
+
|
|
|
+ String url = this.baseurl + "kafka/sendMessage";
|
|
|
+ Map<String, String> paramMap = new HashMap<String, String>();
|
|
|
+ paramMap.put("topic", topic);
|
|
|
+ paramMap.put("key", key);
|
|
|
+ paramMap.put("message", json);
|
|
|
+ JSONResult jsonResult = HttpUtil.httpRequestWithToken(url, "POST", JSON.toJSONString(paramMap), token);
|
|
|
+ if (!jsonResult.isOK()) {
|
|
|
+ throw new Exception("发送消息失败[" + jsonResult.getMsg() + "]");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("serial")
|
|
|
+ private Set<String> excludeCols = new HashSet<String>() {
|
|
|
+ {
|
|
|
+ add("syn_task_no");
|
|
|
+ add("syn_req_time");
|
|
|
+ add("syn_status");
|
|
|
+ add("syn_time");
|
|
|
+ add("syn_times");
|
|
|
+ add("syn_err_msg");
|
|
|
+ add("worker_order_no");
|
|
|
+ add("status");
|
|
|
+ add("fs_status");
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ public Map<String, Object> filterDataMap(Map<String, Object> row, boolean isSub8Hour) {
|
|
|
+ Map<String, Object> paramMap = new HashMap<String, Object>();
|
|
|
+ Set<String> keys = row.keySet();
|
|
|
+ for (String key : keys) {
|
|
|
+ if (!excludeCols.contains(key)) {
|
|
|
+ if (isSub8Hour) {
|
|
|
+ Object value = row.get(key.trim());
|
|
|
+ if (value != null && (value instanceof Date)) {
|
|
|
+ Date date = (Date) value;
|
|
|
+ Calendar calendar = Calendar.getInstance();
|
|
|
+ calendar.setTime(date);
|
|
|
+ calendar.add(Calendar.HOUR, -8);
|
|
|
+ row.put(key.trim(), calendar.getTime());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ paramMap.put(DataUtil.fromDbName2ServiceName(key.trim()), row.get(key.trim()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return paramMap;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|