DolphinDB 计算节点使用指南

导读

为了提升 DolphinDB 在高并发读写场景下的性能与稳定性,DolphinDB 在架构上引入了计算节点 (compute node) 。计算节点接管了数据节点的部分职能,负责响应客户端的请求并返回结果。在架构层面,将集群的计算与存储进行分离,保证数据节点的软硬件资源有效服务于 IO 过程,从而提升集群写入性能与稳定性。同时 DolphinDB 集群管理者可以根据业务需求和集群负载情况,对计算资源和存储资源独立扩展。

本教程将重点介绍计算节点的特性、架构设计、应用实践及运维管理等方面内容,通过原理解析、案例分析来讲解如何部署和使用计算节点

面向读者

第1章 概览

1.1 为什么引入计算节点

数据节点可以同时承担计算任务和数据读写任务的执行,对于一些计算任务不重的场景来说,数据节点兼任计算任务没有问题,但是对于计算逻辑较复杂,并发度较高的计算密集场景下,可能会有以下影响:

DolphinDB 自版本1.30.14,2.00.1开始支持计算节点,能够有效地解决上述问题。

1.2 计算节点简介

1.2.1 核心特性

计算节点,顾名思义,即执行计算任务的节点,相比于数据节点,有以下2个核心特性:

不存储分布式表数据、元数据文件,是一个轻状态的服务组件。计算节点的启停流程十分精简,任何故障都可以通过重启节点来进行快速修复。同时,计算节点可以进行快速地扩缩容。 有关计算节点的启停步骤,参考:启动数据节点和计算节点

如下图所示3个数据节点、3个计算节点的集群:

图1-1

图中 C1-CNODE1 至 C1-CNODE3 为计算节点,P1-DNODE1 至 P3-DNODE1 为数据节点。

从文件系统来简单验证下计算节点的无状态特性。查看数据节点 volume 配置:

