Sharding-JDBC 源码分析 —— SQL 执行
发布日期:2021-05-14 05:45:48 浏览次数:23 分类:精选文章

本文共 4983 字,大约阅读时间需要 16 分钟。

SQL ????????? Sharding-JDBC ? ExecutorEngine

????????????? SQL ?????????????????????? SQL ???Sharding-JDBC ?????????????????????????????????????????????? Sharding-JDBC ?????????ExecutorEngine????? SQL ???????????


1. ??

? SQL ???SQL ??? SQL ???????Sharding-JDBC ?????? SQL ???????????????????????????????????????????????? SQL ?????????? ExecutorEngine ??????????????????????


2. ExecutorEngine??????????

ExecutorEngine ?? Sharding-JDBC ????????????? SQL ???????????????????? SQL??????????????????? SQL ????????? ExecutorEngine ?????????????????????????? SQL?

2.1 ListeningExecutorService??????????

ExecutorEngine ??? ListeningExecutorService???????? ExecutorService ?????????????? ListenableFuture ????ListenableFuture ????? Future????????

  • ??????????????????????????????
  • ???????????????????????????

?? ListeningExecutorService?Sharding-JDBC ???????????????????? ExecutorEngine ???????

public ExecutorEngine(final int executorSize) {
executorService = MoreExecutors.listeningDecorator(
new ThreadPoolExecutor(
executorSize, executorSize, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()
)
);
MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
}

3. ????

???????ExecutorEngine ???????????????? shutdownNow() ????????????????? awaitTermination() ????????????????????????????? SQL ???

public void close() {
executorService.shutdownNow();
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
// ??????????????????
if (!executorService.isTerminated()) {
throw new ShardingJdbcException("ExecutorEngine can not been terminated");
}
}
}

4. ?? SQL ??

ExecutorEngine ?????????????executeStatement()?executePreparedStatement() ? executeBatch()???????????? SQL ??????????????? execute() ??????

4.1 ????

???????????????

  • ?????? SQL ??????????
  • ?????? SQL ????????????????????????
  • ????????????????????????
  • private List
    execute(final SQLType sqlType, final Collection
    baseStatementUnits, final List
    parameterSets, final ExecuteCallback
    executeCallback) {
    if (baseStatementUnits.isEmpty()) {
    return Collections.emptyList();
    }
    Iterator
    iterator = baseStatementUnits.iterator();
    BaseStatementUnit firstInput = iterator.next();
    // ????????
    T firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
    // ????????
    List
    > restFutures = asyncExecute(sqlType, iterator, parameterSets, executeCallback); List
    restOutputs = restFutures.get(); // ???? List
    result = new LinkedList<>(); result.add(0, firstOutput); result.addAll(restOutputs); return result; }

    5. Executor????????

    Executor ? Sharding-JDBC ?????????????????

  • StatementExecutor??????????
  • PreparedStatementExecutor???????????
  • BatchPreparedStatementExecutor?????????????
  • ???????????????BaseStatementUnit???????????? SQL ???


    6. ExecutionEvent??????????

    Sharding-JDBC ?? Guava ? EventBus ??????????????? SQL ???????????????

    • BEFORE_EXECUTE???????
    • EXECUTE_SUCCESS????????
    • EXECUTE_FAILURE????????

    ??????????????????

    @Subscribe
    @AllowConcurrentEvents
    public void listen(DMLExecutionEvent event) {
    System.out.println("DMLExecutionEvent?" + event.getSql() + "\t" + event.getEventExecutionType());
    }

    7. BestEffortsDeliveryListener?????????????

    BestEffortsDeliveryListener ????????????????????????????????????????????????????

    public void listen(DMLExecutionEvent event) {
    if (!isProcessContinuously()) {
    return;
    }
    // ????????????
    switch (event.getEventExecutionType()) {
    case BEFORE_EXECUTE:
    // ??????
    transactionLogStorage.add(new TransactionLog(...));
    break;
    case EXECUTE_SUCCESS:
    // ??????
    transactionLogStorage.remove(event.getId());
    break;
    case EXECUTE_FAILURE:
    // ?????????
    for (int i = 0; i < syncMaxDeliveryTryTimes; i++) {
    // ??????
    if (deliverySuccess) {
    break;
    }
    // ?????????
    conn = bedSoftTransaction.getConnection(...);
    preparedStatement = conn.prepareStatement(event.getSql());
    preparedStatement.executeUpdate();
    deliverySuccess = true;
    transactionLogStorage.remove(event.getId());
    }
    break;
    default:
    throw new UnsupportedOperationException(event.getEventExecutionType().toString());
    }
    }

    8. ???????????

    ????????????? connection.commit() ??????????????? connection.commit() ?????????????????????????????????


    ??????????????ExecutorEngine ? Sharding-JDBC ????? SQL ????????????????????????????????????????????????????????????????????????????????

    上一篇:他来了他来了,他带着云栖大会的免费门票走来了
    下一篇:起立,老师好

    发表评论

    最新留言

    不错!
    [***.144.177.141]2025年04月25日 15时03分21秒