diff --git a/pages/backtest_manager/analyze.py b/pages/backtest_manager/analyze.py
index c0dc2095..c7b80e05 100644
--- a/pages/backtest_manager/analyze.py
+++ b/pages/backtest_manager/analyze.py
@@ -1,4 +1,156 @@
-from utils.st_utils import initialize_st_page
+import constants
+import os
+import json
+import streamlit as st
+from quants_lab.strategy.strategy_analysis import StrategyAnalysis
+from utils.graphs import BacktestingGraphs
+from utils.optuna_database_manager import OptunaDBManager
+from utils.os_utils import load_directional_strategies
+from utils.st_utils import initialize_st_page
initialize_st_page(title="Analyze", icon="š¬", initial_sidebar_state="collapsed")
+
+
+@st.cache_resource
+def get_databases():
+ sqlite_files = [db_name for db_name in os.listdir("data/backtesting") if db_name.endswith(".db")]
+ databases_list = [OptunaDBManager(db) for db in sqlite_files]
+ databases_dict = {database.db_name: database for database in databases_list}
+ return [x.db_name for x in databases_dict.values() if x.status == 'OK']
+
+
+def initialize_session_state_vars():
+ if "strategy_params" not in st.session_state:
+ st.session_state.strategy_params = {}
+ if "backtesting_params" not in st.session_state:
+ st.session_state.backtesting_params = {}
+
+
+initialize_session_state_vars()
+dbs = get_databases()
+if not dbs:
+ st.warning("We couldn't find any Optuna database.")
+ selected_db_name = None
+ selected_db = None
+else:
+ # Select database from selectbox
+ selected_db = st.selectbox("Select your database:", dbs)
+ # Instantiate database manager
+ opt_db = OptunaDBManager(selected_db)
+ # Load studies
+ studies = opt_db.load_studies()
+ # Choose study
+ study_selected = st.selectbox("Select a study:", studies.keys())
+ # Filter trials from selected study
+ merged_df = opt_db.merged_df[opt_db.merged_df["study_name"] == study_selected]
+ bt_graphs = BacktestingGraphs(merged_df)
+ # Show and compare all of the study trials
+ st.plotly_chart(bt_graphs.pnl_vs_maxdrawdown(), use_container_width=True)
+ # Get study trials
+ trials = studies[study_selected]
+ # Choose trial
+ trial_selected = st.selectbox("Select a trial to backtest", list(trials.keys()))
+ trial = trials[trial_selected]
+ # Transform trial config in a dictionary
+ trial_config = json.loads(trial["config"])
+
+ # Strategy parameters section
+ st.write("## Strategy parameters")
+ # Load strategies (class, config, module)
+ strategies = load_directional_strategies(constants.DIRECTIONAL_STRATEGIES_PATH)
+ # Select strategy
+ strategy = strategies[trial_config["name"]]
+ # Get field schema
+ field_schema = strategy["config"].schema()["properties"]
+ c1, c2 = st.columns([5, 1])
+ # Render every field according to schema
+ with c1:
+ columns = st.columns(4)
+ column_index = 0
+ for field_name, properties in field_schema.items():
+ field_type = properties["type"]
+ field_value = trial_config[field_name]
+ with columns[column_index]:
+ if field_type in ["number", "integer"]:
+ field_value = st.number_input(field_name,
+ value=field_value,
+ min_value=properties.get("minimum"),
+ max_value=properties.get("maximum"),
+ key=field_name)
+ elif field_type == "string":
+ field_value = st.text_input(field_name, value=field_value)
+ elif field_type == "boolean":
+ # TODO: Add support for boolean fields in optimize tab
+ field_value = st.checkbox(field_name, value=field_value)
+ else:
+ raise ValueError(f"Field type {field_type} not supported")
+ try:
+ st.session_state["strategy_params"][field_name] = field_value
+ except KeyError as e:
+ pass
+ column_index = (column_index + 1) % 4
+ with c2:
+ add_positions = st.checkbox("Add positions", value=True)
+ add_volume = st.checkbox("Add volume", value=True)
+ add_pnl = st.checkbox("Add PnL", value=True)
+
+ # Backtesting parameters section
+ st.write("## Backtesting parameters")
+ # Get every trial params
+ # TODO: Filter only from selected study
+ backtesting_configs = opt_db.load_params()
+ # Get trial backtesting params
+ backtesting_params = backtesting_configs[trial_selected]
+ col1, col2, col3 = st.columns(3)
+ with col1:
+ selected_order_amount = st.number_input("Order amount",
+ value=50.0,
+ min_value=0.1,
+ max_value=999999999.99)
+ selected_leverage = st.number_input("Leverage",
+ value=10,
+ min_value=1,
+ max_value=200)
+ with col2:
+ selected_initial_portfolio = st.number_input("Initial portfolio",
+ value=10000.00,
+ min_value=1.00,
+ max_value=999999999.99)
+ selected_time_limit = st.number_input("Time Limit",
+ value=60 * 60 * backtesting_params["time_limit"]["param_value"],
+ min_value=60 * 60 * float(backtesting_params["time_limit"]["low"]),
+ max_value=60 * 60 * float(backtesting_params["time_limit"]["high"]))
+ with col3:
+ selected_tp_multiplier = st.number_input("Take Profit Multiplier",
+ value=backtesting_params["take_profit_multiplier"]["param_value"],
+ min_value=backtesting_params["take_profit_multiplier"]["low"],
+ max_value=backtesting_params["take_profit_multiplier"]["high"])
+ selected_sl_multiplier = st.number_input("Stop Loss Multiplier",
+ value=backtesting_params["stop_loss_multiplier"]["param_value"],
+ min_value=backtesting_params["stop_loss_multiplier"]["low"],
+ max_value=backtesting_params["stop_loss_multiplier"]["high"])
+
+ if st.button("Run Backtesting!"):
+ config = strategy["config"](**st.session_state["strategy_params"])
+ strategy = strategy["class"](config=config)
+ try:
+ market_data, positions = strategy.run_backtesting(
+ order_amount=selected_order_amount,
+ leverage=selected_order_amount,
+ initial_portfolio=selected_initial_portfolio,
+ take_profit_multiplier=selected_tp_multiplier,
+ stop_loss_multiplier=selected_sl_multiplier,
+ time_limit=selected_time_limit,
+ std_span=None,
+ )
+ strategy_analysis = StrategyAnalysis(
+ positions=positions,
+ candles_df=market_data,
+ )
+ metrics_container = bt_graphs.get_trial_metrics(strategy_analysis,
+ add_positions=add_positions,
+ add_volume=add_volume,
+ add_pnl=add_pnl)
+ except FileNotFoundError:
+ st.warning(f"The requested candles could not be found.")
diff --git a/quants_lab/strategy/strategy_analysis.py b/quants_lab/strategy/strategy_analysis.py
index 55c3a3c8..f00267c9 100644
--- a/quants_lab/strategy/strategy_analysis.py
+++ b/quants_lab/strategy/strategy_analysis.py
@@ -193,6 +193,15 @@ def avg_trading_time_in_minutes(self):
time_diff_minutes = (pd.to_datetime(self.positions['close_time']) - self.positions['timestamp']).dt.total_seconds() / 60
return time_diff_minutes.mean()
+ def start_date(self):
+ return self.candles_df.timestamp.min()
+
+ def end_date(self):
+ return self.candles_df.timestamp.max()
+
+ def avg_profit(self):
+ return self.positions.ret_usd.mean()
+
def text_report(self):
return f"""
Strategy Performance Report:
@@ -207,3 +216,16 @@ def text_report(self):
- Duration: {self.duration_in_minutes() / 60:,.2f} Hours
- Average Trade Duration: {self.avg_trading_time_in_minutes():,.2f} minutes
"""
+
+ def pnl_over_time(self):
+ fig = go.Figure()
+ fig.add_trace(go.Scatter(name="PnL Over Time",
+ x=self.positions.index,
+ y=self.positions.ret_usd.cumsum()))
+ # Update layout with the required attributes
+ fig.update_layout(
+ title="PnL Over Time",
+ xaxis_title="NĀ° Position",
+ yaxis=dict(title="Net PnL USD", side="left", showgrid=False),
+ )
+ return fig
\ No newline at end of file
diff --git a/utils/file_templates.py b/utils/file_templates.py
index d4d55fcf..2ca4e81d 100644
--- a/utils/file_templates.py
+++ b/utils/file_templates.py
@@ -11,6 +11,7 @@ def directional_strategy_template(strategy_cls_name: str) -> str:
class {strategy_config_cls_name}(BaseModel):
+ name: str = "{strategy_cls_name.lower()}"
exchange: str = Field(default="binance_perpetual")
trading_pair: str = Field(default="ETH-USDT")
interval: str = Field(default="1h")
diff --git a/utils/graphs.py b/utils/graphs.py
index abd610c2..25a6f485 100644
--- a/utils/graphs.py
+++ b/utils/graphs.py
@@ -4,6 +4,7 @@
import streamlit as st
from utils.data_manipulation import StrategyData, SingleMarketStrategyData
+from quants_lab.strategy.strategy_analysis import StrategyAnalysis
import plotly.graph_objs as go
@@ -248,3 +249,97 @@ def get_merged_df(self, strategy_data: StrategyData):
merged_df["trade_pnl_continuos"] = merged_df["unrealized_trade_pnl"] + merged_df["cum_net_amount"] * merged_df["close"]
merged_df["net_pnl_continuos"] = merged_df["trade_pnl_continuos"] - merged_df["cum_fees_in_quote"]
return merged_df
+
+
+class BacktestingGraphs:
+ def __init__(self, study_df: pd.DataFrame):
+ self.study_df = study_df
+
+ def pnl_vs_maxdrawdown(self):
+ fig = go.Figure()
+ fig.add_trace(go.Scatter(name="Pnl vs Max Drawdown",
+ x=-100 * self.study_df["max_drawdown_pct"],
+ y=100 * self.study_df["net_profit_pct"],
+ mode="markers",
+ text=None,
+ hovertext=self.study_df["hover_text"]))
+ fig.update_layout(
+ title="PnL vs Max Drawdown",
+ xaxis_title="Max Drawdown [%]",
+ yaxis_title="Net Profit [%]",
+ height=800
+ )
+ fig.data[0].text = []
+ return fig
+
+ @staticmethod
+ def get_trial_metrics(strategy_analysis: StrategyAnalysis,
+ add_volume: bool = True,
+ add_positions: bool = True,
+ add_pnl: bool = True):
+ """Isolated method because it needs to be called from analyze and simulate pages"""
+ metrics_container = st.container()
+ with metrics_container:
+ col1, col2 = st.columns(2)
+ with col1:
+ st.subheader("š¦ Market")
+ with col2:
+ st.subheader("š General stats")
+ col1, col2, col3, col4 = st.columns(4)
+ with col1:
+ st.metric("Exchange", st.session_state["strategy_params"]["exchange"])
+ with col2:
+ st.metric("Trading Pair", st.session_state["strategy_params"]["trading_pair"])
+ with col3:
+ st.metric("Start date", strategy_analysis.start_date().strftime("%Y-%m-%d %H:%M"))
+ st.metric("End date", strategy_analysis.end_date().strftime("%Y-%m-%d %H:%M"))
+ with col4:
+ st.metric("Duration (hours)", f"{strategy_analysis.duration_in_minutes() / 60:.2f}")
+ st.metric("Price change", st.session_state["strategy_params"]["trading_pair"])
+ st.subheader("š Performance")
+ col1, col2, col3, col4, col5, col6, col7, col8 = st.columns(8)
+ with col1:
+ st.metric("Net PnL USD",
+ f"{strategy_analysis.net_profit_usd():.2f}",
+ delta=f"{100 * strategy_analysis.net_profit_pct():.2f}%",
+ help="The overall profit or loss achieved.")
+ with col2:
+ st.metric("Total positions",
+ f"{strategy_analysis.total_positions()}",
+ help="The total number of closed trades, winning and losing.")
+ with col3:
+ st.metric("Accuracy",
+ f"{100 * (len(strategy_analysis.win_signals()) / strategy_analysis.total_positions()):.2f} %",
+ help="The percentage of winning trades, the number of winning trades divided by the"
+ " total number of closed trades")
+ with col4:
+ st.metric("Profit factor",
+ f"{strategy_analysis.profit_factor():.2f}",
+ help="The amount of money the strategy made for every unit of money it lost, "
+ "gross profits divided by gross losses.")
+ with col5:
+ st.metric("Max Drawdown",
+ f"{strategy_analysis.max_drawdown_usd():.2f}",
+ delta=f"{100 * strategy_analysis.max_drawdown_pct():.2f}%",
+ help="The greatest loss drawdown, i.e., the greatest possible loss the strategy had compared "
+ "to its highest profits")
+ with col6:
+ st.metric("Avg Profit",
+ f"{strategy_analysis.avg_profit():.2f}",
+ help="The sum of money gained or lost by the average trade, Net Profit divided by "
+ "the overall number of closed trades.")
+ with col7:
+ st.metric("Avg Minutes",
+ f"{strategy_analysis.avg_trading_time_in_minutes():.2f}",
+ help="The average number of minutes that elapsed during trades for all closed trades.")
+ with col8:
+ st.metric("Sharpe Ratio",
+ f"{strategy_analysis.sharpe_ratio():.2f}",
+ help="The Sharpe ratio is a measure that quantifies the risk-adjusted return of an investment"
+ " or portfolio. It compares the excess return earned above a risk-free rate per unit of"
+ " risk taken.")
+
+ st.plotly_chart(strategy_analysis.pnl_over_time(), use_container_width=True)
+ strategy_analysis.create_base_figure(volume=add_volume, positions=add_positions, trade_pnl=add_pnl)
+ st.plotly_chart(strategy_analysis.figure(), use_container_width=True)
+ return metrics_container
diff --git a/utils/optuna_database_manager.py b/utils/optuna_database_manager.py
new file mode 100644
index 00000000..5c79c186
--- /dev/null
+++ b/utils/optuna_database_manager.py
@@ -0,0 +1,297 @@
+import os
+import json
+
+import pandas as pd
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+
+from utils.data_manipulation import StrategyData
+
+
+class OptunaDBManager:
+ def __init__(self, db_name):
+ self.db_name = db_name
+ self.db_path = f'sqlite:///{os.path.join("data/backtesting", db_name)}'
+ self.engine = create_engine(self.db_path, connect_args={'check_same_thread': False})
+ self.session_maker = sessionmaker(bind=self.engine)
+
+ @property
+ def status(self):
+ try:
+ with self.session_maker() as session:
+ query = 'SELECT * FROM trials WHERE state = "COMPLETE"'
+ completed_trials = pd.read_sql_query(query, session.connection())
+ if len(completed_trials) > 0:
+ # TODO: improve error handling, think what to do with other cases
+ return "OK"
+ else:
+ return "No records found in the trials table with completed state"
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def tables(self):
+ return self._get_tables()
+
+ def _get_tables(self):
+ try:
+ with self.session_maker() as session:
+ query = "SELECT name FROM sqlite_master WHERE type='table';"
+ tables = pd.read_sql_query(query, session.connection())
+ return tables["name"].tolist()
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def trials(self):
+ return self._get_trials_table()
+
+ def _get_trials_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM trials", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def studies(self):
+ return self._get_studies_table()
+
+ def _get_studies_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM studies", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def trial_params(self):
+ return self._get_trial_params_table()
+
+ def _get_trial_params_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM trial_params", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def trial_values(self):
+ return self._get_trial_values_table()
+
+ def _get_trial_values_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM trial_values", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def trial_system_attributes(self):
+ return self._get_trial_system_attributes_table()
+
+ def _get_trial_system_attributes_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM trial_system_attributes", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def trial_system_attributes(self):
+ return self._get_trial_system_attributes_table()
+
+ def _get_trial_system_attributes_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM trial_system_attributes", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def version_info(self):
+ return self._get_version_info_table()
+
+ def _get_version_info_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM version_info", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def study_directions(self):
+ return self._get_study_directions_table()
+
+ def _get_study_directions_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM study_directions", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def study_user_attributes(self):
+ return self._get_study_user_attributes_table()
+
+ def _get_study_user_attributes_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM study_user_attributes", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def study_system_attributes(self):
+ return self._get_study_system_attributes_table()
+
+ def _get_study_system_attributes_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM study_system_attributes", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def trial_user_attributes(self):
+ return self._get_trial_user_attributes_table()
+
+ def _get_trial_user_attributes_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM trial_user_attributes", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def trial_intermediate_values(self):
+ return self._get_trial_intermediate_values_table()
+
+ def _get_trial_intermediate_values_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM trial_intermediate_values", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def trial_heartbeats(self):
+ return self._get_trial_heartbeats_table()
+
+ def _get_trial_heartbeats_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM trial_heartbeats", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def alembic_version(self):
+ return self._get_alembic_version_table()
+
+ def _get_alembic_version_table(self):
+ try:
+ with self.session_maker() as session:
+ df = pd.read_sql_query("SELECT * FROM alembic_version", session.connection())
+ return df
+ except Exception as e:
+ return f"Error: {str(e)}"
+
+ @property
+ def merged_df(self):
+ return self._get_merged_df()
+
+ @staticmethod
+ def _add_hovertext(x):
+ summary_label = (f"Trial ID: {x['trial_id']}
"
+ f"Study: {x['study_name']}
"
+ f"--------------------
"
+ f"Accuracy: {100 * x['accuracy']:.2f} %
"
+ f"Avg Trading Time in Hours: {x['avg_trading_time_in_hours']:.2f}
"
+ f"Duration in Hours: {x['duration_in_hours']:.2f}
"
+ f"Loss Signals: {x['loss_signals']}
"
+ f"Max Drawdown [%]: {100 * x['max_drawdown_pct']:.2f} %
"
+ f"Max Drawdown [USD]: $ {x['max_drawdown_usd']:.2f}
"
+ f"Net Profit [%]: {100 * x['net_profit_pct']:.2f} %
"
+ f"Net Profit [$]: $ {x['net_profit_usd']:.2f}
"
+ f"Profit Factor: {x['profit_factor']:.2f}
"
+ f"Sharpe Ratio: {x['sharpe_ratio']:.4f}
"
+ f"Total Positions: {x['total_positions']}
"
+ f"Win Signals: {x['win_signals']}
"
+ f"Trial value: {x['value']}
"
+ f"Direction: {x['direction']}
"
+ )
+ return summary_label
+
+ def _get_merged_df(self):
+ float_cols = ["accuracy", "avg_trading_time_in_hours", "duration_in_hours", "max_drawdown_pct", "max_drawdown_usd",
+ "net_profit_pct", "net_profit_usd", "profit_factor", "sharpe_ratio", "value"]
+ int_cols = ["loss_signals", "total_positions", "win_signals"]
+ merged_df = self.trials\
+ .merge(self.studies, on="study_id")\
+ .merge(pd.pivot(self.trial_user_attributes, index="trial_id", columns="key", values="value_json"),
+ on="trial_id")\
+ .merge(self.trial_values, on="trial_id")\
+ .merge(self.study_directions, on="study_id")
+ merged_df[float_cols] = merged_df[float_cols].astype("float")
+ merged_df[int_cols] = merged_df[int_cols].astype("int64")
+ merged_df["hover_text"] = merged_df.apply(self._add_hovertext, axis=1)
+ return merged_df
+
+ def load_studies(self):
+ df = self.merged_df
+ study_name_col = 'study_name'
+ trial_id_col = 'trial_id'
+ nested_dict = {}
+ for _, row in df.iterrows():
+ study_name = row[study_name_col]
+ trial_id = row[trial_id_col]
+ data_dict = row.drop([study_name_col, trial_id_col]).to_dict()
+ if study_name not in nested_dict:
+ nested_dict[study_name] = {}
+ nested_dict[study_name][trial_id] = data_dict
+ return nested_dict
+
+ def load_params(self):
+ trial_id_col = 'trial_id'
+ param_name_col = 'param_name'
+ param_value_col = 'param_value'
+ distribution_json_col = 'distribution_json'
+ nested_dict = {}
+ for _, row in self.trial_params.iterrows():
+ trial_id = row[trial_id_col]
+ param_name = row[param_name_col]
+ param_value = row[param_value_col]
+ distribution_json = row[distribution_json_col]
+
+ if trial_id not in nested_dict:
+ nested_dict[trial_id] = {}
+
+ dist_json = json.loads(distribution_json)
+ default_step = None
+ default_low = None
+ default_high = None
+ default_log = None
+
+ nested_dict[trial_id][param_name] = {
+ 'param_name': param_name,
+ 'param_value': param_value,
+ 'step': dist_json["attributes"].get("step", default_step),
+ 'low': dist_json["attributes"].get("low", default_low),
+ 'high': dist_json["attributes"].get("high", default_high),
+ 'log': dist_json["attributes"].get("log", default_log),
+ }
+ return nested_dict