[分享]分布式数据库中间件–(3) Cobar对简单select命令的处理过程_MySQL, Oracle及数据库讨论区_Weblogic技术|Tuxedo技术|中间件技术|Oracle论坛|JAVA论坛|Linux/Unix技术|hadoop论坛_联动北方技术论坛  
网站首页 | 关于我们 | 服务中心 | 经验交流 | 公司荣誉 | 成功案例 | 合作伙伴 | 联系我们 |
联动北方-国内领先的云技术服务提供商
»  游客             当前位置:  论坛首页 »  自由讨论区 »  MySQL, Oracle及数据库讨论区 »
总帖数
1
每页帖数
101/1页1
返回列表
0
发起投票  发起投票 发新帖子
查看: 2209 | 回复: 0   主题: [分享]分布式数据库中间件–(3) Cobar对简单select命令的处理过程        下一篇 
mchdba
注册用户
等级:上士
经验:254
发帖:15
精华:0
注册:2014-4-23
状态:离线
发送短消息息给mchdba 加好友    发送短消息息给mchdba 发消息
发表于: IP:您无权察看 2014-11-10 14:46:47 | [全部帖] [楼主帖] 楼主

上一篇中介绍了Cobar和客户端初次建立连接的过程,Cobar监听端口,客户端发起连接请求,Cobar发送握手数据包,客户端发送认证数据包最后根据认证的结果Cobar向客户端发送认证结果。 在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。 所以在客户端再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。下面详细分析一下简单select语句的执行过程。
1、事件的产生
NIOReactor的R线程一直在监听selector上的每个连接的感兴趣事件是否发生,当客户端发送了一条select * from tb1,select函数会返回,然后获取到该连接SelectionKey,并且该SelectKey的兴趣事件是OP_READ。此时会调用read(NIOConnection)函数。

