MENU

再造企业微信机器人

June 27, 2021 • 程序阅读设置

Prelude

为什么叫“再造”呢?有再造,那么就一定有第一个:GitHub: Bokjan/DiaoBot

DiaoBot是主体完工于2019年9月的企业微信机器人框架,完整地实现了当时企业微信机器人所有能力的封装。但其存在一些不论是当时还是现在都会令人感觉比较奇怪的一些缺陷:

  • 回调server基于mongoose的HTTP能力,整合较奇怪且其基于select()的复用,性能低(?真的需要性能吗)
  • 固定工作线程数,但工作线程中的封装IO操作却是同步阻塞的
  • 所有定时任务均新开处理线程,且定时任务分派线程原理粗暴(每一分钟唤醒,遍历所有注册项并启动对应项)
  • 基于信号的优雅退出过程慢(定时任务线程睡眠中,须唤醒才会退出)
  • 有着一个比较奇怪的二进制模型:主可执行程序(diaobotd)、公用SDK动态链接库(libdiaobot.so)以及不定数量的用户业务逻辑动态链接库(由diaobotd读取配置文件动态挂载)
  • 回调处理的抽象设计做得不太好
  • ……

我现在惊讶地发现有一个问题其实是有解的:你可以找到一台既能够接收内网回调、又能够请求公网API的机器。如果说当你想干点什么的时候,却发现这个必要的工具其实不好用,是不是就会想去先把工具改造一下了?所以,到了差不多两年之后的今天,GitHub: Bokjan/wcbot横空出世。

有的朋友可能会说了:啊,写工具就应该用Go或者脚本语言来,你看你C++搞的这玩意缺陷又多开发效率又低。这话当然很对,不过我并不是要写工具,只是好玩而已。玩具就是用来消磨时间的,不然还能叫玩具吗?

本文正文主要分为两部分,前面介绍设计和实现,后面介绍如何基于此库来开发,也算是一个备忘。

设计和实现

概览

wcbot是LibUV驱动的、多线程纯异步的、POSIX兼容的、用于企业微信机器人开发的静态链接库。其线程模型为1+N,即一个主线程处理进来的请求以及分配cron定时任务,N个工作线程处理实际业务逻辑。为了兼容tlinux 2.2那老掉牙的gcc 4.8.5,代码仅使用C++11特性。

企业微信机器人需要支持的特性其实很简单,就是消息的收发。收自然指的是回调消息;发呢,除了针对回调消息的回复以外,还可以主动发送,我们认为还需要设计一种定时任务。

  • 定时任务。用户将某个任务注册成定时任务,由框架自动根据时间触发。1分钟粒度。
  • 回调消息。用户注册一个回调消息任务,使用框架内的消息数据结构。框架将收到的HTTP加密消息转成消息数据结构,用户的回调消息任务最后将回复消息的内容正确设置即可(也可以不回复)。

那么简而言之,在业务逻辑之下的是框架逻辑,框架逻辑之下的是网络能力。网络能力有扩展性,但与框架有一定的耦合。

从源码结构上来看,组件分为以下几种:

  • Core。框架主体的实现,包括主线程、工作线程的主循环,以及它们所需要的数据结构等。还有对用户暴露的引擎接口。
  • Codec。编解码器,框架从uv收到TCP数据,给编解码器校验接入了系统的各种协议。目前只实现了入请求的HTTP/1.1协议。
  • Job。工作,一种基于状态机的异步逻辑抽象,是本框架和用户都要使用到的最重要的结构。后文详细介绍。特意没有使用协程。
  • Utility。各种实用工具。
  • WeCom。企业微信机器人的各类消息结构及其编解码方法。
  • ThirdParty。第三方库,目前只有微信官方的WXBizMsgCrypt消息加解密库,及其依赖的tinyxml2

下面对重要部分进行详细介绍。

Core

Core是框架的核心部分,几乎全是与具体实现无关的内容。其中包含了暴露给用户的Engine单例类。

Engine

class Engine final {
 public:
  static Engine& Get();
  EngineImpl& GetImpl() { return *PImpl; }
  int Run();
  bool ParseArguments(int argc, char* argv[]);
  const std::string& GetCustomConfigPath() const;
  void Stop();
  bool Initialize();

  void RegisterServerCodec(Codec* CodecPtr);
  void RegisterClientCodec(Codec* CodecPtr);