# 查看存储卷
select * from pnodeRun(getConfig{`volumes})
P1-DNODE1    /ssd/ssd0/wfHuang/volumes/P1-DNODE1
P2-DNODE1    /ssd/ssd0/wfHuang/volumes/P2-DNODE1
P3-DNODE1    /ssd/ssd0/wfHuang/volumes/P3-DNODE1

到操作系统去查看数据节点 P1-DNODE1 的 volume 目录:

图1-2

可看到 P1-DNODE1 作为数据节点,管理着事务 undo/redo 日志文件,和大量的数据块 CHUNKS 文件。但计算节点不需要配置 volume, 查看 C1-CNODE1 的默认 data 目录:

图1-3

可以看到计算节点 C1-CNODE1 不存储任何的分布式数据文件。

在分布式表读写场景中,由数据节点负责 map 阶段,计算节点负责 merge-reduce 阶段(详见1.2.2章节)。在纯内存对象(常规内存表、分区内存表、流数据表等)计算场景中,由计算节点独立执行。

执行一个分布式表查询 SQL,从文件层面来观察计算节点在数据查询发挥的作用。

查看计算节点 C1-CNODE1 打开的文件(open files):

[appadmin@compute volumes]$ lsof -p pidOfComputenode
skip library files
compute_w 38563 appadmin   19u     IPv4 319181009       0t0       TCP compute:8961->192.192.168.2:56780 (ESTABLISHED)
compute_w 38563 appadmin   20u     IPv4 319103550       0t0       TCP compute:8961->192.192.168.2:56786 (ESTABLISHED)
compute_w 38563 appadmin   22u     IPv4 319095550       0t0       TCP compute:8961->192.192.168.2:34842 (ESTABLISHED)
...skip 20 lines......
compute_w 38563 appadmin   44u     IPv4 319337513       0t0       TCP compute:42566->192.192.168.2:8961 (ESTABLISHED)

查看数据节点 P1-DNODE1 打开的文件:

[appadmin@support4 log]$ lsof -p pidOfDatanode
compute_w 99763 appadmin   33u     IPv4 49451766       0t0       TCP support4:8961->192.192.168.1:42474 (ESTABLISHED)
compute_w 99763 appadmin   36u     IPv4 49455618       0t0       TCP support4:8961->192.192.168.1:42562 (ESTABLISHED)
compute_w 99763 appadmin   39r      REG     8,17 272358102 177313580 /ssd/ssd0/wfHuang/volumes/P1-DNODE1/CHUNKS/test_compute/20220102/hQE/machines_2/1_00000058
compute_w 99763 appadmin   40r      REG     8,17 272358117 177313646 /ssd/ssd0/wfHuang/volumes/P1-DNODE1/CHUNKS/test_compute/20220106/hQE/machines_2/1_00000062
compute_w 99763 appadmin   41r      REG     8,17 272358131 177313802 /ssd/ssd0/wfHuang/volumes/P1-DNODE1/CHUNKS/test_compute/20220227/hQE/machines_2/1_00000072
compute_w 99763 appadmin   42r      REG     8,17 272358111 177313644 /ssd/ssd0/wfHuang/volumes/P1-DNODE1/CHUNKS/test_compute/20220109/hQE/machines_2/1_00000060
......skip hundreds of lines......

可以观察到计算节点 C1-CNODE1 打开了大量的 TCP 连接符,而数据节点 P1-DNODE1 除了 TCP 连接符外,还打开了大量的磁盘文件。

图1-4

如上图所示,计算节点 computenode1 负责解析 client 的请求,并协调数据节点 datanode[1-3] 读/写数据(即 map,分布式执行过程)。接收到 datanode[1-3] 的数据后,经过 reduce-merge 过程,将最终结果返回给客户端。存算分离的架构,使得数据节点的物理资源得到隔离,写入和读取数据更为稳定、高效。

1.2.2 通过一个 SQL 查询理解计算节点

当客户端向计算节点发起一个 sql 查询:

pt1 = loadTable(database("dfs://szstock"),"sztrade")  
select [HINT_EXPLAIN]
    securityID, min(price) as LowPx, max(price) as HighPx, sum(tradeQty) as vol
    , first(price) as OpenPx, last(price) as LastPx, sum(tradeQty * price) as val
from pt
where execType="F"
group by tradedate,securityID,minute(temporalParse(tradetime$STRING, "yyyyMMddHHmmssSSS")) as minute
order by tradedate,securityID, minute

通过[HINT_EXPLAIN]获取执行计划,整个 sql 的执行过程依次为 parse,map,merge,reduce 四个部分。

{
    "measurement": "microsecond",
    "explain": {
        "from": {
            "cost": 2
        },
        "map": {
            "partitions": {
                "local": 0,
                "remote": 320
            },
            "cost": 5189927,
            "detail": {
...skip many lines...
        },
        "merge": {
            "cost": 446588,
            "rows": 8522241,
            "detail": {
 ...skip many lines...
                }
            }
        },
        "reduce": {
            "sql": "select [98307] * from 702a9832b17f0000 order by tradedate asc,securityID asc,minute asc",
            "explain": {
                "sort": {
                    "cost": 296783
                },
                "rows": 8522241,
                "cost": 746374
            }
        },
        "rows": 8522241,
        "cost": 6691112
    }
}

其中,query parse、merge、reduce 过程发生在计算节点,map 阶段则并行发生于各数据节点。

图1-5

整个执行过程消耗资源具体如下表。

执行步骤

发生节点

消耗资源

时间(us)

query parse

computenode

cpu

2

map

datanode * 3

IO, network,cpu, memory

5189927

merge

computenode

cpu,network, memory

446588

reduce

computenode

cpu,network, memory

746374

该 SQL 总共涉及300个分区,各分区记录数在60-280万不等。merge-reduce 阶段分布处理了850万的数据。由 C1-CNODE1 承担的 merge-reduce 阶段总耗时占比为

(446588 + 746374)/ 6691112 = 17.8%

上述例子 map 阶段 where,group by 算子大大减少了数据量,因此计算节点的算量不大,如果是读取数据类的场景,比如数据导入导出,此比例会大大增加。

执行过程中,计算节点与数据节点的内存消耗情况如下图所示(node33:计算节点,node34-36:数据节点,下同)。

图1-6

可以看出在 map 阶段在数据节点,是分布式、并行执行的。在 merge-reduce 过程中计算节点的内存消耗逐渐增加。

1.3 引入计算节点的好处

1.3.1 保证写入数据的稳定性

在数据节点上执行复杂业务计算(如因子计算、机器学习等),会占用大量的内存。使得 cacheEngine 无法获取足够多的内存,而发生 cacheEngine 内存溢出等问题,导致写入延迟,甚至失败。

ChunkCacheEngine can't reclaim any memory after trying 13 times.
Transation failed, cacheEngine out of memory.

另外,使用 TSDB 引擎,数据刷盘时需进行排序,需要消耗大量的 CPU 资源以及排序 buffer。若 CPU 、内存资源不足时,TSDB 的写入性能会下降。

通过在架构上引入计算节点,采用计算与存储分离的架构设计,可以有效地保证集群数据写入的稳定性。

1.3.2 降低故障平均修复时间

使用计算节点的集群能显著地降低故障平均修复时间,尽可能地减少对业务的影响。核心原因是由于计算节点不管理分布式数据,重启可以在数秒内完成。当某个计算节点出现拥塞、无法响应等情况,且无法热修复的,重启即可使其恢复正常服务。

图1-7

如图,当数据节点启动时,流程中的蓝色标记部分:“回放 redo 日志”,“恢复损坏的 chunk” 等包含了大量的 IO 操作,实际生产环境中耗时为分钟级(具体视数据量大小而定)。而计算节点流程十分精简,可以在秒级完成启动。

1.4 计算节点安装

现用如下4台服务器部署集群:

P1: 192.193.168.2
P2: 192.193.168.3
P3: 192.193.168.4
C1: 192.193.168.1

其中服务器 P1-P3 用于部署数据节点,C1 用于部署计算节点。

1.4.1 编写配置文件

在节点列表文件 cluster.nodes 中编写计算节点及代理节点。推荐计算节点以 CN (取 compute node 首字母) 标识,数据节点以 DN (取 data node 首字母) 标识,如:

localSite,mode
192.193.168.2:8990:controller1,controller
192.193.168.3:8990:controller2,controller
192.193.168.4:8990:controller3,controller
192.193.168.2:8960:P1-agent,agent
192.193.168.3:8960:P2-agent,agent
192.193.168.4:8960:P3-agent,agent
192.193.168.1:8960:C1-agent,agent
192.193.168.2:8961:DN1,datanode
192.193.168.3:8961:DN2,datanode
192.193.168.4:8961:DN3,datanode
192.193.168.1:8961:CN1,computenode
192.193.168.1:8966:CN2,computenode
192.193.168.1:8967:CN3,computenode

在配置文件cluster.cfg中,需要使用前缀来区分数据节点与计算节点的配置项。

CN%.workerNum=64
CN%.localExecutors=63
CN%.maxConnections=3000
CN%.maxMemSize=256
CN%.dataSync=0
DN%.maxConnections=512
DN%.maxMemSize=128
DN%.workerNum=32
DN%.localExecutors=31
DN%.chunkCacheEngineMemSize=6
DN%.dataSync=1
DN%.redoLogDir=/ssd/ssd0/wfHuang/volumes/redoLog
DN%.volumes=/ssd/ssd0/wfHuang/volumes,/ssd/ssd1/wfHuang/volumes,/ssd/ssd2/wfHuang/volumes

CN% 匹配 CN 开头的计算节点,DN% 匹配 DN 开头的数据节点。以此保证计算节点与数据节点的配置项不会互相干扰。

注:计算节点必须配置 dataSync = 0

1.4.2 启动计算节点

参考集群安装模式部署并启动 agent、controller 节点。启动完成后后,使用 web 启动计算节点。

图1-8

第2章 架构设计

2.1 计算节点的设计和硬件规划

依据上文解析,计算节点主要负责接收、响应请求以及内存计算,数据节点主要负责数据存储管理。在集群设计与规划时,生产环境最小硬件配置推荐如下:

组件

CPU

内存

网络

硬盘

datanode

8 核+

64GB+

10Gb 网卡

ssd/hdd,500GB+建议元数据存 ssd,分区数据存 hdd

computenode

16 核+

128GB+

10Gb 网卡

hdd,200GB

计算节点更注重 cpu、内存、网卡性能。而数据节点更注重 iops ,存储通道与容量。集群拓扑结构可根据应用场景与需求参考下文设计。

2.2 单计算节点集群架构

图2-1

单一的计算节点,可以做为请求统一入口。此时拓扑简单,部署方便。

2.3 高可用计算节点集群架构

图2-2

DolphinDB 数据库层面,部署多个计算节点,扩展集群计算容量,并实现高可用。

从计算节点发起 DFS 表读写请求时,可以容忍部分数据节点实例级别的故障(比如某个数据节点的服务器断电),其崩溃恢复策略如下:

图2-3

图2-4

在中间件层面,可以引入负载均衡组件(如 haproxy,F5 等)来实现集群统一访问接口。

2.4 流计算节点

生产环境中,流数据对时延要求非常高。在多人、多业务场景使用集群时,数据节点由于本身还是承担着数据读写操作,不可避免地在一定程度上影响流计算。在设计上,可以将流计算相关业务单独地部署至某计算节点实例。

图2-5

图示,使用 computenode3 作为流计算节点,专门负责流计算业务。在另外两个计算节点上进行批量读写,通过计算节点之间的 Share-nothing 架构特性,有效保证流计算的低时延。

2.5 基于计算节点实例的资源隔离

在生产环境中,若有多人同时使用 DolphinDB 集群,需要一定的资源隔离机制来保障核心业务稳定运行。可以在架构层面,配合负载均衡策略,实现基于计算节点的资源隔离。

图2-6

以 haproxy 为例,在 haproxy.cfg 中设置2个监听组

核心业务使用计算资源组,总共6000连接,2个计算节点,高可用。

用于即席查询的计算节点,配置为单节点,限制1000连接,用于分析师,研究员等对数据做一些探索和即席查询。

listen ddb-cluster
   bind 0.0.0.0:5000
   mode tcp
   balance leastconn
   server compute1 192.168.168.1:8961 maxconn 3000 check
   server compute2 192.168.168.1:8966 maxconn 3000 check

listen ad-hoc-single
   bind 0.0.0.0:10090
   mode tcp
   balance leastconn
   server compute3 192.168.168.1:8967 maxconn 1000 check

第3章 应用场景

所有数据节点的计算任务、代码都可以无缝迁移到计算节点。集群管理与使用者应当避免直接访问数据节点,转而使用计算节点作为查询与写入的入口。具体演示数据读取、写入、流计算、机器学习。

3.1 数据读取

使用 python api 从计算节点读取某一只股票的历史交易数据。

import DolphinDB as ddb

def getData(date, securityID, rowNum=1000000):
    s = ddb.session()
    s.connect("computenode1", 8961, "admin", "123456")
    trade = s.loadTable(tableName="sztrade", dbPath="dfs://szstock)
    rst = trade.select("*").where("tradedate = %d" % date)
        .where("securityID = '%s'" % securityID) 
        .where("execType="F"")
        .limit(rowNum).toDF()
    s.close()
    return rst

if __name__ == '__main__' :
    print(getData(20200102, "000014"))

3.2 数据写入

将历史行情数据导入 DolphinDB ,具体操作请参考国内股票行情数据导入实例。导入2020年1月数据,共计82G数据时,集群计算节点,数据节点消耗情况如下:

图3-1

图3-2

在执行导入过程中,计算节点 cpu 均值在15%,峰值达到20%,内存消耗约2.8G。

3.3 流计算

流计算的核心诉求是时延,如果采用数据节点作为流计算节点,必不可免地会受到分布式表读取与写入的影响。配置计算节点成为流计算节点,可以有效地保障 CPU,memory,io,worker 线程等软硬件资源服务于流计算,从而保证低时延。并且由于计算节点之间是互不影响的,推荐在架构上设计部分计算节点作为流计算节点,其他计算节点用于分析与查询。

3.3.1 配置计算节点

设计 CN1 作为流计算的订阅和发布节点,配置文件如下:

# publish
CN1.persistenceDir=/ssd/ssd0/persistDir/C1-CNODE1
CN1.persistenceWorkerNum=4
CN1.maxPubConnections=64

# subscribe 
CN1.maxSubConnections=64
CN1.subPort=8970
CN1.subExecutors=16

3.3.2 创建与订阅流表

在计算节点 CN1 上创建流表:

t = streamTable(1:0, `sym`price, [STRING,DOUBLE])
enableTableShareAndPersistence(t, `tickStream)

订阅并编写实时因子计算:

def sum_diff(x, y){
    return (x-y)/(x+y)
}
factor1 = 
result = table(1000:0, `sym`factor1, [STRING,DOUBLE])
rse = createReactiveStateEngine(name="reactiveDemo", metrics =factor1, dummyTable=tickStream, outputTable=result, keyColumn="sym")
subscribeTable(tableName=`tickStream, actionName="factors", offset=-1, handler=tableInsert{rse})

往 CN1 模拟写入消息(同时另一个计算节点 CN2 执行导入数据作业)

n = 2000000
data = table(take("000001.SH", n) as sym, rand(10.0, n) as price)
tickStream.append!(data)
getStreamingStat().subWorkers

查看集群的资源消耗情况:

图3-3

可以看到 CN2, DN1-3 在消耗相对较高的内存和 CPU 时,CN1 并没有受到数据导入的影响,使得流计算的时延得到有效保证。

3.4 机器学习

机器学习是计算密集型的场景,模型的训练过程会消耗大量的 CPU、内存资源。将机器学习的作业部署至某个计算节点,可以避免对数据写入、读取类任务的负面影响。例如在 CN2 上,使用 adaBoost 训练股票年波动率:

def tranAdaBoost(TrainData){
	db = database(,HASH, [SYMBOL, 10])
	p10TranData = db.createPartitionedTable(table=TrainData, partitionColumns=`SecurityID)
	p10TranData.append!(TrainData)
	model = adaBoostRegressor(sqlDS(