@Override public void syncData(String category) { //判断当前es索引是否存在 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd"); StringBuffer indexName = null; if(category.equals(StatContant.CATEGORY_TYPE_HOUSE)) { //房源 indexName = new StringBuffer("bi_kanyu_dm_housebase_").append(dateFormat.format(new Date())).append("_1"); }else if(category.equals(StatContant.CATEGORY_TYPE_CUSTORMER)) { //客源 indexName = new StringBuffer("bi_kanyu_dm_customer_").append(dateFormat.format(new Date())).append("_1"); }else if(category.equals(StatContant.CATEGORY_TYPE_PERFORMANCE)) { //业绩 indexName = new StringBuffer("bi_kanyu_dm_performance_").append(dateFormat.format(new Date())).append("_1"); } String indexName1 = indexName.toString(); String clusterNames = AppPropUtil.getPropValue(AppPropUtil.Keys.BI_CLUSTER_NAME); boolean indexExists = searchService.isIndexExists(clusterNames, indexName.toString()); if (indexExists) { //如果这个索引存在,先干掉 searchService.deleteIndex(clusterNames, indexName.toString()); } //创建当前对应的索引 boolean createIndex = searchService.createIndex(clusterNames, indexName.toString()); if (createIndex) { //创建安索引成功 logger.error("创建索引" + indexName + "成功"); //创建对应的mapping boolean createMapping = false; if(category.equals(StatContant.CATEGORY_TYPE_HOUSE)) { //房源 createMapping = searchService.createMapping(clusterNames, indexName.toString(), ESContant.DM_HOUSEBASE_TYPE, FileUtils.getContentByLine("mappings/dm_housebase.txt")); }else if(category.equals(StatContant.CATEGORY_TYPE_CUSTORMER)) { //客源 createMapping = searchService.createMapping(clusterNames, indexName.toString(), ESContant.DM_CUSTOMER_TYPE, FileUtils.getContentByLine("mappings/dm_customer.txt")); }else if(category.equals(StatContant.CATEGORY_TYPE_PERFORMANCE)) { //业绩 createMapping = searchService.createMapping(clusterNames, indexName.toString(), ESContant.DM_PERFORMANCE_TYPE, FileUtils.getContentByLine("mappings/dm_performance.txt")); } if (createMapping) { logger.error("创建mapping--"+StatContant.CATEGORY_TYPE_MAP.get(category)+"成功"); long begin = System.currentTimeMillis(); logger.error("------------------开始同步BI两个月"+StatContant.CATEGORY_TYPE_MAP.get(category)+"中间表数据:开始时间" + DateUtil.getDefaultDate() + "-----------------------"); int pageSize = 10000; while (true) { int total = biJdbcTemplate.queryForInt("select count(1) from "+StatContant.CATEGORY_TYPE_MAP.get(category)+"_batch dt where dt.processed=1 and dt.status=0 and to_char(dt.batchtime,'yyyyMMdd') = to_char(sysdate,'yyyyMMdd')"); if (total == 0) { break; } logger.error("查出BI两个月"+StatContant.CATEGORY_TYPE_MAP.get(category)+"中间表数据为:" + total + "条"); double querySize = total; double totalPage = Math.ceil(querySize / pageSize); logger.error("共分" + totalPage + "页"); final String sql = "select * from (select t.*,rownum as rn from "+StatContant.CATEGORY_TYPE_MAP.get(category)+"_batch t where t.processed=1 and t.status=0 and to_char(t.batchtime,'yyyyMMdd') = to_char(sysdate,'yyyyMMdd') and rownum<=? order by t.batchtime asc) where rn>=?"; ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < totalPage; i++) { int start = i * pageSize + 1; int end = (i + 1) * pageSize; if (end > total) { end = total; } final Object[] params = new Object[]{end, start}; es.submit(new Runnable() { @Override public void run() { List