在 R、H2O 和 MinIO 中使用金融数据 (Forex) 进行机器学习

在 R、H2O 和 MinIO 中使用金融数据 (Forex) 进行机器学习

外汇(外汇)提供了丰富的数据和应用机器学习的机会。

在本教程中,我将使用 R、H2O 和 MinIO 使用外汇 (Forex) 数据构建一个非常简单的统计套利模型。我使用TraderMade作为外汇数据的来源您可以在如何在 R 中导入外汇数据中阅读有关 TraderMade 外汇数据源的更多信息我首先注册了一个免费帐户并获得了一个 api_key。免费帐户有限制,但适用于本教程。

如果您不熟悉H2O,它是一个开源分布式内存机器学习环境。我在之前的博文“使用 H20、R 和 MinIO 进行机器学习”中更全面地描述了 H2O 。

用于统计计算的R编程语言被统计学家和数据挖掘者用于数据分析。我在较早的帖子MinIO 和 Apache Arrow Using R中更全面地描述了 R。R 在处理数据、数据分析和机器学习方面相当直观。

R 语言有一个名为RStudio的配套 IDE ,我将使用它进行此开发。

MinIO是高性能软件定义的 S3 兼容对象存储,使其成为 Amazon S3 的强大而灵活的替代品。MinIO 与云原生分析和 ML/AI 框架无缝协作。

请安装 R 和 RStudio,访问 H2O 集群,如果您尚未运行 MinIO,请下载并安装它此外,请下载并安装aws.s3 R 库

统计套利

统计套利的概念是两种工具或资产的价格变动之间存在某种程度的可靠关系。在这种情况下,我们将使用货币对作为工具。如果两人之间的关系相当稳定,那么任何偏离该关系的行为都可能会恢复到稳定的关系。这意味着当出现偏差时,假设关系恢复,我们就有机会建仓并可能获利。

假定具有稳定关系的两种资产的一个典型例子是给定行业中规模相似的公司的股票价格。让我们以技术为例——观察技术行业中规模相似的两家公司的走势,假设它们都将同样受到整个行业变化的影响,例如受过类似培训的劳动力成本、利率、供应短缺等。我们应该能够辨别这些公司的股价之间的关系,如果我们看到偏差,如果我们相信相对股价将恢复到之前观察到的关系,这可能是获利的机会. 当然,与潜在关系的偏差可能是由一家或两家公司的商业模式有意改变造成的,

另一个常见的例子是汽车公司,正如这篇解释统计套利的 Investopedia 文章或这篇关于统计套利的维基百科文章中所解释的那样。

评估工具价格时间序列之间的潜在关系是统计学或计量经济学深入研究的领域。这是一个简化的教程,提供了 2 种简单的方法。第一个简单地获取工具之间的差异,并使用滚动的 10 天平均值和标准偏差来确定与关系的偏差点。第二个使用一些历史来使用 H2O AutoML 构建 ML 模型,然后应用该模型来预测未来仪器之间应该存在的关系。将给定日期仪器之间的实际差异与预测值进行比较,实际值与预测值之间的差异用于确定偏离关系的点。

教程

对于本教程,我从两个货币对开始 - eurjpy 和 gbpusd

货币对分别使用两种货币指定 - eurjpy 是 1 欧元兑换日元的汇率。欧元是“基础货币”,日元是“计价货币”。该工具价格的汇率是购买一欧元所需的日元数量。目前,谷歌表示购买一欧元需要 146.57 日元。

同样,gbpusd 货币对的基础货币为英镑,计价货币为美元。汇率或该工具的价格是购买 1 欧元所需的美元数量。同样,根据谷歌,汇率为 1.13 - 目前,购买一欧元需要 1.13 美元。

汇率,就像市场上交易的所有工具一样,有一定数量的卖方和一定数量的买方,卖方“要价”以希望有人支付,而投标人“出价”一定数量。当买卖双方的这些数字相同时,就会发生“交易”。在构建真实的统计套利之类的东西时,重要的是要了解市场运作的结构以及出价和要价之间的典型“价差”,因为交易算法会受到该价差的影响。不用说,本教程仅供参考,不应按原样用于交易。

我们将探索两种方法来确定这些货币之间的潜在关系。实验 1使用它们之间最近的历史关系来预测关系应该是什么。任何偏离近期历史预测的情况都将是一个潜在的交易机会。我选择(相当随机地)使用 10 天的历史来计算货币对之间差异的均值和标准差。实验二使用 H2O AutoML 基于上半年的训练数据构建模型来预测关系。这些当然不是了解这种关系的唯一方法。大多数专业方法要复杂得多,并且需要相当深厚的统计学或计量经济学能力。这些方法超出了我们在本教程中可以涵盖的范围。

数据

