|
from pprint import pprint |
|
from typing import Any |
|
from copy import deepcopy |
|
import numpy as np |
|
|
|
from dizoo.gym_anytrading.envs.trading_env import TradingEnv, Actions, Positions, load_dataset |
|
from ding.utils import ENV_REGISTRY |
|
from ding.torch_utils import to_ndarray |
|
|
|
|
|
@ENV_REGISTRY.register('stocks-v0') |
|
class StocksEnv(TradingEnv): |
|
|
|
def __init__(self, cfg): |
|
|
|
super().__init__(cfg) |
|
|
|
|
|
raw_data = load_dataset(self._cfg.stocks_data_filename, 'Date') |
|
self.raw_prices = raw_data.loc[:, 'Close'].to_numpy() |
|
EPS = 1e-10 |
|
self.df = deepcopy(raw_data) |
|
if self.train_range == None or self.test_range == None: |
|
self.df = self.df.apply(lambda x: (x - x.mean()) / (x.std() + EPS), axis=0) |
|
else: |
|
boundary = int(len(self.df) * self.train_range) |
|
train_data = raw_data[:boundary].copy() |
|
boundary = int(len(raw_data) * (1 + self.test_range)) |
|
test_data = raw_data[boundary:].copy() |
|
|
|
train_data = train_data.apply(lambda x: (x - x.mean()) / (x.std() + EPS), axis=0) |
|
test_data = test_data.apply(lambda x: (x - x.mean()) / (x.std() + EPS), axis=0) |
|
self.df.loc[train_data.index, train_data.columns] = train_data |
|
self.df.loc[test_data.index, test_data.columns] = test_data |
|
|
|
|
|
|
|
self.trade_fee_bid_percent = 0.01 |
|
self.trade_fee_ask_percent = 0.005 |
|
|
|
|
|
def _process_data(self, start_idx: int = None) -> Any: |
|
''' |
|
Overview: |
|
used by env.reset(), process the raw data. |
|
Arguments: |
|
- start_idx (int): the start tick; if None, then randomly select. |
|
Returns: |
|
- prices: the close. |
|
- signal_features: feature map |
|
- feature_dim_len: the dimension length of selected feature |
|
''' |
|
|
|
|
|
all_feature_name = ['Close', 'Open', 'High', 'Low', 'Adj Close', 'Volume'] |
|
all_feature = {k: self.df.loc[:, k].to_numpy() for k in all_feature_name} |
|
|
|
prices = self.df.loc[:, 'Close'].to_numpy() |
|
diff = np.insert(np.diff(prices), 0, 0) |
|
all_feature_name.append('Diff') |
|
all_feature['Diff'] = diff |
|
|
|
|
|
|
|
selected_feature_name = ['Close', 'Diff', 'Volume'] |
|
selected_feature = np.column_stack([all_feature[k] for k in selected_feature_name]) |
|
feature_dim_len = len(selected_feature_name) |
|
|
|
|
|
if start_idx is None: |
|
if self.train_range == None or self.test_range == None: |
|
self.start_idx = np.random.randint(self.window_size - 1, len(self.df) - self._cfg.eps_length) |
|
elif self._env_id[-1] == 'e': |
|
boundary = int(len(self.df) * (1 + self.test_range)) |
|
assert len(self.df) - self._cfg.eps_length > boundary + self.window_size,\ |
|
"parameter test_range is too large!" |
|
self.start_idx = np.random.randint(boundary + self.window_size, len(self.df) - self._cfg.eps_length) |
|
else: |
|
boundary = int(len(self.df) * self.train_range) |
|
assert boundary - self._cfg.eps_length > self.window_size,\ |
|
"parameter test_range is too small!" |
|
self.start_idx = np.random.randint(self.window_size, boundary - self._cfg.eps_length) |
|
else: |
|
self.start_idx = start_idx |
|
|
|
self._start_tick = self.start_idx |
|
self._end_tick = self._start_tick + self._cfg.eps_length - 1 |
|
|
|
return prices, selected_feature, feature_dim_len |
|
|
|
|
|
def _calculate_reward(self, action: int) -> np.float32: |
|
step_reward = 0. |
|
current_price = (self.raw_prices[self._current_tick]) |
|
last_trade_price = (self.raw_prices[self._last_trade_tick]) |
|
ratio = current_price / last_trade_price |
|
cost = np.log((1 - self.trade_fee_ask_percent) * (1 - self.trade_fee_bid_percent)) |
|
|
|
if action == Actions.BUY and self._position == Positions.SHORT: |
|
step_reward = np.log(2 - ratio) + cost |
|
|
|
if action == Actions.SELL and self._position == Positions.LONG: |
|
step_reward = np.log(ratio) + cost |
|
|
|
if action == Actions.DOUBLE_SELL and self._position == Positions.LONG: |
|
step_reward = np.log(ratio) + cost |
|
|
|
if action == Actions.DOUBLE_BUY and self._position == Positions.SHORT: |
|
step_reward = np.log(2 - ratio) + cost |
|
|
|
step_reward = float(step_reward) |
|
|
|
return step_reward |
|
|
|
|
|
def max_possible_profit(self) -> float: |
|
current_tick = self._start_tick |
|
last_trade_tick = current_tick - 1 |
|
profit = 1. |
|
|
|
while current_tick <= self._end_tick: |
|
|
|
if self.raw_prices[current_tick] < self.raw_prices[current_tick - 1]: |
|
while (current_tick <= self._end_tick |
|
and self.raw_prices[current_tick] < self.raw_prices[current_tick - 1]): |
|
current_tick += 1 |
|
|
|
current_price = self.raw_prices[current_tick - 1] |
|
last_trade_price = self.raw_prices[last_trade_tick] |
|
tmp_profit = profit * (2 - (current_price / last_trade_price)) * (1 - self.trade_fee_ask_percent |
|
) * (1 - self.trade_fee_bid_percent) |
|
profit = max(profit, tmp_profit) |
|
else: |
|
while (current_tick <= self._end_tick |
|
and self.raw_prices[current_tick] >= self.raw_prices[current_tick - 1]): |
|
current_tick += 1 |
|
|
|
current_price = self.raw_prices[current_tick - 1] |
|
last_trade_price = self.raw_prices[last_trade_tick] |
|
tmp_profit = profit * (current_price / last_trade_price) * (1 - self.trade_fee_ask_percent |
|
) * (1 - self.trade_fee_bid_percent) |
|
profit = max(profit, tmp_profit) |
|
last_trade_tick = current_tick - 1 |
|
|
|
return profit |
|
|
|
def __repr__(self) -> str: |
|
return "DI-engine Stocks Trading Env" |
|
|