应用场景:

1、数据库安全审计定义为 禁止某些命令 比如 drop mysqldump outfile intofile 这些平时正常程序用不到的指令。

2、数据库安全则定义为 虚拟用户,所有应用程序都使用虚拟用户连接数据库,提升数据库安全。

3、对查询出来的结果集进行正则替换,把敏感数据脱敏处理,比如身份证、手机号、邮箱、电话等

4、数据库安全还有就是过滤处理查询请求的SQL,修改后再发送查询,对SQL查询做用户的日志记录。

13.png
如图,所有手机号的中间4位都被隐藏为#号再显示出来。

中间件名称 mysql-proxy ,基于lua实现对 连接认证 SQL请求处理 结果集返回处理的相关

配置结构

/etc/mysql-proxy.cnf #基本配置,包括有用于主从日志权限的用户
/usr/local/mysql-proxy/lua/admin-sql.lua #默认运行配置相关,不需要修改
/usr/local/mysql-proxy/lua/rw-splitting.lua #核心业务逻辑,处理 请求 认证 结果集返回等所有业务
/usr/local/mysql-proxy/lib/mysql-proxy/lua/proxy/userlist.lua #虚拟用户列表,用于连接mysql但不存在于mysql真实用户中的
/usr/local/mysql-proxy/lib/mysql-proxy/lua/proxy/realuser.lua 虚拟用户共用的真实mysql用户密码
#sql请求日志是基于虚拟用户记录
/usr/local/mysql-proxy/lib/mysql-proxy/lua/proxy/userpass.lua #不使用虚拟用户认证的 用户名,当这里的用户名连接时直接抛给mysql去认证
/usr/local/mysql-proxy/lib/mysql-proxy/lua/proxy/nomask.lua #不需要脱敏的用户,当这类用户名进行sql查询时,不进行数据脱敏

rw-splitting.lua 核心代码

--[[ $%BEGINLICENSE%$
 Copyright (c) 2007, 2012, Oracle and/or its affiliates. All rights reserved.

 This program is free software; you can redistribute it and/or
 modify it under the terms of the GNU General Public License as
 published by the Free Software Foundation; version 2 of the
 License.

 This program is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 GNU General Public License for more details.

 You should have received a copy of the GNU General Public License
 along with this program; if not, write to the Free Software
 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 02110-1301  USA

 $%ENDLICENSE%$ --]]

---
-- a flexible statement based load balancer with connection pooling
--
-- * build a connection pool of min_idle_connections for each backend and maintain
--   its size
-- * 
-- 
-- 

local commands    = require("proxy.commands")
local tokenizer   = require("proxy.tokenizer")
local lb          = require("proxy.balance")
local auto_config = require("proxy.auto-config")

--- config
--
-- connection pool
if not proxy.global.config.rwsplit then
    proxy.global.config.rwsplit = {
        min_idle_connections = 4,
        max_idle_connections = 8,

        is_debug = false
    }
end

---
-- read/write splitting sends all non-transactional SELECTs to the slaves
--
-- is_in_transaction tracks the state of the transactions
local is_in_transaction       = false

-- if this was a SELECT SQL_CALC_FOUND_ROWS ... stay on the same connections
local is_in_select_calc_found_rows = false

