1.简单粗暴的方法--对mysql库进行封装要统计一个执行过程, 就需要知道这个执行过程的开始位置和结束位置, 所以最简单粗暴的方法就是基于要调用的方法进行封装,在框架调用mysql库和mysql库中间实现一个中间层, 在中间层完成耗时统计,如:
# 伪代码def my_execute(conn, sql, param): # 针对mysql库的统计封装组件 with mytracer(conn, sql, param): # 以下为正常使用mysql库的代码with conn.cursor as cursor: cursor.execute(sql, param)...
看样子实现起来非常不错, 而且更改非常方便, 但由于是在最顶层的api上进行修改, 其实是非常不灵活的, 同时在cursor.execute里会进行一些预操作, 如把sql和param进行拼接, 调用nextset清除当前游标的数据等等。我们最后拿到的数据如时间耗时也是不准确的, 同时也没办法得到一些详细的元数据, 如错误码等等.
如果要拿到最直接有用的数据,就只能去改源代码, 然后再调用源代码了, 但是如果每个库都需要改源代码才能统计, 那也太麻烦了, 好在python也提供了一些类似探针的接口, 可以通过探针把库的源码进行替换完成我们的代码.
2.python的探针在python中可以通过sys.meta_path来实现import hook的功能, 当执行 import 相关操作时, 会根据sys.meta_path定义的对象对import相关库进行更改.sys.meta_path中的对象需要实现一个find_module方法, 这个find_module方法返回none或一个实现了load_module方法的对象, 我们可以通过这个对象, 针对一些库在import时, 把相关的方法进行替换, 简单用法如下,通过hooktime.sleep让他在sleep的时候能打印消耗的时间.
import importlibimport sysfrom functools import wrapsdef func_wrapper(func): """这里通过一个装饰器来达到狸猫换太子和获取数据的效果""" @wraps(func) def wrapper(*args, **kwargs): # 记录开始时间 start = time.time() result = func(*args, **kwargs) # 统计消耗时间 end = time.time() print(f"speed time:{end - start}") return result return wrapperclass metapathfinder: def find_module(self, fullname, path=none): # 执行时可以看出来在import哪些模块 print(f'find module:{path}:{fullname}') return metapathloader()class metapathloader: def load_module(self, fullname): # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder = sys.meta_path.pop(0) # 导入 module module = importlib.import_module(fullname) if fullname == 'time': # 替换函数 module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0, finder) return modulesys.meta_path.insert(0, metapathfinder())if __name__ == '__main__': import time time.sleep(1)# 输出示例:# find module:datetime# find module:time# load module:time# find module:math# find module:_datetime# speed time:1.00073385238647468
3.制作探针模块了解完了主要流程后, 可以开始制作自己的探针模块了, 由于示例只涉及到aiomysql模块, 那么在metapathfinder.find_module中需要只对aiomysql模块进行处理, 其他的先忽略. 然后我们需要确定我们要把aiomysql的哪个功能给替换, 从业务上来说, 一般情况下我们只要cursor.execute, cursor.fetchone, cursor.fetchall, cursor.executemany这几个主要的操作,所以需要深入cursor看看如何去更改代码, 后者重载哪个函数.
先cursor.execute的源码(cursor.executemanay也类似), 发现会先调用self.nextset的方法, 把上个请求的数据先拿完, 再合并sql语句, 最后通过self._query进行查询:
async def execute(self, query, args=none): """executes the given operation executes the given operation substituting any markers with the given parameters. for example, getting all rows where id is 5: cursor.execute("select * from t1 where id = %s", (5,)) :param query: ``str`` sql statement :param args: ``tuple`` or ``list`` of arguments for sql query :returns: ``int``, number of rows that has been produced of affected """ conn = self._get_db() while (await self.nextset()): pass if args is not none: query = query % self._escape_args(args, conn) await self._query(query) self._executed = query if self._echo: logger.info(query) logger.info("%r", args) return self._rowcount
再看cursor.fetchone的源码(cursor.fetchall也类似), 发现其实是从缓存中获取数据,
这些数据在执行cursor.execute中就已经获取了:
def fetchone(self): """fetch the next row """ self._check_executed() fut = self._loop.create_future() if self._rows is none or self._rownumber >= len(self._rows): fut.set_result(none) return fut result = self._rows[self._rownumber] self._rownumber += 1 fut = self._loop.create_future() fut.set_result(result) return fut
综合上面的分析, 我们只要对核心的方法self._query进行重载即可拿到我们要的数据, 从源码中我们可以知道, 我们能获取到传入self._query的self和sql参数, 根据self又能获取到查询的结果, 同时我们通过装饰器能获取到运行的时间, 要的数据基本都到齐了,
按照思路修改后的代码如下:
import importlibimport timeimport sysfrom functools import wrapsfrom typing import cast, any, callable, optional, tuple, type_checkingfrom types import moduletypeif type_checking: import aiomysqldef func_wrapper(func: callable): @wraps(func) async def wrapper(*args, **kwargs) -> any: start: float = time.time() func_result: any = await func(*args, **kwargs) end: float = time.time() # 根据_query可以知道, 第一格参数是self, 第二个参数是sql self: aiomysql.cursor = args[0] sql: str = args[1] # 通过self,我们可以拿到其他的数据 db: str = self._connection.db user: str = self._connection.user host: str = self._connection.host port: str = self._connection.port execute_result: tuple[tuple] = self._rows # 可以根据自己定义的agent把数据发送到指定的平台, 然后我们就可以在平台上看到对应的数据或进行监控了, # 这里只是打印一部分数据出来 print({ "sql": sql, "db": db, "user": user, "host": host, "port": port, "result": execute_result, "speed time": end - start }) return func_result return cast(callable, wrapper)class metapathfinder: @staticmethod def find_module(fullname: str, path: optional[str] = none) -> optional["metapathloader"]: if fullname == 'aiomysql': # 只有aiomysql才进行hook return metapathloader() else: return noneclass metapathloader: @staticmethod def load_module(fullname: str): if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder: "metapathfinder" = sys.meta_path.pop(0) # 导入 module module: moduletype = importlib.import_module(fullname) # 针对_query进行hook module.cursor._query = func_wrapper(module.cursor._query) sys.meta_path.insert(0, finder) return moduleasync def test_mysql() -> none: import aiomysql pool: aiomysql.pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='123123', db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("select 42;") (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed()if __name__ == '__main__': sys.meta_path.insert(0, metapathfinder()) import asyncio asyncio.run(test_mysql())# 输出示例:# 可以看出sql语句与我们输入的一样, db, user, host, port等参数也是, 还能知道执行的结果和运行时间# {'sql': 'select 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.00045609474182128906}
这个例子看来很不错, 但是需要在调用的入口处显式调用该逻辑, 通常一个项目可能有几个入口, 每个入口都显示调用该逻辑会非常麻烦, 而且必须先调用我们的hook逻辑后才能import, 这样就得订好引入规范, 不然就可能出现部分地方hook不成功, 如果能把引入hook这个逻辑安排在解析器启动后马上执行, 就可以完美地解决这个问题了. 查阅了一翻资料后发现,python解释器初始化的时候会自动import pythonpath下存在的sitecustomize和usercustomize模块, 我们只要创建该模块, 并在模块里面写入我们的 替换函数即可。
.├── __init__.py├── hook_aiomysql.py├── sitecustomize.py└── test_auto_hook.py
hook_aiomysql.py是我们制作探针的代码为例子, 而sitecustomize.py存放的代码如下, 非常简单, 就是引入我们的探针代码, 并插入到sys.meta_path:
import sysfrom hook_aiomysql import metapathfindersys.meta_path.insert(0, metapathfinder())
test_auto_hook.py则是测试代码:
import asynciofrom hook_aiomysql import test_mysqlasyncio.run(test_mysql())
接下来只要设置pythonpath并运行我们的代码即可(如果是项目的话一般交由superisor启动,则可以在配置文件中设置好pythonpath):
(.venv) ➜ python_hook git:(master) ✗ export pythonpath=. (.venv) ➜ python_hook git:(master) ✗ python test_auto_hook.py {'sql': 'select 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000213623046875}
4.直接替换方法可以看到上面的方法很好的运行了, 而且可以很方便的嵌入到我们的项目中, 但是依赖与sitecustomize.py文件很难让他抽离成一个第三方的库, 如果要抽离成第三方的库就得考虑看看有没有其他的方法。在上面介绍metapathloader时说到了sys.module, 在里面通过sys.modules来减少重复引入:
class metapathloader: def load_module(self, fullname): # import的模块都会存放在sys.modules里面, 通过判断可以减少重复import if fullname in sys.modules: return sys.modules[fullname] # 防止递归调用 finder = sys.meta_path.pop(0) # 导入 module module = importlib.import_module(fullname) if fullname == 'time': # 替换函数 module.sleep = func_wrapper(module.sleep) sys.meta_path.insert(0, finder) return module
这个减少重复引入的原理是, 每次引入一个模块后, 他就会存放在sys.modules, 如果是重复引入, 就会直接刷新成最新引入的模块。上面之所以会考虑到减少重复import是因为我们不会在程序运行时升级第三方库的依赖。利用到我们可以不考虑重复引入同名不同实现的模块, 以及sys.modules会缓存引入模块的特点, 我们可以把上面的逻辑简化成引入模块->替换当前模块方法为我们修改的hook方法。
import timefrom functools import wrapsfrom typing import any, callable, tuple, castimport aiomysqldef func_wrapper(func: callable): """和上面一样的封装函数, 这里简单略过"""# 判断是否hook过_is_hook: bool = false# 存放原来的_query_query: callable = aiomysql.cursor._query# hook函数def install_hook() -> none: _is_hook = false if _is_hook: return aiomysql.cursor._query = func_wrapper(aiomysql.cursor._query) _is_hook = true# 还原到原来的函数方法def reset_hook() -> none: aiomysql.cursor._query = _query _is_hook = false
代码简单明了,接下来跑一跑刚才的测试:
import asyncioimport aiomysqlfrom demo import install_hook, reset_hookasync def test_mysql() -> none: pool: aiomysql.pool = await aiomysql.create_pool( host='127.0.0.1', port=3306, user='root', password='', db='mysql' ) async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("select 42;") (r,) = await cur.fetchone() assert r == 42 pool.close() await pool.wait_closed()print("install hook")install_hook()asyncio.run(test_mysql())print("reset hook")reset_hook()asyncio.run(test_mysql())print("end")
通过测试输出可以发现我们的逻辑的正确的, install hook后能出现我们提取的元信息, 而reset后则不会打印原信息
install hook{'sql': 'select 42;', 'db': 'mysql', 'user': 'root', 'host': '127.0.0.1', 'port': 3306, 'result': ((42,),), 'speed time': 0.000347137451171875}reset hookend
以上就是python探针怎么完成调用库的数据提取的详细内容。