From 1c7c5d049ba0ecbf812cbdb98a7ab9216cd7867c Mon Sep 17 00:00:00 2001 From: Ben Heckmann Date: Mon, 17 Jul 2023 13:09:16 +0200 Subject: [PATCH] #854 added support for qlib's data format, implemented factor check, reformatted summary --- scripts/check_data_health.py | 71 +++++++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/scripts/check_data_health.py b/scripts/check_data_health.py index 42a4de49f0..65661cb3ec 100644 --- a/scripts/check_data_health.py +++ b/scripts/check_data_health.py @@ -5,8 +5,11 @@ import fire import pandas as pd +import qlib from tqdm import tqdm +from qlib.data import D + class DataHealthChecker: """Checks a dataset for data completeness and correctness. The data will be converted to a pd.DataFrame and checked for the following problems: @@ -47,8 +50,16 @@ def __init__( self.data[filename] = df elif qlib_dir: - # todo: add support for qlib_dir - pass + qlib.init(provider_uri=qlib_dir) + self.load_qlib_data() + + def load_qlib_data(self): + instruments = D.instruments(market="all") + instrument_list = D.list_instruments(instruments=instruments, as_list=True) + required_fields = ["$open", "$close", "$low", "$high", "$volume"] + for instrument in instrument_list: + df = D.features([instrument], required_fields, freq="day") + self.data[instrument] = df def check_missing_data( self, filename: str, df: pd.DataFrame @@ -102,8 +113,17 @@ def check_required_columns( def check_missing_factor( self, filename: str, df: pd.DataFrame ) -> Optional[Tuple[DataProblem, List[str]]]: - # todo - pass + """Check if the 'factor' column is missing in the DataFrame.""" + if "factor" not in df.columns: + logging.warning( + f"{filename}: Missing 'factor' column, trading unit will be disabled." + ) + return self.DataProblem.MISSING_FACTOR, ["factor"] + elif df["factor"].isnull().any(): + logging.warning( + f"{filename}: Missing factor data, trading unit may be incorrectly adjusted." + ) + return self.DataProblem.MISSING_FACTOR, ["factor"] def check_data(self): checks = [ @@ -130,20 +150,37 @@ def _print_report(self, problems: Dict[str, List[Tuple[DataProblem, str]]]): ) stats["count"] += 1 stats["affected_columns"].update(affected_columns) - print("\n-----------------------------") - print("Summary of data health check:") - print(f"Files checked: {len(self.data)}") - padding = max(len(problem.name) for problem in self.DataProblem) + print(f"\nSummary of data health check ({len(self.data)} files checked):") + print("-----------------------") + padding_between_columns = 2 + padding_problem_name = ( + max(len(problem.name) for problem in self.DataProblem) + + padding_between_columns + ) + padding_count = ( + max( + len(str(stats["count"])) for stats in problem_stats_by_type.values() + ) + + padding_between_columns + ) + print( + "Problem".ljust(padding_problem_name), + "Count".ljust(padding_count), + "Affected columns", + ) for problem in self.DataProblem: - padded_name = problem.name.ljust(padding + 2, " ") - print(f"• {padded_name}", end="") - if problem in problem_stats_by_type: - print(f"{problem_stats_by_type[problem]['count']}") - print( - f" affected columns{' ' * max(padding - 14, 0)}{problem_stats_by_type[problem]['affected_columns']}" - ) - else: - print("0") + padded_name = problem.name.ljust(padding_problem_name) + padded_count = str( + problem_stats_by_type[problem]["count"] + if problem in problem_stats_by_type + else 0 + ).ljust(padding_count) + affected_columns = ( + problem_stats_by_type[problem]["affected_columns"] + if problem in problem_stats_by_type + else "-" + ) + print(padded_name, padded_count, affected_columns) else: logging.info("Data check passed. No problems found.")