导读
前二天写了一篇《Java 多线程并发编程》点我直达,放国庆,在家闲着没事,继续写剩下的东西,开干!
线程池
为什么要使用线程池
例如web服务器、数据库服务器、文件服务器或邮件服务器之类的。请求的时候,单个任务时间很短,但是请求数量巨大。每一次请求,就会创建一个新线程,然后在新线程中请求服务,频繁的创建线程,销毁线程造成系统很大的开销,资源的浪费。
线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程车创建的开销分摊到多个任务上。
创建与使用
Future
对具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果。get方法会阻塞,直到任务返回结果。
Callable&FutureTask
Callable与Runnable功能相似,Callable有返回值;Runnable没有返回值;一般情况下,Callable与FutureTask一起使用,或者与线程池一起使用
线程池核心组成部分
corePoolSize:核心线程池大小
maximumPoolSize:线程池最大容量
KeepAliveTime:当线程数量大于核心时,多余的空闲线程在终止之前等待新任务的最大时间
unit:时间单位
workQueue:工作队列
ThreadFactory:线程工厂
handler:拒绝策略
Executor框架
实战
需求分析
业务场景
一般系统,多数会与第三方系统的数据进行打交道,而第三方的生产库,并不允许我们直接操作。在企业里面,一般都是通过中间表进行同步,即第三方系统将生产数据放入一张与其生产环境隔离的另一个独立数据库中的独立表,在根据接口协议,增加相应的字段。而我方需要读取该中间表中的数据,并对数据进行同步操作。此时就需要编写相应的程序进行数据同步。
同步方式
全量同步:每天定时将当天的生产数据全部同步过来(优点:实现检点;缺点:数据同步不及时)
增量同步:每新增一条,便将该数据同步过来(优点:数据接近实时同步;缺点:实现相对困难)
我方需要做的事情
读取中间表的数据,并同步到业务系统中
模型抽离(生产者消费者模型)
生产者:读取中间表的数据
消费者:消费生产者生产的数据
接口协议的制定
取我方业务上需要用到的字段
需要有字段记录数据什么时候进入中间表
增加相应的数据标志位,用于标志数据的同步状态
记录数据的同步时间
技术选型
mybatis
单一生产者多消费者
多线程并发操作
中间表设计
项目搭建
项目结构
pom.xml
4.0.0
com.cyb
ybchen_syn
1.0-SNAPSHOT
org.mybatis
mybatis
3.5.6
mysql
mysql-connector-java
8.0.21
com.alibaba
druid
1.2.1
org.slf4j
slf4j-log4j12
1.7.30
junit
junit
4.13
test
pom.xml
log4j.properties
### 设置###
log4j.rootLogger = debug,stdout,D,E
### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
### 输出DEBUG 级别以上的日志到=E://logs/error.log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = ./logs/debug.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
### 输出ERROR 级别以上的日志到=E://logs/error.log ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File =./logs/error.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR
log4j.appender.E.layout = org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
log4j.properties
middle-student.xml
/p>
PUBLIC "-////DTD Mapper 3.0//EN"
"/dtd/mybatis-3-mapper.dtd">
SELECT
*
FROM student
WHERE data_status = 'I' limit #{count}
update student
set data_status = #{dataStatus}, deal_time = #{dealTime}
where id =#{id}
middle-student.xml
test-student.xml
/p>
PUBLIC "-////DTD Mapper 3.0//EN"
"/dtd/mybatis-3-mapper.dtd">
insert into student (name,sex,department) values (#{name},#{sex},#{department})
test-student.xml
mybatis-config-middle.xml
/p>
PUBLIC "-////DTD Config 3.0//EN"
"/dtd/mybatis-3-config.dtd">
mybatis-config-middle.xml
mybatis-config-test.xml
/p>
PUBLIC "-////DTD Config 3.0//EN"
"/dtd/mybatis-3-config.dtd">
mybatis-config-test.xml
StudentConst.java
packagecom.cyb.cost;public classStudentConst {//I:第三方系统入库;D:处理中;F:处理完成;E:发生错误或异常
public static final String INIT="I";public static final String DEALING="D";public static final String FINISH="F";public static final String ERROR="E";
}
StudentConst.java
DruidDataSourceFactory.java
packagecom.cyb.datasource;importcom.alibaba.druid.pool.DruidDataSource;importorg.apache.ibatis.datasource.unpooled.UnpooledDataSourceFactory;/*** @ClassName:DruidDataSourceFactory
* @Description:Druid连接池工厂类
* @Author:chenyb
* @Date:/10/7 7:30 下午
* @Versiion:1.0*/
public class DruidDataSourceFactory extendsUnpooledDataSourceFactory {publicDruidDataSourceFactory() {this.dataSource = newDruidDataSource();
}
}
DruidDataSourceFactory.java
Student.java(middle包下)
packagecom.cyb.entity.middle;importjava.io.Serializable;importjava.util.Date;public class Student implementsSerializable {privateInteger id;privateString name;privateString sex;privateString address;privateString department;privateDate addTime;privateString dataStatus;privateDate dealTime;publicInteger getId() {returnid;
}public voidsetId(Integer id) {this.id =id;
}publicString getName() {returnname;
}public voidsetName(String name) {this.name =name;
}publicString getSex() {returnsex;
}public voidsetSex(String sex) {this.sex =sex;
}publicString getAddress() {returnaddress;
}public voidsetAddress(String address) {this.address =address;
}publicString getDepartment() {returndepartment;
}public voidsetDepartment(String department) {this.department =department;
}publicDate getAddTime() {returnaddTime;
}public voidsetAddTime(Date addTime) {this.addTime =addTime;
}publicString getDataStatus() {returndataStatus;
}public voidsetDataStatus(String dataStatus) {this.dataStatus =dataStatus;
}publicDate getDealTime() {returndealTime;
}public voidsetDealTime(Date dealTime) {this.dealTime =dealTime;
}
@OverridepublicString toString() {return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", sex='" + sex + '\'' +
", address='" + address + '\'' +
", department='" + department + '\'' +
", addTime=" + addTime +
", dataStatus='" + dataStatus + '\'' +
", dealTime=" + dealTime +
'}';
}
}
student.java
Student.java(test包下)
packagecom.cyb.entity.test;importjava.io.Serializable;public class Student implementsSerializable {privateInteger id;privateString name;privateString sex;privateString department;publicInteger getId() {returnid;
}public voidsetId(Integer id) {this.id =id;
}publicString getName() {returnname;
}public voidsetName(String name) {this.name =name;
}publicString getSex() {returnsex;
}public voidsetSex(String sex) {this.sex =sex;
}publicString getDepartment() {returndepartment;
}public voidsetDepartment(String department) {this.department =department;
}
@OverridepublicString toString() {return "student{" +
"id=" + id +
", name='" + name + '\'' +
", sex='" + sex + '\'' +
", department='" + department + '\'' +
'}';
}
}
Student.java
MiddleProcess.java
packagecom.cyb.process;importcom.cyb.entity.middle.Student;importjava.util.List;public interfaceMiddleProcess {/*** 查询数据
*@paramcount 一次查询的数量
*@return
*/List queryList(intcount);/*** 修改数据状态
*@paramdata 待修改数据
*@paramstatus 要修改成的状态
*@return
*/
int modifyListStatus(Listdata, String status);
}
MiddleProcess.java
TestProcess.java
packagecom.cyb.process;importcom.cyb.entity.middle.Student;importjava.util.List;public interfaceTestProcess {/*** 处理数据
*@paramdata*/
void hand(Listdata);
}
TestProcess.java
MiddleProcessImpl.java
packagecom.cyb.process.impl;importcom.cyb.entity.middle.Student;importcom.cyb.process.MiddleProcess;importcom.cyb.util.SqlSessionUtil;importorg.apache.ibatis.session.SqlSession;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.Date;importjava.util.List;public class MiddleProcessImpl implementsMiddleProcess {private static final Logger LOGGER= LoggerFactory.getLogger(MiddleProcess.class);
@Overridepublic List queryList(intcount) {
SqlSession middleSqlSession= SqlSessionUtil.getSqlSession("middle");
List objects =null;try{
objects= middleSqlSession.selectList("middle-student.selectList", count);
}catch(Exception e){
LOGGER.error("查询发生异常=======》",e);
}finally{//关闭连接
middleSqlSession.close();
}returnobjects;
}
@Overridepublic int modifyListStatus(Listdata, String status) {
data.forEach(stu->{
stu.setDataStatus(status);
SqlSession middleSqlSession= SqlSessionUtil.getSqlSession("middle");try{
middleSqlSession.update("middle-student.updateStatusById",stu);
mit();
}catch(Exception e){//回滚当前提交
middleSqlSession.rollback();
LOGGER.error("修改状态失败=======》",e);
}finally{
middleSqlSession.close();
}
});return 0;
}
}
MiddleProcessImpl.java
TestProcessImpl.java
packagecom.cyb.process.impl;importcom.cyb.cost.StudentConst;importcom.cyb.entity.middle.Student;importcom.cyb.process.TestProcess;importcom.cyb.util.SqlSessionUtil;importorg.apache.ibatis.session.SqlSession;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.ArrayList;importjava.util.Date;importjava.util.List;public class TestProcessImpl implementsTestProcess {private Logger LOGGER = LoggerFactory.getLogger(TestProcess.class);
@Overridepublic void hand(Listdata) {//将data转换成业务库的实体
List students =adapter(data);//处理数据,并入库
students.forEach(stu ->{
stu.setName(stu.getName()+ "_test");
SqlSession testSqlSession= SqlSessionUtil.getSqlSession("test");try{
testSqlSession.insert("test-student.addStudent", stu);
mit();//修改中间表状态
modifyMiddle(stu.getId(), StudentConst.FINISH);
}catch(Exception e) {//回滚操作
testSqlSession.rollback();
LOGGER.error("处理数据发生异常============》",e);
}finally{
testSqlSession.close();
}
});
}/*** 数据适配器
*
*@paramdata
*@return
*/
public List adapter(Listdata) {
List result = new ArrayList<>();
data.forEach(stu->{
com.cyb.entity.test.Student student= newcom.cyb.entity.test.Student();
student.setId(stu.getId());
student.setName(stu.getName());
student.setDepartment(stu.getDepartment());
student.setSex(stu.getSex());
result.add(student);
});returnresult;
}/*** 修改中间表状态
*@paramid
*@paramstatus*/
private void modifyMiddle(intid, String status) {
Student student= newStudent();
student.setId(id);
student.setDataStatus(status);
student.setDealTime(newDate());
SqlSession middleSqlSession= SqlSessionUtil.getSqlSession("middle");try{
middleSqlSession.update("middle-student.updateStatusById", student);
mit();
}catch(Exception e) {
middleSqlSession.rollback();
LOGGER.error("修改中间表状态失败===========》",e);
}finally{
middleSqlSession.close();
}
}
}
TestProcessImpl.java
Consumer.java
packagecom.cyb.start;importcom.cyb.entity.middle.Student;importcom.cyb.process.TestProcess;importjava.util.List;importjava.util.concurrent.LinkedBlockingDeque;/*** @ClassName:Consumer
* @Description:消费者
* @Author:chenyb
* @Date:/10/7 9:23 下午
* @Versiion:1.0*/
public class Consumer implementsRunnable{private Listdata;privateTestProcess testProcess;private LinkedBlockingDequeconsumer;public Consumer(TestProcess testProcess, LinkedBlockingDequeconsumer) {this.testProcess =testProcess;this.consumer =consumer;
}
@Overridepublic voidrun() {try{
testProcess.hand(data);
}finally{try{//添加元素,队列满,进入阻塞状态
consumer.put(this);
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}public void setData(Listdata){this.data=data;
}
}
Consumer.java
Producer.java
packagecom.cyb.start;importcom.cyb.cost.StudentConst;importcom.cyb.entity.middle.Student;importcom.cyb.process.MiddleProcess;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.Collections;importjava.util.List;importjava.util.concurrent.LinkedBlockingDeque;importjava.util.concurrent.ThreadPoolExecutor;/*** @ClassName:Producer
* @Description:提供者
* @Author:chenyb
* @Date:/10/7 9:22 下午
* @Versiion:1.0*/
public class Producer implementsRunnable {private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);privateMiddleProcess middleProcess;private LinkedBlockingDequeconsumer;privateThreadPoolExecutor executor;public Producer(MiddleProcess middleProcess, LinkedBlockingDequeconsumer, ThreadPoolExecutor executor) {this.middleProcess =middleProcess;this.consumer =consumer;this.executor =executor;
}
@Overridepublic voidrun() {while (true) {//每次生产10条数据
List students = middleProcess.queryList(10);try{if (students != null && students.size() > 0) {//将数据修改为处理中
middleProcess.modifyListStatus(students, StudentConst.DEALING);
Consumer con=(Consumer) consumer.take();
con.setData(students);
executor.execute(con);
}else{//如果没有数据,睡眠5秒
try{
Thread.sleep(5000L);
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}catch(Exception e){
LOGGER.error("生产者发生异常========>",e);
}
}
}
}
Producer.java
Main.java
packagecom.cyb.start;importcom.cyb.process.MiddleProcess;importcom.cyb.process.TestProcess;importcom.cyb.process.impl.MiddleProcessImpl;importcom.cyb.process.impl.TestProcessImpl;importjava.util.concurrent.LinkedBlockingDeque;importjava.util.concurrent.LinkedBlockingQueue;importjava.util.concurrent.ThreadPoolExecutor;importjava.util.concurrent.TimeUnit;/*** 生产着消费者:1 VS 10*/
public classMain {public static voidmain(String[] args) {
TestProcess testProcess=newTestProcessImpl();
MiddleProcess middleProcess=newMiddleProcessImpl();
LinkedBlockingDeque runnables=new LinkedBlockingDeque<>(10);
ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(10,20,5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20));//10个消费者
for (int i = 0; i < 10; i++) {try{
runnables.put(newConsumer(testProcess,runnables));
}catch(InterruptedException e) {
e.printStackTrace();
}
}//开启一个线程-》生产者
Producer producer=newProducer(middleProcess,runnables,threadPoolExecutor);newThread(producer).start();
}
}
Main.java
SqlSessionUtil.java
packagecom.cyb.util;importorg.apache.ibatis.io.Resources;importorg.apache.ibatis.session.SqlSessionFactory;importorg.apache.ibatis.session.SqlSessionFactoryBuilder;importorg.apache.ibatis.session.SqlSession;importjava.io.IOException;importjava.io.Reader;/*** @ClassName:SqlSessionUtil
* @Description:SqlSession工具类
* @Author:chenyb
* @Date:/10/7 7:37 下午
* @Versiion:1.0*/
public classSqlSessionUtil {private static final String MYBATIS_CONFIG_MIDDLE = "mybatis-config-middle.xml";private static final String MYBATIS_CONFIG_TEST = "mybatis-config-test.xml";private staticSqlSessionFactory middleSqlSessionFactory;private staticSqlSessionFactory testSqlSessionFactory;private static Reader middleResourceAsReader =null;private static Reader testResourceAsReader =null;static{try{
middleResourceAsReader=Resources.getResourceAsReader(MYBATIS_CONFIG_MIDDLE);
testResourceAsReader=Resources.getResourceAsReader(MYBATIS_CONFIG_TEST);
middleSqlSessionFactory=newSqlSessionFactoryBuilder().build(middleResourceAsReader);
testSqlSessionFactory=newSqlSessionFactoryBuilder().build(testResourceAsReader);
}catch(IOException e) {
e.printStackTrace();
}finally{try{
middleResourceAsReader.close();
testResourceAsReader.close();
}catch(IOException e) {
e.printStackTrace();
}
}
}public staticSqlSession getSqlSession(String type){if ("test".equals(type)){returntestSqlSessionFactory.openSession();
}returnmiddleSqlSessionFactory.openSession();
}
}
SqlSessionUtil.java
sql脚本
/*Navicat Premium Data Transfer
Source Server : localhost
Source Server Type : MySQL
Source Server Version : 50728
Source Host : localhost:3306
Source Schema : middle
Target Server Type : MySQL
Target Server Version : 50728
File Encoding : 65001
Date: 07/10/ 22:42:55*/
SETNAMES utf8mb4;SET FOREIGN_KEY_CHECKS = 0;--------------------------------Table structure for student------------------------------
DROP TABLE IF EXISTS`student`;CREATE TABLE`student` (
`id`int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`name`varchar(255) NOT NULL COMMENT '姓名',
`sex`varchar(255) DEFAULT NULL COMMENT '性别',
`address`varchar(255) DEFAULT NULL COMMENT '地址',
`department`varchar(255) DEFAULT NULL COMMENT '系',
`add_time`datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据进入中间表时间',
`data_status`varchar(10) NOT NULL DEFAULT 'I' COMMENT 'I:第三方系统入库;D:处理中;F:处理完成;E:发生错误或异常',
`deal_time`datetime DEFAULT NULL COMMENT '处理时间',PRIMARY KEY(`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;--------------------------------Records of student------------------------------
BEGIN;INSERT INTO `student` VALUES (1, '张三', '男', '上海', '英语系', '-10-07 22:19:25', 'F', '-10-07 22:19:26');INSERT INTO `student` VALUES (2, '李四', '女', '北京', '中文系', '-10-07 22:19:25', 'F', '-10-07 22:19:26');INSERT INTO `student` VALUES (3, '王五', '男', '天津', '计算机系', '-10-07 22:19:25', 'F', '-10-07 22:19:26');COMMIT;SET FOREIGN_KEY_CHECKS = 1;
middle.student
/*Navicat Premium Data Transfer
Source Server : localhost
Source Server Type : MySQL
Source Server Version : 50728
Source Host : localhost:3306
Source Schema : test
Target Server Type : MySQL
Target Server Version : 50728
File Encoding : 65001
Date: 07/10/ 22:45:03*/
SETNAMES utf8mb4;SET FOREIGN_KEY_CHECKS = 0;--------------------------------Table structure for student------------------------------
DROP TABLE IF EXISTS`student`;CREATE TABLE`student` (
`id`int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`name`varchar(255) DEFAULT NULL COMMENT '姓名',
`sex`varchar(255) DEFAULT NULL COMMENT '性别',
`department`varchar(255) DEFAULT NULL COMMENT '系',PRIMARY KEY(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;SET FOREIGN_KEY_CHECKS = 1;
test.student
演示
项目源码下载
链接: /s/1C7q7_QRUhRoCZIVZ_Bp3KQ 密码: 7hbf
部署如何启动指定main
修改pom.xml
mainClass指定启动包下的main
org.springframework.boot
spring-boot-maven-plugin
com.cyb.start.Main
JAR
true
repackage
修改log4j.properties日志输出目录
部署
mysql服务器多线程模型_java 线程池 多线程并发实战(生产者消费者模型 1 vs 10) 附案例源码 - 陈彦斌 - 博客园...