-
Notifications
You must be signed in to change notification settings - Fork 79
Open
Description
这份代码大部分借助于AI,十分粗略,不过可以用,只写了群聊的SQL查询语句,理论上私聊也可以。
需要解密并去除密码后的数据库
实测在普通笔记本上导出9000多条聊天记录只需要5~10s。
我计划配合Flet写个渲染框架,复现当时的聊天记录。
这是代码:
import base64
import json
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Union
import blackboxprotobuf
import threading
import sqlite3
def query_db(db_path, query):
"""
执行SQLite查询并将结果转换为字典列表,自动尝试用Protobuf解析字节类型字段
:param db_path: SQLite数据库文件路径
:param query: SQL查询语句
:return: 包含结果字典的列表,键为列名
"""
conn = sqlite3.connect(db_path)
try:
cursor = conn.cursor()
cursor.execute(query)
# 获取列名信息
columns = [col[0] for col in cursor.description] if cursor.description else []
results = []
# 遍历所有结果行
for row in cursor.fetchall():
processed_row = {}
for idx, value in enumerate(row):
col_name = columns[idx]
# 处理字节类型字段
if isinstance(value, bytes):
try:
# 尝试Protobuf解码
decoded, _ = blackboxprotobuf.decode_message(value)
processed_row[col_name] = decoded
except Exception as e:
# 解析失败保留原始值
print(e)
processed_row[col_name] = value
else:
processed_row[col_name] = value
results.append(processed_row)
return results
finally:
conn.close()
def write_list_to_json(list, json_file_name):
"""
将list写入到json文件
:param list:
:param json_file_name: 写入的json文件名字
:param json_file_save_path: json文件存储路径
:return:
"""
# os.chdir(json_file_save_path)
with open(json_file_name, "w") as f:
json.dump(list, f)
def recursive_process(
data: Union[dict, list], lock: threading.Lock
) -> Union[dict, list]:
"""
递归地处理字典或列表中的字节字符串。
"""
# 复制结构,避免修改原数据
new_data = data.copy() if isinstance(data, dict) else list(data)
# 处理字典
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, bytes):
# 替换字节字符串
new_data[key] = process_byte_string(value, lock)
elif isinstance(value, (dict, list)):
# 递归处理子结构
new_data[key] = recursive_process(value, lock)
# 处理列表
elif isinstance(data, list):
for i, value in enumerate(data):
if isinstance(value, bytes):
# 替换字节字符串
new_data[i] = process_byte_string(value, lock)
elif isinstance(value, (dict, list)):
# 递归处理子结构
new_data[i] = recursive_process(value, lock)
return new_data
def process_byte_string(byte_str: bytes, lock: threading.Lock) -> str:
"""
处理字节字符串,按照要求转换。
"""
try:
# 1. 尝试 UTF-8 解码
try:
utf8_result = byte_str.decode("utf-8")
return utf8_result
except UnicodeDecodeError:
# 如果 UTF-8 解码失败,尝试 ProtoBuf 解码
try:
decoded_message, _ = blackboxprotobuf.decode_message(byte_str)
return f"ProtoBuf decoded: {json.dumps(decoded_message)}"
except:
# ProtoBuf 解码失败,转为 Base64 编码
base64_result = base64.b64encode(byte_str).decode("utf-8")
return base64_result
finally:
pass
def process_data_multithread(data: list, num_threads: int = 5) -> list:
"""
使用多线程递归处理数据。
"""
lock = threading.Lock()
results = []
with ThreadPoolExecutor(max_workers=num_threads) as executor:
# 将数据拆分为多个子任务
futures = []
for chunk in data:
future = executor.submit(recursive_process, chunk, lock)
futures.append(future)
# 收集结果
for future in futures:
results.append(future.result())
return results
# 测试函数
# processed_data = process_data_multithread(input_data)
# print(json.dumps(processed_data, indent=2, ensure_ascii=False))
if __name__ == "__main__":
db_path = input("输入解密后的nt_msg相对于运行目录的路径:")
group_list = input("请输入需要导出的群号,多个群号用空格分隔:").split()
start_time_in = input("请输入开始时间(格式:2021-09-01 00:00:00):")
time_arr = time.strptime(start_time_in, "%Y-%m-%d %H:%M:%S")
start_time = time.mktime(time_arr) # 时间戳
end_time_in = input("请输入结束时间(格式:2021-09-01 00:00:00):")
time_arr = time.strptime(end_time_in, "%Y-%m-%d %H:%M:%S")
end_time = time.mktime(time_arr) # 时间戳
# # 测试用例
# db_path = "nt_db/nt_msg.body.db"
# group_list = []
results = query_db(
db_path,
f"""SELECT *
FROM group_msg_table
WHERE "40050" <> 0
AND "40050" >= {int(start_time)}
AND "40050" < {int(end_time)}
AND "40021" IN ('{"','".join(map(str, group_list))}')
ORDER BY "40050" ;""",
)
results1 = process_data_multithread(results)
with open(f"{'+'.join(map(str, group_list))}--S{start_time_in.replace(':', ';')}--E{end_time_in.replace(':', ';')}.json", "w") as f:
json.dump(results1, f, ensure_ascii=False, indent=4)顺便附上提示词:
我有一个sqlite3数据库,没有密码加密,请写一个Python函数,输入数据库路径和一条查询语句(字符串),返回字典编码的查询结果,键为列名。其中如果结果中值类型为byte,则尝试调用blackprotobuf库解析值并替换原来的值(相当于解密值)。
在一个Python程序中有一个列表,其中嵌套了许多字典和列表,它的一部分如下所示:
<字典数据>
请写一个Python函数,以多线程递归遍历检查字典的每一个值(检查到底,不能漏下),若检测为Byte字符串,则按照如下步骤转换并替换原值:
- UTF-8解码,如错误转2
- ProtoBuf解码,使用blackboxprotobuf库,如错误转3
- 将原Byte字符串Base64编码
(至少遍历检查3次)
shenapex and yumubi
Metadata
Metadata
Assignees
Labels
No labels