  void RegisterCallbackHandler(FN_CreateCallbackHandlerJob Function);
  void RegisterCronJob(const CronTrigger& Trigger, FN_CreateJob Function);

 private:
  Engine();
  ~Engine();
  Engine(const Engine&) = delete;
  Engine(const Engine&&) = delete;

  EngineImpl* PImpl;
};

以上便是引擎的全部内容,是用户需要理解的仅有的3个概念的第一个:引擎核心。用户实现自己的回调处理器和定时任务,再调用引擎来注册即可。同样地,在用户的可执行程序中,需要把命令行参数等交给引擎来初始化,并且最终调用它的Run()把整个程序循环跑起来。可以注意到引擎用到了PImpl,这是因为其中的细节暴露给用户并不是好事。

EngineImpl

引擎实现。可以理解为库的核心逻辑,以及主线程逻辑都在这里。它持有这些东西:

  • libuv相关:主循环、信号处理器、定时任务定时器。
  • 网络相关:入/出编解码器列表、入请求TCP连接ID和句柄映射。
  • 工作分配相关:工作线程列表、工作线程分配器、定时任务时间轮。
  • 其他:框架配置项、企业微信消息加密器等。

其中有一些很好理解的,比如框架配置项、消息加解密、定时任务、主循环、信号处理等。还有一些看起来不直观的概念,在后续介绍中会一一说明。

WorkerThread

工作线程。这里实现了工作线程的逻辑(以及所有出的网络请求由工作线程直接处理,主线程只处理入的请求),其中ThreadContext是每个工作线程都持有的上下文信息,uv相关的主循环、信号处理器、idle任务都很好理解,还有一些工作线程特有的成员:

  • ITC相关:ITC事件队列、ITC Async。
  • 时间队列:睡眠队列和超时队列。
  • cURL相关:curl_multi_handle及其定时器。

时间队列也还是比较好理解的,就是实现了业务逻辑的睡眠和外部请求超时的处理。而ITC是下文要介绍的一个重要概念。

ITC

ITC,即Inter-Thread Communication,线程间通信。我们通过前文介绍已经知道,所有的工作(Job)均跑在工作线程上,而负责分配定时任务,以及负责收网络请求的,都是主线程。需要一种机制将主线程和工作线程关联起来。基于libuv的uv_async能力,我们抽象出一种ItcEvent

class ItcEvent {
 public:
  virtual ~ItcEvent() = default;
  virtual void Process() = 0;
};

ItcEvent通过ItcQueue在线程间传递,可以看出,传递到对应线程后,它调用Process()虚函数即可。每个工作线程均持有:

ItcQueue MainToWorkerQueue;
ItcQueue WorkerToMainQueue;
uv_async_t MainToWorkerAsync;
uv_async_t WorkerToMainAsync;

也就是一对ItcQueue,一个读一个写。当写方往队列中写入了新的事件时,写方同时会通过uv_async_send来激活对应的uv_async实现对目标线程的通知。目标线程在下一个tick时,就会处理自己的读队列中的事件了。

我们以TCP收发为例来看一看:

void TcpMainToWorker::Process() {
  // dispatch TCP request
  worker_impl::DispatchTcp(this->Buffer);
  // free memory
  this->DeleteThis();
}

void TcpWorkerToMain::Process() {
  // do uv write
  main_impl::SendTcpToClient(Buffer, ConnId, CloseConnection);
  // free memory
  this->DeleteThis();
}

使用这种ITC方式,虽然有频繁创建销毁ItcEvent(可改对象池)、有锁队列(可改CAS无锁)、虚函数调用开销等问题,但不失为一种简单粗暴的解决方案。

TimeQueue

时间队列实现了业务逻辑的睡眠和外部请求超时的处理,也就是下面的两种。

两种队列的实现都一样,采用一个链表。拿到用户给定的超时时间后,加上现在的epoch,从后往前找到对应的超时位置。经历到某个大于其超时时间的tick时,Job被继续驱动起来。

SleepQueue

任何Job都可以睡眠,睡眠时间到达后继续驱动状态机运转。

DelayQueue

延时队列存放的是IOJob,在设定的超时时间到达后,还没有IO返回,则发生超时。

延时队列还是有一点不一样的:由于我们希望多个请求复用同一个TCP连接,系统会给每一个出请求分配一个ID。所以延时队列还维护着一个请求ID到链表节点的映射,便于未超时完成IO操作时找到对应的节点并将其摘除。

TimeWheel

