随着算力的提升和机器学习与深度学习的普及,在进行数据建模时往往会采用批量生成衍生特征的方法来丰富数据集的特征,如:对原有的 10 维特征都采用 max, min, avg, std, sum 这五种方式进行聚合,数据的特征将变为 50 维。
类似的衍生特征工程极大地提升了模型的准确性。在 Python 中,调用 Pandas 的 agg 函数,传入一个字典(key:列名,value:衍生特征的函数列表)即可实现这样的衍生特征计算。本教程将通过DolphinDB的元编程 ,以极少的代码量实现与 pandas 中类似的衍生特征计算方法。
我们使用 2021 年 16 支股票的 Level 2 快照数据,构建频率为 10 分钟的特征,并利用元编程,低代码量实现了 676 列衍生特征的计算。与多个 Python Pandas 进程并行计算相比,DolphinDB 能带来约 30 倍的性能提升。教程还展示了如何在生产环境中用流式计算进行输入特征的实时计算。
本教程示例代码必须在 2.00.6 及以上版本的 DolphinDB server上运行。
本教程应用的数据源为上交所 level2 快照数据(Snapshot),每幅快照间隔时间为 3 秒或 5 秒,数据文件结构如下:
字段 | 含义 | 字段 | 含义 | 字段 | 含义 |
SecurityID | 证券代码 | LowPx | 最低价 | BidPrice[10] | 申买十价 |
DateTime | 日期时间 | LastPx | 最新价 | BidOrderQty[10] | 申买十量 |
PreClosePx | 昨收价 | TotalVolumeTrade | 成交总量 | OfferPrice[10] | 申卖十价 |
OpenPx | 开始价 | TotalValueTrade | 成交总金额 | OfferOrderQty[10] | 申卖十量 |
HighPx | 最高价 | InstrumentStatus | 交易状态 | …… | …… |
2021 年上交所所有股票的快照数据已经提前导入至 DolphinDB 数据库中,一共约17.07亿条数据,导入方法见国内股票行情数据导入实例。
本教程受 Kaggle 的 Optiver Realized Volatility Prediction 竞赛项目启发,该项目排名第一的代码中使用了两档的买卖量价数据及 pandas 的元编程实现了批量衍生特征的计算,本教程在此基础上,使用真实十档买卖量价的快照数据实现了 676 列衍生特征的计算。
一级指标全部由快照数据中的 10 档买卖单量价数据计算而来
二级指标全部由一级指标计算生成
衍生特征是对一级指标和二级指标做 10 分钟降采样的衍生,降采样的聚合方法通过元编程来批量实现,衍生方法如下:
指标名称 | 衍生方法 |
DateTime | count |
Wap[10] | sum, mean, std |
LogReturn[10] | sum, realizedVolatility, mean, std |
LogReturnOffer[10] | sum, realizedVolatility, mean, std |
LogReturnBid[10] | sum, realizedVolatility, mean, std |
WapBalance | sum, mean, std |
PriceSpread | sum, mean, std |
BidSpread | sum, mean, std |
OfferSpread | sum, mean, std |
TotalVolume | sum, mean, std |
VolumeImbalance | sum, mean, std |
最终预测的计算指标为实际波动率:
同时考虑到,过去 10 分钟的特征中,特征的时效性是随时间增加的,越靠前的特征时效性越弱,越靠后的特征时效性越强。所以,本教程对 10 分钟的特征切分成了 0-600s(全部),150-600s,300-600s,450-600s 四段,分别进行上述衍生指标的计算。
10 分钟的快照数据最终形成 676 维的聚合特征,如下图所示。
本教程中的重点和难点是批量生成大量特征列计算表达式,如果按照传统 SQL 来编程,实现 676 个计算指标需要编写大量代码,因此本教程使用元编程方法实现衍生特征的计算,关于元编程的详情请参考 元编程 — DolphinDB 2.0 文档 。本教程通过元编程函数 sql 生成元代码。为了对快照数据做 10min 的聚合计算,sql 函数的分组参数 groupby=[
自定义聚合函数的入参是某支股票的 BidPrice, BidOrderQty, OfferPrice, OfferOrderQty 这四个十档量价数据的矩阵,在 DolphinDB 中对一二级指标的计算代码如下:
wap = (BidPrice * OfferOrderQty + BidOrderQty * OfferPrice) (BidOrderQty + OfferOrderQty)
wapBalance = abs(wap[0] - wap[1])
priceSpread = (OfferPrice[0] - BidPrice[0]) ((OfferPrice[0] + BidPrice[0]) 2)
BidSpread = BidPrice[0] - BidPrice[1]
OfferSpread = OfferPrice[0] - OfferPrice[1]
totalVolume = OfferOrderQty.rowSum() + BidOrderQty.rowSum()
volumeImbalance = abs(OfferOrderQty.rowSum() - BidOrderQty.rowSum())
LogReturnWap = logReturn(wap)
LogReturnOffer = logReturn(OfferPrice)
LogReturnBid = logReturn(BidPrice)
利用 Python 的 pandas 库,通过向 groupby.agg 传入一个字典(字典的 key 为列名,value为聚合函数列表),即可实现对指定的列进行批量的聚合指标计算。
在 DolphinDB 中,亦可通过自定义函数实现类似的需求,即把字典转换成元编程代码,具体代码如下:
def createAggMetaCode(aggDict){
metaCode = []
metaCodeColName = []
for(colName in aggDict.keys()){
for(funcName in aggDict[colName])
{
metaCode.append!(sqlCol(colName, funcByName(funcName), colName + `_ + funcName$STRING))
metaCodeColName.append!(colName + `_ + funcName$STRING)
}
}
return metaCode, metaCodeColName$STRING
}
features = {
"DateTime":[`count]
}
for( i in 0..9)
{
features["Wap"+i] = [`sum, `mean, `std]
features["LogReturn"+i] = [`sum, `realizedVolatility, `mean, `std]
features["LogReturnOffer"+i] = [`sum, `realizedVolatility, `mean, `std]
features["LogReturnBid"+i] = [`sum, `realizedVolatility, `mean, `std]
}
features["WapBalance"] = [`sum, `mean, `std]
features["PriceSpread"] = [`sum, `mean, `std]
features["BidSpread"] = [`sum, `mean, `std]
features["OfferSpread"] = [`sum, `mean, `std]
features["TotalVolume"] = [`sum, `mean, `std]
features["VolumeImbalance"] = [`sum, `mean, `std]
aggMetaCode, metaCodeColName = createAggMetaCode(features)
返回结果为元代码向量和对应的元代码列名,如下图所示:
在自定义函数中,为了方便后续使用元编程进行衍生特征计算,需要将计算的一二级指标拼接成一个 table,同时修改列名,具体代码如下:
subTable = table(DateTime as `DateTime, BidPrice, BidOrderQty, OfferPrice, OfferOrderQty, wap, wapBalance, priceSpread, BidSpread, OfferSpread, totalVolume, volumeImbalance, LogReturnWap, LogReturnOffer, LogReturnBid)
colNum = 0..9$STRING
colName = `DateTime <- (`BidPrice + colNum) <- (`BidOrderQty + colNum) <- (`OfferPrice + colNum) <- (`OfferOrderQty + colNum) <- (`Wap + colNum) <- `WapBalance`PriceSpread`BidSpread`OfferSpread`TotalVolume`VolumeImbalance <- (`LogReturn + colNum) <- (`LogReturnOffer + colNum) <- (`LogReturnBid + colNum)
subTable.rename!(colName)
其中 “<-” 是 DolphinDB 函数 join 的简写符号,此处用于将各字段拼接成列向量。
最后将元代码作为参数传入自定义聚合函数,配合一二级指标拼接而成的 table 进行 676 列衍生指标的计算,并以 676 列的形式作为聚合结果返回,具体代码如下:
subTable['BarDateTime'] = bar(subTable['DateTime'], 10m)
result = sql(select = aggMetaCode, from = subTable).eval().matrix()
result150 = sql(select = aggMetaCode, from = subTable, where =
部分计算结果展示
所有衍生特征列名展示
由于 DolphinDB 的计算是分布式并行计算,本教程中配置的并行度为 8,因此在使用 Python 实现衍生特征计算时,也采用了 8 的并行度进行并行计算,即同时调用 8 个 Python 进程进行计算。
计算性能对比结果
股票数量 | 交易日 | 数据量 | Python(s) | DolphinDB(s) |
16 | 243 | 19,220,237 | 3,039 | 100 |
选取 16 支股票 (601318,600519,600036,600276,601166,600030,600887,600016,601328,601288,600000,600585,601398,600031,601668,600048) 2022年 09:30:00-11:30:00 和 13:00:00-15:00:00 的 level2 快照进行模型训练。
DolphinDB 支持一系列常用的机器学习算法,例如最小二乘回归、随机森林、K-平均等,使用户能够方便地完成回归、分类、聚类等任务。除了内置的经典的机器学习算法函数,DolphinDB 还支持许多第三方库,因此我们也可以调用 DolphinDB 提供的第三方库插件来进行模型训练。
XGBOOST(Extreme Gradient Boosting)是一种 Tree Boosting 的可扩展机器学习系统,它在 Gradient Boosting 框架下实现机器学习算法,提供了并行树提升(也称为 GBDT,GBM),可以快速准确地解决许多数据科学的问题。
本教程参考机器学习教程-5.使用 DolphinDB 插件进行机器学习,选取 XGBOOST 进行训练。
参考:XGBOOST 插件安装教程
模型评价指标:根均方百分比误差(Root Mean Square Percentage Error, RMSPE)
参考:模型构建和训练代码
删除掉含 NULL 的记录,标注 label 并将构造出来的 676 维特征适当调整为 XGBOOST 输入的数据格式,具体代码如下:
//将计算出来的特征中包含 NULL 的记录删除
result = result[each(isValid, result.values()).rowAnd()]
result_input = copy(result)
//选取 LogReturn0_realizedVolatility 作为label
label = result[`LogReturn0_realizedVolatility]
//将 SYMBOL 型的 SecurityID 列转换为 XGBOOST 模型支持输入的 INT 型
result_input.update!(`SecurityID_int, int(result[`SecurityID]))
//调整输入的字段,去除不需要的列
result_input.dropColumns!(`SecurityID`DateTime`LogReturn0_realizedVolatility)
注意:本次预测值为未来 10 分钟的波动率,WAP_0 最接近股价,所以选取LogReturn0_realizedVolatility 作为 label。
本项目中没有设置验证集,训练集和测试集按 7:3 比例划分,即 train:test = 62514:26804,具体代码如下:
def trainTestSplit(x, testRatio) {
xSize = x.size()
testSize =( xSize * (1-testRatio))$INT
return x[0: testSize], x[testSize:xSize]
}
Train_x, Test_x = trainTestSplit(result_input, 0.3)
Train_y, Test_y = trainTestSplit(label, 0.3)
DolphinDB 的 XGBOOST 插件中包含 4 个用户接口:
具体使用方法的说明参见 DolphinDB XGBoost 插件用户接口教程。
具体代码如下:
//定义评估指标 RMSPE(Root Mean Square Percentage Error)
def RMSPE(a,b)
{
return sqrt( sum( ((a-b)a)*((a-b)a) ) a.size() )
}
//定义模型训练的参数
params = {
objective: 'reg:squarederror',
colsample_bytree: 0.8,
subsample: 0.8,
min_child_weight: 1,
max_leaves:128,
eta: 0.1,
max_depth:10,
eval_metric : 'rmse'
}
//XGBOOST 模型训练
model_1 = xgboost::train(Train_y ,Train_x, params, 500)
//用测试集预测波动率,并计算 RMPSE
y_pred = xgboost::predict(model_1, Test_x)
print('RMSPE='+RMSPE(Test_y, y_pred))
运行结果:
RMSPE:0.559
模型训练时间:1m 3s 327ms
本次预测采取手动粗调参,不代表模型以及应用的最优结果。
模型保存及加载:
//保存模型,modelSavePath 为保存模型的路径,需要根据实际环境配置
xgboost::saveModel(model_1, modelSavePath)
//模型加载,modelSavePath 为模型的路径,需要根据实际环境配置
model = xgboost::loadModel(modelSavePath)
回归模型预测性能
数据量(条) | 模型预测时间(ms) |
1 | 0.936 |
10 | 1.832 |
100 | 9.314 |
1000 | 49.274 |
10000 | 317.656 |
以上部分的计算都是基于批量历史数据的计算,而在实际生产环境中,数据的来源往往是以“流”的方式,而如何套用上述复杂的衍生特征计算逻辑实现流式计算是业务层面面临的重大难题。
对于这类问题,DolphinDB 内置了多种类型的流计算引擎,以提供简易快捷的低延时解决方案。
DolphinDB 流计算代码,请参考知乎原文附件
流计算处理流程
上图对应本章节整体的流程框架:实时的流数据首先通过 DolphinDB API 注入至snapshotStream表中。然后通过订阅/推送,将快照数据注入至时间序列聚合引擎,进行窗口为10分钟,步长为10分钟的滑动窗口计算,核心代码如下:
name = `SecurityID`DateTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`BidPrice0`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidOrderQty0`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`BidOrderQty6`BidOrderQty7`BidOrderQty8`BidOrderQty9`OfferPrice0`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferPrice6`OfferPrice7`OfferPrice8`OfferPrice9`OfferOrderQty0`OfferOrderQty1`OfferOrderQty2`OfferOrderQty3`OfferOrderQty4`OfferOrderQty5`OfferOrderQty6`OfferOrderQty7`OfferOrderQty8`OfferOrderQty9
type =`SYMBOL`TIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT
share streamTable(100000:0, name, type) as snapshotStream
share streamTable(100000:0 , `DateTime`SecurityID <- metaCodeColName <- (metaCodeColName+"_150") <- (metaCodeColName+"_300") <- (metaCodeColName+"_450"),`TIMESTAMP`SYMBOL <- take(`DOUBLE, 676)) as aggrFeatures10min
share streamTable(100000:0 , `Predicted`SecurityID`DateTime, `FLOAT`SYMBOL`TIMESTAMP) as result10min
metrics=sqlColAlias(, metaCodeColName <- (metaCodeColName+"_150") <- (metaCodeColName+"_300") <- (metaCodeColName+"_450"))
createTimeSeriesEngine(name="aggrFeatures10min", windowSize=600000, step=600000, metrics=metrics, dummyTable=snapshotStream, outputTable=aggrFeatures10min, timeColumn=`DateTime, useWindowStartTime=true, keyColumn=`SecurityID)
subscribeTable(tableName="snapshotStream", actionName="aggrFeatures10min", offset=-1, handler=getStreamEngine("aggrFeatures10min"), msgAsTable=true, batchSize=2000, throttle=1, hash=0, reconnect=true)
def predictRV(mutable result10min, model, mutable msg){
startTime = now()
temp_table = select SecurityID, DateTime from msg
msg.update!(`SecurityID_int, int(msg[`SecurityID])).dropColumns!(`SecurityID`DateTime`LogReturn0_realizedVolatility)
Predicted = xgboost::predict(model , msg)
temp_table_2 = table(Predicted, temp_table)
result10min.append!(temp_table_2)
}
subscribeTable(tableName="aggrFeatures10min", actionName="predictRV", offset=-1, handler=predictRV{result10min, model}, msgAsTable=true, hash=1, reconnect=true)
上述脚本定义的 metrics 中调用的 featureEngineering 函数与批计算的代码脚本中的定义完全相同,体现了 DolphinDB 流批一体的优势特点。
附件的流计算示例代码,通过历史数据回放的方式,回测实盘波动率预测的结果展示:
本章节统计了股票在时序聚合引擎中的计算延时情况,流计算延时主要由两部分构成:计算聚合特征的耗时和模型预测实时波动率的耗时。主要延时为计算聚合特征的耗时。
timer getStreamEngine('aggrFeatures10min').append!(data)
test_x = select * from aggrFeatures10min
timer{
temp_table = select SecurityID, DateTime from test_x
test_x.update!(`SecurityID_int, int(test_x[`SecurityID])).dropColumns!(`SecurityID`DateTime`LogReturn0_realizedVolatility)
Predicted = xgboost::predict(model , test_x)
}
股票数量 | 10分钟数据量 | 计算聚合特征耗时 | 模型预测实时波动率的耗时 | 总耗时 |
1 | 201 | 22ms | 3ms | 25ms |
10 | 2011 | 92ms | 3ms | 95ms |
20 | 4020 | 162ms | 4ms | 168ms |
30 | 6030 | 257ms | 5ms | 262ms |
40 | 8040 | 321ms | 6ms | 327ms |
50 | 10054 | 386ms | 7ms | 393ms |
本教程通过使用 DolphinDB 强大的数据处理能力,并结合元编程实现了低代码批量生成多维股票衍生特征的应用场景。与 Python 等传统数据处理方法相比,DolphinDB 依靠数据存储引擎和计算引擎的高度融合,在数据预处理阶段,方便地实现了分布式并行计算,不仅节约了内存资源,而且在使用相同物理计算资源的情况下,提高了约 30 倍的计算效率。
在批计算的基础上,本教程利用 DolphinDB 内置的流计算处理框架,为实际生产环境的类似需求(实时计算衍生特征、实时调用模型预测计算)提供了一套完整高效的解决方案。本教程以上证 16 支股票 level2 快照数据作为订阅的数据源,通过模式真实数据的实时注入,可以在毫秒级完成对每只股票 10 分钟快照数据衍生676 维衍生特征的低延时计算,从而为后续的数据建模提供强大的数据支撑。
注意事项:本教程示例代码必须在2.00.6及以上版本的DolphinDB server上运行。
DolphinDB批计算代码
DolphinDB批计算代码(数组向量版)
模型构建和训练代码
Python批计算代码
DolphinDB流计算代码
level2快照测试数据
附录文件请见知乎原文
开发环境
页面更新:2024-04-02
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号