如前所述,数据来自 TraderMade,我使用的是每日时间序列。该数据提供每天的市场开盘价、最高价、最低价和收盘价。“市场”的概念在外汇中有点用词不当,因为没有一个单一的基于清算所的货币兑换市场,而是多个市场,并且这些市场的价格可能不同。此外,外汇是一个连续的市场,因此市场开盘和收盘的概念是人为制造的。由于我想要一个代表每一天的数据点,我选择平均每天的开盘价、最高价和最低价,并将该平均值用作给定日期的单一汇率。这样做的要点是,我用来表示“一天”的“每日价格”数据点实际上是本教程的平均值。

与其将这些数据存储为 CSV 文件或将其存储在 MySQL、MariaDB、PostgreSQL 或其他数据库中,我发现利用更新的“开放表标准”文件格式并将数据存储在对象存储中要容易得多. MinIO 是此用例的绝佳选择。

对于本教程,我将文件存储为 Parquet 格式,但有几种新格式提供了处理文件的便利性以及表结构的强大功能。在这种情况下,我从 TraderMade 提取原始数据并按日期对其进行分区。按日期分区类似于在数据库术语中创建日期索引。

生成的保存数据如下所示。例如,在“eurjpy”的前缀内,数据被写入为按指定字段分区的多个文件。


Screen Shot 2022-10-20 at 11.19.37 AM.png


这意味着选定的检索(例如检索日期范围)仅读取相关文件。与通常加载整个文件然后在内存中选择的 CSV 中存储数据相比,这种方法非常有效。它也比建立一个包含所有必需配置、驱动程序和维护的单独数据库要方便得多。

这是存储此数据的 R 代码:

 # get the data from TraderMade and store as Parquet partitioned by date
  ccy1_dataframe_tick<-GetCCYTimeseries(ccy1,start_date,end_date)
  ccy1_dataframe_tick %>%
    group_by(quotes.date) %>%
    write_dataset(GetMinIOURI(ccy1), format = "parquet" )

第一行使用隐藏 URL 等创建的函数从 TraderMade 检索数据。第二行是通过 Dplyr“group_by”谓词传输数据帧的管道,然后将其传输到将数据写入我的函数的管道MinIO 集群作为 Parquet。“group_by”指令导致数据按“quotes.date”分区,如前所示。这再简单不过了。

数据检索同样简单。这是将此 ccy 的所有数据检索到数据帧中的 R 代码:

df1 <- open_dataset(GetMinIOURI(ccy1), format = "parquet") %>% collect()

上面的代码生成了一个包含 132 行的数据框——我存储的所有数据。选择行的子集同样简单。这里我只加载“2022-04-01”之后的数据:

df1 <- open_dataset(GetMinIOURI(ccy1), format = "parquet") %>% filter(quotes.date > "2022-04-01") %>% collect()

请注意使用过滤器来指定用于加载数据的“where 子句”。这非常简单,不需要维护和提供外部数据库。MinIO 对象存储的性能也非常出色,因为假设分区是正确的,只加载请求的数据。

回到手头的问题

为了创建代表这两种工具之间关系的单一时间序列,我首先计算它们之间的利率差异——这意味着我从一种工具中减去另一种工具的每日价格。为了查看该值相对于两种工具之间“现有关系”的位置,我使用前 10 天差异的平均值依次比较标准差。

什么?是的,计算每天的费率之间的差异。对于每一天,计算前 10 天的差异平均值。然后根据标准差确定今天的利率与该平均值的差距。

这是一个例子。以下是 2022 年 3 月 1 日至 2022 年 8 月 31 日的图表。此图表采用 eurjpy 并减去 gbpusd 以获得差异。然后,它将每个点绘制为与该差异的前 10 天平均值的标准差。因此,在峰值时,这大约比前 10 天差异的平均值高出 2.3 个标准差。最低时,这大约比前 10 天差异的平均值低 2.7 个标准差。红色散列线位于上下 1.5 个标准偏差处,以供视觉参考。


Screen Shot 2022-10-19 at 10.30.00 AM.png


这种数据表示的优点在于它在零上下的变化相当均匀。我们建议零线(前 10 天差异的平均值)代表潜在关系,当数据远离该线时,我们相信它会随后恢复。

我们看到两种货币之间的标准化差异多次突破上限和下限。请记住,我选择的阈值是任意的,并不要求它们围绕均值对称。此外,Y 轴上的单位是标准偏差。由于数据不太可能服从正态分布或高斯分布,因此标准差的概念并不成立。然而,它提供了一种简单的方法来讨论与任何单个测量值的平均值的距离,因此我们将使用它。

解释

漂亮的线条;这是什么意思?当蓝线较高时,表示相对于前 10 天的差值(欧元日元减去英镑兑美元)较大 - 这意味着欧元日元相对于英镑兑美元而言是昂贵的。发生这种情况时,我们认为欧元兑日元会下跌,而英镑兑日元会上涨(因此该线将向“零”移动——前 10 天差异的平均值)。当蓝线处于低位时,表示差异较小,我们认为近期欧元兑日元将相对于英镑兑美元上涨,以走向“零”。

