木鼠子 ⅱ Python Ver. ~ 木卫二 ~

         _,met$$$$$gg.           pi@raspberrypi
      ,g$$$$$$$$$$$$$$$P.        OS: Debian 12 bookworm
    ,g$$P""       """Y$$.".      Kernel: aarch64 Linux 6.1.0-rpi4-rpi-v8
   ,$$P'              `$$$.      Uptime: 20h 55m
  ',$$P       ,ggs.     `$$b:    Packages: 615
  `d$$'     ,$P"'   .    $$$     Shell: bash 5.2.15
   $$P      d$'     ,    $$P     Disk: 3.0G / 30G (11%)
   $$:      $$.   -    ,d$$'     CPU: BCM2835 @ 4x 1.8GHz
   $$\;      Y$b._   _,d$P'      RAM: 272MiB / 7811MiB
   Y$$.    `.`"Y$$$$P"'
   `$$b      "-.__
    `Y$$
     `Y$$.
       `$$b.
         `Y$$b.
            `"Y$b._
                `""""

这里是今天也在树莓派4B上被修出新bug的骰娘木鼠子きねずみこ

木鼠子是基于pykinezumiko制作的骰娘,未公开。不过,要是找到了木鼠子的话,就跟木鼠子加个好友吧。

技术信息

pykinezumiko是OneBot SDK。它和NoneBot等框架有着类似的作用,只是设计毫无保留地偏向开发的快乐。

要部署的话,首先要获取go-cqhttp(唯一经过测试的OneBot实现)的二进制发行版,放在任意位置运行,在生成配置文件时选择使用HTTP通信。在配置中请确认下列信息:

额外需要的材料列表以诗歌形式提供,因此不必手动安装,可交由pip处理。因为使用了发行版往往不提供的库,不推荐全局安装,请使用喜爱的方式创建虚拟环境后执行pip install pykinezumiko

使用开发分支
pip install --upgrade --editable git+https://github.com/satgo1546/pykinezumiko.git#egg=pykinezumiko

在requirements.txt中的写法:

-e git+https://github.com/satgo1546/pykinezumiko.git#egg=pykinezumiko

pykinezumiko没有所谓的配置文件,所有配置都基于入口程序充满副作用的导入。某种意义上,pykinezumiko是个库;副作用却无情地将事实扭曲成别样。

#!/usr/bin/env python3
"""消息处理端的入口。

请将本脚本保存为main.py。
"""

# conf模块中的配置项可在导入后修改。
import pykinezumiko.conf
# 少量紧急情况也需要使用的信息必须正确配置。
pykinezumiko.conf.BACKSTAGE = -979976910

# 导入即注册。通过导入顺序决定插件加载顺序。
# 下面加载了部分示例插件。
# 除了阅读Plugin类的文档,不要忘了可以跟随典例从实践中学习插件的制作方法。
import pykinezumiko.plugins.demo
import pykinezumiko.plugins.jrrp

# 在此加载自定义插件、当场创建插件,或者像下述这样,从指定文件夹下加载所有模块。
# 因为会按文件名顺序加载,所以可在文件名开头标注数字以指定插件加载顺序。
import os, importlib
for name in sorted(
    entry.name.removesuffix(".py")
    for entry in os.scandir("plugins")
    if entry.name.endswith(".py")
    and entry.name.count(".") == 1
    or entry.is_dir()
    and "." not in entry.name
):
    importlib.import_module("plugins." + name, ".")

# 导入app时就会启动服务循环。
import pykinezumiko.app

运行方法是在两个窗口分别启动go-cqhttp(./go-cqhttp)和消息处理端(python main.py)。

因为重启go-cqhttp需要重新发送登录信息,甚至重新扫码(session.token失效的场合),所以尽量少重启go-cqhttp。

消息处理端服务器启动时不断尝试监听目标端口,因此可以同时打开两个消息处理程序,甲成功监听的话,乙就会因端口被占用而被挡在门外。甲挂掉后,乙自然接管。这便是自我更新的原理:执行git pull之后,启动新的消息处理程序,再退出自己。

木鼠子——在生产环境使用调试模式实现重要特性的先驱者

以下功能在梦里有

因为在x86-64电脑上没法运行ARM64的程序,即使替换成x86-64版本的程序来尝试运行也势必会把树莓派上正在工作的踢下线,所以有个办法不需要真的登录QQ也能运行起一部分代码。使用类似Matcha的方法创建一个假的OneBot实现,就可以调试任何OneBot应用了。可惜Matcha不支持HTTP连接,这里没法直接使用。

clock todos

鼠:一直很想要但没做的功能

原理

pykinezumiko/__init__.py
import inspect
import os
import re
import time
from bisect import bisect_left
from collections import OrderedDict
from collections.abc import Generator
from typing import Any, Callable, ClassVar, Never, TypeVar, overload

import requests

# 即使没有在本文件中用到,也要保留这些子模块的重新导出,以便在import pykinezumiko时就导入子模块。
from . import humanity, conf, docstore

CallableT = TypeVar("CallableT", bound=Callable)


class Plugin:
    """所有插件的基类。

    继承此类且没有子类的类将自动被视为插件而实例化。
    通过覆盖以“on_”开头的方法,可以监听事件。
    因为这些方法都是空的,不必在覆盖的方法中调用super。
    可以在事件处理中调用基类中的方法来作出行动。

    事件包含整数型的context和sender参数。
    正数表示好友,负数表示群。
    context表示消息来自哪个会话。要回复的话请发送到这里。
    sender表示消息的发送者。除了少数无来源的群通知事件,都会是正值。
    例如,私聊的场合,有context == sender > 0;群聊消息则满足context < 0 < sender。

    通常,当插件处理了事件(例如回复了消息),就要返回真值。
    为方便计,可以直接返回要回复的文字,与执行send函数无异。
    返回None的场合,表示插件无法处理这个事件。该事件会轮替给下一个插件来处理。
    """

    _name_cache: ClassVar[dict[int | tuple[int, int], str]] = {}
    """在name方法内部使用的名称缓存。若想在对话中包含某人的名称,请使用name方法。

    从context到好友名或群聊名的映射,以及从(context, sender)到群名片的映射。
    """

    def __init__(self) -> None:
        self.flows: OrderedDict[
            tuple[int, int], tuple[float, Generator[object, str | None, object]]
        ] = OrderedDict()
        """尚在进行的对话流程。

        从(context, sender)到(最后活动时间戳, 程序执行状态)的映射,按最后活动时间从早到晚排序。
        """

    KNOWN_ENTITIES = {
        "face": ["id"],
        "image": ["url", "type", "subType"],
        "record": ["url", "magic"],
        "at": ["qq"],
        "share": ["url", "title", "content", "image"],
        "reply": ["id", "seq"],
        "poke": ["qq"],
        "forward": ["id"],
        "xml": ["resid", "data"],
        "json": ["resid", "data"],
    }
    """unescape方法会将部分已知的控制序列命名参数按此处的顺序排列,以便依靠正则表达式匹配接收到的文本。"""

    @classmethod
    def unescape(cls, raw_message: str) -> str:
        r"""转换传入的CQ码到更不容易遇到转义问题的木鼠子码。

        CQ码表示为"[CQ:face,id=178]"的消息会被转换为"\x9dface\0id=178\x9c"。
        基本上,"["对应"\x9d","]"对应"\x9c",","对应"\0"。
        通过使用莫名其妙的控制字符,使控制序列与常规文本冲突的可能性降到极低。
        当输入确实包含"\x9d"和"\x9c"时就完蛋了,到那时再自求多福吧。

        【已否决的设计】
        [CQ:控制序列,参数名=参数值,参数名=参数值]
            由酷Q设计,在QQ机器人界,这种格式十分流行。
            但是,因为占用了方括号和逗号字符,必须小心处理转义问题。
            转义序列以"&"开头。
            保守的转义处理方式导致任何包含逗号和"&"的消息都被大量改动。
            要处理英语文本、网址、JSON字面量时,就不得不面对解析。

        \e<控制序列 参数值 参数值>
            这是TenshitMirai的格式。
            因为使用了U+001B这一控制字符而几乎不会遇到普通文本消息被转义问题。
            因为"<"和">"被HTML占用,被列为URL中禁止使用的字符,因此参数是网址也没有问题。
            mirai提供面向对象的消息接口,因此不得不花费很多代码来序列化到纯文本和从纯文本反序列化。
            mirai也提供类似CQ码的mirai码,也有和CQ码一样的问题。
            Ruby和GCC支持"\e"作为"\033"的别名,但包括Python在内的许多其他地方并不支持。
            直接输出到终端的时候可能会把终端状态搞乱。

        \e]114514;控制序列;参数值;参数值\e\\
            xterm提出了"\e]"开始、"\e\\"或"\a"终止的终端控制序列。
            参数通常用分号隔开,如果参数值中有分号就完了。
            使用一个随便的数字,就能保证在输出到终端的时候隐藏控制序列。

        ❰控制序列❚参数值❚参数值❱
            为什么不试试神奇的Unicode呢?
        """

        def replacer(match: re.Match[str]) -> str:
            name, _, args = match.group(1).partition(",")
            args = args.split(",") if args else []
            args = dict(x.partition("=")[::2] for x in args)
            # 试图用itertools.chain改写下列对extend的调用会导致……
            # args.items在之前args.pop被调用,引发“迭代时字典大小变化”的运行时异常。
            ret = [name]
            ret.extend(
                f"{k}={args.pop(k, '')}" for k in cls.KNOWN_ENTITIES.get(name, ())
            )
            ret.extend(f"{k}={v}" for k, v in args.items())
            return "\x9d" + "\0".join(ret) + "\x9c"

        return (
            re.sub(r"\[CQ:(.*?)\]", replacer, raw_message, re.DOTALL)
            .replace("&#91;", "[")
            .replace("&#93;", "]")
            .replace("&#44;", ",")
            .replace("&amp;", "&")
        )

    @staticmethod
    def escape(text: str) -> str:
        """转换unescape函数所用的木鼠子码到CQ码。"""

        def replacer(match: re.Match[str]) -> str:
            return match.group().replace(",", "&#44;")

        return re.sub(r"\x9d[^\x9d\x9c]*\x9c", replacer, text).translate(
            {
                ord("&"): "&amp;",
                91: "&#91;",
                93: "&#93;",
                0x9D: "[CQ:",
                0x9C: "]",
                0: ",",
            }
        )

    @staticmethod
    def gocqhttp(endpoint: str, data: dict = {}, **kwargs) -> dict:
        """向go-cqhttp发送请求,并返回响应数据。

        关于具体参数,必须参考go-cqhttp的API文档。
        https://docs.go-cqhttp.org/api/

        使用例:

        - 发送私聊消息

            gocqhttp("send_private_msg", user_id=114514, message="你好")

        - 获取当前登录账号的昵称

            gocqhttp("get_login_info")["nickname"]
        """
        kwargs.update(data)
        data = requests.post(
            f"http://127.0.0.1:5700/{endpoint}",
            headers={"Content-Type": "application/json"},
            json=kwargs,
        ).json()
        if data["status"] == "failed":
            raise Exception(data["msg"], data["wording"])
        return data["data"] if "data" in data else {}

    @classmethod
    def send(cls, context: int, message: str) -> None:
        """发送消息。

        :param context: 发送目标,正数表示好友,负数表示群。
        :param message: 要发送的消息内容,富文本用木鼠子码表示。
        """
        cls.gocqhttp(
            "send_msg",
            {"user_id" if context >= 0 else "group_id": abs(context)},
            message=cls.escape(message),
        )

    @classmethod
    def send_file(cls, context: int, filename: str, name: str | None = None) -> None:
        """发送文件。

        :param context: 发送目标。
        :param filename: 本机文件路径。
        :param name: 发送时显示的文件名。默认为路径中指定的文件名。
        """
        name = name or os.path.basename(filename)
        filename = os.path.realpath(filename)
        if context >= 0:
            cls.gocqhttp(
                "upload_private_file", user_id=context, file=filename, name=name
            )
        else:
            cls.gocqhttp(
                "upload_group_file", group_id=-context, file=filename, name=name
            )

    def on_event(self, context: int, sender: int, data: dict[str, Any]) -> bool:
        """接收事件并调用对应的事件处理方法。

        :param data: 来自go-cqhttp的上报数据。
        :returns: True表示事件已受理,不应再交给其他插件;False表示应继续由其他插件处理本事件。
        """
        result: object = None
        # https://docs.go-cqhttp.org/event/
        if data["post_type"] == "message":
            # 这个类型的上报只有好友消息和群聊消息两种。
            message = self.unescape(data["raw_message"])
            # 强制终止超过一天仍未结束的对话流程。
            while (
                self.flows and next(iter(self.flows.values()))[0] < time.time() - 86400
            ):
                self.flows.popitem(last=False)
            # 如果当前上下文中的发送者没有仍在进行的对话流程,有可能因本条消息启动新的对话流程。
            if (context, sender) not in self.flows:
                result = self.dispatch_command(
                    context, sender, message, data["message_id"]
                )
                # 是否启动了新的对话流程?
                if isinstance(result, Generator):
                    self.flows[context, sender] = time.time(), result
                    message = None  # 向Generator首次send的值必须为None
            # 当前上下文中的发送者有无仍在进行(或上面刚启动)的对话流程?
            if (context, sender) in self.flows:
                try:
                    generator = self.flows[context, sender][1]
                    result = generator.send(message)
                    if result:
                        self.flows[context, sender] = time.time(), generator
                        self.flows.move_to_end((context, sender))
                except StopIteration as e:
                    result = e.value
                    del self.flows[context, sender]
        elif data["post_type"] == "request":
            # 这个类型的上报只有申请添加好友和申请加入群聊两种。
            result = self.on_admission(context, sender, data["comment"])
            if result is not None:
                if data["request_type"] == "friend":
                    self.gocqhttp(
                        "set_friend_add_request", flag=data["flag"], approve=result
                    )
                elif data["request_type"] == "group":
                    self.gocqhttp(
                        "set_group_add_request",
                        flag=data["flag"],
                        type=data["sub_type"],
                        approve=result,
                    )
                result = True
        elif data["post_type"] == "meta_event":
            # 这个类型的上报包含心跳等杂项事件。仅OneBot文档中有说明,go-cqhttp的文档中没有说明。
            result = self.on_interval()
        # 其余所有事件都是通知上报。
        elif data["notice_type"] in ("friend_recall", "group_recall"):
            message = Plugin.gocqhttp("get_msg", message_id=data["message_id"])
            message = str(message["raw_message"]) if "raw_message" in message else ""
            result = self.on_message_deleted(
                context, sender, message, data["message_id"]
            )
        elif data["notice_type"] == "offline_file":
            result = self.on_file(
                context,
                sender,
                data["file"]["name"],
                data["file"]["size"],
                data["file"]["url"],
            )
        elif data["notice_type"] == "group_upload":
            url = Plugin.gocqhttp(
                "get_group_file_url",
                group_id=-context,
                file_id=data["file"]["id"],
                busid=data["file"]["busid"],
            )["url"]
            result = self.on_file(
                context, sender, data["file"]["name"], data["file"]["size"], url
            )
        # 结果是非空值的时候,无论是什么类型都要回复出来,除非结果只是True而已。
        # 编写插件时,因为意外返回了数值或空字符串等,结果完全不知道为什么什么也没有回复的情况太常发生,于是如此判断。
        if context and result is not None and result is not True:
            self.send(context, format(result))
        return result is not None

    @overload
    def name(self, context: int, sender: int) -> str:
        """获取各种用户的名称的方法。如果context是群聊,则尝试获取群名片。

        有Python侧一级缓存和go-cqhttp侧二级缓存,因此可以安心频繁调用本方法。
        """

    @overload
    def name(self, context: tuple[int, int]) -> str:
        ...

    @overload
    def name(self, context: int) -> str:
        """获取好友名(正参数)或群聊名(负参数)。

        有Python侧一级缓存和go-cqhttp侧二级缓存,因此可以安心频繁调用本方法。
        """

    def name(self, context, sender=None) -> str:
        if sender is not None:
            return self.name((context, sender))
        if context in self._name_cache:
            return self._name_cache[context]
        if isinstance(context, int):
            if context >= 0:
                for response in self.gocqhttp("get_friend_list"):
                    self._name_cache[response["user_id"]] = response["nickname"]
                name = self._name_cache.get(context, "")
            else:
                response = self.gocqhttp("get_group_info", group_id=-context)
                name = response["group_name"]
        else:
            if context[0] >= 0:
                name = self.name(context[1])
            else:
                response = self.gocqhttp(
                    "get_group_member_info", group_id=-context[0], user_id=context[1]
                )
                name = response.get("card") or response["nickname"]
        self._name_cache[context] = name
        return name

    def dispatch_command(
        self, context: int, sender: int, text: str, message_id: int
    ) -> object:
        """找到并调用某个on_command_×××,抑或是on_message。

        方法名的匹配是模糊的,但要求方法名必须为规范形式。
        具体参照humanity.normalize函数,最重要的是必须是小写。
        由于__getattr__等魔法方法的存在,不可能列出对象支持的方法列表,故当方法名不规范时,无法给出任何警告。

        如果同时定义了on_message、on_command_foo、on_command_foo_bar,最具体的函数会被调用。

        - ".foo bar" → on_command_foo_bar
        - ".foo baz" → on_command_foo
        - ".bar" → on_message

        on_command_×××方法的参数必须支持按参数名传入(关键字参数),且正确标注类型。
        有名为context、sender、text、message_id的参数时,对应的值会被传入。
        """
        # 到底为什么会收到有\r\n的消息啊?
        text.replace("\r\n", "\n")
        parts = humanity.tokenize_command_name(text)
        while parts:
            name = "".join(parts)
            f = getattr(self, "on_command_" + name, None)
            if callable(f):
                try:
                    kwargs = humanity.parse_command(
                        {
                            parameter.name: (
                                parameter.annotation,
                                parameter.default is not inspect.Parameter.empty,
                            )
                            for parameter in inspect.signature(f).parameters.values()
                            if parameter.annotation is not inspect.Parameter.empty
                        },
                        {
                            "context": context,
                            "sender": sender,
                            "text": text,
                            "message_id": message_id,
                        },
                        # 在原始字符串中找到命令名之后的部分。
                        # 证明一下这个二分法数据的单调性?
                        # 平时做算法题怎么都想不到二分答案——而且这除了用来做算法题以外有什么用啊!
                        # 结果真的在实际开发中用到了这种思路,这合理吗?
                        text[
                            bisect_left(
                                range(min(111, len(text))),
                                name,
                                lo=1,
                                key=lambda i: humanity.normalize(text[1:i]),
                            ) :
                        ].strip(),
                    )
                except humanity.CommandSyntaxError as e:
                    return e.args[0] if e.args else inspect.getdoc(f)
                return f(**kwargs)
            # 从长到短,一段一段截下,再尝试取用属性。
            parts.pop()
        return self.on_message(context, sender, text, message_id)

    def on_message(self, context: int, sender: int, text: str, message_id: int):
        """当收到消息时执行此函数。

        如果不知道参数和返回值的含义的话,请看Plugin类的说明。

        因为on_message事件太常用了,扩展了以下方便用法。

        【关于命令自动解析】
        只要定义函数on_command_foo(self, …),就能处理.foo这样的指令。
        这样一来,只需要处理有格式命令的话,甚至不必编写on_message事件处理器就能做到。
        方法的命名、参数、优先关系等细节请参照dispatch_command方法的文档。
        参数解析的细节请参照humanity.parse_command函数的文档。
        可以在on_command_×××中使用yield(参照下述对话流程功能)。

        【关于对话流程】
        可以像阻塞式控制台程序一样编写事件处理程序,在需要向用户提问的交互式场合非常方便。

            print("你输入的是" + input("请输入文字"))
            → return "你输入的是" + (yield "请输入文字")

        这种写法使用了无法持久化保存的Python生成器,也就是说,进程重启之后程序的执行状态就会消失。
        此外,超过一天没有下文的对话流程会被直接删除。
        """

    def on_message_deleted(self, context: int, sender: int, text: str, message_id: int):
        """消息被撤回。"""

    def on_file(self, context: int, sender: int, filename: str, size: int, url: str):
        """接收到离线文件或群有新文件。"""

    def on_admission(self, context: int, sender: int, text: str) -> bool | None:
        """收到了添加好友的请求或加入群聊的请求。

        返回True接受,False拒绝,None无视并留给下一个插件处理。
        """

    def on_interval(self) -> None:
        """每隔不到一分钟,此函数就会被调用。用于实现定时功能。

        因为空泛地不针对任何人,即使想通过返回值快速回复也不知道会回复到何处。必须通过send方法来发出消息。
        因为插件不应该剥夺其他插件定时处理的能力,所以也不允许返回真值。
        这样一来,这个函数只能返回None了。
        """