由外部驱动的时间轮,不再像老版本一样需要遍历全部定时任务,减小了开销。根据定时任务的特性,分成了分钟、小时、周天、月天、月五个轮。

Codec

框架网络层会把收到的网络数据存在buffer内,并且交给编解码器识别。如果某解码器识别出来buffer内存在某类协议,且buffer长度已完整,则会给主线程返回实际长度。主线程把对应长度的内容切成独立的buffer,通过ITC方式交给工作线程创建Job、反序列化,并且执行业务逻辑。

Job

基本概念

Job(下称“任务”)是框架中非常重要的一个概念。它是工作线程中运行的一个状态机(很简单时可以不写成状态机的样子),可以并发调用子任务,可以睡眠,可以发起网络操作(IOJob的派生类型)。框架内的所有逻辑都是由任务来嵌套实现的,暴露给用户的业务逻辑接口也都要求用户通过任务(或派生类型的任务)的形式来注入自己的逻辑。

对于用户来说,普通的任务用5种方法和1个属性:

  • int ErrCode 代表任务执行的错误码。为0成功,为负数是框架错误,为整数是业务错误(用户定义)。
  • virtual void Do(Job *Trigger = nullptr) 驱动状态机继续工作。外部或内部想让状态机运动起来,就调用Do();如果是子任务完成了,驱动的Do会传入子任务的指针,供自己的父任务取出执行的结果
  • void Sleep(int Millisecond) 使任务睡眠一段时间,任务将在至少Millisecond毫秒后的第一个被tick唤醒,由工作线程主循环来调用Do()
  • void DeleteThis() 状态机到达最后一步,需要将自己给删除。
  • void InvokeChild(Job *Child, Job *DoArg = nullptr) 调用一个子任务。自己把子任务new出来后,一定要记得通过InvokeChild的方式来启动子任务,不要自己Child->Do(DoArg)InvokeChild会正确维护任务间的关系。
  • void NotifyParent() 子任务完成,想返回结果给父任务时,通过NotifyParent来进行,不要直接Parent->Do(this)NotifyParent会正确维护任务间的关系。

使用InvokeChildNotifyParent,究其原因,是Job本身保存了Parent和所有Child的指针;自行设置它们的值不如调用成员函数来得简便稳妥。

你也许还关注到了SafeParent()这个函数——当任务的父任务因为某些原因(比如其他子任务超时)而先行销毁时,通过Parent指针来访问父任务便会发生错误了。为了防止这种情况,任务对象析构时,会自动将所有子任务的Parent设置为nullptrSafeParent()的作用便是当探测到这类情况时,返回job_impl::GuardJobObject的指针给业务逻辑,让程序不会因为非法内存访问挂掉。job_impl::GuardJobObjectjob_impl::GuardJob的一个静态实例,它实现了Job指定的所有虚函数,但实现为空,可以随意调用。

TcpHandlerJob 和 HttpHandlerJob

TcpHandlerJob由框架产生,创建时会得到一个保存有效分包的TcpMemoryBuffer,用户逻辑可以基于此从中读出数据,并且通过SendData传一个MemoryBuffer进去,发送给客户端对应数据。

HttpHandlerJob继承自TcpHandlerJob,增加了一大堆企业微信机器人回调消息相关的内容。从主线程的DispatchTcp开始,会创建一个HttpHandlerJob,并处理进来的HTTP请求。

HttpClientJob

用户可使用该任务发起HTTP请求,将请求参数设置在Request中,返回时从Response读出结果。底层是使用的标准cURL调用,比较好地支持了各种复杂情况。

SilentPushJob

用户可使用该任务让机器人静默推送一条ServerMessage,也就是说该任务并不会notify父任务。注意到explicit构造函数的参数是const wecom::ServerMessage &Message,这意味着传入的Message可以安全地在栈上构造;也意味着如果以new的方式创建了对应的消息,SilentPushJob并不会负责delete

WeComUploadJob

在发送FileServerMessage前,需要获得对应文件的media_id。WeComUploadJob便提供这样的能力。同样要注意传入的Data指针所指向的内容不会由WeComUploadJob负责释放,WeComUploadJob也不关心Data是栈上内存还是堆上内存。

MessageCallbackJob

我们要求用户注册到引擎的回调消息处理任务继承自MessageCallbackJob。用户可以从ClientMessage类型的Request成员中读取收到的实际消息,并且可选地将回复绑定到Response上。

Utility

一些实用工具。

Common