当蓝线处于高位时,我们希望卖出欧元兑日元(卖出欧元并以日元计算收益)以利用预期的回归。同时,我们要购买 gbpusd(购买 gbp 并以 usd 支付)。我们这样做是因为我们认为,根据我们正在考虑的历史,欧元汇率相对于英镑兑美元汇率较高。当蓝线处于低位时,出于类似的原因,我们希望买入 eurjpy 并卖出 gpbusd - 我们相信汇率的走势很快就会发生变化,我们相信我们可以获利。

卖出欧元是什么意思?好吧,我在美国,我通常持有美元 (usd),所以为了卖出欧元,我必须先用我的美元购买一些欧元。要运行本教程,我需要这些货币中的每一种来开始交易,因为我可能需要放弃其中任何一种作为交易的一部分,而且我不能放弃我不拥有的东西。

那么我们该怎么做呢?首先,我们从给定货币的“期初余额”概念开始——对我来说,它将是美元。然后我们购买我们可能交易的每种货币中的一些。所以我购买欧元美元、日元美元、英镑美元、美元美元——我购买我可能需要的 ccys,并使用我的美元支付这些 ccys。请注意,最后一个并没有真正购买任何东西。在每个实验结束时,我会将这些 ccys 清算回 usd 以评估性能。

一旦我们有了一些我们需要的 ccys,我们就可以根据信号进行交易。当我们完成并将我们的 ccys 清算为美元时,我们可以看到我们是赚了还是亏了。简单但有效。

使用 MinIO 和 Parquet 对财务数据进行 AI/ML 和数据分析

我正在使用 MinIO,它是一个 100% S3 兼容的对象存储。数据被下载并存储为 Parquet 文件。Parquet 是一种较新的表格文件格式,是 CSV 文件的高效替代品。Parquet 和其他“开放标准”文件正在迅速取代 CSV 数据存储并取代专有数据库存储格式——大多数数据库产品现在都能够使用外部 Parquet 文件作为“外部表”。这意味着数据库产品提供的功能(索引和查询等)与底层存储格式是分离的。这对数据用户来说是一个了不起的进步。驻留在 MinIO S3 存储上的 Parquet 文件(或 ORC、Hudi 或 Iceberg——现在有很多开放标准)为 AI/ML 和分析工作负载提供高效且经济高效的数据存储解决方案。

我也在使用 R 和 H2O。如果您阅读过我以前的博客,那么这些是我最喜欢的用于此类工作的两个产品。

实验一

首先,我创建了一个 R 脚本文件来加载所需的包,名为packages.R

#load necessary libraries

if (!require("httr")) {
  install.packages("httr")
  library (httr)
}

if (!require("jsonlite")) {
  install.packages("jsonlite")
library (jsonlite)
}

if (!require("ggplot2")) {
  install.packages("ggplot2")
  library(ggplot2)
}

if (!require("arrow")) {
  install.packages("arrow")
  library(arrow)
}

if (!require("dplyr")) {
  install.packages("dplyr")
  library(dplyr)
}

if (!require("zoo")) {
  install.packages("zoo")
  library(zoo)
}

if (!require("aws.s3")) {
  install.packages("aws.s3")
  library(aws.s3)
}

if (!require("lubridate")) {
  install.packages("lubridate")
  library(lubridate)
}

if (!require("quantmod")) {
  install.packages("quantmod")
  library(quantmod)
}
  
if (!require("zoo")) {
  install.packages("zoo")
  library(zoo)
}

if (!require("h2o")) {
  install.packages("h2o")
  library(h2o)
}


GetMinIOURI <- function(prefix) {
  # get minio config, with expected defaults
  minio_key <- Sys.getenv("MINIO_ACCESS_KEY", "minioadmin")
  minio_secret <- Sys.getenv("MINIO_SECRET_KEY", "minioadmin")
  minio_host <- Sys.getenv("MINIO_HOST", "Your MinIO Host IP address")
  minio_port <- Sys.getenv("MINIO_PORT", "Your MinIO Port")
  minio_arrow_bucket <- Sys.getenv("MINIO_ARROW_BUCKET", "arrow-bucket")
  # helper function for minio URIs
  minio_path <- function(...) paste(minio_arrow_bucket, ..., sep = "/")
  minio_uri <- function(...) {
    template <- "s3://%s:%s@%s?scheme=http&endpoint_override=%s%s%s"
    sprintf(template, minio_key, minio_secret, minio_path(...), minio_host, ":", minio_port)
  }
  return(minio_uri(prefix))
}

接下来,我创建了一些数据检索函数以在 R 脚本中从 Tradermade 获取数据DataFunctions.R

source("packages.R")

# using TraderMade
# https://marketdata.tradermade.com
# uses an api_key
api_key <- "Your Free TraderMade API Key"

GetCCYList <- function(...) {
# get list of ccys
req <- paste0("https://marketdata.tradermade.com/api/v1/historical_currencies_list?api_key=",api_key)
data_raw <- GET(url = req)
data_text <- content(data_raw, "text", encoding = "UTF-8")

data_json <- fromJSON(data_text, flatten=TRUE)
dataframe <- as.data.frame(data_json)
# this is like 9700 rows
return(dataframe)
}