class NameCacheUpdater(Plugin):
    """与Plugin基类联合工作的必备插件。"""

    def on_event(self, context: int, sender: int, data: dict[str, Any]) -> bool:
        # 如果有详细的发送者信息,更新名称缓存。
        if "sender" in data:
            nickname = data["sender"].get("nickname", "")
            self._name_cache[sender] = self._name_cache[sender, sender] = nickname
            self._name_cache[context, sender] = data["sender"].get("card") or nickname
        return False


class Logger(Plugin):
    """调试用,在控制台中输出消息的内部表示。"""

    def on_message(self, context: int, sender: int, text: str, message_id: int):
        print(context, sender, repr(text), message_id)


class HelpProvider(Plugin):
    """提供.help命令的插件。@Plugin.documented默认将帮助信息置于此处。"""

    def on_command_help(self, _: Never):
        pass


def documented(
    under: Callable | None = HelpProvider.on_command_help,
) -> Callable[[CallableT], CallableT]:
    """使用此装饰器添加单条命令帮助的第一行到帮助索引命令中。

    要添加到总目录.help中:

        @pykinezumiko.documented()
        def on_command_foo(self):
            ...

    要作为子命令添加到某一其他命令的帮助中:

        @pykinezumiko.documented(on_command_foo)
        def on_command_foo_bar(self):
            ...
    """

    def decorator(f: CallableT) -> CallableT:
        under.__doc__ = (
            (inspect.getdoc(under) or "")
            + "\n‣ "
            + (
                f.__doc__
                or humanity.command_prefix[0] + f.__name__.removeprefix("on_command_")
            )
            .partition("\n")[0]
            .strip()
        )
        return f

    return decorator
pykinezumiko/app.py
"""主程序。

仅仅是导入这个模块就会启动服务器,所以务必在导入所有需要的插件后再导入这个模块。
"""

import os
import sys
import time
import errno
import traceback
from collections import defaultdict
import http.server
import werkzeug.serving
from flask import Flask, request

from . import Plugin, conf, docstore


# 虽然只是加载而没有将模块留下,但是其中的类皆已成功定义。
# 靠深度优先搜索找出所有继承了Plugin但没有子类的类,它们是要实例化的插件类。
def leaf_subclasses(cls: type) -> list[type]:
    """找出指定类的所有叶子类。使用类上的__subclasses__函数。"""
    return [s for c in cls.__subclasses__() for s in leaf_subclasses(c)] or [cls]


# 为定义了记录类的模块分配文档数据库。
os.makedirs("excel", exist_ok=True)
databases = defaultdict(list)
for t in leaf_subclasses(docstore.Record):
    databases[t.__module__].append(t)
databases = [
    docstore.Database(f"excel/{name}.xlsx", tuple(tables)) for name, tables in databases.items()
]

# 实例化找到的插件类。
plugins: list[Plugin] = []
for p in leaf_subclasses(Plugin):
    print("加载插件类", p.__name__)
    plugins.append(p())

# 上述过程中易碎的细节:
# • 插件模块相互独立,从而按导入顺序加载。
# • Python 3.4起,__subclasses__按字典键的顺序返回子类列表。
# • Python 3.6起,字典按加入顺序迭代键。
# • Python 3.9起,文档明确指出__subclasses__按子类定义先后顺序返回子类列表。
# • leaf_subclasses函数返回列表从而保持顺序。

# 创建Flask应用程序实例。
app = Flask(__name__)


@app.route("/", methods=["GET", "POST"])
def root():
    # GET请求来自浏览器。
    if request.method == "GET":
        return "消息处理端已启动。"

    # POST请求来自go-cqhttp。
    # 为了原路反馈异常信息,在局部变量中记录消息上下文。
    context = 0

    try:
        data = request.json
        # 因为request.json的类型是Optional[Any],所以不得不先打回None的可能性。
        # 其实是data为None不可能的!在此之前就已经抛出异常挂掉了。
        # 为了类型检查通过不得已而检查一下罢了。
        assert data is not None
        # 从go-cqhttp的事件数据中提取context和sender。
        sender = int(data.get("user_id", 0))
        context = -int(data["group_id"]) if "group_id" in data else sender
        # 易碎的细节:all和any短路求值。
        any(p.on_event(context, sender, data) for p in plugins)
        # 在处理完任意事件后自动保存所有已修改的数据库。
        for database in databases:
            if database.dirty:
                print("写入数据库", database)
                database.save()
    except Exception as e:
        # 打印错误堆栈到控制台。
        # 通常的Flask应用中,只需再行抛出。但是,因为使用了自定义的服务器类,这么做会导致进程终止。
        traceback.print_exc()
        if context:
            Plugin.send(context, f"\u267b\ufe0f {e!r}")
        else:
            Plugin.send(conf.BACKSTAGE, f"\u267b\ufe0f 处理无来源事件时发生了下列异常:{e!r}")
    return ""


if len(sys.argv) > 1:
    print("启动参数", sys.argv[1])
    Plugin.send(conf.BACKSTAGE, f"\U0001f4e6 {sys.argv[1] = }")


class PerseveringWSGIServer(http.server.ThreadingHTTPServer):
    """持续不断地尝试监听端口的多线程服务器。

    werkzeug.serving.make_server创建的服务器只是为了打印自定义错误信息
    “Either identify and stop that program, or start the server with a different …”
    就把OSError据为己有,所以不得不自己定义一个服务器类来使用。
    """

    multithread = True
    multiprocess = False

    def __init__(self, host: str, port: int, app) -> None:
        handler = werkzeug.serving.WSGIRequestHandler
        handler.protocol_version = "HTTP/1.1"

        self.host = host
        self.port = port
        self.app = app
        self.address_family = werkzeug.serving.select_address_family(host, port)
        self.ssl_context = None

        super().__init__(
            werkzeug.serving.get_sockaddr(host, port, self.address_family),  # type: ignore[arg-type]
            handler,
            bind_and_activate=False,
        )
        while True:
            try:
                self.server_bind()
                self.server_activate()
                break
            except OSError as e:
                if e.errno == errno.EADDRINUSE:
                    print("端口被占用,将重试")
                    time.sleep(1)
                else:
                    raise


PerseveringWSGIServer("127.0.0.1", 5701, app).serve_forever()
pykinezumiko/conf.py
"""存储到处都要使用的全局配置。"""
import pygments.style
import pygments.token

BACKSTAGE = -114514
"""管理用群。调试信息将发送到此处;管理用插件也只接受来自其中的管理命令。"""

THEME = ("#000000", "#b53d00", "#ffcc80", "#fff3e0")
"""文字色、深色前景、深色背景、浅色背景。"""
ACCENTS = ("#b71c1c", "#827717", "#33691e", "#009095", "#0d47a1", "#4a148c")
"""用于图表等的红黄绿青蓝紫。"""