包含了一堆其他地方会用到的东西:

  • 读取文件
  • 字符串转整数
  • 计算MD5
  • Base 64 编解码
  • Url Decode

CronTrigger

CronTrigger数据结构用来表示定时任务的执行时机,其内部实现是数个表示时间点的bitmap。

HttpPackage

将HTTP请求和回复表示成C++的类,并且提供解析、生成的功能。前文说到的HttpClientJob在使用。

Logger

日志工具。框架自带了StderrLoggerSyncFileLogger两类,分别是打印到标准错误输出和同步地打印到文件。可以继承wcbot::Logger并实现virtual void Log(const char *, va_list)来扩展其他的日志方式。

MemoryBuffer

一个平平无奇的双倍扩容逻辑的堆上内存buffer。广泛用于项目内libuv相关的组件buffer。

WeCom

所有(目前尚未全部实现完)企业微信机器人消息的C++表示,提供解析、生成XML或JSON的能力。用户需要哪些类型的消息,就包含对应的头文件。对于MessageCallbackJob中类型为ClientMessageRequest,用户可以通过dynamic_cast等方式判断其真实消息类型。对于回调消息,只允许回复XmlServerMessage的派生消息(企业微信限制)。

小结

定时任务

用户继承Job并开发自己的逻辑,使用Engine::RegisterCronJob注册到引擎上。引擎有一个一分钟定时器,驱动时间轮,选取当前时刻应当执行的任务出来,从RoundRobinThreadDispatcher分配一个工作线程。主线程使用名为itc::JobCreateAndRun的ITC对象将用户注册的FN_CreateJob函数传递到工作线程。工作线程执行函数得到Job实例并驱动它运行起来。

回调消息

用户把创建处理任务的函数通过RegisterCallbackHandler注册到引擎上。主线程使用libuv监听指定的TCP端口,客户端消息进来后,送给HttpCodec检测;如果解码器返回了包体长度,则把对应长度切成独立的TcpMemoryBuffer(附带连接信息),通过itc::TcpMainToWorker事件分配给工作线程。工作线程创建HttpHandlerJob任务,解析buffer,并解码加密数据,创建用户的处理任务,并绑定Request。待用户任务返回后,将Response编码、加密,写入一个MemoryBuffer,并通过itc::TcpWorkerToMain发给主线程回复过去。

简易教程

定时任务

void QBJob::Do(Job* Trigger) {
  wcbot::wecom::TextServerMessage TSM;
  TSM.Content = "各位薅薅公子,明天发 Q 币!你的 30 Q 币用完了吗?";
  InvokeChild(new wcbot::SilentPushJob(TSM));
  DeleteThis();
}

QBJob继承Job,并实现了Do函数。任务构造了一个TextServerMessage消息,并通过SilentPushJob推送出去。

回调消息

void EchoCallbackJob::Do(Job *Trigger) {
  do {
    auto *TCM = dynamic_cast<wcbot::wecom::TextClientMessage *>(Request);
    if (TCM == nullptr) {
      break;
    }
    auto *TSM = new wcbot::wecom::TextServerMessage;
    TSM->Content = TCM->Content;
    this->SetResponse(TSM);
  } while (false);
  DeleteThis();
}

一个非常简单的echo回调。判断发过来的是否为纯文字消息,若是,则将内容原样回复过去;否则不回复任何内容。

主程序

static void RegisterQBJob();

int main(int argc, char *argv[]) {
  wcbot::Engine &Engine = wcbot::Engine::Get();
  Engine.ParseArguments(argc, argv);
  Engine.Initialize();

  RegisterQBJob();
  Engine.RegisterCallbackHandler(
      []() -> wcbot::MessageCallbackJob * { return new EchoCallbackJob(); });

  int Ret = wcbot::Engine::Get().Run();
  LOG_ALL("%d", Ret);
}

void RegisterQBJob() {
  wcbot::CronTrigger Trigger;
  Trigger.SetMonth(wcbot::CronTrigger::kEvery);
  Trigger.SetDayOfMonth(16);
  Trigger.SetHour(10);
  Trigger.SetMinute(0);
  wcbot::Engine::Get().RegisterCronJob(Trigger, []() -> wcbot::Job * { return new QBJob(); });
}

主程序请遵循这样的步骤:先ParseArguments,再Initialize引擎。之后可以做任意用户需要做的事情,比如这里是注册了定时任务和回调处理。最后将引擎Run()起来即可。

Archives Tip
QR Code for this page
Tipping QR Code