--- 
-- get a connection to a backend
--
-- as long as we don't have enough connections in the pool, create new connections
--
function connect_server() 
    local is_debug = proxy.global.config.rwsplit.is_debug
    -- make sure that we connect to each backend at least ones to 
    -- keep the connections to the servers alive
    --
    -- on read_query we can switch the backends again to another backend

    if is_debug then
        print()
        print("[connect_server] " .. proxy.connection.client.src.name)
    end

    local rw_ndx = 0

    -- init all backends 
    for i = 1, #proxy.global.backends do
        local s        = proxy.global.backends[i]
        local pool     = s.pool -- we don't have a username yet, try to find a connections which is idling
        local cur_idle = pool.users[""].cur_idle_connections

        pool.min_idle_connections = proxy.global.config.rwsplit.min_idle_connections
        pool.max_idle_connections = proxy.global.config.rwsplit.max_idle_connections
        
        if is_debug then
            print("  [".. i .."].connected_clients = " .. s.connected_clients)
            print("  [".. i .."].pool.cur_idle     = " .. cur_idle)
            print("  [".. i .."].pool.max_idle     = " .. pool.max_idle_connections)
            print("  [".. i .."].pool.min_idle     = " .. pool.min_idle_connections)
            print("  [".. i .."].type = " .. s.type)
            print("  [".. i .."].state = " .. s.state)
        end

        -- prefer connections to the master 
        if s.type == proxy.BACKEND_TYPE_RW and
           s.state ~= proxy.BACKEND_STATE_DOWN and
           cur_idle < pool.min_idle_connections then
            proxy.connection.backend_ndx = i
            break
        elseif s.type == proxy.BACKEND_TYPE_RO and
               s.state ~= proxy.BACKEND_STATE_DOWN and
               cur_idle < pool.min_idle_connections then
            proxy.connection.backend_ndx = i
            break
        elseif s.type == proxy.BACKEND_TYPE_RW and
               s.state ~= proxy.BACKEND_STATE_DOWN and
               rw_ndx == 0 then
            rw_ndx = i
        end
    end

    if proxy.connection.backend_ndx == 0 then
        if is_debug then
            print("  [" .. rw_ndx .. "] taking master as default")
        end
        proxy.connection.backend_ndx = rw_ndx
    end

    -- pick a random backend
    --
    -- we someone have to skip DOWN backends

    -- ok, did we got a backend ?

    if proxy.connection.server then 
        if is_debug then
            print("  using pooled connection from: " .. proxy.connection.backend_ndx)
        end

        -- stay with it
        return proxy.PROXY_IGNORE_RESULT
    end

    if is_debug then
        print("  [" .. proxy.connection.backend_ndx .. "] idle-conns below min-idle")
    end

    -- open a new connection 
end



--用户帐号处理函数
function read_auth()
    local c = proxy.connection.client
    local s = proxy.connection.server
    local password =assert(require("mysql.password"))
    local proto = assert(require("mysql.proto"))
    
    -- 用户配置文件需支持热更新,设置为反复加载
    
    -- 虚拟用户列表
    package.loaded["proxy.userlist"] = nil
    local ud = require("proxy.userlist")
    -- 直连用户列表
    package.loaded["proxy.userpass"] = nil
    local pd = require("proxy.userpass")
    -- 直接用户配置
    package.loaded["proxy.realuser"] = nil
    local realuser = require("proxy.realuser")
    
    local check = 0
    
    for user, passwd in pairs(ud) do
        if user == c.username and password.scramble(s.scramble_buffer, password.hash(passwd)) == c.scrambled_password then
            check = 1
            break
        end
    end
    
    -- 检查跳过列表 即不使用虚拟帐号的 
    for user, passwd in pairs(pd) do
        if user == c.username then
            check = 2
            break
        end
    end
    
    
    if check == 0 then
        proxy.response.type = proxy.MYSQLD_PACKET_ERR
        proxy.response.errmsg ="error , code 111 !"
        return proxy.PROXY_SEND_RESULT
    end

    --如果二级帐号授权成功 则把连接交给真实帐号
    if check == 1 then
        local protocol_41_default_capabilities = 8 + 512 + 32768 
        proxy.queries:append(1,
                        proto.to_response_packet({
                                username = realuser['username'],
                                response = password.scramble(s.scramble_buffer, password.hash(realuser['password'])),
                                charset  = 8, -- default charset
                                database = c.default_db,
                                max_packet_size = 1 * 1024 * 1024,
                                server_capabilities=protocol_41_default_capabilities
                        })
                )

        return proxy.PROXY_SEND_QUERY
    end
    
end


---

--- 
-- put the successfully authed connection into the connection pool
--
-- @param auth the context information for the auth
--
-- auth.packet is the packet
function read_auth_result( auth )
    if is_debug then
        print("[read_auth_result] " .. proxy.connection.client.src.name)
    end
    if auth.packet:byte() == proxy.MYSQLD_PACKET_OK then
        -- auth was fine, disconnect from the server
        proxy.connection.backend_ndx = 0
    elseif auth.packet:byte() == proxy.MYSQLD_PACKET_EOF then
        -- we received either a 
        -- 
        -- * MYSQLD_PACKET_ERR and the auth failed or
        -- * MYSQLD_PACKET_EOF which means a OLD PASSWORD (4.0) was sent
        print("(read_auth_result) ... not ok yet");
    elseif auth.packet:byte() == proxy.MYSQLD_PACKET_ERR then
        -- auth failed
    end
end