class PygmentsStyle(pygments.style.Style):
    """代码高亮的样式。"""
    styles = {
        # 【编程语言】
        # 红:关键字。
        pygments.token.Keyword: ACCENTS[0],
        pygments.token.Name.Builtin.Pseudo: ACCENTS[0],
        pygments.token.Name.Function.Magic: ACCENTS[0],
        pygments.token.Name.Variable.Magic: ACCENTS[0],
        pygments.token.Operator.Word: ACCENTS[0],
        # 黄:字符串。
        pygments.token.String: ACCENTS[1],
        # 绿:注释。
        pygments.token.Comment: ACCENTS[2],
        pygments.token.String.Doc: ACCENTS[2],
        # 青:符号。
        pygments.token.Operator: ACCENTS[3],
        pygments.token.Punctuation: ACCENTS[3],
        pygments.token.String.Interpol: ACCENTS[3],
        # 蓝:数值。
        pygments.token.Literal: ACCENTS[4],
        pygments.token.String.Symbol: ACCENTS[4],
        # 紫:魔法。
        pygments.token.Comment.Preproc: ACCENTS[5],
        pygments.token.Comment.PreprocFile: ACCENTS[5],
        pygments.token.Name.Entity: ACCENTS[5],
        pygments.token.Name.Decorator: ACCENTS[5],
        # 主题色:系统。
        pygments.token.Generic.Prompt: THEME[1],
        pygments.token.Generic.Punctuation.Marker: THEME[1],
        # 【非程序】
        pygments.token.Generic.Inserted: ACCENTS[2],
        pygments.token.Generic.Deleted: ACCENTS[0],
        pygments.token.Generic.Subheading: "bold",
        pygments.token.Generic.Emph: "italic",
        pygments.token.Generic.EmphStrong: "bold",
    }
pykinezumiko/docstore.py
"""一种很新的文档数据库对象-关系映射(ORM)。

所谓文档数据库,就是把数据存在Office文档里!
"""

import time
from collections.abc import KeysView, ValuesView, ItemsView
from itertools import count, takewhile
import typing
from typing import Any, Generator, Iterable, Protocol, TypeVar, dataclass_transform
from . import xlsx

T = TypeVar("T")
T_contra = TypeVar("T_contra", contravariant=True)
TableT = TypeVar("TableT", bound="Table")


class Comparable(Protocol[T_contra]):
    def __lt__(self, __other: T_contra) -> bool:
        ...


ComparableT = TypeVar("ComparableT", bound=Comparable)
RecordT = TypeVar("RecordT", bound="Record")


@dataclass_transform(kw_only_default=True)
class Table(type):
    """记录的元类。

    TODO:最好画张类结构示意图!

    数据直接在内存中以索引到记录对象的**有序**映射存放。
    键保持有序是通过每次插入时全部重排实现的屑。

    Table元类因记录类实际保存数据、提供表中记录增删操作而得名。

    【问题】
    Table继承type,作为Record的元类,这样就能实现在记录类自身上使用标准下标语法操作表中记录。
    但是,正确标注类型极其困难。
    类型检查器偏好type而非其他元类,因此像下面这样使Table继承type的泛型也无济于事,T不知何所指。

        class Table(type[T]):
            def __getitem__(cls, key: str) -> T:
                ...

    https://discuss.python.org/t/metaclasses-and-typing/6983
    https://github.com/python/typing/issues/715

    现在只能做到外部使用基本没有问题。
    如你所见,类的内部一派混乱,强制无视类型错误的指令漫天飞舞。

    【已否决的设计】
    sortedcontainers提供的SortedDict性能更好。
    但反正现在每次操作完都会重新写入文件,内存中的操作摆烂也无所谓了。
    pandas的本质是平行数组,不适合单条记录的增删,所以不使用。

    在Record中添加类变量table: ClassVar[Table[KT, VT]],其中Table是dict的子类。
    这样无法确定KT和VT。

    多重继承真的很糟糕。
    同时继承type和dict的话,会报基类间实例内存布局冲突错。
    同时继承type和UserDict或type和MutableMapping的话,isinstance将无法正常工作。
    但是,直接检查__class__仍然可行。
    因为类型标注比当前贫弱的解决方案更不充分,所以保持了手动实现各种dict方法的现状。

    既能使键类型成为泛型,又能正确获得元类产生的类的办法并不是没有。

        class Table(type[VT], UserDict[KT, VT], Generic[KT, VT]):
            def __getitem__(cls, key: KT) -> VT:
                ...
        class transformer(Generic[KT]):
            def __call__(self, cls: type[VT]) -> Table[KT, VT]:
                return Table(cls.__name__, cls.__bases__, cls.__dict__.copy())
        def record_type(key_type: type[KT]) -> transformer[KT]:
            return transformer()

        @record_type(key_type=int)
        class User:
            name: str

    但是这样会使@dataclass_transform()完全失效,无论加在哪里都无用。
    """

    dirty = False
    """插入记录、删除记录时自动置位。向记录对象写入属性时,也会写入此标志。"""

    def sort(cls) -> None:
        cls.table = dict(sorted(cls.table.items()))

    def __getitem__(cls: type[T], key: Comparable) -> T:
        return cls.table[key]  # type: ignore

    def __setitem__(cls: type[T], key: Comparable, value: T) -> None:
        if not cls.table or key in cls.table or next(reversed(cls.table)) < key:  # type: ignore
            cls.table[key] = value  # type: ignore
        else:
            cls.table[key] = value  # type: ignore
            cls.sort()  # type: ignore
        cls.dirty = True  # type: ignore

    def __delitem__(cls, key: Comparable) -> None:
        del cls.table[key]
        cls.dirty = True

    def keys(cls) -> KeysView[Comparable]:
        return cls.table.keys()  # type: ignore

    def values(cls: type[T]) -> ValuesView[T]:
        return cls.table.values()  # type: ignore

    def items(cls: type[T]) -> ItemsView[Comparable, T]:
        return cls.table.items()  # type: ignore

    def clear(cls) -> None:
        cls.table.clear()
        cls.dirty = True

    def update(cls: type[T], data: Iterable[tuple[Comparable, T]]) -> None:
        cls.table.update(data)  # type: ignore
        cls.sort()  # type: ignore
        cls.dirty = True  # type: ignore

    def __len__(self) -> int:
        return len(self.table)

    # TODO: def __iter__, pop, popitem, clear, update, setdefault, __contains__, get


class Record(metaclass=Table):
    def __init__(self, **kwargs) -> None:
        # 实际上这些属性赋值都会经过self.__setattr__。
        # 因为在__slots__中有特别判断,所以没有额外副作用。
        self.created_at = time.time()
        """创建记录对象的时间戳。"""
        self.updated_at = self.created_at
        """上一次设置属性的时间戳。"""
        self.__dict__.update(kwargs)

    def __setattr__(self, name: str, value: Any) -> None:
        super().__setattr__(name, value)
        # 修改特殊属性不应导致其他特殊属性的变化。
        if name not in ("created_at", "updated_at"):
            self.updated_at = time.time()
        self.__class__.dirty = True


class Database:
    """Excel数据库。

    “他妈的,怎么是Excel?!”pickle、JSON、CSV、SQLite……样样总比Excel好——我一开始也是这样想的。
    在快速开发迭代的过程中,数据库表头(schema)变动是常有的事。
    如已存了一些数据,在正式的数据库中修改表头必须编写数据迁移脚本,非常不便。
    号称schemaless的文档数据库只不过是将所有字段设为可空,反而加重了数据读取端校验的负担。
    专用的二进制存储格式只有借助特制的工具才能打开,人工浏览困难重重。
    电子表格在这样的场合便利至极:查阅、筛选、统计都很轻松,迁移数据更是只需填充公式。
    Office文档虽然的的确确是堆烂格式,但是人人都在用,受到良好的支持。
    """

    def __init__(self, filename: str, tables: tuple[TableT, ...]) -> None:
        self.filename = filename
        self.tables = tables
        self.reload()

    def reload(self):
        try:
            workbook_data = xlsx.read(self.filename)
        except FileNotFoundError:
            workbook_data = {}
        for table in self.tables:
            worksheet_data = workbook_data.get(table.__name__)
            field_types = typing.get_type_hints(table)
            # 注意Table没有__init__,table属性是在这里初始化的。
            table.table = {}
            if worksheet_data:
                fields = list(
                    map(
                        str,
                        takewhile(bool, (worksheet_data[0, j + 1] for j in count())),
                    )
                )
                for i in count():
                    if worksheet_data[i + 1, 0] == "":
                        break
                    row = table()
                    for j, field in enumerate(fields):
                        # 迫使标有类型的字段值转换到标注的类型。仅适用于有单参数构造函数的数据类型。
                        cell = worksheet_data[i + 1, j + 1]
                        if not isinstance(cell, field_types.get(field, object)):
                            cell = field_types[field](cell)
                        setattr(row, field, cell)
                    table.table[worksheet_data[i + 1, 0]] = row
            table.dirty = False

    @property
    def dirty(self) -> bool:
        return any(table.dirty for table in self.tables)

    def worksheet_data(
        self, table: Table
    ) -> Generator[tuple[tuple[int, int], xlsx.CellValue], None, None]:
        fields = typing.get_type_hints(table)
        yield (0, 0), ""
        for j, field in enumerate(fields):
            yield (0, j + 1), field
        for i, (key, row) in enumerate(table.table.items()):
            yield (i + 1, 0), key
            for j, field in enumerate(fields):
                yield (i + 1, j + 1), getattr(row, field)

    def save(self) -> None:
        xlsx.write(
            self.filename,
            {table.__name__: self.worksheet_data(table) for table in self.tables},
        )
        for table in self.tables:
            table.dirty = False
pykinezumiko/humanity.py
import re
import typing
import unicodedata
from itertools import filterfalse, groupby
from typing import Any, NoReturn, Never, SupportsInt, Union
from types import UnionType