GetCCYRates <- function(ccy1,ccy2) {
#get current rates
req <- paste0("https://marketdata.tradermade.com/api/v1/live?currency=",toupper(ccy1),",",ccy2,"&api_key=",api_key)
data_raw <- GET(url = req)
data_text <- content(data_raw, "text", encoding = "UTF-8")

data_json <- fromJSON(data_text, flatten=TRUE)
dataframe <- as.data.frame(data_json)
return(dataframe)
}

GetCCYTimeseries <- function(ccy,start_date,end_date) {
# get timeseries historical data for ccy
tick_req <- paste0("https://marketdata.tradermade.com/api/v1/timeseries?currency=",
                   toupper(ccy),"&api_key=",api_key,"&start_date=",
                   start_date,"&end_date=",end_date,"&format=records",collapse="")
data_tick_raw <- GET(url = tick_req)
data_tick_text <- content(data_tick_raw, "text", encoding = "UTF-8")
data_tick_json <- fromJSON(data_tick_text, flatten=TRUE)
dataframe_tick <- as.data.frame(data_tick_json)
dataframe_tick$quotes.avg <- 
  (dataframe_tick$quotes.open + dataframe_tick$quotes.high + dataframe_tick$quotes.low)/3 
dataframe_tick$quotes.date <- ymd(dataframe_tick$quotes.date)

#sort by date
dataframe_tick <- dataframe_tick[order(dataframe_tick$quotes.date),]

return(dataframe_tick)
}

这些数据函数由以下 DataFrame 函数使用,DFFunctions.R因为数据帧是 R 处理数据的方式:

source("packages.R")
source("DataFunctions.R")


DownloadCCYDF <- function(ccy1, start_date, end_date) {
  # get the data
  ccy1_dataframe_tick<-GetCCYTimeseries(ccy1,start_date,end_date)
  ccy1_dataframe_tick %>%
    group_by(quotes.date) %>%
    write_dataset(GetMinIOURI(ccy1), format = "parquet" )

  return (ccy1_dataframe_tick)
}


DownloadCCYDFs <- function(ccy1, ccy2, start_date, end_date) {
  # get the data
  ccy1_dataframe_tick<-GetCCYTimeseries(ccy1,start_date,end_date)
  ccy1_dataframe_tick %>%
    group_by(quotes.date) %>%
    write_dataset(GetMinIOURI(ccy1), format = "parquet" )
  
  ccy2_dataframe_tick<-GetCCYTimeseries(ccy2,start_date,end_date)
  ccy2_dataframe_tick %>%
    group_by(quotes.date) %>%
    write_dataset(GetMinIOURI(ccy2), format = "parquet" )
  
  return (list(ccy1_dataframe_tick, ccy2_dataframe_tick))
}

LoadCCYDF <-function(ccy1) {
  df1 <- open_dataset(GetMinIOURI(ccy1), format = "parquet") %>% collect()
  return(df1)
}


LoadCCYDFs <-function(ccy1, ccy2) {
  df1 <- open_dataset(GetMinIOURI(ccy1), format = "parquet") %>% collect()
  df2 <- open_dataset(GetMinIOURI(ccy2), format = "parquet") %>% collect()
  return(list(df1,df2))
}


CreateJointDF<-function(ccy1_df,ccy2_df,rolling_period) {
  
  # we believe this relationship should be stable
  joint_df <- inner_join(ccy1_df,ccy2_df, by = 'quotes.date')
  
  # order by date
  joint_df<- joint_df[order(joint_df$quotes.date),]
  
  # ok, simple case, the relationship between ccy1 and ccy2 is absolute, so just subtract
  joint_df$quotes.diff <- joint_df$quotes.avg.x - joint_df$quotes.avg.y
  
  # add a rolling mean and sd at 10 days
  joint_df <- joint_df %>%
    mutate(rolling.mean = rollmean(quotes.diff, k=rolling_period, fill=NA, align='right')) %>%
    mutate(rolling.sd = rollapplyr(quotes.diff, rolling_period, sd, fill = NA))
  
  # remove the NAs in the beginning of the rolling mean
  joint_df <- joint_df[complete.cases(joint_df), ]
  
  joint_df$quotes.diff.normalized <- 
    (joint_df$quotes.diff - joint_df$rolling.mean) / joint_df$rolling.sd
  
  #   # write this as parquet files
  # joint_df %>%
  #   write_dataset(GetMinIOURI(paste0(ccy1,"-",ccy2)), format = "parquet" )
  
  return (joint_df)
  
}


CreateLaggedDF<-function(ccy1_df,ccy2_df,lag) {
  
  # we believe this relationship should be stable
  tmp_df <- inner_join(ccy1_df,ccy2_df, by = 'quotes.date')
  
  # sort the result
  tmp_df<-tmp_df[order(tmp_df$quotes.date),]
  
  # we are going to predict this difference
  tmp_df$quotes.diff <- tmp_df$quotes.avg.x - tmp_df$quotes.avg.y
  
  #save off date vector
  df.date<-tmp_df$quotes.date
  
  #save off the target
  df.target<-tmp_df$quotes.diff
  
  #save off the data that we are going to lag
  df.data<-tmp_df$quotes.diff
  df.data<-zoo(df.data,df.date)
  
  #create lagged df
  df.data.lag<-Lag(df.data,k=1:lag)
  df.data.lag<-as.data.frame(df.data.lag)
  
  # cbind the date and target on the front
  lagged_df<-cbind(df.date,df.target,df.data.lag)
  lagged_df<-lagged_df[complete.cases(lagged_df),]
  lagged_df$df.date <- ymd(lagged_df$df.date)
  
  #   # write this as parquet files
  # lagged_df %>%
  # write_dataset(GetMinIOURI(paste0(ccy1,"-",ccy2,"-lagged-",lag)), format = "parquet" )

  # this is a file that can be used for training a model
  return (lagged_df)
  
  
}

为了评估这些实验的结果,我们将需要一些回溯测试功能。这是那个文件,TestFunctions.R

source("packages.R")
source("DFFunctions.R")




TestActionsDF<-function(ccy1,ccy2,actions_df,startingUSD,start_date,end_date) {
  
  # start with an equal amount (in USD) of each)
  USDBal <- startingUSD
  x = floor(USDBal/4)
  
  #load the data - get each in terms of USD
  c1<-toupper(paste0(substring(ccy1,1,3),"USD"))
  c2<-toupper(paste0(substring(ccy1,4,6),"USD"))
  c3<-toupper(paste0(substring(ccy2,1,3),"USD"))
  c4<-toupper(paste0(substring(ccy2,4,6),"USD"))
  
  # get the individual currency / USD rates
  if (c1 != "USDUSD") {
    #c1_df<- DownloadCCYDF(c1,start_date,end_date)
    c1_df <- LoadCCYDF(c1)
    p_c1 <- (c1_df %>% filter(quotes.date == ymd(start_date)))$quotes.avg
    c1_bal <- floor(x/p_c1)
    USDBal <- USDBal - (c1_bal*p_c1)
    c1_trade_amt <- floor(c1_bal/20)
  } else {
    c1_bal = x
    USDBal = USDBal - x
  }
  
  if (c2 != "USDUSD") {
    #c2_df<- DownloadCCYDF(c2,start_date,end_date)
    c2_df <- LoadCCYDF(c2)
    p_c2 <- (c2_df %>% filter(quotes.date == ymd(start_date)))$quotes.avg
    c2_bal <- floor(x/p_c2)
    USDBal <- USDBal - (c2_bal*p_c2)
    c2_trade_amt <- floor(c2_bal/20)
  } else {
    c2_bal = x
    USDBal = USDBal - x
  }
  
  
  if (c3 != "USDUSD") {
    #c3_df<- DownloadCCYDF(c3,start_date,end_date)
    c3_df <- LoadCCYDF(c3)
    p_c3 <- (c3_df %>% filter(quotes.date == ymd(start_date)))$quotes.avg
    c3_bal <- floor(x/p_c3)
    USDBal <- USDBal - (c3_bal*p_c3)
    c3_trade_amt <- floor(c3_bal/20)
  } else {
    c3_bal = x
    USDBal = USDBal - x
  }
  
  
  if (c4 != "USDUSD") {
    #c4_df<- DownloadCCYDF(c4,start_date,end_date)
    c4_df <- LoadCCYDF(c4)
    p_c4 <- (c4_df %>% filter(quotes.date == ymd(start_date)))$quotes.avg
    c4_bal <- floor(x/p_c4)
    USDBal <- USDBal - (c4_bal*p_c4)
    c4_trade_amt <- floor(c4_bal/20)
  } else {
    c4_bal = x
    USDBal = USDBal - x
  }
  
  # OK, we have a balance in each of the desired currencies, and we have a notion of the trade amount
  # apply the trades and see what happens
  
  actions <- actions_df %>% filter (quotes.date > ymd(start_date) & quotes.date <= ymd(end_date))
  
  # apply the trades
  for (row in 1:nrow(actions)) {
    # figure out what we are buying and selling and get the rates on the specified dates
    d = actions[row,1]
    buy.ccy = actions[row,2]
    sell.ccy = actions[row,3]
    
    # handle the buy
    if (buy.ccy == ccy1) {
      # get he exchange rate on this date
      ex_rate = (ccy1_df %>% filter(quotes.date == ymd(d)))$quotes.avg
      if (c2_bal >= c1_trade_amt / ex_rate) {
        c1_bal = c1_bal + c1_trade_amt
        c2_bal = c2_bal - (c1_trade_amt * ex_rate)
      }
    } else {
      # get he exchange rate on this date
      ex_rate = (ccy2_df %>% filter(quotes.date == ymd(d)))$quotes.avg
      if (c4_bal >= c3_trade_amt / ex_rate) {
        c3_bal = c3_bal + c3_trade_amt
        c4_bal = c4_bal - (c3_trade_amt * ex_rate)
      }
      
    }
    
    #handle the sell
    if (sell.ccy == ccy1) {
      # get he exchange rate on this date
      ex_rate = (ccy1_df %>% filter(quotes.date == ymd(d)))$quotes.avg
      if (c1_bal >= c1_trade_amt) {
        c1_bal = c1_bal - c1_trade_amt
        c2_bal = c2_bal + (c1_trade_amt * ex_rate)
      }
    } else {
      # get he exchange rate on this date
      ex_rate = (ccy2_df %>% filter(quotes.date == ymd(d)))$quotes.avg

      if (c3_bal >= c3_trade_amt) {
        c3_bal = c3_bal - c3_trade_amt
        c4_bal = c4_bal + (c3_trade_amt * ex_rate)
      }
    }
  }
  
  # unwind and accumulate the balance in USD
  # get the end quotes for individual currency / USD at the end date
  if (c1 != "USDUSD") {
    p_c1 <-c1_df[c1_df$quotes.date==end_date,]$quotes.avg
    USDBal <- USDBal + (c1_bal * p_c1)
  } else {
    USDBal <- USDBal + c1_bal
  }
  
  if (c2 != "USDUSD") {
    p_c2 <- c2_df[c2_df$quotes.date==end_date,]$quotes.avg
    USDBal <- USDBal + (c2_bal * p_c2)
  } else {
    USDBal <- USDBal + c1_bal
  }
  
  if (c3 != "USDUSD") {
    p_c3 <- c3_df[c3_df$quotes.date==end_date,]$quotes.avg
    USDBal <- USDBal + (c3_bal * p_c3)
  } else {
    USDBal <- USDBal + c1_bal
  }
  
  if (c4 != "USDUSD") {
    p_c4 <- c4_df[c4_df$quotes.date==end_date,]$quotes.avg
    USDBal <- USDBal + (c4_bal * p_c4)
  } else {
    USDBal <- USDBal + c1_bal
  }
  
  
  return ((USDBal-startingUSD)/startingUSD)
  
}