public void run() {
      final Selector selector = this.selector;
      for (;;) {
            ++reactCount;
            try {
                  int res = selector.select();
                  LOGGER.debug(reactCount + ">>NIOReactor接受连接数:" + res);
                  register(selector);
                  Set<SelectionKey> keys = selector.selectedKeys();
                  try {
                        for (SelectionKey key : keys) {
                              Object att = key.attachment();
                              if (att != null && key.isValid()) {
                                    int readyOps = key.readyOps();
                                    if ((readyOps & SelectionKey.OP_READ) != 0) {
                                          LOGGER.debug("select读事件");
                                          read((NIOConnection) att);
                                          ..............................
                                    }
                                    ...........................
                              }
                        } ..................
                  } ............
            }
      }


 2、调用该连接的read函数进行处理
该函数在上一篇中提到过,该函数的实现在AbstractConnection中,实现从channel中读取数据到缓冲区,然后从缓冲区完整的取出整包数据交给FrontendConnection类的handle()函数处理。 该函数交给processor进行异步处理。从processor中的线程池获取一个线程来执行该任务。这里调用具体的handler来进行处理。 刚开始提到的,当认证成功后,Cobar将连接的回调处理函数设置为FrontendCommandHandler。所以这里会调用前端命令处理器的handler函数进行数据的处理。 在这里需要先了解MySQL数据包的格式: MySQL客户端命令请求报文

MySQL客户端命令请求报文
该处理函数如下:

public void handle(byte[] data) {
      LOGGER.info("data[4]:"+data[4]);
      switch (data[4]) {
            case MySQLPacket.COM_INIT_DB:
            commands.doInitDB();
            source.initDB(data);
            break;
            case MySQLPacket.COM_QUERY:
            commands.doQuery();
            source.query(data);
            break;
            case MySQLPacket.COM_PING:
            commands.doPing();
            source.ping();
            break;
            case MySQLPacket.COM_QUIT:
            commands.doQuit();
            source.close();
            break;
            case MySQLPacket.COM_PROCESS_KILL:
            commands.doKill();
            source.kill(data);
            break;
            case MySQLPacket.COM_STMT_PREPARE:
            commands.doStmtPrepare();
            source.stmtPrepare(data);
            break;
            case MySQLPacket.COM_STMT_EXECUTE:
            commands.doStmtExecute();
            source.stmtExecute(data);
            break;
            case MySQLPacket.COM_STMT_CLOSE:
            commands.doStmtClose();
            source.stmtClose(data);
            break;
            case MySQLPacket.COM_HEARTBEAT:
            commands.doHeartbeat();
            source.heartbeat(data);
            break;
            default:
            commands.doOther();
            source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
      }
}


  由于每个报文都有消息头,消息头固定的是4个字节,前3个字节是消息长度,后面的一个字节是报文序号,如下所示 mysql_protocol_struct 所以data[4]是第五个字节。也就是消息体的第一个字节。客户端向Cobar端发送的是命令报文,第一个字节是具体的命令。 如果是select语句,那么data[4]就是COM_QUERY,然后会调用具体连接的query成员函数,其定义在FrontendConnection类中。

public void query(byte[] data) {
      if (queryHandler != null) {
            // 取得语句
            MySQLMessage mm = new MySQLMessage(data);
            mm.position(5);
            String sql = null;
            try {
                  sql = mm.readString(charset);
            } catch (UnsupportedEncodingException e) {
                  writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
                  return;
            }
            if (sql == null || sql.length() == 0) {
                  writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Empty SQL");
                  return;
            }
            LOGGER.debug("解析的SQL语句:"+sql);
            // 执行查询
            queryHandler.query(sql);
      } else {
      writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Query unsupported!");
}
}


首先新建一个MySQLMessage对象,将数据包的索引位置定位到第6个字节位置处。然后将后面的所有的字节读取成指定编码格式的SQL语句,这里就形成了完整的SQL语句。 查询的时候Cobar控制台输出如下内容:

    11:35:33,392 INFO data[4]:3 11:35:33,392 DEBUG 解析的SQL语句:select * from tb2

解析出SQL语句后交给queryHandler处理。该对象是在新建连接的时候设置的ServerQueryHandler类,其实现的query函数如下:

public void query(String sql) {
      //这里就得到了完整的SQL语句,接收自客户端
      ServerConnection c = this.source;
      if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
      }
      //该函数对SQL语句的语法和语义进行分析,并返回SQL语句的对于类型,执行相应的操作
      int rs = ServerParse.parse(sql);
      switch (rs & 0xff) {
            .......................
            case ServerParse.SELECT:
            //select操作执行
            SelectHandler.handle(sql, c, rs >>> 8);
            break;
            .......................
      }
}


首先对SQL语句进程解析,通过parse函数对语句解析后返回语句类型的编号。 如果语句没有语法错误,则直接交给SelectHandler进行处理。如果是一般的select语句,则直接调用ServerConnection的execute执行sql

c.execute(stmt, ServerParse.SELECT);


在ServerConnection中的execute函数中需要进行路由检查,因为select的数据不一定在一个数据库中,需要按拆分的规则进行路由的检查。

// 路由计算
RouteResultset rrs = null;
try {
      rrs = ServerRouter.route(schema, sql, this.charset, this);
      LOGGER.debug("路由计算结果:"+rrs.toString());
}


具体的路由算法也是比较复杂,以后会专门分析。 Cobar的DEBUG控制台输出路由的计算结果如下:

    11:35:33,392 DEBUG 路由计算结果:select * from tb2, route={ 1 -> dnTest2.default{select * from tb2} 2 -> dnTest3.default{select * from tb2} }

该条SQL语句的select内容分布在dnTset2和dnTest3中,所以要分别向这两个数据库进行查询。 经过比较复杂的资源处理最后在每个后端数据库上执行函数execute0。

