| 副标题[/!--empirenews.page--]  前言  近期接到一个任务,需要改造现有从mysql往Elasticsearch导入数据MTE(mysqlToEs)小工具,由于之前采用单线程导入,千亿数据需要两周左右的时间才能导入完成,导入效率非常低。所以楼主花了3天的时间,利用java线程池框架Executors中的FixedThreadPool线程池重写了MTE导入工具,单台服务器导入效率提高十几倍(合理调整线程数据,效率更高)。 关键技术栈
    ElasticsearchjdbcExecutorServiceThreadsql 工具说明maven依赖 <dependency>  <groupId>mysql</groupId>  <artifactId>mysql-connector-java</artifactId>  <version>${mysql.version}</version> </dependency> <dependency>  <groupId>org.elasticsearch</groupId>  <artifactId>elasticsearch</artifactId>  <version>${elasticsearch.version}</version> </dependency> <dependency>  <groupId>org.elasticsearch.client</groupId>  <artifactId>transport</artifactId>  <version>${elasticsearch.version}</version> </dependency> <dependency>  <groupId>org.projectlombok</groupId>  <artifactId>lombok</artifactId>  <version>${lombok.version}</version> </dependency> <dependency>  <groupId>com.alibaba</groupId>  <artifactId>fastjson</artifactId>  <version>${fastjson.version}</version> </dependency> 
 java线程池设置 默认线程池大小为21个,可调整。其中POR为处理流程已办数据线程池,ROR为处理流程已阅数据线程池。 private static int THREADS = 21; public static ExecutorService POR = Executors.newFixedThreadPool(THREADS); public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS); 
 定义已办生产者线程/已阅生产者线程:ZlPendProducer/ZlReadProducer public class ZlPendProducer implements Runnable {  ...  @Override  public void run() {  System.out.println(threadName + "::启动...");  for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++)  try {  ....  int size = 1000;  for (int i = 0; i < count; i += size) {  if (i + size > count) {  //作用为size最后没有100条数据则剩余几条newList中就装几条  size = count - i;  }  String sql = "select * from " + tableName + " limit " + i + ", " + size;  System.out.println(tableName + "::sql::" + sql);  rs = statement.executeQuery(sql);  List<HistPendingEntity> lst = new ArrayList<>();  while (rs.next()) {  HistPendingEntity p = PendUtils.getHistPendingEntity(rs);  lst.add(p);  }  MteExecutor.POR.submit(new ZlPendConsumer(lst));  Thread.sleep(2000);  }  ....  } catch (Exception e) {  e.printStackTrace();  }  } } public class ZlReadProducer implements Runnable {  ...已阅生产者处理逻辑同已办生产者 } 
 定义已办消费者线程/已阅生产者线程:ZlPendConsumer/ZlReadConsumer public class ZlPendConsumer implements Runnable {  private String threadName;  private List<HistPendingEntity> lst;  public ZlPendConsumer(List<HistPendingEntity> lst) {  this.lst = lst;  }  @Override  public void run() {  ...  lst.forEach(v -> {  try {  String json = new Gson().toJson(v);  EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null);  Const.COUNTER.LD_P.incrementAndGet();  } catch (Exception e) {  e.printStackTrace();  System.out.println("err::PendingId::" + v.getPendingId());  }  });  ...  } } public class ZlReadConsumer implements Runnable {  //已阅消费者处理逻辑同已办消费者 } 
 定义导入Elasticsearch数据监控线程:Monitor (编辑:广西网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |