首页 > mycat > Mycat心跳机制梳理
2019
05-14

Mycat心跳机制梳理

1、Mycat心跳检测

1) NIOProcessor 定时检测后端连接,侧重点,连接是否超时等等,见io.mycat.MycatServer

定时任务执行频率:processorCheckPeriod,默认值,1s执行一次

Mycat心跳机制梳理 - 第1张  | 技术人生-孙强

关键代码:

// 后端连接检查

private void backendCheck() {
        long sqlTimeout = MycatServer.getInstance().getConfig().getSystem().getSqlExecuteTimeout() * 1000L;
        Iterator<Entry<Long, BackendConnection>> it = backends.entrySet().iterator();
        while (it.hasNext()) {
            BackendConnection c = it.next().getValue();

            // 删除空连接
            if (c == null) {
                it.remove();
                continue;
            }
            // SQL执行超时的连接关闭
            if (c.isBorrowed() && c.getLastTime() < TimeUtil.currentTimeMillis() - sqlTimeout) {
                LOGGER.warn("found backend connection SQL timeout ,close it " + c);
                c.close("sql timeout");
            }

            // 清理已关闭连接,否则空闲检查。
            if (c.isClosed()) {
                it.remove();

            } else {
                // very important ,for some data maybe not sent
                if (c instanceof AbstractConnection) {
                    checkConSendQueue((AbstractConnection) c);
                }
                c.idleCheck();
            }
        }
    }

2)DataHost中空闲连接心跳检测

检测频率:dataNodeIdleCheckPeriod,默认值:5分钟,见io.mycat.MycatServer
Mycat心跳机制梳理 - 第2张  | 技术人生-孙强

关键代码:

for (PhysicalDBPool node : nodes.values()) {

node.heartbeatCheck(heartPeriod);

}

PhysicalDatasource.heatBeatCheck
Mycat心跳机制梳理 - 第3张  | 技术人生-孙强

上述代码主要就是遍历ConQueue(Mycat后端连接池存放的地方),选出需要进行心跳的连接,然后执行conHeartBeatHandler.doHeartBeat。

重点关注一下检测checkIfNeedHertBeat方法:

private void checkIfNeedHeartBeat(
            LinkedList<BackendConnection> heartBeatCons, ConQueue queue,
            ConcurrentLinkedQueue<BackendConnection> checkLis,
            long hearBeatTime, long hearBeatTime2) {
        int maxConsInOneCheck = 10;
        Iterator<BackendConnection> checkListItor = checkLis.iterator();
        while (checkListItor.hasNext()) {
            BackendConnection con = checkListItor.next();   // @1
            if (con.isClosedOrQuit()) {
                checkListItor.remove();
                continue;
            }
            if (validSchema(con.getSchema())) {
                if (con.getLastTime() < hearBeatTime
                        && heartBeatCons.size() < maxConsInOneCheck) {
                    checkListItor.remove(); // @2
                    // Heart beat check
                    con.setBorrowed(true);
                    heartBeatCons.add(con);    // @3
                }
            } else if (con.getLastTime() < hearBeatTime2) {
                // not valid schema conntion should close for idle
                // exceed 2*conHeartBeatPeriod
                checkListItor.remove();
                con.close(" heart beate idle ");
            }

        }

    }

上面的逻辑是,获取ConQueue中的所有连接(ConcurrentLinkedQueue<BackendConnection>),然后获取一个遍历器,

进行遍历,然后判断该连接是否需要加入到心跳检测中,如果符合,就先移除,然后加入到待检测列表中。

上述这个步骤,乍一看没什么问题,但如果你与获取连接一起考虑,就会发现问题,我们知道,获取连接,也是从ConQueue的(ConcurrentLinkedQueue<BackendConnection>中poll一个,这样就会产生这样一个问题,对于一个连接c,有可能放入到检测列表中,并同时分配给其他线程,造成一个连接,在同一时间,会又做心跳检测,又执行其他SQL,造成结果混乱。

分析上述代码出现场景:

t1 线程运行到代码@1,t2线程获取数据库连接,使用poll()方法,获取一个连接,有可能就是获取的这个连接正好与t1代码@1的连接一样,

然后t1运行到代码@2,将移除,但并不会返回成功与否,然后运行代码@3,将连接放入到待检测列表中,此时一个连接,同时被多个线程使用,并且,最后一个使用,会改变连接的响应处理器,造成第二个线程的响应处理器会获得第一个线程的响应结果。

3)检测频率:dataNodeHeartbeatPeriod,默认值10s,见io.mycat.MycatServer 【验证数据节点是否正常】,也就是readhost,writerhost的存活状态检测。
Mycat心跳机制梳理 - 第4张  | 技术人生-孙强

关键代码:MySQLDetector
Mycat心跳机制梳理 - 第5张  | 技术人生-孙强

Mycat心跳机制梳理 - 第6张  | 技术人生-孙强

上述代码引起了我一个疑问,为什么heartbeat方法没调用一次,就要新建一个SQLJob对象,为什么嗯?

我们先重点看一下看一下该方法的调用链:
Mycat心跳机制梳理 - 第7张  | 技术人生-孙强

一个PhysicalDatasource对应一个readerhost或writerhost。

跟踪代码发现,PhysicalDatasource拥有一个(private DBHeartbeat heartbeat),一个MySQLHeartbeat拥有一个MySQLDetector,

那为什么执行MySQLDetector时,确需要将SQLJob声明为volatile,并没次调用都新建一个对象,估计是因为每次心跳检查都需要一个独立的

OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler( fetchColms, this);故这里每次都新建了SQLJob。

此处,应该有一个可优化 的点:由于心跳连接运行比较频繁,默认10s一次,故此处连接的重用性问题值得思考。

上述BUG,是由我公司生产环境下跑出的,因为项目组反映,一条查询连接,竟然返回了心跳检测的响应结果:
Mycat心跳机制梳理 - 第8张  | 技术人生-孙强

附上本次代码修改后的代码:

PhysicalDatasource.checkIfNeedHeartBeat
private void checkIfNeedHeartBeat(
            LinkedList<BackendConnection> heartBeatCons, ConQueue queue,
            ConcurrentLinkedQueue<BackendConnection> checkLis,
            long hearBeatTime, long hearBeatTime2) {
        int maxConsInOneCheck = 10;
        Iterator<BackendConnection> checkListItor = checkLis.iterator();
        while (checkListItor.hasNext()) {
            BackendConnection con = checkListItor.next();
            if (con.isClosedOrQuit()) {
                checkListItor.remove();
                continue;
            }
            if (validSchema(con.getSchema())) {
                if (con.getLastTime() < hearBeatTime
                        && heartBeatCons.size() < maxConsInOneCheck) {

                    if(checkLis.remove(con)) {
                        //如果移除成功,则放入到心跳连接中,如果移除失败,说明该连接已经被其他线程使用,忽略本次心跳检测
                        con.setBorrowed(true);
                        heartBeatCons.add(con);
                    }
//                  checkListItor.remove();
//                  // Heart beat check
//                  con.setBorrowed(true);
//                  heartBeatCons.add(con);

                }
            } else if (con.getLastTime() < hearBeatTime2) {
                // not valid schema conntion should close for idle
                // exceed 2*conHeartBeatPeriod
//              checkListItor.remove();
//              con.close(" heart beate idle ");

                // 同样,这里也需要先移除,避免被业务连接
                if(checkLis.remove(con)) {
                    con.close(" heart beate idle ");
                }
            }

        }

    }

本文主要梳理一下Mycat主要心跳逻辑,修复了一个并发BUG,对SQLJob的优化目前还在思考中,改动代码会比较多。

最后编辑:
作者:sunny5156
喜欢技术....

留下一个回复