--- 
-- read/write splitting
function read_query( packet )
    local is_debug = proxy.global.config.rwsplit.is_debug
    local cmd      = commands.parse(packet)
    local c        = proxy.connection.client

    local r = auto_config.handle(cmd)
    if r then return r end

    local tokens
    local norm_query

    --记录SQL请求 用户和请求的语句
    if cmd.type == proxy.COM_QUERY then
        str_log("query.log","{user:\""..c.username.."\",query:\""..cmd.query.."\"}")
    end

    -- looks like we have to forward this statement to a backend
    if is_debug then
        print("[read_query] " .. proxy.connection.client.src.name)
        print("  current backend   = " .. proxy.connection.backend_ndx)
        print("  client default db = " .. c.default_db)
        print("  client username   = " .. c.username)
        if cmd.type == proxy.COM_QUERY then 
            print("  query             = "        .. cmd.query)
        end
    end

    if cmd.type == proxy.COM_QUIT then
        -- don't send COM_QUIT to the backend. We manage the connection
        -- in all aspects.
        proxy.response = {
            type = proxy.MYSQLD_PACKET_OK,
        }
    
        if is_debug then
            print("  (QUIT) current backend   = " .. proxy.connection.backend_ndx)
        end

        return proxy.PROXY_SEND_RESULT
    end
    
    -- COM_BINLOG_DUMP packet can't be balanced
    --
    -- so we must send it always to the master
    if cmd.type == proxy.COM_BINLOG_DUMP then
        -- if we don't have a backend selected, let's pick the master
        --
        if proxy.connection.backend_ndx == 0 then
            proxy.connection.backend_ndx = lb.idle_failsafe_rw()
        end

        return
    end

    proxy.queries:append(1, packet, { resultset_is_needed = true })

    -- read/write splitting 
    --
    -- send all non-transactional SELECTs to a slave
    if not is_in_transaction and
       cmd.type == proxy.COM_QUERY then
        tokens     = tokens or assert(tokenizer.tokenize(cmd.query))

        local stmt = tokenizer.first_stmt_token(tokens)

        if stmt.token_name == "TK_SQL_SELECT" then
            is_in_select_calc_found_rows = false
            local is_insert_id = false

            for i = 1, #tokens do
                local token = tokens[i]
                -- SQL_CALC_FOUND_ROWS + FOUND_ROWS() have to be executed 
                -- on the same connection
                -- print("token: " .. token.token_name)
                -- print("  val: " .. token.text)
                
                if not is_in_select_calc_found_rows and token.token_name == "TK_SQL_SQL_CALC_FOUND_ROWS" then
                    is_in_select_calc_found_rows = true
                elseif not is_insert_id and token.token_name == "TK_LITERAL" then
                    local utext = token.text:upper()

                    if utext == "LAST_INSERT_ID" or
                       utext == "@@INSERT_ID" then
                        is_insert_id = true
                    end
                end

                -- we found the two special token, we can't find more
                if is_insert_id and is_in_select_calc_found_rows then
                    break
                end
            end

            -- if we ask for the last-insert-id we have to ask it on the original 
            -- connection
            if not is_insert_id then
                local backend_ndx = lb.idle_ro()

                if backend_ndx > 0 then
                    proxy.connection.backend_ndx = backend_ndx
                end
            else
                print("   found a SELECT LAST_INSERT_ID(), staying on the same backend")
            end
        end
    end

    -- no backend selected yet, pick a master
    if proxy.connection.backend_ndx == 0 then
        -- we don't have a backend right now
        -- 
        -- let's pick a master as a good default
        --
        proxy.connection.backend_ndx = lb.idle_failsafe_rw()
    end

    -- by now we should have a backend
    --
    -- in case the master is down, we have to close the client connections
    -- otherwise we can go on
    if proxy.connection.backend_ndx == 0 then
        return proxy.PROXY_SEND_QUERY
    end

    local s = proxy.connection.server

    -- if client and server db don't match, adjust the server-side 
    --
    -- skip it if we send a INIT_DB anyway
    if cmd.type ~= proxy.COM_INIT_DB and 
       c.default_db and c.default_db ~= s.default_db then
        print("    server default db: " .. s.default_db)
        print("    client default db: " .. c.default_db)
        print("    syncronizing")
        proxy.queries:prepend(2, string.char(proxy.COM_INIT_DB) .. c.default_db, { resultset_is_needed = true })
    end

    -- send to master
    if is_debug then
        if proxy.connection.backend_ndx > 0 then
            local b = proxy.global.backends[proxy.connection.backend_ndx]
            print("  sending to backend : " .. b.dst.name);
            print("    is_slave         : " .. tostring(b.type == proxy.BACKEND_TYPE_RO));
            print("    server default db: " .. s.default_db)
            print("    server username  : " .. s.username)
        end
        print("    in_trans        : " .. tostring(is_in_transaction))
        print("    in_calc_found   : " .. tostring(is_in_select_calc_found_rows))
        print("    COM_QUERY       : " .. tostring(cmd.type == proxy.COM_QUERY))
    end

    return proxy.PROXY_SEND_QUERY