def format_timespan(seconds: SupportsInt) -> str:
    r = []
    seconds = int(seconds)
    if seconds >= 86400:
        r.append(str(seconds // 86400))
        r.append("天")
    seconds %= 86400
    if seconds >= 3600:
        r.append(str(seconds // 3600))
        r.append("小时")
    seconds %= 3600
    if seconds >= 60:
        r.append(str(seconds // 60))
        r.append("分")
    seconds %= 60
    r.append(str(seconds))
    r.append("秒")
    return " ".join(r)


def parse_number(s: str, default=0) -> float:
    """将可能含有汉字的字符串转换成对应的数值。"""
    s = s.strip()
    if not s:
        return default
    try:
        return float(s)
    except ValueError:
        pass
    for c, v in (
        ("亿", 100000000),
        ("億", 100000000),
        ("万", 10000),
        ("千", 1000),
        ("百", 100),
        ("十", 10),
    ):
        head, c, tail = s.partition(c)
        if c:
            return parse_number(head, 1) * v + parse_number(tail)
    s = s.translate(str.maketrans("零点〇一二三四五六七八九", "0.0123456789"))
    try:
        return float(s)
    except ValueError:
        return 0.0


def to_number(s: str) -> int | float:
    """将字符串转换成整数或浮点数。"""
    try:
        return int(s)
    except ValueError:
        try:
            return float(s)
        except ValueError:
            return 0.0 if "." in s else 0


def normalize(text: str) -> str:
    """激进地统一字符串为规范形式。

    将对输入字符串"! Foo  BÄR114514 "进行下述Unicode变换。

    - 丢弃开头的命令符,且只取字符串的开头一段。
        → " Foo  BÄR114514 "
    - 消去开头和结尾的空白符。
        → "Foo  BÄR114514"
    - 标准分解(NFD)。拆分出独立的重音符号。
        → "Foo  BA\u0308R114514"
    - case folding。简单地说就是变成小写。一些语言有额外变换(ß → ss,ς → σ等)。
        → "foo  ba\u0308r114514"
    - 兼容分解形式标准化(NFKD)。简单地说就是把怪字转换为正常字,比如全角变成半角。
        → "foo  ba\u0308r114514"
    - case folding。
    - 兼容分解形式标准化(NFKD)。套两层是因为㎯这样的方块字母。参照Unicode标准之默认大小写算法。
        A string X is a compatibility caseless match for a string Y if and only if:
            NFKD(toCasefold(NFKD(toCasefold(NFD(X))))) =
                NFKD(toCasefold(NFKD(toCasefold(NFD(Y)))))
    - 删去组合字符。一些语言的语义可能受到影响(é → e,が → か等)。
        → "foo  bar114514"
    - 替换连续的空白符和下划线为单个下划线。
        → "foo_bar114514"
    """
    return re.sub(
        r"[\s_]+",
        "_",
        "".join(
            filterfalse(
                unicodedata.combining,
                unicodedata.normalize(
                    "NFKD",
                    unicodedata.normalize(
                        "NFKD",
                        unicodedata.normalize("NFD", text.strip()).casefold(),
                    ).casefold(),
                ),
            )
        ),
    )


command_prefix = ".。!!"
"""可能的命令符组成的字符串。

例如,用text[0] in command_prefix来判断text是否是命令。
"""


def tokenize_command_name(text: str) -> list[str]:
    """当输入字符串以命令符(参照command_prefix变量)开头,给出按字符类切分后的列表,否则返回空列表。"""
    return (
        ["".join(s) for _, s in groupby(normalize(text[1:111]), unicodedata.category)]
        if text[0] in command_prefix
        else []
    )


def match_start_or_end(pattern: str, text: str, flags=0) -> re.Match[str] | None:
    return re.match(pattern, text, flags) or re.search(rf"(?:{pattern})\Z", text, flags)


def parse_command(
    parameters: dict[str, tuple[type, bool]],
    given_arguments: dict[str, Any],
    text: str,
) -> dict[str, Any]:
    """根据需要的参数类型从命令名之后的字符串中宽容地解析参数。

    :param parameters: 需要的参数名到(参数类型, 是否可选)的映射。将按字典顺序依次提取参数。

    仅支持下列基本数据类型。

    - Never(NoReturn)。该参数必定匹配失败。因为匹配失败时会显示帮助信息,所以可用于实现.help等没有实际功能的命令。
    - int和float。
    - str。因为参数用空白分割,只有最后一个str参数才会笼络空白。
    - Union。易碎的细节:按指定顺序匹配,因此int | str可能传入整数或字符串,而str | int等同于str。
    - Optional。

    :param given_arguments: 已知的参数。如果需要其中的参数,就直接赋予,不从字符串中解析。
    :param text: 待解析的字符串。
    :raises: CommandSyntaxError
    """
    kwargs = {}
    first_parameter = True
    last_str_parameter_name = None
    for name, (parameter, optional) in parameters.items():
        if name in given_arguments:
            kwargs[name] = given_arguments[name]
            continue

        # 在匹配每个参数之前,先去除字符串两端的空白。
        text = text.strip()

        # 根据参数类型匹配字符串。
        for parameter in (
            typing.get_args(parameter)
            if typing.get_origin(parameter) in (Union, UnionType)
            else (parameter,)
        ):
            # NoReturn is not Never,什么鬼?
            if parameter is NoReturn or parameter is Never:
                match = None
            elif parameter is None or parameter is type(None):
                # 以Optional[int](= Union[int, None])为例。
                # 遇到None时表明当前处在Optional中,而int无法匹配。
                # 此时在参数项中放入None,将参数视为有默认值,不阻止后续类型匹配成功覆盖此参数值。
                match = None
                kwargs[name] = None
                optional = True
            elif parameter is int:
                match = match_start_or_end(
                    r"[+-]?(\d+|0x[0-9a-f]+|0o[0-7]+|0b[01]+)", text, re.IGNORECASE
                )
            elif parameter is float:
                match = match_start_or_end(
                    r"[+-]?(\d*\.\d*|0x[0-9a-f]*\.[0-9a-f]*p\d+|\d+)",
                    text,
                    re.IGNORECASE,
                )
            elif parameter is str:
                last_str_parameter_name = name
                match = re.match(r"\S+", text)
            else:
                raise CommandSyntaxError(f"插件命令的参数 {name} 拥有不能理解的参数类型 {parameter}。")
            # 将值填入参数表中。
            if match:
                first_parameter = False
                kwargs[name] = parameter(match.group())
                text = text[: match.start()] + text[match.end() :]
                break
        else:
            if optional:
                pass
            elif first_parameter:
                raise CommandSyntaxError()
            else:
                raise CommandSyntaxError(f"解析命令时找不到参数 {name}。")

    # 清理解析完所有参数后剩下的字符串。
    if text.lstrip():
        if last_str_parameter_name:
            kwargs[last_str_parameter_name] += text
        else:
            raise CommandSyntaxError(f"残留未成功解析的参数“{text}”。")
    return kwargs


class CommandSyntaxError(Exception):
    pass
pykinezumiko/makedocs.py
import os
import sys
import subprocess
import urllib.request
import base64
from typing_extensions import Buffer
import zipfile
import tempfile
import urllib.parse
import mistletoe
import pygments
import pygments.lexers
import pygments.formatters
import pygments.style
import pygments.token

from . import conf


def font_face(
    woff2: Buffer, name: str, weight: str = "normal", style: str = "normal"
) -> str:
    """生成内联WOFF2字体数据的CSS字体声明。"""
    return f"""
@font-face {{
    font-family: "{name}";
    src: url(data:application/font-woff2;base64,{base64.b64encode(woff2).decode()}) format("woff2");
    font-weight: {weight};
    font-style: {style};
}}
"""


class HTMLFormatter(pygments.formatters.HtmlFormatter):
    def __init__(self) -> None:
        super().__init__(style=conf.PygmentsStyle)

    def get_linenos_style_defs(self) -> list[str]:
        return []


source_formatter = HTMLFormatter()


def make() -> None:
    # 缓存字体。
    os.makedirs("cache", exist_ok=True)
    for local, remote in {
        "cache/hei.otf": "https://mirrors.ctan.org/fonts/fandol/FandolHei-Regular.otf",
        "cache/jb.zip": "https://download.jetbrains.com/fonts/JetBrainsMono-2.304.zip",
    }.items():
        if not os.path.exists(local):
            urllib.request.urlretrieve(remote, local)
    # 打印HTML头部。
    # 样式表中不包含字体,因为需要在打印完整个文档后统计用到的字符,子集化中文字体。
    print(
        f"""<!DOCTYPE html>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>pykinezumiko — 木鼠子 ⅱ Python Ver.</title>
<style>
body {{
    font: 14px/28px "JetBrains Mono", FandolHei, Courier, sans-serif;
    color: {conf.THEME[0]};
    background-color: {conf.THEME[2]};
    margin: 8px;
}}

main {{
    max-width: 66em;
    margin: 0 auto;
    border: 3px solid {conf.THEME[1]};
    padding: 4px 12px;
    background:
        0 27px url("data:image/svg+xml,{urllib.parse.quote(f'''
<svg xmlns='http://www.w3.org/2000/svg' width='8' height='28'>
<path d='m0 .5 h 4' stroke='{conf.THEME[2]}'/>
</svg>
''')}") content-box,
        {conf.THEME[3]};
    overflow: hidden;
}}

h1 {{
    margin: 14px -12px;
    padding: 0 12px;
    font-size: 28px;
    font-weight: inherit;
    line-height: 56px;
    color: {conf.THEME[1]};
    background-color: {conf.THEME[2]};
}}

h2, h3 {{
    margin: 0;
    font-size: inherit;
    font-weight: inherit;
    color: {conf.THEME[1]};
}}

h2 {{
    font-size: 16px;
}}

p {{
    margin: 0;
    text-indent: 2em;
}}

summary {{
    color: {conf.THEME[1]};
}}

pre, code {{
    margin: 0;
    font: inherit;
    white-space: pre-wrap;
}}

{source_formatter.get_style_defs()}
</style>
"""
    )
    characters = set()
    print("<main>")
    with open("README.md") as f:
        print(mistletoe.markdown(f))
    print("<h1>原理</h1>")
    print(end="<pre>")
    for filename in sorted(
        os.path.join(root, filename)
        for root, dirs, files in os.walk("pykinezumiko")
        for filename in files
        if filename.endswith(".py")
    ):
        characters.update(filename)
        print(end=f"<details><summary>{filename}</summary>")
        with open(filename, "r") as f:
            source = f.read()
            characters.update(source)
            pygments.highlight(
                source.rstrip(),
                pygments.lexers.get_lexer_for_filename(filename),
                source_formatter,
                sys.stdout,
            )
        print(end="</details>")
    print("</pre>")
    print("</main>")
    # 从中文字体中去除Latin-1字符集。
    characters.difference_update(chr(i) for i in range(256))
    # 打印字体声明。
    # 将字体内联在网页尾部的好处:
    # • 保持网页为单文件,轻松离线保存。
    # • 先加载更重要的网页内容,随后再加载字体。
    # • 尚未加载到@font-face声明时,浏览器认为指定的字体是本地字体。
    #   没有声明font-display: swap却有着一样的效果。
    print("<style>")
    with zipfile.ZipFile("cache/jb.zip") as z:
        with z.open("fonts/webfonts/JetBrainsMono-Regular.woff2") as f:
            print(font_face(f.read(), "JetBrains Mono"))
        with z.open("fonts/webfonts/JetBrainsMono-Bold.woff2") as f:
            print(font_face(f.read(), "JetBrains Mono", "bold"))
    with tempfile.TemporaryDirectory() as tmpdir:
        with open(os.path.join(tmpdir, "text"), "w") as f:
            f.write("".join(characters))
        subprocess.run(
            [
                "pyftsubset",
                "cache/hei.otf",
                f"--output-file={tmpdir}/subset",
                f"--text-file={tmpdir}/text",
                "--flavor=woff2",
                "--desubroutinize",
                "--obfuscate-names",
            ],
            check=True,
        )
        with open(os.path.join(tmpdir, "subset"), "rb") as f:
            print(font_face(f.read(), "FandolHei"))
    print("</style>")


if __name__ == "__main__":
    make()
pykinezumiko/plugins/__init__.py
"""示例插件的文件夹。

要学习插件制作方法的话,下面是推荐的阅读顺序。

- jrrp:梦开始的地方。
- demo:从最基础的收到消息就回复,到基于Python生成器语言功能的对话流程,汇聚了各种功能演示的插件。
- clock:演示定时任务和数据持久化的做法。
- gate:遭遇来自go-cqhttp的事件时,会依次询问插件是否能处理该事件,遇到第一个有处理能力的插件后就结束询问。因此,gate能在机器人被某个群禁用时拦截事件,阻止后续插件对事件的处理。
"""
pykinezumiko/plugins/clock.py
import re
import os
import time
import pickle
from queue import PriorityQueue
import pykinezumiko


class Clock(pykinezumiko.Plugin):
    def __init__(self) -> None:
        super().__init__()
        # 存储路径
        self.path = 'logs/20clock.pickle'
        # 提醒队列
        t = []
        if os.path.isfile(self.path):
            with open(self.path, 'rb') as f:
                t = pickle.load(f)
        self.pq = PriorityQueue()
        for l in t:
            self.pq.put(l)

    def on_message(self, context: int, sender: int, text: str, message_id: int):
        if text.startswith(".clock"):
            # ".clock 增加的时间 消息" or ".clock 消息 增加的时间"
            dtAndTitle = text[7:].strip()
            # 匹配开头和结尾作为时间输入
            title, dt = None, None
            res = re.search(r"^\d+|\d+$", dtAndTitle)
            if not res:
                return "无法识别到有效时间"
            else:
                l, r = res.span()
                dt = int(res.group())
                title = dtAndTitle[:l] + dtAndTitle[r:]
                title = title.strip()
                if not title:
                    return "标题不能为空"

            # 存储格式:[浮点触发时间戳,回复内容,会话id]
            self.pq.put([time.time()+dt, title, context])
            with open(self.path, 'wb') as f:
                pickle.dump(list(self.pq.queue), f)
            return str(time.time()+dt) + " "+title

    def on_interval(self):
        # 如果提醒队列非空且第一个提醒到时间了就提醒用户
        while not self.pq.empty() and self.pq.queue[0][0] < time.time():
            _, title, target = self.pq.get()
            with open(self.path, 'wb') as f:
                pickle.dump(list(self.pq.queue), f)
            self.send(target, title)
            time.sleep(1)
pykinezumiko/plugins/code.py
import re
import requests
import pykinezumiko


def decbv(bv: str) -> int:
    """转换BV号为avid。

    :param bv: 可以是"BV1GJ411x7h7"或单纯的"1GJ411x7h7"。
    """
    avid = 0
    bv_indices = [
        "fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF".index(ch)
        for ch in bv[-10:]
    ]
    for i in range(6):
        avid += bv_indices[(1, 2, 4, 6, 8, 9)[i]] * 58 ** (2, 4, 5, 3, 1, 0)[i]
    return (avid - 8728348608) ^ 177451812


def encav(avid: int) -> str:
    """转换avid为BV号。

    :returns: 不带"BV"前缀的10位BV号,例如"1GJ411x7h7"。
    """
    bv = ["1", "?", "?", "4", "?", "1", "?", "7", "?", "?"]
    for i in range(6):
        bv[
            (1, 2, 4, 6, 8, 9)[i]
        ] = "fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF"[
            ((avid ^ 177451812) + 8728348608) // 58 ** (2, 4, 5, 3, 1, 0)[i] % 58
        ]
    return "".join(bv)


class Code(pykinezumiko.Plugin):
    def on_message(self, context: int, sender: int, text: str, message_id: int):
        if re.search(r"bilibili\.com\/video\/BV|BV1..4.1.7..|\bb23\.tv\b", text):
            return self.av_bv(text)

    def av_bv(self, text: str):
        bv = {bv: decbv(bv) for bv in re.findall(r"BV1\w\w4\w1\w7\w\w", text, re.ASCII)}
        b23 = {
            url: decbv(match.group())
            for url in (
                requests.head("https://" + url.group().replace("\\", "")).headers[
                    "Location"
                ]
                for url in re.finditer(r"\bb23\.tv\\{0,2}\/[A-Za-z0-9]{3,8}", text)
            )
            if (match := re.search(r"BV1..4.1.7..", url))
        }
        if bv or b23:
            str1 = f" {len(bv)} 个 BV 号" if bv else ""
            str2 = f" {len(b23)} 个 bilibili 精巧地址" if b23 else ""
            str3 = "和" if bv and b23 else ""
            r = f"消息中的{str1}{str3}{str2}被转换为 aid。\n"
            # 目前小程序暂时还不是这样接收的。之后可能会修改。
            if (
                text.startswith("\x1b<Rich message::Xiaochengxu>")
                and not bv
                and len(b23) == 1
            ):
                r = f"bilibili 小程序被转换为地址。\n"
            bv |= b23
            r += (
                "\n".join(f"‣ {k} = av{v}" for k, v in bv.items())
                if len(bv) > 1
                else f"‣ av{next(iter(bv.values()))}"
            )
            return r

    @pykinezumiko.documented()
    # 由于on_command_u+不是合法的标识符名称,只能先定义一个别名,然后在类定义完全后setattr。
    def on_command_unicode(self, s: str):
        """.u+ ⟨210F|ℏ⟩(Unicode 码位)"""
        r = []
        for w in s.split():
            if re.fullmatch(r"[0-9A-Fa-f]{1,6}", w):
                r.append(f"{int(w, 16):c} U+{w.upper():>04s}")
            else:
                for c in w:
                    r.append(f"{c!r} U+{ord(c):04X}")
        return "\n".join(r)

    MORSE_TABLE = {
        "A": ".-",
        "B": "-...",
        "C": "-.-.",
        "D": "-..",
        "E": ".",
        "F": "..-.",
        "G": "--.",
        "H": "....",
        "I": "..",
        "J": ".---",
        "K": "-.-",
        "L": ".-..",
        "M": "--",
        "N": "-.",
        "O": "---",
        "P": ".--.",
        "Q": "--.-",
        "R": ".-.",
        "S": "...",
        "T": "-",
        "U": "..-",
        "V": "...-",
        "W": ".--",
        "X": "-..-",
        "Y": "-.--",
        "Z": "--..",
        "0": "-----",
        "1": ".----",
        "2": "..---",
        "3": "...--",
        "4": "....-",
        "5": ".....",
        "6": "-....",
        "7": "--...",
        "8": "---..",
        "9": "----.",
    }
    for c in list(MORSE_TABLE.keys()):
        MORSE_TABLE[MORSE_TABLE[c]] = c
    MORSE_DOTS = ".·⋅∙•⸳⸱・・ꞏ․‧˙⦁"
    MORSE_DASHES = "-‐‒–—⁃−𐆑―一ー𝄖-"
    MORSE_NORMALIZE = str.maketrans(
        MORSE_DOTS + MORSE_DASHES, "." * len(MORSE_DOTS) + "-" * len(MORSE_DASHES)
    )

    @pykinezumiko.documented()
    def on_command_morse(self, s: str):
        """.morse ⟨·–|A⟩(摩尔斯电码)"""

        def replacer(match: re.Match[str]) -> str:
            s = match.group().translate(self.MORSE_NORMALIZE)
            return self.MORSE_TABLE.get(s.upper(), s)

        s = re.sub(r"(?<=\w) +(?=\w)", " / ", s)
        s = re.sub(r"(?<=\w)(?=\w)", " ", s)
        s = re.sub(
            rf"\w|[{re.escape(self.MORSE_DOTS)}{re.escape(self.MORSE_DASHES)}]+",
            replacer,
            s,
        )
        return s


setattr(Code, "on_command_u+", Code.on_command_unicode)
pykinezumiko/plugins/commander.py
import os
import subprocess
import sys
import tarfile
import tempfile
import time

import requests
import pykinezumiko
from pykinezumiko.humanity import format_timespan


class Commander(pykinezumiko.Plugin):
    """提供管理与调试功能的插件。

    如此命名的原因是早期的命令式文件管理器的名字中常常带有“commander”一词。
    """

    def on_command_reload(self):
        print("重启")
        # 提交变更后,先从远程仓库拉取,再推送到远程。
        subprocess.run(["git", "add", "."], check=True)
        subprocess.run(
            [
                "git",
                "-c",
                "user.email=@",
                "-c",
                "user.name=守护进程",
                "commit",
                "-m",
                "",
                "--allow-empty",
                "--allow-empty-message",
            ],
            check=True,
        )
        subprocess.run(["git", "pull", "--no-rebase", "--no-edit"], check=True)
        subprocess.run(["git", "push"], check=True)
        # 尝试启动新的版本。
        print("启动")
        process = subprocess.Popen(
            [sys.executable, "-m", "pykinezumiko", "通过.reload启动"]
        )
        try:
            print("等待")
            process.wait(5)
        except subprocess.TimeoutExpired:
            # Flask进程启动一段时间内仍在正常运行,表明可以安全地切换到新版。
            print("结束")
            exit()
        else:
            # 新版存在问题。
            print("子进程快速终止")
            return f"Flask进程寄啦({process.returncode})!请尽快修复后重新执行.reload。"

    def on_command_backup(self, context: int):
        filename = tempfile.mktemp(".tar.xz", "pykinezumiko-")
        with tarfile.open(filename, "w:xz") as tar:
            for dir_entry in os.scandir("."):
                if dir_entry.name not in [".git", "data"]:
                    tar.add(dir_entry.name)
        self.send_file(context, filename)
        return True

    def on_command_debug_s(self, context: int, sender: int):
        ret = ["下面是调试信息。"]
        ret.append(f"消息发送者 ID = {sender}")
        ret.append(f"消息发送者 = {self.name(sender)}")
        ret.append(f"消息上下文 ID = {context}")
        ret.append(f"消息上下文 = {self.name(context)}")
        if context == pykinezumiko.conf.BACKSTAGE:
            ret.append("消息来自管理用群。")
        ret.append("现在 = " + time.strftime("%-Y 年 %-m 月 %-d 日 %H:%M %Z"))
        ret.append(f"所在 = {os.getcwd()}")
        if os.name == "posix":
            with open("/proc/uptime", "r") as f:
                str1 = format_timespan(float(f.readline().split()[0]))
                ret.append(f"服务器运行时间 = {str1}")
            str2 = subprocess.check_output(
                "free --bytes | awk '/Mem/ { print $3 / $2 * 100.0; }'",
                shell=True,
                encoding="iso-8859-1",
            ).strip()
            ret.append(f"内存使用量 = {str2}%")
            str3 = subprocess.check_output(
                "df . --output=pcent | tail -n 1", shell=True, encoding="iso-8859-1"
            ).strip()
            ret.append(f"卷已用空间 = {str3}")
        return "\n".join(ret)

    def on_command_debug_to(self, target: int, content: str):
        self.send(target, content)
        return f"重定向 {content} 到 [{self.name(target)}]。"

    def on_command_print(self, expr: str):
        from .. import app

        return repr(
            eval(
                expr,
                globals()
                | {type(p).__name__: type(p) for p in app.plugins}
                | {type(p).__name__.lower(): p for p in app.plugins},
            )
        )

    def on_command_select_from(self, context: int, db: str):
        self.send_file(context, f"excel/{db}.xlsx")
        return True

    def on_file(self, context: int, sender: int, filename: str, size: int, url: str):
        name = filename.removesuffix(".xlsx")
        new_name = f"excel/{name}.xlsx"
        if os.path.exists(new_name):
            old_name = f"{new_name}.{time.strftime('%Y-%m-%d_%H_%M')}.xlsx"
            os.rename(new_name, old_name)
            with open(new_name, "wb") as f:
                f.write(requests.get(url).content)
            from pykinezumiko import app

            app.databases[name].reload()
            return f"替换了 {new_name};原始文件被重命名为 {old_name}。"
pykinezumiko/plugins/demo.py
import pathlib
import random
import re
import time
from collections.abc import Generator
from typing import Union
from PIL import Image
import pykinezumiko


class 一条数据(pykinezumiko.docstore.Record):
    text: str


class Demonstration(pykinezumiko.Plugin):
    """演示各种功能的插件。"""

    # 等到Python 3.12有@override了,建议在这里标一下。
    def on_message(self, context: int, sender: int, text: str, message_id: int):
        if text == ".debug p":
            return "你好,世界!"
        elif text == ".cat":
            return random.choice(("喵呜~", "喵!", "喵?", "喵~"))
        # 可以使用任意字符串判据。(废话。)
        elif not text.startswith("^") and text.endswith("^") or text == "More?":
            return "More?"

    def on_command_debug_m(self, context: int):
        # 不必只回复一条消息。有需要的话,可以向任意会话任意发送消息。
        self.send(context, "这是第一条消息。")
        self.send(context, "这是第二条消息。")
        return True

    def on_command_debug_t(self, context: int):
        self.send(context, "8 秒后,将被回调。")
        # 在这8秒内,其他命令能否响应?
        time.sleep(8)
        return "被回调。"

    @pykinezumiko.documented()
    def on_command_猜数字(self) -> Generator[str, str, None | bool | str]:
        # 注意观察下列代码与控制台程序有多么相像。
        def number_guessing_in_console() -> None:
            x = random.randint(1, 100)
            guess = input("I've chosen a random integer between 1 and 100.")
            while guess.isnumeric():
                guess = int(guess)
                if guess < x:
                    guess = input("Too small.")
                elif guess > x:
                    guess = input("Too big.")
                else:
                    return print("Right!")
            print(f"Game over. The answer should be {x}.")

        # 转换为对话流程只需要进行以下替换:
        # • input(…) → yield …
        # • print(…) → self.send(context, …)
        # • 以及依惯例,self.send(context, …); return True → return …
        # 这和应用程序与用户界面框架的主从关系近几十年来的反转有关。
        # 实际上,在现代操作系统中,input函数内部的系统调用以类似yield的方式实现。
        x = random.randint(1, 100)
        guess = yield "我从 1~100 中随机选了一个整数。猜对了也没有奖励,猜错了也没有惩罚。"
        while guess.isnumeric():
            guess = int(guess)
            if guess < x:
                guess = yield "太小了。"
            elif guess > x:
                guess = yield "太大了。"
            else:
                return "猜对了!"
        return f"游戏结束。正确答案是 {x}。"

    def on_command_debug_next(self, n: int):
        n = max(1, n)
        if n > 9:
            return "注意,即使 .debug cls 也无法清除待回显的状态。请再考虑一下。"
        text = yield f"将回显接下来的 {n} 条消息。"
        for _ in range(n - 1):
            text = yield text
        return text

    def on_command_debug_repr(self):
        return repr((yield "将以 repr 回显接下来的一条消息。"))

    def on_command_debug_face(self, x: str):
        if match := re.fullmatch(r"\x9dface\0id=(\d+)\x9c", x):
            id = int(match.group(1))
        else:
            id = int(x)
        return f"\x9dface\0id={id}\x9c = {id}"

    def on_command_debug_img(self):
        raise NotImplementedError()
        uri = pathlib.Path("pykinezumiko/resources/sample.png").resolve().as_uri()
        return f"查看下列图片:\x9dimage\0file={uri}\x9c"

    def on_command_crud_insert(self, k: str, v: str):
        一条数据[k] = 一条数据(text=v)
        return k

    def on_command_crud_select(self):
        return "\n".join(f"{k}: {v.text}" for k, v in 一条数据.items()) if len(一条数据) else "空"

    on_command_crud_update = on_command_crud_insert

    def on_command_crud_delete(self, k: str):
        del 一条数据[k]
        return k
pykinezumiko/plugins/gate.py
from typing import Any
import pykinezumiko


class BackstageOnly(pykinezumiko.Plugin):
    """拦截事件,只允许来自管理用群和无来源的事件抵达下面的插件。"""

    def on_event(self, context: int, sender: int, data: dict[str, Any]) -> bool:
        return context not in (0, pykinezumiko.conf.BACKSTAGE)
pykinezumiko/plugins/img4img.py
import re
import json

import requests
import pykinezumiko

apiKey = "1145141919810HENGHENGAAAAAAAAAAAAAAPIKEY"

def search(imageURL: str, num: int = 1) -> str:
    print("以图搜图", imageURL)
    url = "https://saucenao.com/search.php"

    params = {
        "url": imageURL,
        "db": 999,
        "api_key": apiKey,
        "output_type": 2,
        "numres": num
    }

    r = requests.get(url=url, params=params)
    print("响应", r.text)
    min_ = json.loads(r.text).get("header").get("minimum_similarity")
    res = json.loads(r.text).get("results")
    cnt = 1
    ret = ""
    for j in res:
        if float(j.get("header").get("similarity")) >= min_:
            # 用来防封号的表情符号
            symbol="\x9dface\0id=60\x9c"
            # 对结果中的 ext_urls 插入表情
            if "ext_urls" in j["data"]:
                for index, _ in enumerate(j["data"]["ext_urls"]):
                    j["data"]["ext_urls"][index] = j["data"]["ext_urls"][index].replace(
                        ".", symbol+".")
                    j["data"]["ext_urls"][index] = j["data"]["ext_urls"][index].replace(
                        "://", ":"+symbol+"//")
            # 对结果中的 source 插入表情
            if "source" in j["data"]:
                j["data"]["source"] = j["data"]["source"].replace(
                        ".", symbol+".")
                j["data"]["source"] = j["data"]["source"].replace(
                        "://", ":"+symbol+"//")

            ret += "第"+str(cnt)+"项匹配"+": 相似度"+j.get("header").get("similarity")+"%\n"
            ret += json.dumps(j.get("data"), indent=1)+"\n"
            cnt += 1
    print(f"返回值 {ret!r}")
    return ret

class SauceNAO(pykinezumiko.Plugin):
    """以图搜图。

    其实只是调用API的产物。
    既然img2img非常火,那么就叫img4img吧,取search for之for之意。
    """

    def on_command_img(self, x: str = ""):
        for i in range(2):
            # 如果用户直接发送了一个图片URL,如.img https://……
            if re.fullmatch(r'https?://\S+', x):
                return search(x)
            # 如果用户在.img后面跟了一个内联图片,即图文混排的消息
            # 或是询问后发送了单张图片
            elif match := re.search(r'\x9dimage\0url=(.*?)\0', x):
                return search(match.group(1))
            # 都没有,且是第一次进入这里(通过.img进入本函数)则询问
            elif not i:
                x = yield "将查找接下来的一张图片。"
            # 第二次(已经询问过了)就寄掉,玩我呢
            else:
                return "没有收到图片。"
pykinezumiko/plugins/jrrp.py
import time
import random

import pykinezumiko

class 今日人品(pykinezumiko.Plugin):
    def on_command_jrrp(self, sender: int):
        r = random.Random((int(time.time()) + 3600 * 8) // 86400 + sender)
        return f"今日のあんたん運勢は{r.randrange(101)}点や。"
pykinezumiko/plugins/touchfish.py
import time
import requests

import pykinezumiko


def stamp2day(stamp: float, time_zone: int) -> int:
    return int(stamp // 3600 + time_zone) // 24


def stamp2hour(stamp: float, time_zone: int) -> int:
    return int(stamp // 3600 + time_zone) % 24


class TouchFish(pykinezumiko.Plugin):
    """定时发摸鱼人日历"""

    def __init__(self) -> None:
        super().__init__()
        # 存储路径
        self.path = "logs/20touchfish.txt"

    def on_command_touch_fish(self, sender: int) -> None:
        r = requests.get("https://api.vvhan.com/api/moyu?type=json", timeout=3)
        # 请求摸鱼人日历API
        self.send(sender, f"\x9dimage\0file={r.json()['url']}\x9c")

    def on_interval(self):
        stp = time.time()
        try:
            with open(self.path, "r") as f:
                last = int(f.readline())
        except FileNotFoundError:
            last = 0

        # 每天上午七点定时在管理群发摸鱼人日历
        if stamp2day(stp, 8) != last and stamp2hour(stp, 8) > 7:
            # 标记当前日期
            with open(self.path, "w") as f:
                print(stamp2day(stp, 8), file=f)
            self.on_command_touch_fish(pykinezumiko.conf.BACKSTAGE)
pykinezumiko/typesetting.py
import base64
import io
import math
import re
import pkgutil
from functools import cache
from itertools import pairwise
from typing import NamedTuple

from PIL import Image, ImageDraw, ImageFont

from . import conf

# 虽然函数名叫truetype,但是下层调用的FreeType其实支持许多字体格式。
# 反倒是用适用于Windows的文泉驿点阵正黑渲染会有错位。
font = ImageFont.truetype(
    pkgutil.get_data(__name__, "resources/wenquanyi_10pt.pcf"), 13
)


class Glue(NamedTuple):
    """弹性长度。

    不受迫就保持原长width,但若有需要,能拉伸到width + stretch,也能挤压到width - shrink。

    类名为“粘连”是因为TeX如此称呼。
    """

    width: float = 0.0
    stretch: float = 0.0
    shrink: float = 0.0

    def __neg__(self) -> "Glue":
        return Glue(-self[0], -self[1], -self[2])

    def __add__(self, other: "Glue") -> "Glue":
        return Glue(self[0] + other[0], self[1] + other[1], self[2] + other[2])

    def __sub__(self, other: "Glue") -> "Glue":
        return self + -other

    @property
    def stretched(self) -> float:
        return self.width + self.stretch

    @property
    def shrunk(self) -> float:
        return self.width - self.shrink

    def ratio(self, to: float) -> float:
        if to > self.width:
            try:
                return (to - self.width) / self.stretch
            except ZeroDivisionError:
                return math.inf
        elif to < self.width:
            try:
                return (to - self.width) / self.shrink
            except ZeroDivisionError:
                return -math.inf
        else:
            return 0.0

    def set(self, ratio: float = 0.0) -> float:
        if ratio >= 0.0:
            return self.width + self.stretch * ratio
        else:
            return self.width + self.shrink * ratio

    def demerit(self, to: float) -> float:
        return min(10000.0, (0.01 + abs(self.ratio(to)) ** 3) ** 2)


@cache
def measure(text: str) -> Glue:
    """计算文字的宽度。如果有字符串包含空格,会带有伸长量和压缩量。"""
    width = font.getlength(text)
    space = sum(font.getlength(match.group()) for match in re.finditer(r"\s+", text))
    stretch = space * 0.6 + text.count("\u200b") * 2.5
    return Glue(width, stretch, space * 0.2)


def is_breakable(ch: str) -> bool:
    """非常笼统的任意可断字符判断。

    在这些字符前后,只要符合行首行末禁则,即使没有空格也允许断行。
    主要包含汉字、假名、注音符号、全角符号。
    大概能一直用到Unicode 19.1版吧。
    """
    i = ord(ch)
    return (
        0x2E80 <= i < 0xA000
        or 0xF900 <= i < 0xFB00
        or 0xFF00 <= i < 0xFFF0
        or 0x20000 <= i < 0x40000
    )


def break_text(text: str, line_width: float) -> list[list[tuple[float, str]]]:
    """使文字两端对齐。

    使用简化的TeX断行算法(Knuth-Plass 1981附录A)。没有实现自动断字。

    :returns: 行构成的列表,每行由单词构成,单词用(横坐标, 字符串)表示。
    """
    if line_width <= 0.0:
        raise ValueError("line_width必须为正")
    # 在汉字前后可断行处添加零宽度空格。
    text = "".join(
        a + "\u200b"
        if (is_breakable(a) or is_breakable(b))
        and a not in " \t\n\r\f\v$([{£¥‘“〈《「『【〔〖〝﹙﹛﹝$([{「£¥"
        and b
        not in " \t\n\r\f\v!%),.:;?]}¢°’”…‰′″›℃∶、。々〃〉》」』】〕〗〞︶︺︾﹀﹄﹚﹜﹞ぁぃぅぇぉっゃゅょゎ゛゜ゝゞァィゥェォッャュョヮヵヶ・ーヽヾ!%),.:;?]}~。」、・ァィゥェォャュョッー゙゚¢"
        else a
        for a, b in pairwise(text + "?")
    )
    # 切割字符串为项目列表。
    # 项目有不可中断的单词、可断行且断行后消失的空格、段落结束。
    # 段落结束"\n"也是可断行。以其为行末,则该行成本为零。
    whitespace = r"[ \t\u2000-\u200b]"
    items: list[str] = list(filter(None, re.split(rf"(\n|{whitespace}+)", text)))
    if items and items[-1] != "\n":
        items.append("\n")
    # 优先尝试在isspace的位置断行。当在这些位置断行时,对应项目将消失。
    isspace = [item == "\n" or bool(re.match(whitespace, item)) for item in items]
    for i in range(len(items)):
        # 保留段首空格。
        if (i == 0 or items[i - 1] == "\n") and items[i] != "\n":
            isspace[i] = False
    # cumsum[i] = 前i个项目的弹性宽度和。
    cumsum = [Glue()]
    for item in items:
        i = measure(item)
        if i.shrunk > line_width:
            i = Glue(line_width - 0.5, i.stretch, i.shrink)
        cumsum.append(cumsum[-1] + i)
    # print(*zip(items, isspace, cumsum), cumsum[-1], sep="\n")
    # dp[i] = 在第i个项目处断行的(成本最小值, 达到最小值时上一个断行处的项目索引)。
    # 通常是在空格处断行。紧急情况下(例如单词超出行宽),也会在单词前断行。
    dp: list[tuple[float, int]] = [(math.inf, -1)] * len(items)
    # 首个断行点就是第一个单词。
    dp[0] = (0.0, -1)
    i = 0
    for k in range(1, len(items)):
        if isspace[k]:
            while (cumsum[k] - cumsum[i + isspace[i]]).shrunk > line_width:
                i += 1
            dp[k] = min(
                (
                    dp[j][0]
                    + (
                        0.0
                        if items[k] == "\n"
                        else (cumsum[k] - cumsum[j + isspace[j]]).demerit(line_width)
                    ),
                    j,
                )
                for j in range(i, k)
                if math.isfinite(dp[j][0])
            )
            # 强制在段落结束处断行。
            if items[k] == "\n":
                i = k
    # 追踪断行项目索引。
    breaks: list[int] = []
    i = len(items) - 1
    while i >= 0:
        breaks.append(i)
        i = dp[i][1]
    breaks.reverse()
    # 整理用于绘制的(坐标, 单词)列表。
    lines: list[list[tuple[float, str]]] = []
    for i, k in pairwise(breaks):
        i += isspace[i]
        ratio = 0.0 if items[k] == "\n" else (cumsum[k] - cumsum[i]).ratio(line_width)
        if math.isinf(ratio):
            ratio = 0.0
        lines.append(
            [
                ((cumsum[j] - cumsum[i]).set(ratio), items[j])
                for j in range(i, k)
                if not isspace[j]
            ]
        )
    return lines


def text_bitmap(
    text="string\nlorem ipsum 114514\n1919810\n共计处理了489975条消息",
    font=font,
    width=274,
    line_height=28,
    margin=8,
    border=3,
    padding_inline=12,
    padding_block=4,
    scale=2,
    dash_on=4,
    dash_off=4,
):
    lines = break_text(text, width)
    height = line_height * len(lines) - 1
    img = Image.new(
        "RGB",
        (
            width + (margin + border + padding_inline) * 2,
            height + (margin + border + padding_block) * 2,
        ),
        conf.THEME[3],
    )
    draw = ImageDraw.Draw(img)
    draw.rectangle(
        ((0, 0), img.size),
        outline=conf.THEME[2],
        width=margin,
    )
    draw.rectangle(
        ((margin, margin), (img.width - margin, img.height - margin)),
        outline=conf.THEME[1],
        width=border,
    )
    for y in range(line_height - 1, height, line_height):
        y += margin + border + padding_block
        for x in range(0, width, dash_on + dash_off):
            x += margin + border + padding_inline
            img.paste(conf.THEME[2], (x, y, x + dash_on, y + 1))
    for y, line in enumerate(lines):
        y *= line_height
        y += margin + border + padding_block + font.size // 2
        for x, item in line:
            x += margin + border + padding_inline
            draw.text((x, y), item, fill=conf.THEME[0], font=font)
    return img.resize((img.width * scale, img.height * scale), resample=Image.BOX)


def pil_image_to_base64(img: Image.Image) -> str:
    with io.BytesIO() as f:
        img.save(f, format="PNG")
        return base64.b64encode(f.getvalue()).decode()
pykinezumiko/xlsx.py
"""Microsoft Excel 2007 XLSX文件读写库。

基于pydpiper开发的pylightxl改写。
pylightxl是轻量级、零依赖的Excel电子表格数据、公式、批注读写库,支持Python 2.7.18+。
该库的代码仅一个文件,有完整的类型标注,可惜不知为何没能进到awesome-python列表中。
https://github.com/PydPiper/pylightxl
https://pylightxl.readthedocs.io/

不支持读写公式、批注、主题。
不压缩写出的工作簿。在今日硬件上,不压缩的性能往往更好。
这还有助于全体打包时使用更高压缩率的算法,而非受限于ZIP通行的DEFLATE。

【动机】
openpyxl即使在只读与只写模式下也慢得很。
pylightxl在创建xl/sharedStrings.xml时采用了线性查找而非散列表。
不知性能问题是否与库默认的ZIP压缩选项有关。
因为需要一种快速写入的方法,自己编写了库。
"""

import datetime
import html
import math
import os
import re
import xml.etree.ElementTree as ET
import zipfile
from collections import defaultdict
from collections.abc import Iterable, Iterator, Mapping
from functools import reduce
from itertools import groupby
from typing import IO, Any, Callable, Literal, Optional, Union

CellPrimitive = Union[None, bool, int, float, str]
"""单元格值的类型。

无法区分整数和浮点数,NaN和无穷也无法准确存储。
"""

CellValue = Union[CellPrimitive, datetime.datetime, bytes]
"""通过单元格数值格式,额外支持的单元格值类型。"""

Color = Union[tuple[int, int, int], tuple[int, int, int, int], str]
"""RGB元组,RGBA元组,或#RRGGBB、#RRGGBBAA格式的字符串。"""


def color_to_hex(color: Color) -> str:
    """转换Color类型数据到十六进制ARGB字符串。"""
    if isinstance(color, tuple):
        if len(color) == 3:
            return "ff%02x%02x%02x" % color
        else:
            return "%02x%02x%02x%02x" % (color[3:] + color[:3])
    else:
        color = color.removeprefix("#")
        if len(color) == 6:
            return "ff" + color
        else:
            return color[6:] + color[:6]


CellBorderStyle = Literal[
    "none",
    "thin",
    "medium",
    "dashed",
    "dotted",
    "thick",
    "double",
    "hair",
    "mediumDashed",
    "dashDot",
    "mediumDashDot",
    "dashDotDot",
    "mediumDashDotDot",
    "slantDashDot",
]


class CellStyle:
    """单元格格式。"""

    def __init__(self) -> None:
        self.reset()

    def reset(self) -> None:
        self.number_format: str = "General"

        self.font_name: str = "Courier New"
        self.font_size: float = 10.0
        self.bold: bool = False
        self.italic: bool = False
        self.underline: bool = False
        self.strikethrough: bool = False
        self.subscript: bool = False
        self.superscript: bool = False
        self.color: Color = (0, 0, 0)

        self.fill: Color = (255, 255, 255)

        self.border_style = "none"
        self.border_color = (0, 0, 0)
        self.border_diagonal_down: bool = False
        self.border_diagonal_up: bool = False

        self.width: float = 8
        self.height: float = 16
        """列宽和行高。
        
        并不是单元格的格式,而是整行和整列的格式,在单个单元格上设置宽度和高度无效。
        但是因为styler函数中第−1列表示整行,第−1行表示整列,将尺寸信息写在这里非常方便。
        """

    # 我去,匿名装饰器!
    # 该装饰器创建一个只可写入的属性。
    @lambda f: property(fset=f)
    def border_style(self, style: CellBorderStyle):
        self.border_top_style: CellBorderStyle = style
        self.border_right_style: CellBorderStyle = style
        self.border_bottom_style: CellBorderStyle = style
        self.border_left_style: CellBorderStyle = style
        self.border_diagonal_style: CellBorderStyle = style

    @lambda f: property(fset=f)
    def border_color(self, color: Color):
        self.border_top_color: Color = color
        self.border_right_color: Color = color
        self.border_bottom_color: Color = color
        self.border_left_color: Color = color
        self.border_diagonal_color: Color = color

    def font_spec(self) -> str:
        return (
            f'<font><name val="{html.escape(self.font_name)}"/><sz val="{self.font_size}"/>'
            + ("<b/>" if self.bold else "")
            + ("<i/>" if self.italic else "")
            + ("<u/>" if self.underline else "")
            + ("<strike/>" if self.strikethrough else "")
            + (
                '<vertAlign val="subscript"/>'
                if self.subscript
                else '<vertAlign val="superscript"/>'
                if self.superscript
                else ""
            )
            + f'<color rgb="{color_to_hex(self.color)}"/></font>'
        )

    def fill_spec(self) -> str:
        return f'<fill><patternFill patternType="solid"><fgColor rgb="{color_to_hex(self.fill)}"/></patternFill></fill>'

    def border_spec(self) -> str:
        return (
            "<border"
            + (' diagonalUp="1"' if self.border_diagonal_up else "")
            + (' diagonalDown="1"' if self.border_diagonal_down else "")
            + f""">
<left style="{self.border_left_style}">
<color rgb="{color_to_hex(self.border_left_color)}"/></left>
<right style="{self.border_right_style}">
<color rgb="{color_to_hex(self.border_right_color)}"/></right>
<top style="{self.border_top_style}">
<color rgb="{color_to_hex(self.border_top_color)}"/></top>
<bottom style="{self.border_bottom_style}">
<color rgb="{color_to_hex(self.border_bottom_color)}"/></bottom>
<diagonal style="{self.border_diagonal_style}">
<color rgb="{color_to_hex(self.border_diagonal_color)}"/></diagonal>
</border>"""
        )


NUMBER_FORMATS = {
    0: "General",
    1: "0",
    2: "0.00",
    3: "#,##0",
    4: "#,##0.00",
    5: '"$"#,##0_);("$"#,##0)',
    6: '"$"#,##0_);[Red]("$"#,##0)',
    7: '"$"#,##0.00_);("$"#,##0.00)',
    8: '"$"#,##0.00_);[Red]("$"#,##0.00)',
    9: "0%",
    10: "0.00%",
    11: "0.00E+00",
    12: "# ?/?",
    13: "# ??/??",
    14: "mm-dd-yy",
    15: "d-mmm-yy",
    16: "d-mmm",
    17: "mmm-yy",
    18: "h:mm AM/PM",
    19: "h:mm:ss AM/PM",
    20: "h:mm",
    21: "h:mm:ss",
    22: "m/d/yy h:mm",
    # 27..36和50..58在不同语言中有不同的定义,甚至类型都不一样。
    # 只使用用户级的格式代码并不能做到自适应。
    27: '[$-404]e/m/d;yyyy"年"m"月";[$-411]ge.m.d;yyyy"年" mm"月" dd"日"',
    28: '[$-404]e"年"m"月"d"日";m"月"d"日";[$-411]ggge"年"m"月"d"日";mm-dd',
    29: '[$-404]e"年"m"月"d"日";m"月"d"日";[$-411]ggge"年"m"月"d"日";mm-dd',
    30: "m/d/yy;m-d-yy;m/d/yy;mm-dd-yy",
    31: 'yyyy"年"m"月"d"日";yyyy"年"m"月"d"日";yyyy"年"m"月"d"日";yyyy"년" mm"월" dd"일"',
    32: 'hh"時"mm"分";h"时"mm"分";h"時"mm"分";h"시" mm"분"',
    33: 'hh"時"mm"分"ss"秒";h"时"mm"分"ss"秒";h"時"mm"分"ss"秒";h"시" mm"분" ss"초"',
    34: '上午/下午hh"時"mm"分";上午/下午h"时"mm"分";yyyy"年"m"月";yyyy-mm-dd',
    35: '上午/下午hh"時"mm"分"ss"秒";上午/下午h"时"mm"分"ss"秒";m"月"d"日";yyyy-mm-dd',
    36: '[$-404]e/m/d;yyyy"年"m"月";[$-411]ge.m.d;yyyy"年" mm"月" dd"日"',
    37: "#,##0_);(#,##0)",
    38: "#,##0_);[Red](#,##0)",
    39: "#,##0.00_);(#,##0.00)",
    40: "#,##0.00_);[Red](#,##0.00)",
    41: r'_(* #,##0_);_(* \(#,##0\);_(* "-"_);_(@_)',
    42: r'_("$"* #,##0_);_("$"* \(#,##0\);_("$"* "-"_);_(@_)',
    43: r'_(* #,##0.00_);_(* \(#,##0.00\);_(* "-"??_);_(@_)',
    44: r'_("$"* #,##0.00_)_("$"* \(#,##0.00\)_("$"* "-"??_)_(@_)',
    45: "mm:ss",
    46: "[h]:mm:ss",
    47: "mmss.0",
    48: "##0.0E+0",
    49: "@",
    50: '[$-404]e/m/d;yyyy"年"m"月";[$-411]ge.m.d;yyyy"年" mm"月" dd"日"',
    51: '[$-404]e"年"m"月"d"日";m"月"d"日";[$-411]ggge"年"m"月"d"日";mm-dd',
    52: '上午/下午hh"時"mm"分";yyyy"年"m"月";yyyy"年"m"月";yyyy-mm-dd',
    53: '上午/下午hh"時"mm"分"ss"秒";m"月"d"日";m"月"d"日";yyyy-mm-dd',
    54: '[$-404]e"年"m"月"d"日";m"月"d"日";[$-411]ggge"年"m"月"d"日";mm-dd',
    55: '上午/下午hh"時"mm"分";上午/下午h"时"mm"分";yyyy"年"m"月";yyyy-mm-dd',
    56: '上午/下午hh"時"mm"分"ss"秒";上午/下午h"时"mm"分"ss"秒";m"月"d"日";yyyy-mm-dd',
    57: '[$-404]e/m/d;yyyy"年"m"月";[$-411]ge.m.d;yyyy"年" mm"月" dd"日"',
    58: '[$-404]e"年"m"月"d"日";m"月"d"日";[$-411]ggge"年"m"月"d"日";mm-dd',
    59: "t0",
    60: "t0.00",
    61: "t#,##0",
    62: "t#,##0.00",
    67: "t0%",
    68: "t0.00%",
    69: "t# ?/?",
    70: "t# ??/??",
    71: "ว/ด/ปปปป",
    72: "ว-ดดด-ปป",
    73: "ว-ดดด",
    74: "ดดด-ปป",
    75: "ช:นน",
    76: "ช:นน:ทท",
    77: "ว/ด/ปปปป ช:นน",
    78: "นน:ทท",
    79: "[ช]:นน:ทท",
    80: "นน:ทท.0",
    81: "d/m/bb",
}

EPOCH = datetime.datetime(1899, 12, 30)
"""Excel元年。

Excel没有专门的日期/时间类型,而是用数字代替,就像Unix时间戳一样。
单元格内存储的数值表示从1900年1月0日起、包含1900年2月29日在内的天数(???)。
而且,作为深度本地化的受害者,是按计算机设置的时区计算的。

- 0.5 = 当地时间1900年1月0日12:00:00,Excel无法显示1899年及以前的日期
- π = 当地时间1900年1月3日03:23:53.605
- 7162+42314/86400 = 当地时间1919年8月10日11:45:14
- 25569 = 当地时间1970年1月1日00:00:00

然而,因为早期Macintosh电脑不支持1904年以前的日期,所以改成了1904年1月0日起的天数。
直到今天,仍然可以在工作簿选项中自选“使用1904日期系统”。大混乱。
为了不被迫感受小小的Excel震撼,建议不要在数据交换中使用Excel的日期与时间。

https://learn.microsoft.com/en-us/office/troubleshoot/excel/wrongly-assumes-1900-is-leap-year
https://learn.microsoft.com/en-us/office/troubleshoot/excel/1900-and-1904-date-system
"""


def column_letter_to_number(s: str) -> int:
    """转换字母列名到从0开始的列编号。

    - "A" → 0
    - "AAA" → 702
    """
    # 注意编号起始值与下题不同!
    # https://leetcode.com/problems/excel-sheet-column-number/
    if len(s) > 7 or not s.isascii() or not s.isupper():
        raise ValueError("错误的列名:应为一个或多个大写字母")
    return reduce(lambda s, c: s * 26 + ord(c) - 64, s, 0) - 1


def column_number_to_letter(n: int) -> str:
    """转换从0开始的列编号到字母列名。

    - 0 → "A"
    - 702 → "AAA"
    """
    # 注意编号起始值与下题不同!
    # https://leetcode.com/problems/excel-sheet-column-title/
    s = ""
    while n >= 0:
        s = chr(n % 26 + 65) + s
        n = n // 26 - 1
    return s


def parse_cell_reference(address: str) -> tuple[int, int]:
    """转换A1和R1C1这样对单个单元格的引用到从0开始的行列索引。

    即使在Excel选项中选择了R1C1格式,存储的文件中也仍然采用A1格式,单元格也好,公式也是。
    天哪,这居然是个正确的决定!那为什么各种名称要存储成本地化的字符串?
    """
    if match := re.fullmatch(r"([A-Z]+)([0-9]+)", address.upper()):
        return int(match.group(2)) - 1, column_letter_to_number(match.group(1))
    elif match := re.fullmatch(r"R(\d+)C(\d+)", address, re.ASCII | re.IGNORECASE):
        return int(match.group(1)) - 1, int(match.group(2)) - 1
    else:
        raise ValueError("错误的单元格引用格式:应类似A1或R1C1")


def pool(index_base: int = 0) -> defaultdict[Any, int]:
    """创建一个值池,即从值到加入顺序(从指定索引开始)的映射。

    值是新的时,产生新的索引,否则返回原有索引。用于共享字符串池、样式表索引的生成。

        x = pool(100)
        assert x["foo"] == 100
        assert x["bar"] == 101
        assert x["baz"] == 102
        assert x["bar"] == 101
        assert x["foobar"] == 103
    """
    x = defaultdict(lambda: len(x) + index_base)
    return x


def read(
    file: Union[str, os.PathLike[str], IO[bytes]]
) -> dict[str, defaultdict[tuple[int, int], CellValue]]:
    """读取指定的工作簿。

    返回对象可以以下列形式使用:

        workbook = read("input.xlsx")
        worksheet = workbook["Sheet1"]
        cell = worksheet[8, 4]  # 第8行第4列,下标从0开始
        print("Sheet1!E9的值是", cell)

    由Excel文件格式保证工作表字典键已排序。
    """
    with zipfile.ZipFile(file, "r") as z:
        # 先定义一个方便函数。
        def xq(filename: str, xpath: str) -> Iterator[ET.Element]:
            """从已打开的这个压缩包中解析XML文件、直达要害节点。"""
            with z.open(filename, "r") as f:
                return ET.parse(f).getroot().iterfind(xpath)

        # 从工作簿关系文件中提取从rId×××到sheet×××.xml的映射。
        workbook_rels = {
            # Microsoft Excel写出的是相对路径,但openpyxl会写出相对于压缩包根的绝对路径……
            # ZipFile需要的是相对于压缩包根的相对路径。
            el.get("Id", ""): "xl/" + el.get("Target", "").removeprefix("/xl/")
            for el in xq("xl/_rels/workbook.xml.rels", "./{*}Relationship")
        }

        # 从工作簿清单中按顺序枚举工作表,根据记录的关系,产生从工作表名到工作表XML文件路径的映射。
        sheets = {
            el.get("name", ""): workbook_rels[
                el.get(
                    # openpyxl有时候不输出"r"命名空间……
                    "{http://schemas.openxmlformats.org/officeDocument/2006/relationships}id",
                    el.get("id", ""),
                )
            ]
            for el in xq("xl/workbook.xml", "./{*}sheets/{*}sheet")
        }

        # 取出共享字符串池为字符串列表。共享字符串池的下标从0开始。
        if "xl/sharedStrings.xml" in z.NameToInfo:
            shared_strings = [
                "".join(t.text or "" for t in el.iterfind(".//{*}t"))
                for el in xq("xl/sharedStrings.xml", "./{*}si")
            ]
        else:
            shared_strings = []

        # 取出工作簿的样式表。
        if "xl/styles.xml" in z.NameToInfo:
            number_formats = NUMBER_FORMATS | {
                int(el.get("numFmtId", "-1")): el.get("formatCode", "")
                for el in xq("xl/styles.xml", "./{*}numFmts/{*}numFmt")
            }
            style_number_formats = [
                number_formats.get(int(el.get("numFmtId", "")), "General")
                for el in xq("xl/styles.xml", "./{*}cellXfs/{*}xf")
                if el.get("numFmtId")
            ] or ["General"]
        else:
            style_number_formats = ["General"]

        # 读取工作表数据。
        workbook: dict[str, defaultdict[tuple[int, int], CellValue]] = {}
        for sheet_name, filename in sheets.items():
            workbook[sheet_name] = defaultdict(
                str,
                (
                    (
                        parse_cell_reference(el.get("r", "")),
                        _primitive_to_value(el, shared_strings, style_number_formats),
                    )
                    for el in xq(filename, "./{*}sheetData/{*}row/{*}c")
                ),
            )
        return workbook


def write(
    file: Union[str, os.PathLike[str], IO[bytes]],
    data: Mapping[str, Iterable[tuple[tuple[int, int], CellValue]]],
    styler: Callable[[CellStyle, str, int, int, CellValue], object] = lambda *_: None,
) -> None:
    """向指定的文件中写出Excel 2007工作簿。

    数据由从工作表名到内容的映射给出,内容类似numpy.ndenumerate产生的迭代器,只要能被下列代码输出即可。

        for sheet_name in data:
            print("【工作表", sheet_name, "】")
            for (i, j), cell in data[sheet_name]:
                print("第", i, "行第", j, "列的数据是", cell)

    因此,根据使用需求不同,数据可以以各种结构存放,交给本函数的用户决定。

    如果数据是二维列表,那么像下面这样调用。

        sheet = [["A1", "B1"], ["A2", "B2"]]
        xlsx.write("output.xlsx", {
            "Sheet1": (
                ((i, j), cell)
                for i, row in enumerate(sheet)
                for j, cell in enumerate(row)
            ),
        })

    如果数据是二维字典,那么像下面这样调用。
    Excel要求单元格必须按顺序写入,否则认为文件损坏。
    如果能确保字典键按顺序排列,则可删去sorted。

        sheet = {0: {0: "A1", 1: "B1"}, 1: {0: "A2", 1: "B2"}}
        xlsx.write("output.xlsx", {
            "Sheet1": (
                ((i, j), cell)
                for i, row in sorted(sheet.items())
                for j, cell in sorted(row.items())
            ),
        })

    如果数据是复合键字典,那么像下面这样调用。
    同样,如果能确保字典键按顺序排列,则可删去sorted。

        sheet = {(0, 0): "A1", (0, 1): "B1", (1, 0): "A2", (1, 1): "B2"}
        xlsx.write("output.xlsx", {"Sheet1": sorted(sheet.items())})

    如果数据是NumPy数组,那么像下面这样调用。

        sheet = np.array([["A1", "B1"], ["A2", "B2"]])
        xlsx.write("output.xlsx", {"Sheet1": np.ndenumerate(sheet)})

    通过styler来程序化地指定单元格的样式。传入的函数如下述。

        def styler(style: CellStyle, sheet_name: str, row: int, column: int, value: CellValue):
            # 示例:设置B列为粗体、深蓝色字、浅蓝色背景。
            if column == 1:
                style.bold = True
                style.color = (0x12, 0x34, 0x56)
                style.fill = (0xab, 0xcd, 0xef)

    因为并不知道styler会在哪些单元格设置格式,实际只能指定指定了内容的单元格的样式。
    当然,可以通过指定单元格内容为空字符串来提示需要在对应单元格上执行styler。
    sheet_name还会传入空字符串,row、column参数还会传入−1。这时需要返回工作簿、行、列等的默认样式。
    """
    # 接下来将会多次出现的r:id="rId×××"并不是只有这一种固定格式。
    # OOXML是通过像Java那样狂写XML配置来表明文件之间关联的。
    # 因此,只要引用标识符一致性正确,理论上文件名随便是什么都没问题。
    # 然而,第三方软件完全不理解这一点,直接使用文件名和关系ID的索引来分析文件的库不在少数——本模块也是。
    # 为了尽可能兼容,还是按照Office的所作所为来做比较好。

    # https://insutanto.net/tag/Excel
    # https://zhuanlan.zhihu.com/p/386085542

    with zipfile.ZipFile(file, "w") as zf:
        zf.writestr(
            "[Content_Types].xml",
            """<?xml version="1.0" encoding="utf-8" standalone="yes"?>
<Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types">
    <Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/>
    <Default Extension="xml" ContentType="application/xml"/>
    <Override PartName="/xl/workbook.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet.main+xml"/>
    %s
    <Override PartName="/xl/sharedStrings.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.sharedStrings+xml"/>
    <Override PartName="/xl/styles.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.styles+xml" />
</Types>"""
            % "".join(
                f'<Override PartName="/xl/worksheets/sheet{i}.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.worksheet+xml"/>'
                for i in range(1, len(data) + 1)
            ),
        )

        zf.writestr(
            "_rels/.rels",
            """<?xml version="1.0" encoding="utf-8" standalone="yes"?>
<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">
    <Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/officeDocument" Target="xl/workbook.xml"/>
</Relationships>""",
        )

        zf.writestr(
            "xl/workbook.xml",
            """<?xml version="1.0" encoding="utf-8" standalone="yes"?>
<workbook xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main" xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships">
    <workbookPr/>
    <sheets>
        %s
    </sheets>
    <calcPr calcId="114514"/>
</workbook>"""
            % "".join(
                f'<sheet name="{sheet_name}" sheetId="{i}" r:id="rId{i}"/>'
                for i, sheet_name in enumerate(data, 1)
            ),
        )

        # 即使是只用到一次的字符串也会存在共享字符串池中,未见有文件用单元格类型t="inlineStr"。
        shared_strings: defaultdict[str, int] = pool()

        cell_style = CellStyle()
        number_formats: defaultdict[str, int] = pool(176)  # 小索引都被Excel自带的数值格式占掉了
        number_formats |= {v: k for k, v in NUMBER_FORMATS.items()}
        fonts: defaultdict[str, int] = pool()
        fills: defaultdict[str, int] = pool(2)  # 似乎0号和1号填充被占用了,必须填充垃圾样式
        borders: defaultdict[str, int] = pool(1)  # 这个大概也有问题,保险起见填个垃圾再说
        cell_xfs: defaultdict[tuple[int, int, int, int], int] = pool()

        def style(sheet_name: str, i: int, j: int, value: CellValue) -> int:
            cell_style.reset()
            cell_style.number_format = (
                _value_to_cell(value, shared_strings)[0] or cell_style.number_format
            )
            styler(cell_style, sheet_name, i, j, value)
            return cell_xfs[
                number_formats[cell_style.number_format],
                fonts[cell_style.font_spec()],
                fills[cell_style.fill_spec()],
                borders[cell_style.border_spec()],
            ]

        # 将默认样式作为初始项目填入cell_xfs。
        style("", -1, -1, None)

        for sheet_id, (sheet_name, sheet) in enumerate(data.items(), 1):
            default_style = style(sheet_name, -1, -1, None)
            default_width = cell_style.width
            xml_head = f"""<?xml version="1.0" encoding="utf-8" standalone="yes"?>
<worksheet xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main" xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships">
<sheetFormatPr customHeight="1" defaultRowHeight="{cell_style.height}" defaultColWidth="{default_width}"/>
<cols>"""
            xml_body = "</cols><sheetData>"
            columns = {16384: ""}
            old_i = -1
            for i, row in groupby(sheet, lambda x: x[0][0]):
                if i <= old_i:
                    raise ValueError("单元格行号应已排序")
                if i >= 1048576:
                    raise ValueError("超出范围的单元格")
                xml_body += f'<row r="{i + 1}" s="{style(sheet_name, i, -1, None)}" customFormat="1" ht="{cell_style.height}" customHeight="1">'
                old_j = -1
                for (_, j), cell in row:
                    if j <= old_j:
                        raise ValueError("一行中的单元格应按列号排序")
                    if j >= 16384:
                        raise ValueError("超出范围的单元格")
                    old_j = j
                    if j not in columns:
                        columns[
                            j
                        ] = f'<col min="{j + 1}" max="{j + 1}" style="{style(sheet_name, -1, j, None)}" width="{cell_style.width}" customWidth="1"/>'
                    xml_body += f'<c r="{column_number_to_letter(j)}{i + 1}" s="{style(sheet_name, i, j, cell)}" {_value_to_cell(cell, shared_strings)[1]}</c>'
                xml_body += "</row>"
            old_j = -1
            for j in sorted(columns):
                if j != old_j + 1:
                    xml_head += f'<col min="{old_j + 2}" max="{j}" style="{default_style}" width="{default_width}" customWidth="1"/>'
                xml_head += columns[j]
            zf.writestr(
                f"xl/worksheets/sheet{sheet_id}.xml",
                xml_head + xml_body + "</sheetData></worksheet>",
            )

        # rId1..rId(N) = 工作表。
        # rId(N+1) = 共享字符串池。
        # rId(N+2) = 样式表。
        # sheets first for rId# then theme > styles > sharedStrings
        zf.writestr(
            "xl/_rels/workbook.xml.rels",
            """<?xml version="1.0" encoding="utf-8" standalone="yes"?>
<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">
    %s
    <Relationship Target="sharedStrings.xml" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/sharedStrings" Id="rId%d"/>
    <Relationship Target="styles.xml" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/styles" Id="rId%d"/>
</Relationships>"""
            % (
                "".join(
                    f'<Relationship Target="worksheets/sheet{i}.xml" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/worksheet" Id="rId{i}"/>'
                    for i in range(1, len(data) + 1)
                ),
                len(data) + 1,
                len(data) + 2,
            ),
        )

        # 最后写入共享字符串池和样式表,因为在写入其他组件时会更新这些表。
        zf.writestr(
            "xl/sharedStrings.xml",
            # 如果不设置xml:space="preserve"的话,字符串中的前导和尾随空格会被XML解析器吞掉。
            # 设置了就一定有用吗?
            """<?xml version="1.0" encoding="utf-8" standalone="yes"?>
<sst uniqueCount="%d" count="%d" xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main" xml:space="preserve">%s</sst>"""
            % (
                len(shared_strings),
                len(shared_strings),
                "".join(
                    f"<si><t>{html.escape(val)}</t></si>" for val in shared_strings
                ),
            ),
        )

        zf.writestr(
            "xl/styles.xml",
            """<?xml version="1.0" encoding="utf-8" standalone="yes"?>
<styleSheet xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main">
    <numFmts>%s</numFmts>
    <fonts>%s</fonts>
    <fills>
        <fill><patternFill patternType="none"/></fill>
        <fill><patternFill patternType="gray125"/></fill>
        %s
    </fills>
    <borders>
        <border/>
        %s
    </borders>
    <cellStyleXfs>%s</cellStyleXfs>
    <cellXfs>%s</cellXfs>
    <cellStyles>
        <cellStyle name="a" xfId="0" builtinId="0" customBuiltin="1"/>
    </cellStyles>
</styleSheet>"""
            % (
                "".join(
                    f'<numFmt numFmtId="{i}" formatCode="{html.escape(number_format)}"/>'
                    for number_format, i in number_formats.items()
                    if i >= 128
                ),
                "".join(fonts),
                "".join(fills),
                "".join(borders),
                # cellStyleXfs中的第0项是所有工作表单元格的默认样式。
                "".join(
                    f'<xf numFmtId="{number_format}" fontId="{font}" fillId="{fill}" borderId="{border}"/>'
                    for number_format, font, fill, border in cell_xfs
                ),
                "".join(
                    f'<xf xfId="{i}" numFmtId="{number_format}" fontId="{font}" fillId="{fill}" borderId="{border}"/>'
                    for i, (number_format, font, fill, border) in enumerate(cell_xfs)
                ),
            ),
        )


def _value_to_cell(
    x: CellValue, shared_strings: Mapping[str, int]
) -> tuple[Optional[str], str]:
    """转换Python数据到单元格数值格式和SpreadsheetML <c>节点的属性和内容。

    对单元格格式没有特殊要求时返回None。返回值的XML片段应该嵌入在"<c "和"</c>"之间。
    """
    if x is None:
        # 空白单元格表示空字符串,所以必须另寻空值的表示。
        # #N/A表示空值是贴切的。只有数据科学家才用NaN表示缺损数据。
        # #NULL!是异常,类似计算min([])时发生的ValueError,不应采用。
        return None, 't="e"><v>#N/A</v>'
    elif isinstance(x, str):
        # 字符串会自动加入到共享字符串池。
        return None, f't="s"><v>{shared_strings[x]}</v>'
    elif isinstance(x, bytes):
        return "\"bytes\"('@')", f't="s"><v>{shared_strings[x.hex(" ").upper()]}</v>'
    elif isinstance(x, bool):
        # 布尔值,用0和1表示。
        return None, f't="b"><v>{x:d}</v>'
    elif isinstance(x, float) and math.isnan(x):
        # 借用#NUM!表示NaN。
        return None, f't="e"><v>#NUM!</v>'
    elif isinstance(x, float) and math.isinf(x):
        # 借用#DIV/0!表示无穷大。
        # 实际上Excel中=0/0会被计算为#DIV/0!(应为NaN),而=114^514会被计算为#NUM!(应为+∞)。
        # 不要在意这些细节。
        return None, f't="e"><v>#DIV/0!</v>'
    elif isinstance(x, datetime.datetime):
        return "yyyy-mm-dd hh:mm:ss", f"><v>{(x - EPOCH).total_seconds() / 86400}</v>"
    elif False:
        # 写入公式的话,要用<f>节点。<v>也能出现,用来缓存上回计算结果。
        # 能坚持不重算的程度还和工作簿的calcId有关。
        # 不过并没有加入公式支持的打算。
        return None, f"><f>{html.escape(...)}</f>"
    else:
        # 常规数值,类型省略。
        return None, f"><v>{x}</v>"
    # 顺便介绍一下剩下的Excel异常。
    # #NAME?对应NameError。
    # #REF!用C语言的话来说就是use after free。Python也有意思很接近的ReferenceError。
    # 带有垃圾回收机制的Python对象为什么也会在释放后又被使用?原因是弱引用,参照weakref标准库模块。
    # #VALUE!对应TypeError,不是ValueError。
    # #GETTING_DATA大概是正在从外部数据源获取数据时的占位值。
    # 介绍Excel的文章一般会用公式=NA()来人为制造一个空值。
    # 其实直接在单元格中输入“#N/A”就能创建空值。而且,其他类型的错误值也都能用直接输入的方式创造出来。
    # 这些值还能作为字面量在公式中导致报错,例如=IF(A1>0,A1-1,#NUM!)。Excel,很神奇吧?


def _cell_to_primitive(el: ET.Element, shared_strings: list[str]) -> CellPrimitive:
    """转换<c>元素到Python数据。"""
    t = el.get("t")
    value = el.find("./{*}v")
    value = value.text or "" if value is not None else ""
    formula = el.find("./{*}f")
    formula = formula.text or "" if formula is not None else ""

    if t == "s":
        return shared_strings[int(value)]
    elif t == "b":
        return value != "0"
    elif t == "str" or not value:
        # str类型表示公式计算结果是字符串类型,值不经过共享字符串池。
        # 空白单元格以空字符串表示。
        # 有时会有只有样式(s属性)的单元格,也按此处理。
        pass
    elif t == "e":
        if value == "#N/A":
            return None
        else:
            return math.nan
    else:
        return float(value)


def _primitive_to_value(
    el: ET.Element, shared_strings: list[str], style_number_formats: list[str]
) -> CellValue:
    """从单元格数值格式解析原始Python数据到复杂数据。"""
    value = _cell_to_primitive(el, shared_strings)
    number_format = style_number_formats[int(el.get("s", "0"))]
    if number_format.startswith('"') and '"(' in number_format:
        f = number_format.removeprefix('"').partition('"(')[0]
        if f == "bytes" and isinstance(value, str):
            return bytes.fromhex(value)
    # 删除段开头的方括号表达式,这可能包括货币和语言选项、特殊数字格式、颜色等。
    # 转义是简单替换:例如,"\\\"表示显示三个反斜杠,"\"\"表示显示一个反斜杠和一个引号。
    format_codes = re.sub(
        r'(^|(?<=;))(\[[^\[\]]+\])+|[\\_].|"[^"]*"|[-+$/():!^&\'~{}<>= ]+',
        "",
        number_format.casefold(),
    )
    while format_codes.endswith(";general"):
        format_codes = format_codes.removesuffix(";general")
    if format_codes == "general":
        format_codes = ""
    if (
        isinstance(value, float)
        and math.isfinite(value)
        and "." not in format_codes
        and ("0" in format_codes or "#" in format_codes)
    ):
        return int(value)
    if (
        isinstance(value, float)
        and value >= 0
        and re.search(r"[ymdhsgebวดปชนท]", format_codes, re.IGNORECASE)
    ):
        return EPOCH + datetime.timedelta(value)
    return value


def f(style: CellStyle, sheet_name: str, i, j, x):
    if sheet_name.endswith("0"):
        style.border_diagonal_up = True
        style.border_diagonal_style = "thick"
        style.border_diagonal_color = "#987654"
    style.fill = "#abcdef" if type(x) is str else "#114514"
    style.bold = i == 12
    if i == 12:
        style.height = 24
    if j == 6:
        style.width = 114
    style.border_bottom_color = "#e9e981"
    style.border_bottom_style = "thick"


if __name__ == "__main__":
    write(
        "output.xlsx",
        {
            "工作表114514": sorted(
                {
                    (12, 6): "妙的",
                    (12, 1): "不妙的",
                    (12, 7): 114.514,
                    (12, 4): math.inf,
                    (11, 2): "妙的",
                    (11, 3): "不妙的",
                    (11, 4): 114.514,
                    (11, 5): math.nan,
                    (13, 7): b"BYTES\0--in excel!",
                    (13, 8): datetime.datetime(1919, 8, 10, 11, 45, 14),
                }.items()
            ),
            "工作表1919810": (),
        },
        f,
    )
    from pprint import pprint
    from timeit import timeit

    pprint(read("output.xlsx"))