private void execute0(RouteResultsetNode rrn, Channel c, boolean autocommit, BlockingSession ss, int flag) {
      ServerConnection sc = ss.getSource();
      .........................
      try {
            // 执行并等待返回
            BinaryPacket bin = ((MySQLChannel) c).execute(rrn, sc, autocommit);
            // 接收和处理数据,执行到这里就说明上面的执行已经得到执行结果的返回
            final ReentrantLock lock = MultiNodeExecutor.this.lock;
            lock.lock();
            try {
                  switch (bin.data[0]) {
                        case ErrorPacket.FIELD_COUNT:
                        c.setRunning(false);
                        handleFailure(ss, rrn, new BinaryErrInfo((MySQLChannel) c, bin, sc, rrn));
                        break;
                        case OkPacket.FIELD_COUNT:
                        OkPacket ok = new OkPacket();
                        ok.read(bin);
                        affectedRows += ok.affectedRows;
                        // set lastInsertId
                        if (ok.insertId > 0) {
                              insertId = (insertId == 0) ? ok.insertId : Math.min(insertId, ok.insertId);
                        }
                        c.setRunning(false);
                        handleSuccessOK(ss, rrn, autocommit, ok);
                        break;
                        default: // HEADER|FIELDS|FIELD_EOF|ROWS|LAST_EOF
                        final MySQLChannel mc = (MySQLChannel) c;
                        if (fieldEOF) {
                              for (;;) {
                                    bin = mc.receive();
                                    switch (bin.data[0]) {
                                          case ErrorPacket.FIELD_COUNT:
                                          c.setRunning(false);
                                          handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));
                                          return;
                                          case EOFPacket.FIELD_COUNT:
                                          handleRowData(rrn, c, ss);
                                          return;
                                          default:
                                          continue;
                                    }
                              }
                        } else {
                        bin.packetId = ++packetId;// HEADER
                        List<MySQLPacket> headerList = new LinkedList<MySQLPacket>();
                        headerList.add(bin);
                        for (;;) {
                              bin = mc.receive();
                              switch (bin.data[0]) {
                                    case ErrorPacket.FIELD_COUNT:
                                    c.setRunning(false);
                                    handleFailure(ss, rrn, new BinaryErrInfo(mc, bin, sc, rrn));
                                    return;
                                    case EOFPacket.FIELD_COUNT:
                                    bin.packetId = ++packetId;// FIELD_EOF
                                    for (MySQLPacket packet : headerList) {
                                          buffer = packet.write(buffer, sc);
                                    }
                                    headerList = null;
                                    buffer = bin.write(buffer, sc);
                                    fieldEOF = true;
                                    handleRowData(rrn, c, ss);
                                    return;
                                    default:
                                    bin.packetId = ++packetId;// FIELDS
                                    switch (flag) {
                                          case RouteResultset.REWRITE_FIELD:
                                          StringBuilder fieldName = new StringBuilder();
                                          fieldName.append("Tables_in_").append(ss.getSource().getSchema());
                                          FieldPacket field = PacketUtil.getField(bin, fieldName.toString());
                                          headerList.add(field);
                                          break;
                                          default:
                                          headerList.add(bin);
                                    }
                              }
                        }
                  }
            }
      } finally {
      lock.unlock();
}
}//异常处理....................
}


这里真正的执行SQL语句,然后等待后端执行语句的返回数据,在成功获取后端Mysql返回的结果后,该函数返回的数据包是结果集数据包。 当客户端发起认证请求或命令请求后,服务器会返回相应的执行结果给客户端。客户端在收到响应报文后,需要首先检查第1个字节的值,来区分响应报文的类型。
响应报文类型     第1个字节取值范围
OK 响应报文     0×00
Error 响应报文     0xFF
Result Set 报文     0×01 – 0xFA
Field 报文     0×01 – 0xFA
Row Data 报文     0×01 – 0xFA
EOF 报文     0xFE
注:响应报文的第1个字节在不同类型中含义不同,比如在OK报文中,该字节并没有实际意义,值恒为0×00;而在Result Set报文中,该字节又是长度编码的二进制数据结构(Length Coded Binary)中的第1字节。 Result Set 消息分为五部分,结构如下:
结构     说明
[Result Set Header]     列数量
[Field]     列信息(多个)
[EOF]     列结束
[Row Data]     行数据(多个)
[EOF]     数据结束
函数执行完成后,返回的结果都放入LinkedList中,当读取结果完成后放入多节点执行器的缓冲区。如果buffer满了,就通过前端连接写出给客户端。

--转自 北京联动北方科技有限公司




赞(0)    操作        顶端 
总帖数
1
每页帖数
101/1页1
返回列表
发新帖子
请输入验证码: 点击刷新验证码
您需要登录后才可以回帖 登录 | 注册
技术讨论