对于第一个实验,这里是主文件,MainAbsolute.R读取数据,争论,确定一些买卖点,最后回测结果。

source("packages.R")
source ("DFFunctions.R")
source ("TestFunctions.R")


ccy1 = "eurjpy"
ccy2 = "gbpusd"

# these are used to get the data
start_date="2022-06-01"
end_date="2022-08-31"

# Adding buy/sell indicator
sd_threshold <- 1.5


# load the data
#df_list <- DownloadCCYDFs(ccy1,ccy2, start_date, end_date)
df_list <- LoadCCYDFs(ccy1,ccy2)
ccy1_df<-df_list[[1]]
ccy2_df<-df_list[[2]]

#sort these
ccy1_df<-ccy1_df[order(ccy1_df$quotes.date),]
ccy2_df<-ccy2_df[order(ccy2_df$quotes.date),]

# Test 1 - straight stat-arb using subtraction
joint_df<-CreateJointDF(ccy1_df,ccy2_df,10)


# graph these
ggplot(aes(x = quotes.date, y = quotes.diff.normalized, group=1), data = joint_df) +
  geom_point(color = "blue")  +
  geom_line(color = "blue") +
  geom_hline(yintercept=sd_threshold, linetype="dashed", 
             color = "red") +
  geom_hline(yintercept=-sd_threshold, linetype="dashed", 
             color = "red") +
  theme(axis.text.x = element_text(angle = 90))

joint_df <- joint_df %>%
  mutate(sell = case_when(
    quotes.diff.normalized > sd_threshold ~ ccy1,
    quotes.diff.normalized < (-1*sd_threshold) ~ ccy2)) %>%
  mutate(buy = case_when(
      quotes.diff.normalized > sd_threshold ~ ccy2,
      quotes.diff.normalized < (-1*sd_threshold) ~ ccy1))

# get rid of some unnecessary cols
joint_df <- joint_df %>% select(quotes.date,quotes.diff.normalized,buy,sell)

actions_df <- joint_df[,c('quotes.date','buy','sell')]
actions_df <- actions_df[complete.cases(actions_df), ]


startingUSD=10000

ret <- TestActionsDF(ccy1,ccy2,actions_df,startingUSD,start_date,end_date)
ret

让我们讨论一下实验一的结果。有几点需要说明:

  1. 我们测试的时间段是6/1/2022 - 8/31/2022

  2. 只需在期初将一些美元转入这些货币,然后在期末转回美元,其表现为 -0.0675 或-6.75%,我们将其用作我们的“零假设”。如果我们不进行任何交易,那么这就是回报 - 这是我们必须用我们的指标系统击败的。

  3. 如果我们使用减法来定义关系并使用 10 天滚动平均值作为基线,则性能为 -0.06616 或大约-6.62%这比根本不交易要好,所以我们正朝着正确的方向前进。

  4. 而且,当然,我们忽略了一大堆现实世界的考虑因素,比如与这些交易相关的任何费用都会影响回报。

