摘要:现在的分片策略是上海深圳分别建库,每个库都存各自交易所的两支股票的,且按照月分表。五配置分片策略数据库分片策略在这个实例中,数据库的分库就是根据上海和深圳来分的,在中是单键分片。
由于当当发布了最新的Sharding-Sphere,所以本文已经过时,不日将推出新的版本
项目中遇到了分库分表的问题,找到了shrding-jdbc,于是就搞了一个springboot+sharding-jdbc+mybatis的增量分片的应用。今天写博客总结一下遇到的坑。一、需求背景其实,我自己写了一个increament-jdbc组件的,当我读了sharding-jdbc的源码之后,发现思路和原理差不多,sharding这个各方面要比我的强,毕竟我是一天之内赶出来的东东。
示例代码地址:https://gitee.com/spartajet/s...
demo没有写日志,也没有各种异常判断,只是说明问题
我的项目背景就不说了,现在举一个例子吧:A,B两支股票都在上海,深圳上市,需要实时记录这两支股票的交易tick(不懂tick也没有关系)。现在的分片策略是:上海、深圳分别建库,每个库都存各自交易所的两支股票的ticktick,且按照月分表。如图:
db_sh
tick_a_2017_01
tick_b_2017_01
........
tick_a_2017_12
tick_b_2017_12
db_sz
tick_a_2017_01
tick_b_2017_01
........
tick_a_2017_12
tick_b_2017_12
分库分表就是这样的。根据这个建库。 **千万不要讨论这样分库分表是否合适,这里这样分片只是举个栗子,说明分库分表这个事情。** **Sharding-jdbc是不支持建库的SQL,如果像我这样增量的数据库和数据表,那就要一次性把一段时期的数据库和数据表都要建好。**二、建库
考虑到表确实多,所以我就只建1,2月份的表。语句见demo文件。
三、springboot集成sharding-jdbcmvn配置pom如下:
com.spartajet springboot-sharding-jdbc-demo 0.0.1-SNAPSHOT jar springboot-sharding-jdbc-demo Springboot integrate Sharding-jdbc Demo UTF-8 UTF-8 UTF-8 zh_CN 1.8 ${java.version} 1.4.1.RELEASE 1.0.13 5.1.36 1.4.1 2.8.0 2.9.7 1.4 2.5 1.2.0 org.springframework.boot spring-boot-starter-jdbc ${spring.boot.version} org.mybatis.spring.boot mybatis-spring-boot-starter ${mybatis-spring-boot-starter.version} commons-dbcp commons-dbcp ${commons-dbcp.version} com.dangdang sharding-jdbc-core ${sharding-jdbc.version} com.dangdang sharding-jdbc-config-spring ${sharding-jdbc.version} com.dangdang sharding-jdbc-self-id-generator ${sharding-jdbc.version} com.google.code.gson gson ${com.google.code.gson.version} org.springframework.boot spring-boot-starter-web ${spring.boot.version} org.springframework.boot spring-boot-start-logging org.springframework.boot spring-boot-starter-test ${spring.boot.version} test org.springframework.boot spring-boot-starter-log4j2 ${spring.boot.version} log4j log4j org.springframework.boot spring-boot-starter ${spring.boot.version} org.springframework.boot spring-boot-start-logging logback-classic ch.qos.logback log4j-over-slf4j org.slf4j mysql mysql-connector-java ${mysql-connector-java.version} org.springframework.boot spring-boot-maven-plugin ${spring.boot.version} org.apache.maven.plugins maven-compiler-plugin 3.1 ${project.build.jdk} ${project.build.sourceEncoding} org.apache.maven.plugins maven-jar-plugin 2.4
其实这个和sharding-jdbc的官网差不多。其实我想写一个sharding-jdbc-spring-boot-starter的pom的,等项目业务都做完再说吧。
四、配置数据源我想将数据库做成可配置的,所以我没有在application.properties文件中直接配置数据库,而是写在了database.json文件中。
[ { "name": "db_sh", "url": "jdbc:mysql://localhost:3306/db_sh", "username": "root", "password": "root", "driveClassName":"com.mysql.jdbc.Driver" }, { "name": "db_sz", "url": "jdbc:mysql://localhost:3306/db_sz", "username": "root", "password": "root", "driveClassName":"com.mysql.jdbc.Driver" } ]
然后在springboot读取database文件,加载方式如下:
@Value("classpath:database.json") private Resource databaseFile; @Bean public Listdatabases() throws IOException { String databasesString = IOUtils.toString(databaseFile.getInputStream(), Charset.forName("UTF-8")); List databases = new Gson().fromJson(databasesString, new TypeToken >() { }.getType()); return databases; }
加载完database信息之后,可以通过工厂方法配置逻辑数据库:
@Bean public HashMapdataSourceMap(List databases) { Map dataSourceMap = new HashMap<>(); for (Database database : databases) { DataSourceBuilder dataSourceBuilder = DataSourceBuilder.create(); dataSourceBuilder.url(database.getUrl()); dataSourceBuilder.driverClassName(database.getDriveClassName()); dataSourceBuilder.username(database.getUsername()); dataSourceBuilder.password(database.getPassword()); DataSource dataSource = dataSourceBuilder.build(); dataSourceMap.put(database.getName(), dataSource); } return dataSourceMap; }
这样就把各个逻辑数据库就加载好了。
五、配置分片策略 5.1数据库分片策略在这个实例中,数据库的分库就是根据上海(sh)和深圳(sz)来分的,在sharding-jdbc中是单键分片。根据官方文档实现接口SingleKeyDatabaseShardingAlgorithm就可以
@service public class DatabaseShardingAlgorithm implements SingleKeyDatabaseShardingAlgorithm{ /** * 根据分片值和SQL的=运算符计算分片结果名称集合. * * @param availableTargetNames 所有的可用目标名称集合, 一般是数据源或表名称 * @param shardingValue 分片值 * * @return 分片后指向的目标名称, 一般是数据源或表名称 */ @Override public String doEqualSharding(Collection availableTargetNames, ShardingValue shardingValue) { String databaseName = ""; for (String targetName : availableTargetNames) { if (targetName.endsWith(shardingValue.getValue())) { databaseName = targetName; break; } } return databaseName; } }
此接口还有另外两个方法,doInSharding和doBetweenSharding,因为我暂时不用IN和BETWEEN方法,所以就没有写,直接返回null。
5.2数据表分片策略数据表的分片策略是根据股票和时间共同决定的,在sharding-jdbc中是多键分片。根据官方文档,实现MultipleKeysTableShardingAlgorithm接口就OK了
@service public class TableShardingAlgorithm implements MultipleKeysTableShardingAlgorithm { /** * 根据分片值计算分片结果名称集合. * * @param availableTargetNames 所有的可用目标名称集合, 一般是数据源或表名称 * @param shardingValues 分片值集合 * * @return 分片后指向的目标名称集合, 一般是数据源或表名称 */ @Override public CollectiondoSharding(Collection availableTargetNames, Collection > shardingValues) { String name = null; Date time = null; for (ShardingValue> shardingValue : shardingValues) { if (shardingValue.getColumnName().equals("name")) { name = ((ShardingValue ) shardingValue).getValue(); } if (shardingValue.getColumnName().equals("time")) { time = ((ShardingValue ) shardingValue).getValue(); } if (name != null && time != null) { break; } } String timeString = new SimpleDateFormat("yyyy_MM").format(time); String suffix = name + "_" + timeString; Collection result = new LinkedHashSet<>(); for (String targetName : availableTargetNames) { if (targetName.endsWith(suffix)) { result.add(targetName); } } return result; } }
这些方法的使用可以查官方文档。
5.3注入分片策略以上只是定义了分片算法,还没有形成策略,还没有告诉shrding将哪个字段给分片算法:
@Configuration public class ShardingStrategyConfig { @Bean public DatabaseShardingStrategy databaseShardingStrategy(DatabaseShardingAlgorithm databaseShardingAlgorithm) { DatabaseShardingStrategy databaseShardingStrategy = new DatabaseShardingStrategy("exchange", databaseShardingAlgorithm); return databaseShardingStrategy; } @Bean public TableShardingStrategy tableShardingStrategy(TableShardingAlgorithm tableShardingAlgorithm) { Collectioncolumns = new LinkedList<>(); columns.add("name"); columns.add("time"); TableShardingStrategy tableShardingStrategy = new TableShardingStrategy(columns, tableShardingAlgorithm); return tableShardingStrategy; } }
这样才能形成完成的分片策略。
六、配置Sharding-jdbc的DataSourcesharding-jdbc的原理其实很简单,就是自己做一个DataSource给上层应用使用,这个DataSource包含所有的逻辑库和逻辑表,应用增删改查时,他自己再修改sql,然后选择合适的数据库继续操作。所以这个DataSource创建很重要。
@Bean @Primary public DataSource shardingDataSource(HashMapdataSourceMap, DatabaseShardingStrategy databaseShardingStrategy, TableShardingStrategy tableShardingStrategy) { DataSourceRule dataSourceRule = new DataSourceRule(dataSourceMap); TableRule tableRule = TableRule.builder("tick").actualTables(Arrays.asList("db_sh.tick_a_2017_01", "db_sh.tick_a_2017_02", "db_sh.tick_b_2017_01", "db_sh.tick_b_2017_02", "db_sz.tick_a_2017_01", "db_sz.tick_a_2017_02", "db_sz.tick_b_2017_01", "db_sz.tick_a_2017_02")).dataSourceRule(dataSourceRule).build(); ShardingRule shardingRule = ShardingRule.builder().dataSourceRule(dataSourceRule).tableRules(Arrays.asList(tableRule)).databaseShardingStrategy(databaseShardingStrategy).tableShardingStrategy(tableShardingStrategy).build(); DataSource shardingDataSource = ShardingDataSourceFactory.createDataSource(shardingRule); return shardingDataSource; }
这里要着重说一下为什么要用@Primary这个注解,没有这个注解是会报错的,错误大致意思就是DataSource太多了,mybatis不知道用哪个。加上这个mybatis就知道用sharding的DataSource了。这里参考的是jpa的多数据源配置
七、配置mybatis 7.1 Beanpublic class Tick { private long id; private String name; private String exchange; private int ask; private int bid; private Date time; }7.2 Mapper
很简单,只实现一个插入方法
@Mapper public interface TickMapper { @Insert("insert into tick (id,name,exchange,ask,bid,time) values (#{id},#{name},#{exchange},#{ask},#{bid},#{time})") void insertTick(Tick tick); }7.3 SessionFactory配置
还要设置一下tick的SessionFactory:
@Configuration @MapperScan(basePackages = "com.spartajet.shardingboot.mapper", sqlSessionFactoryRef = "sessionFactory") public class TickSessionFactoryConfig { @Bean public SqlSessionFactory sessionFactory(DataSource shardingDataSource) throws Exception { final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean(); sessionFactory.setDataSource(shardingDataSource); return sessionFactory.getObject(); } @Bean public CommonSelfIdGenerator commonSelfIdGenerator() { CommonSelfIdGenerator.setClock(AbstractClock.systemClock()); CommonSelfIdGenerator commonSelfIdGenerator = new CommonSelfIdGenerator(); return commonSelfIdGenerator; } }
这里添加了一个CommonSelfIdGenerator,sharding自带的id生成器,看了下代码和facebook的snowflake类似。我又不想把数据库的主键设置成自增的,否则数据双向同步会死的很惨的。
八、测试写入@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class SpringbootShardingJdbcDemoApplicationTests { @Autowired private TickMapper tickMapper; @Autowired private CommonSelfIdGenerator commonSelfIdGenerator; @Test public void contextLoads() { Tick tick = new Tick(commonSelfIdGenerator.generateId().longValue(), "a", "sh", 100, 200, new Date()); this.tickMapper.insertTick(tick); } }
成功实现增量分库分表!!!
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/71030.html
摘要:基于最新的,是你学习的最佳指南。驱动程序通过自动注册,手动加载类通常是不必要。由于加上了注解,如果转账中途出了意外和的钱都不会改变。三的方式项目结构相比于注解的方式主要有以下几点改变,非常容易实现。公众号多篇文章被各大技术社区转载。 Github 地址:https://github.com/Snailclimb/springboot-integration-examples(Sprin...
摘要:前提好几周没更新博客了,对不断支持我博客的童鞋们说声抱歉了。熟悉我的人都知道我写博客的时间比较早,而且坚持的时间也比较久,一直到现在也是一直保持着更新状态。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好几周没更新博客了,对不断支持我博客的童鞋们说声:抱歉了!。自己这段时...
摘要:本文是作者自己对中线程的状态线程间协作相关使用的理解与总结,不对之处,望指出,共勉。当中的的数目而不是已占用的位置数大于集合番一文通版集合番一文通版垃圾回收机制讲得很透彻,深入浅出。 一小时搞明白自定义注解 Annotation(注解)就是 Java 提供了一种元程序中的元素关联任何信息和着任何元数据(metadata)的途径和方法。Annotion(注解) 是一个接口,程序可以通过...
摘要:但本文将讲述如何将缓存应用到应用中。这是的使用注解之一,除此之外常用的还有和,分别简单介绍一下配置在方法上表示其返回值将被加入缓存。 showImg(https://segmentfault.com/img/remote/1460000016643568); 注: 本文首发于 博客 CodeSheep · 程序羊,欢迎光临 小站!本文共 851字,阅读大约需要 3分钟 ! 本文内...
阅读 1586·2021-11-22 15:33
阅读 1727·2021-11-15 18:01
阅读 663·2021-10-09 09:43
阅读 2603·2021-09-22 16:03
阅读 757·2021-09-03 10:28
阅读 3549·2021-08-11 10:22
阅读 2717·2019-08-30 15:54
阅读 1759·2019-08-30 14:21