end


function str_log(filename,str)
    local filename= "/usr/local/mysql-proxy/logs/"..os.date("%Y-%m-%d", os.time())..filename
    local file = io.open(filename,"a")
    if file == nil then
        print "file nil"
        return
    end
    file:write(str .."\n")
    file:flush()
    file:close()
end


---
-- as long as we are in a transaction keep the connection
-- otherwise release it so another client can use it
function read_query_result( inj ) 

    local is_debug = proxy.global.config.rwsplit.is_debug
    local res      = assert(inj.resultset)
      local flags    = res.flags
    proxy.response.resultset = {fields = {}, rows = {}}
    
    
    fn = 1
    local rows = inj.resultset.rows
    local fields = inj.resultset.fields
    while fields[fn] do    
        table.insert(proxy.response.resultset.fields, {type = proxy.MYSQL_TYPE_STRING, name = fields[fn].name})        
        fn = fn + 1
    end
    fn = fn -1
    
    -- 如果是不需要脱敏的帐号 则不脱敏
    package.loaded["proxy.nomask"] = nil
    local nm = require("proxy.nomask")
    local c = proxy.connection.client
    -- print ( 'result for user:'..c.username)
    local need_mask = 1
    for user, passwd in pairs(nm) do
        if user == c.username then
            need_mask = 0
            break
        end
    end
    
    
    
    for row in inj.resultset.rows do
        if need_mask == 1 then
            for i = 1, fn do
                --if fields[i].table =="blog_contents" then
                --    print (fields[i].name ..':'.. fields[i].type)
                --end
                
                --跳过text字段类型
                --if row[i] ~= nil and  fields[i].type ~= 252 then
                    --row[i] = string.gsub(row[i],"(%w*)(%w%w)(@)(%w%w)(%w*)","%1##%3##%5")
                    --row[i] = string.gsub(row[i],"1([3456789]\d{1})([3456789]\d{3})([3456789]\d{3})","1%1####%3")        
                --end
                if row[i] ~= nil then
                    row[i] = string.gsub(row[i],'([1]%d%d)(%d%d%d%d)(%d%d%d%d)','%1####%3')
                end
            end
        end
        table.insert(proxy.response.resultset.rows,row)
    end
    
    
    
    
    
    

    proxy.response.type = proxy.MYSQLD_PACKET_OK
    return proxy.PROXY_SEND_RESULT

    --    



    --[[if inj.id ~= 1 then
        -- ignore the result of the USE <default_db>
        -- the DB might not exist on the backend, what do do ?
        --
        if inj.id == 2 then
            -- the injected INIT_DB failed as the slave doesn't have this DB
            -- or doesn't have permissions to read from it
            if res.query_status == proxy.MYSQLD_PACKET_ERR then
                proxy.queries:reset()

                proxy.response = {
                    type = proxy.MYSQLD_PACKET_ERR,
                    errmsg = "can't change DB ".. proxy.connection.client.default_db ..
                        " to on slave " .. proxy.global.backends[proxy.connection.backend_ndx].dst.name
                }

                return proxy.PROXY_SEND_RESULT
            end
        end
        return proxy.PROXY_IGNORE_RESULT
    end


    is_in_transaction = flags.in_trans
    local have_last_insert_id = (res.insert_id and (res.insert_id > 0))



    if not is_in_transaction and 
       not is_in_select_calc_found_rows and
       not have_last_insert_id then
        -- release the backend
        proxy.connection.backend_ndx = 0
    elseif is_debug then
        print("(read_query_result) staying on the same backend")
        print("    in_trans        : " .. tostring(is_in_transaction))
        print("    in_calc_found   : " .. tostring(is_in_select_calc_found_rows))
        print("    have_insert_id  : " .. tostring(have_last_insert_id))
    end--]]

end

--- 
-- close the connections if we have enough connections in the pool
--
-- @return nil - close connection 
--         IGNORE_RESULT - store connection in the pool
function disconnect_client()
    local is_debug = proxy.global.config.rwsplit.is_debug
    if is_debug then
        print("[disconnect_client] " .. proxy.connection.client.src.name)
    end

    -- make sure we are disconnection from the connection
    -- to move the connection into the pool
    proxy.connection.backend_ndx = 0
end