实验二

我们能否通过使用机器学习来定义/预测这些工具之间的关系并创造更有利可图的交易来做得更好?我正在使用 H2O 的 AutoML 功能来构建一个模型,该模型可以预测货币之间的差异应该是什么(给定它所训练的数据)。然后我比较实际差异,如果实际值和预测值不一致,那么这似乎是一个交易机会。

我们需要更多文件来测试它,第一个是MLFunctions.R

source("Packages.R")

# initialize the h2o server
h2o.init(ip="Your H2O host IP address", port=54321, startH2O=FALSE, 
         jvm_custom_args = "-Dsys.ai.h2o.persist.s3.endPoint=”http://Your MinIO Host”:9000 -Dsys.ai.h2o.persist.s3.enable.path.style=true")
h2o.set_s3_credentials("Your MinIO Access Key", "Your MinIO Secret Key")


TrainModel<-function(training_df) {
  training_df.hex<-as.h2o(training_df, destination_frame= "training_df.hex")
  
  splits <- h2o.splitFrame(data = training_df.hex, 
                           ratios = c(0.6),  #partition data into 60%, 40%
                           seed = 1)  #setting a seed will guarantee reproducibility
  train_hex <- splits[[1]]
  test_hex <- splits[[2]]
  
  y_value <- 1
  predictors <- c(2:ncol(train_hex))
  
  
  # train
  aml = h2o.automl(y=y_value,x=predictors,
             training_frame=train_hex,
             leaderboard_frame = test_hex,
             max_runtime_secs = 60,
             seed = 1)
  model<-aml@leader
  
  # train model
  # model <- h2o.deeplearning(y=y_value, x=predictors,
  #                              training_frame=train_hex,
  #                              activation="Tanh",
  #                              autoencoder=FALSE,
  #                              hidden=c(50),
  #                              l1=1e-5,
  #                              ignore_const_cols=FALSE,
  #                              epochs=1)
  
  
  
  # save the leader model as bin
  model_path <- h2o.saveModel(model, path = "s3://bin-models/stat_arb_model_bin")

  return(model)

}


RunModel<-function(model,pred_df) {
  pred_df.hex<-as.h2o(pred_df)
  pred.hex <- h2o.predict(model, pred_df.hex)  # predict(aml, test) and h2o.predict(aml@leader, test) also work
  pred<-as.data.frame(pred.hex)
  return(pred)
}

和一个新的MainModel.R练习模型的方法

source("Packages.R")
source ("DFFunctions.R")
source("MLFunctions.R")
source ("TestFunctions.R")

# Example of multiple approaches to stat arb in forex

ccy1 = "eurjpy"
ccy2 = "gbpusd"

# these are used to get the data
start_date="2022-03-01"
end_date="2022-08-31"
start_training_date = "2022-03-01"
end_training_date = "2022-05-31"
start_pred_date = "2022-06-01"
end_pred_date = "2022-08-31"



# load the data
#df_list <- DownloadCCYDFs(ccy1,ccy2, start_date, end_date)
df_list <- LoadCCYDFs(ccy1,ccy2)
ccy1_df<-df_list[[1]][,c("quotes.date","quotes.avg")]
ccy2_df<-df_list[[2]][,c("quotes.date","quotes.avg")]

#sort these
ccy1_df<-ccy1_df[order(ccy1_df$quotes.date),]
ccy2_df<-ccy2_df[order(ccy2_df$quotes.date),]

# Test 2 - arb using a model to predict the difference
lagged_df<-CreateLaggedDF(ccy1_df,ccy2_df,10)

# create the training df
training_df <- lagged_df %>% filter(df.date > ymd(start_training_date), df.date <= ymd(end_training_date))
#get rid of the date training_df
training_df<-training_df[-1]
# create the pred df
pred_df <- lagged_df %>% filter(df.date > ymd(start_pred_date), df.date <= ymd(end_pred_date))


#write the files out
training_df %>% write_dataset(GetMinIOURI("training_df"), format = "parquet" )
pred_df %>% write_dataset(GetMinIOURI("pred_df"), format = "parquet" )

#train the model on this data
model <- TrainModel(training_df)
model

# okay, run the model on the data from 6/1 to 8/31
# it's ok that date and target are in here, the model selects the X values by col name
pred<-RunModel(model,pred_df)

# wrangle the return
predictions<-cbind(quotes.date=pred_df$df.date,actual=pred_df$df.target,pred=pred)
predictions$quotes.date<-as.Date(predictions$quotes.date)
predictions$diff<-predictions$actual-predictions$pred


# Adding buy/sell indicator
upper_threshold <- 5.0
lower_threshold <- 0.5

ggplot(aes(x = quotes.date, y = diff, group=1), data = predictions) +
  geom_point(color = "blue")  +
  geom_line(color = "blue") +
  geom_hline(yintercept=upper_threshold, linetype="dashed", 
             color = "red") +
  geom_hline(yintercept=lower_threshold, linetype="dashed", 
             color = "red") +
  theme(axis.text.x = element_text(angle = 90))


