MyBatis 源码阅读之数据库连接
MyBatis 的配置文件所有配置会被 org.apache.ibatis.builder.xml.XMLConfigBuilder 类读取,
而 MyBatis 的映射文件配置会被 org.apache.ibatis.builder.xml.XMLMapperBuilder 类读取。
本文探讨 事务管理器 和 数据源 相关代码配置 environment
以下是 mybatis 配置文件中 environments 节点的一般配置。
environments 节点的加载也不算复杂,它只会加载 id 为 development 属性值的 environment 节点。
它的加载代码在 XMLConfigBuilder 类的 environmentsElement() 方法中,代码不多,逻辑也简单,此处不多讲。
接下来我们看看 environment 节点下的子节点。transactionManager 节点的 type 值默认提供有 JDBC 和 MANAGED ,dataSource 节点的 type 值默认提供有 JNDI 、 POOLED 和 UNPOOLED 。
它们对应的类都可以在 Configuration 类的构造器中找到,当然下面我们也一个一个来分析。
现在我们大概了解了配置,然后来分析这些配置与 MyBatis 类的关系。
TransactionFactorytransactionManager 节点对应 TransactionFactory 接口,使用了 抽象工厂模式 。MyBatis 给我们提供了两个实现类:ManagedTransactionFactory 和 JdbcTransactionFactory ,它们分别对应者 type 属性值为 MANAGED 和 JDBC 。
TransactionFactory 有三个方法,我们需要注意的方法只有 newTransaction() ,它用来创建一个事务对象。
void setProperties(Properties props); Transaction newTransaction(Connection conn); Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit);
其中 JdbcTransactionFactory 创建的事务对象是 JdbcTransaction 的实例,该实例是对 JDBC 事务的简单封装,实例中 Connection 和 DataSource 对象正是事务所在的 连接 和 数据源 。
TransactionIsolationLevel 代表当前事务的隔离等级,它是一个枚举类,简单明了无需多言。而 autoCommit 表示是否开启了自动提交,开启了,则没有事务的提交和回滚等操作的意义了。
ManagedTransactionFactory 创建的事务对象是 ManagedTransaction 的实例,它本身并不控制事务,即 commit 和 rollback 都是不做任何操作,而是交由 JavaEE 容器来控制事务,以方便集成。
DataSourceFactoryDataSourceFactory 是获取数据源的接口,也使用了 抽象工厂模式 ,代码如下,方法极为简单:
public interface DataSourceFactory { /** * 可传入一些属性配置 */ void setProperties(Properties props); DataSource getDataSource(); }
MyBatis 默认支持三种数据源,分别是 UNPOOLED 、 POOLED 和 JNDI 。对应三个工厂类:
UnpooledDataSourceFactory 、 PooledDataSourceFactory 和 JNDIDataSourceFactory 。
其中 JNDIDataSourceFactory 是使用 JNDI 来获取数据源。我们很少使用,并且代码不是非常复杂,此处不讨论。我们先来看看 UnpooledDataSourceFactory :
public class UnpooledDataSourceFactory implements DataSourceFactory { private static final String DRIVER_PROPERTY_PREFIX = "driver."; private static final int DRIVER_PROPERTY_PREFIX_LENGTH = DRIVER_PROPERTY_PREFIX.length(); protected DataSource dataSource; public UnpooledDataSourceFactory() { this.dataSource = new UnpooledDataSource(); } @Override public void setProperties(Properties properties) { Properties driverProperties = new Properties(); // MetaObject 用于解析实例对象的元信息,如字段的信息、方法的信息 MetaObject metaDataSource = SystemMetaObject.forObject(dataSource); // 解析所有配置的键值对key-value,发现非预期的属性立即抛异常,以便及时发现 for (Object key : properties.keySet()) { String propertyName = (String) key; if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) { // 添加驱动的配置属性 String value = properties.getProperty(propertyName); driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value); } else if (metaDataSource.hasSetter(propertyName)) { // 为数据源添加配置属性 String value = (String) properties.get(propertyName); Object convertedValue = convertValue(metaDataSource, propertyName, value); metaDataSource.setValue(propertyName, convertedValue); } else { throw new DataSourceException("Unknown DataSource property: " + propertyName); } } if (driverProperties.size() > 0) { metaDataSource.setValue("driverProperties", driverProperties); } } @Override public DataSource getDataSource() { return dataSource; } /** * 将 String 类型的值转为目标对象字段的类型的值 */ private Object convertValue(MetaObject metaDataSource, String propertyName, String value) { Object convertedValue = value; Class> targetType = metaDataSource.getSetterType(propertyName); if (targetType == Integer.class || targetType == int.class) { convertedValue = Integer.valueOf(value); } else if (targetType == Long.class || targetType == long.class) { convertedValue = Long.valueOf(value); } else if (targetType == Boolean.class || targetType == boolean.class) { convertedValue = Boolean.valueOf(value); } return convertedValue; } }
虽然代码看起来复杂,实际上非常简单,在创建工厂实例时创建它对应的 UnpooledDataSource 数据源。
setProperties() 方法用于给数据源添加部分属性配置,convertValue() 方式时一个私有方法,就是处理 当 DataSource 的属性为整型或布尔类型时提供对字符串类型的转换功能而已。
最后我们看看 PooledDataSourceFactory ,这个类非常简单,仅仅是继承了 UnpooledDataSourceFactory ,然后构造方法替换数据源为 PooledDataSource 。
public class PooledDataSourceFactory extends UnpooledDataSourceFactory { public PooledDataSourceFactory() { this.dataSource = new PooledDataSource(); } }
虽然它的代码极少,实际上都在 PooledDataSource 类中。
DataSource看完了工厂类,我们来看看 MyBatis 提供的两种数据源类: UnpooledDataSource 和 PooledDataSource 。
UnpooledDataSourceUnpooledDataSource 看名字就知道是没有池化的特征,相对也简单点,以下代码省略一些不重要的方法
import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public class UnpooledDataSource implements DataSource { private ClassLoader driverClassLoader; private Properties driverProperties; private static MapregisteredDrivers = new ConcurrentHashMap (); private String driver; private String url; private String username; private String password; private Boolean autoCommit; // 事务隔离级别 private Integer defaultTransactionIsolationLevel; static { // 遍历所有可用驱动 Enumeration drivers = DriverManager.getDrivers(); while (drivers.hasMoreElements()) { Driver driver = drivers.nextElement(); registeredDrivers.put(driver.getClass().getName(), driver); } } // ...... private Connection doGetConnection(Properties properties) throws SQLException { // 每次获取连接都会检测驱动 initializeDriver(); Connection connection = DriverManager.getConnection(url, properties); configureConnection(connection); return connection; } /** * 初始化驱动,这是一个 同步 方法 */ private synchronized void initializeDriver() throws SQLException { // 如果不包含驱动,则准备添加驱动 if (!registeredDrivers.containsKey(driver)) { Class> driverType; try { // 加载驱动 if (driverClassLoader != null) { driverType = Class.forName(driver, true, driverClassLoader); } else { driverType = Resources.classForName(driver); } Driver driverInstance = (Driver)driverType.newInstance(); // 注册驱动代理到 DriverManager DriverManager.registerDriver(new DriverProxy(driverInstance)); // 缓存驱动 registeredDrivers.put(driver, driverInstance); } catch (Exception e) { throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e); } } } private void configureConnection(Connection conn) throws SQLException { // 设置是否自动提交事务 if (autoCommit != null && autoCommit != conn.getAutoCommit()) { conn.setAutoCommit(autoCommit); } // 设置 事务隔离级别 if (defaultTransactionIsolationLevel != null) { conn.setTransactionIsolation(defaultTransactionIsolationLevel); } } private static class DriverProxy implements Driver { private Driver driver; DriverProxy(Driver d) { this.driver = d; } /** * Driver 仅在 JDK7 中定义了本方法,用于返回本驱动的所有日志记录器的父记录器 * 个人也不是十分明确它的用法,毕竟很少会关注驱动的日志 */ public Logger getParentLogger() { return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME); } // 其他方法均为调用 driver 对应的方法,此处省略 } }
这里 DriverProxy 仅被注册到 DriverManager 中,这是一个代理操作,但源码上并没有什么特别的处理代码,我也不懂官方为什么在这里加代理,有谁明白的可以留言相互讨论。这里的其他方法也不是非常复杂,我都已经标有注释,应该都可以看懂,不再细说。
以上便是 UnpooledDataSource 的初始化驱动和获取连接关键代码。
PooledDataSource接下来我们来看最后一个类 PooledDataSource ,它也是直接实现 DataSource ,不过因为拥有池化的特性,它的代码复杂不少,当然效率比 UnpooledDataSource 会高出不少。
PooledDataSource 通过两个辅助类 PoolState 和 PooledConnection 来完成池化功能。
PoolState 是记录连接池运行时的状态,定义了两个 PooledConnection 集合用于记录空闲连接和活跃连接。
PooledConnection 内部定义了两个 Connection 分别表示一个真实连接和代理连接,还有一些其他字段用于记录一个连接的运行时状态。
先来详细了解一下 PooledConnection
/** * 此处使用默认的访问权限 * 实现了 InvocationHandler */ class PooledConnection implements InvocationHandler { private static final String CLOSE = "close"; private static final Class>[] IFACES = new Class>[] { Connection.class }; /** hashCode() 方法返回 */ private final int hashCode; private final Connection realConnection; private final Connection proxyConnection; // 省略 checkoutTimestamp、createdTimestamp、lastUsedTimestamp private boolean valid; /* * Constructor for SimplePooledConnection that uses the Connection and PooledDataSource passed in * * @param connection - the connection that is to be presented as a pooled connection * @param dataSource - the dataSource that the connection is from */ public PooledConnection(Connection connection, PooledDataSource dataSource) { this.hashCode = connection.hashCode(); this.realConnection = connection; this.dataSource = dataSource; this.createdTimestamp = System.currentTimeMillis(); this.lastUsedTimestamp = System.currentTimeMillis(); this.valid = true; this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this); } /* * 设置连接状态为不正常,不可使用 */ public void invalidate() { valid = false; } /* * 查看连接是否可用 * * @return 如果可用则返回 true */ public boolean isValid() { return valid && realConnection != null && dataSource.pingConnection(this); } /** * 自动上一次使用后经过的时间 */ public long getTimeElapsedSinceLastUse() { return System.currentTimeMillis() - lastUsedTimestamp; } /** * 存活时间 */ public long getAge() { return System.currentTimeMillis() - createdTimestamp; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) { // 对于 close() 方法,将连接放回池中 dataSource.pushConnection(this); return null; } else { try { if (!Object.class.equals(method.getDeclaringClass())) { checkConnection(); } return method.invoke(realConnection, args); } catch (Throwable t) { throw ExceptionUtil.unwrapThrowable(t); } } } private void checkConnection() throws SQLException { if (!valid) { throw new SQLException("Error accessing PooledConnection. Connection is invalid."); } } }
本类实现了 InvocationHandler 接口,这个接口是用于 JDK 动态代理的,在这个类的构造器中 proxyConnection 就是创建了此代理对象。
来看看 invoke() 方法,它拦截了 close() 方法,不再关闭连接,而是将其继续放入池中,然后其他已实现的方法则是每次调用都需要检测连接是否合法。
而 PoolState 类,这个类实际上没什么可说的,都是一些统计字段,没有复杂逻辑,不讨论; 需要注意该类是针对一个 PooledDataSource 对象统计的 。
也就是说 PoolState 的统计字段是关于整个数据源的,而一个 PooledConnection 则是针对单个连接的。
最后我们回过头来看 PooledDataSource 类,数据源的操作就只有两个,获取连接,释放连接,先来看看获取连接
public class PooledDataSource implements DataSource { private final UnpooledDataSource dataSource; @Override public Connection getConnection() throws SQLException { return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection(); } @Override public Connection getConnection(String username, String password) throws SQLException { return popConnection(username, password).getProxyConnection(); } /** * 获取一个连接 */ private PooledConnection popConnection(String username, String password) throws SQLException { boolean countedWait = false; PooledConnection conn = null; long t = System.currentTimeMillis(); int localBadConnectionCount = 0; // conn == null 也可能是没有获得连接,被通知后再次走流程 while (conn == null) { synchronized (state) { // 是否存在空闲连接 if (!state.idleConnections.isEmpty()) { // 池里存在空闲连接 conn = state.idleConnections.remove(0); } else { // 池里不存在空闲连接 if (state.activeConnections.size() < poolMaximumActiveConnections) { // 池里的激活连接数小于最大数,创建一个新的 conn = new PooledConnection(dataSource.getConnection(), this); } else { // 最坏的情况,无法获取连接 // 检测最早使用的连接是否超时 PooledConnection oldestActiveConnection = state.activeConnections.get(0); long longestCheckoutTime = oldestActiveConnection.getCheckoutTime(); if (longestCheckoutTime > poolMaximumCheckoutTime) { // 使用超时连接,对超时连接的操作进行回滚 state.claimedOverdueConnectionCount++; state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime; state.accumulatedCheckoutTime += longestCheckoutTime; state.activeConnections.remove(oldestActiveConnection); if (!oldestActiveConnection.getRealConnection().getAutoCommit()) { try { oldestActiveConnection.getRealConnection().rollback(); } catch (SQLException e) { /* * Just log a message for debug and continue to execute the following statement * like nothing happened. Wrap the bad connection with a new PooledConnection, * this will help to not interrupt current executing thread and give current * thread a chance to join the next competition for another valid/good database * connection. At the end of this loop, bad {@link @conn} will be set as null. */ log.debug("Bad connection. Could not roll back"); } } conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this); conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp()); conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp()); oldestActiveConnection.invalidate(); } else { // 等待可用连接 try { if (!countedWait) { state.hadToWaitCount++; countedWait = true; } long wt = System.currentTimeMillis(); state.wait(poolTimeToWait); state.accumulatedWaitTime += System.currentTimeMillis() - wt; } catch (InterruptedException e) { break; } } } } // 已获取连接 if (conn != null) { // 检测连接是否可用 if (conn.isValid()) { // 对之前的操作回滚 if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password)); conn.setCheckoutTimestamp(System.currentTimeMillis()); conn.setLastUsedTimestamp(System.currentTimeMillis()); // 激活连接池数+1 state.activeConnections.add(conn); state.requestCount++; state.accumulatedRequestTime += System.currentTimeMillis() - t; } else { // 连接坏掉了,超过一定阈值则抛异常提醒 state.badConnectionCount++; localBadConnectionCount++; conn = null; if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) { // 省略日志 throw new SQLException( "PooledDataSource: Could not get a good connection to the database."); } } } } } if (conn == null) { // 省略日志 throw new SQLException( "PooledDataSource: Unknown severe error condition. The connection pool returned a null connection."); } return conn; } }
while => 连接为空
能否直接从池里拿连接 => 可以则获取连接并返回
不能,查看池里的连接是否没满 => 没满则创建一个连接并返回
满了,查看池里最早的连接是否超时 => 超时则强制该连接回滚,然后获取该连接并返回
protected void pushConnection(PooledConnection conn) throws SQLException { // 同步操作 synchronized (state) { // 从活动池中移除连接 state.activeConnections.remove(conn); if (conn.isValid()) { // 不超过空闲连接数 并且连接是同一类型的连接 if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) { state.accumulatedCheckoutTime += conn.getCheckoutTime(); if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } // 废弃原先的对象 PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this); state.idleConnections.add(newConn); newConn.setCreatedTimestamp(conn.getCreatedTimestamp()); newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp()); // 该对象已经不能用于连接了 conn.invalidate(); if (log.isDebugEnabled()) { log.debug("Returned connection " + newConn.getRealHashCode() + " to pool."); } state.notifyAll(); } else { state.accumulatedCheckoutTime += conn.getCheckoutTime(); if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } // 关闭连接 conn.getRealConnection().close(); if (log.isDebugEnabled()) { log.debug("Closed connection " + conn.getRealHashCode() + "."); } conn.invalidate(); } } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection."); } state.badConnectionCount++; } } }
pingConnection() 执行一条 SQL 检测连接是否可用。
forceCloseAll() 回滚并关闭激活连接池和空闲连接池中的连接
