zjowowen's picture
init space
079c32c
raw
history blame
6.42 kB
from pprint import pprint
from typing import Any
from copy import deepcopy
import numpy as np
from dizoo.gym_anytrading.envs.trading_env import TradingEnv, Actions, Positions, load_dataset
from ding.utils import ENV_REGISTRY
from ding.torch_utils import to_ndarray
@ENV_REGISTRY.register('stocks-v0')
class StocksEnv(TradingEnv):
def __init__(self, cfg):
super().__init__(cfg)
# ====== load Google stocks data =======
raw_data = load_dataset(self._cfg.stocks_data_filename, 'Date')
self.raw_prices = raw_data.loc[:, 'Close'].to_numpy()
EPS = 1e-10
self.df = deepcopy(raw_data)
if self.train_range == None or self.test_range == None:
self.df = self.df.apply(lambda x: (x - x.mean()) / (x.std() + EPS), axis=0)
else:
boundary = int(len(self.df) * self.train_range)
train_data = raw_data[:boundary].copy()
boundary = int(len(raw_data) * (1 + self.test_range))
test_data = raw_data[boundary:].copy()
train_data = train_data.apply(lambda x: (x - x.mean()) / (x.std() + EPS), axis=0)
test_data = test_data.apply(lambda x: (x - x.mean()) / (x.std() + EPS), axis=0)
self.df.loc[train_data.index, train_data.columns] = train_data
self.df.loc[test_data.index, test_data.columns] = test_data
# ======================================
# set cost
self.trade_fee_bid_percent = 0.01 # unit
self.trade_fee_ask_percent = 0.005 # unit
# override
def _process_data(self, start_idx: int = None) -> Any:
'''
Overview:
used by env.reset(), process the raw data.
Arguments:
- start_idx (int): the start tick; if None, then randomly select.
Returns:
- prices: the close.
- signal_features: feature map
- feature_dim_len: the dimension length of selected feature
'''
# ====== build feature map ========
all_feature_name = ['Close', 'Open', 'High', 'Low', 'Adj Close', 'Volume']
all_feature = {k: self.df.loc[:, k].to_numpy() for k in all_feature_name}
# add feature "Diff"
prices = self.df.loc[:, 'Close'].to_numpy()
diff = np.insert(np.diff(prices), 0, 0)
all_feature_name.append('Diff')
all_feature['Diff'] = diff
# =================================
# you can select features you want
selected_feature_name = ['Close', 'Diff', 'Volume']
selected_feature = np.column_stack([all_feature[k] for k in selected_feature_name])
feature_dim_len = len(selected_feature_name)
# validate index
if start_idx is None:
if self.train_range == None or self.test_range == None:
self.start_idx = np.random.randint(self.window_size - 1, len(self.df) - self._cfg.eps_length)
elif self._env_id[-1] == 'e':
boundary = int(len(self.df) * (1 + self.test_range))
assert len(self.df) - self._cfg.eps_length > boundary + self.window_size,\
"parameter test_range is too large!"
self.start_idx = np.random.randint(boundary + self.window_size, len(self.df) - self._cfg.eps_length)
else:
boundary = int(len(self.df) * self.train_range)
assert boundary - self._cfg.eps_length > self.window_size,\
"parameter test_range is too small!"
self.start_idx = np.random.randint(self.window_size, boundary - self._cfg.eps_length)
else:
self.start_idx = start_idx
self._start_tick = self.start_idx
self._end_tick = self._start_tick + self._cfg.eps_length - 1
return prices, selected_feature, feature_dim_len
# override
def _calculate_reward(self, action: int) -> np.float32:
step_reward = 0.
current_price = (self.raw_prices[self._current_tick])
last_trade_price = (self.raw_prices[self._last_trade_tick])
ratio = current_price / last_trade_price
cost = np.log((1 - self.trade_fee_ask_percent) * (1 - self.trade_fee_bid_percent))
if action == Actions.BUY and self._position == Positions.SHORT:
step_reward = np.log(2 - ratio) + cost
if action == Actions.SELL and self._position == Positions.LONG:
step_reward = np.log(ratio) + cost
if action == Actions.DOUBLE_SELL and self._position == Positions.LONG:
step_reward = np.log(ratio) + cost
if action == Actions.DOUBLE_BUY and self._position == Positions.SHORT:
step_reward = np.log(2 - ratio) + cost
step_reward = float(step_reward)
return step_reward
# override
def max_possible_profit(self) -> float:
current_tick = self._start_tick
last_trade_tick = current_tick - 1
profit = 1.
while current_tick <= self._end_tick:
if self.raw_prices[current_tick] < self.raw_prices[current_tick - 1]:
while (current_tick <= self._end_tick
and self.raw_prices[current_tick] < self.raw_prices[current_tick - 1]):
current_tick += 1
current_price = self.raw_prices[current_tick - 1]
last_trade_price = self.raw_prices[last_trade_tick]
tmp_profit = profit * (2 - (current_price / last_trade_price)) * (1 - self.trade_fee_ask_percent
) * (1 - self.trade_fee_bid_percent)
profit = max(profit, tmp_profit)
else:
while (current_tick <= self._end_tick
and self.raw_prices[current_tick] >= self.raw_prices[current_tick - 1]):
current_tick += 1
current_price = self.raw_prices[current_tick - 1]
last_trade_price = self.raw_prices[last_trade_tick]
tmp_profit = profit * (current_price / last_trade_price) * (1 - self.trade_fee_ask_percent
) * (1 - self.trade_fee_bid_percent)
profit = max(profit, tmp_profit)
last_trade_tick = current_tick - 1
return profit
def __repr__(self) -> str:
return "DI-engine Stocks Trading Env"