predictions <- predictions %>%
  mutate(sell = case_when(
    diff > upper_threshold ~ ccy1,
    diff < (lower_threshold) ~ ccy2)) %>%
  mutate(buy = case_when(
    diff > upper_threshold ~ ccy2,
    diff < (lower_threshold) ~ ccy1))



actions_df <- predictions %>% select(quotes.date,buy,sell) %>% filter(! is.na(buy))

# Let's test it for 6/1 - 8/31
startingUSD=10000

ret <- TestActionsDF(ccy1,ccy2,actions_df,startingUSD,start_date,end_date)
ret

请注意,在建模函数中,我们运行 H2O AutoML 并允许它从它尝试的模型中选择最佳模型。我们用于 AutoML 执行的参数仅用于此演示。一旦选择了模型,我们就以二进制格式将其保存回 MinIO,这样它就可以在没有 R 的情况下从生产工作流中执行。模型可以存储在模型存储库中,但老实说,我发现通常那些是有点矫枉过正。直接存储模型以及有关模型的一些元数据通常就足够了。

使用建模方法来识别差异要复杂一些。我使用 2022 年 3 月 1 日到 2022 年 5 月 31 日的数据创建了一个 10 天的“滞后”训练集。这是有效的,但这是在时间序列数据上训练模型的最基本方法,并且存在许多问题和缺点。我正在使用带有 H2O AutoML 的滞后训练数据来训练能够预测差异的模型。我们使用这个预测作为 ccys 之间的预期差异而不是滚动平均值,就像我们在第一个实验中所做的那样。如果您对处理用于 AI/ML 的时间序列数据感兴趣,这里有一篇来自 H2O 的时间序列预测博客此外,本文末尾还推荐了作者Marcos Lopez De Prado的一本书这是对这个复杂主题的更深入的处理。

这是模型预测与 2022 年 6 月 1 日至 2022 年 8 月 31 日期间实际情况之间差异的图表


Screen Shot 2022-10-19 at 10.53.27 AM.png


我们来看看实验二的结果。

我们做得如何?这组实验的零假设是-6.75 %一个实验 - 使用滚动平均静态关系和交易,如所示,具有-6.62%的性能。这样好一点,至少在朝着正确的方向前进。

使用在滞后历史关系上训练的AutoML 模型,然后使用该方法预测差异,使我们能够略微提高回报。使用这种方法,性能为 -0.05997 或大约-6.0%,略好于之前的实验。

我们能做得更好吗?这两个实验受到严格限制,仅用于教学目的。每个外汇交易者都有自己的方法,因此我尝试提供一个可以轻松定制的流程。有许多财务途径可以用来改进此处概述的方法。

无论我们的财务策略如何,我们的技术途径都很明确——云原生 MinIO 对象存储具有为各种统计和 AI/ML 工作负载提供支持的性能和弹性。软件定义、S3 兼容且性能极佳的 MinIO 是此类工作负载的标准。这就是为什么 MinIO 成为Kubeflow 的标准,并与每个主要的 AI/ML 框架无缝协作,包括H2OTensorFlow。  

结论

本教程着眼于在一对汇率 - eurjpy 和 gbpusd 之间使用统计套利。还有许多其他的探索。它只查看“每日柱线”(开盘价、最高价、最低价、收盘价数据)。还有其他时间框架或粒度需要探索 - “小时柱”、“分钟柱”、“第二柱”以及实际报价数据 - 它们在时域中不是同质的,需要一些额外的处理。

如果你对金融算法和处理金融数据感兴趣,我推荐An Introduction to High-Frequency Finance。作者讨论了高频(逐笔交易)数据在理解市场微观结构方面的价值和用途,包括讨论处理如此海量数据的最佳数学模型和工具。这些示例和许多主题都与外汇相关。

如果您有兴趣将机器学习应用于财务数据,我推荐Advances in Financial Machine Learning作者提供了有关使用高级 ML 解决方案克服现实世界投资问题的实用见解。

数据库和分析市场正在发生翻天覆地的变化,开放金融(和其他)数据以实现更好的协作和洞察力。近年来,许多数据库和分析包都采用了开放表标准,例如 Parquet、ArrowIcebergHudi毫无疑问,数据库和分析产品仅限于专有文件格式的时代即将结束。这些开放的表格数据标准允许在不损失存储效率的情况下实现更大的处理灵活性。它们还允许在产品之间移动数据——例如,可以从 Snowflake 访问以开放文件格式存储的 Big Query 数据作为外部表。开放文件格式针对对象存储进行了优化。例如,在本教程中创建的 Parquet 文件可以从数据管理和分析程序访问,这些程序能够使用开放表格式合并外部表。由于其云原生和无处不在的特性,它们非常适合 MinIO 对象存储。同样,对于本地部署,在商用硬件上运行的对象存储为客户增加了价值。硬件供应商之间对商品硬件的竞争保证了低成本和高价值。

未来是开放的、云原生的和分布式的。


上一篇 下一篇