#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of the
# ResoKit Project (https://github.com/Gianuzzi/resokit).
# Copyright (c) 2025, Emmanuel Gianuzzi
# License: MIT
# Full Text: https://github.com/Gianuzzi/resokit/blob/master/LICENSE
# =============================================================================
# DOCS
# =============================================================================
"""Module to manage provided exoplanet datasets from exoplanet.eu and NASA."""
# =============================================================================
# IMPORTS
# =============================================================================
import datetime
import warnings
from io import BytesIO, StringIO, TextIOWrapper
from pathlib import Path
from typing import BinaryIO, Dict, List, Tuple, Union
from zipfile import ZIP_DEFLATED, ZipFile
import attrs
import numpy as np # for nan
import pandas as pd
from resokit.core import MetaData, ResokitDataFrame, df_to_resokit
from resokit.datasets.utils import (
BINARIES_COLUMNS,
BINARIES_FILENAMES,
BINARIES_URLS,
DATASETS_DIR,
DATASET_DTYPES,
DATASET_FILENAMES,
DATASET_URLS,
DATASET_ZIPNAMES,
INDEX_COLUMNS,
check_file_age,
check_outdated_binary,
check_outdated_dataset,
load_from_zip,
merge_old_and_new,
remove_from_zip,
request_dataset,
resolve_paths,
)
from resokit.query import build_query, execute_query
from resokit.utils.parser import (
DEFAULT_METADATA,
QUERY_MAPPINGS,
parse_name,
parse_to_iter,
)
# =============================================================================
# CLASSES
# =============================================================================
@attrs.define(frozen=True, slots=True, repr=False)
class ResoKitDataset:
"""Class to store a ResoKit dataset.
Parameters
----------
dataset : pd.DataFrame
The dataset as a pandas DataFrame.
source : str
The source of the dataset ('eu' or 'nasa').
age : int
The age of the dataset in days.
origin : str
The origin of the dataset (file in zip, file, or mixed).
is_full : bool
Whether the dataset is complete.
metadata : dict
Metadata for the dataset.
"""
dataset: pd.DataFrame = attrs.field(
validator=attrs.validators.instance_of(pd.DataFrame),
)
source: str = attrs.field(
validator=attrs.validators.in_({"eu", "nasa"}),
converter=str.lower,
)
age: int = attrs.field(validator=attrs.validators.instance_of(int))
origin: str = attrs.field(
validator=attrs.validators.in_(
{"file", "zip", "mixed", "internet", "null"}
),
converter=str.lower,
)
is_full: bool = attrs.field(validator=attrs.validators.instance_of(bool))
metadata: dict = attrs.field(converter=MetaData, factory=MetaData)
def __attrs_post_init__(self):
"""Post-init method to set the metadata."""
# Check wrong configurations
if self.origin == "null":
if self.age != -1:
raise ValueError("Age must be -1 if origin is 'null'.")
if self.is_full:
raise ValueError("is_full must be False if origin is 'null'.")
if not self.dataset.empty:
raise ValueError("Dataset must be empty if origin is 'null'.")
if self.age < 0:
if self.age != -1:
raise ValueError("Age must be -1 or positive.")
if self.is_full:
raise ValueError("is_full must be False if age is -1.")
if not self.dataset.empty:
raise ValueError("Dataset must be empty if age is -1.")
if self.is_full:
if self.dataset.empty:
raise ValueError("Dataset cannot be empty if is_full is True.")
def __len__(self):
"""len(x) <=> x.__len__()."""
return len(self.dataset)
def __getitem__(self, key):
"""x[y] <==> x.__getitem__(y)."""
if isinstance(key, ResoKitDataset):
# Attempt to get a slice from sliced
sliced = self.dataset.__getitem__(key.dataset)
else:
sliced = self.dataset.__getitem__(key)
is_full = self.is_full and len(sliced) == len(self.dataset)
# Transform to df if possible
if isinstance(sliced, pd.Series):
sliced = sliced.to_frame()
return attrs.evolve(self, dataset=sliced, is_full=is_full)
def __dir__(self):
"""dir(pdf) <==> pdf.__dir__()."""
return super().__dir__() + dir(self.dataset)
def __getattr__(self, a):
"""getattr(x, y) <==> x.__getattr__(y) <==> getattr(x, y)."""
return getattr(self.dataset, a)
def __repr__(self):
"""repr(x) <=> x.__repr__()."""
with pd.option_context("display.show_dimensions", False):
df_body = repr(self.dataset).splitlines()
# Construct the repr
aux = "Full" if self.is_full else "Partial"
parts = [
f"{aux} ResokitDataset - {self.dataset.shape[0]} rows x "
+ f"{self.dataset.shape[1]} columns",
f"Source: {self.source}",
f"Age: {self.age} days",
f"Origin: {self.origin}",
*df_body,
]
return "\n".join(parts)
def _repr_html_(self):
"""Return a HTML representation of the DataFrame."""
ad_id = id(self) # Unique ID for the div container
# Header and footer
aux = "Full" if self.is_full else "Partial"
rows = f"{self.dataset.shape[0]} rows"
columns = f"{self.dataset.shape[1]} columns"
footer = f" {aux} ResokitDataSet - {rows} x {columns}"
# HTML representation of the DataFrame
with pd.option_context("display.show_dimensions", False):
df_html = self.dataset._repr_html_()
# Construct the HTML
parts = [
f'<div class="resokit-data-container" id={ad_id}>',
df_html,
footer,
"</div>",
]
# Join the parts
html = "".join(parts)
return html
def __eq__(self, value):
"""X == Y <==> X.__eq__(Y)."""
if isinstance(value, ResoKitDataset):
return (
self.dataset.equals(value.dataset)
and self.source == value.source
and self.age == value.age
and self.origin == value.origin
and self.is_full == value.is_full
and self.metadata == value.metadata
)
elif isinstance(value, pd.DataFrame):
return self.dataset.equals(value)
elif isinstance(value, (str, int, float)):
return self.dataset == value
return False
def __and__(self, other):
"""X & Y <==> X.__and__(Y)."""
if isinstance(other, ResoKitDataset):
return attrs.evolve(
self,
dataset=self.dataset.__and__(other.dataset),
is_full=self.is_full and other.is_full,
)
return attrs.evolve(self, dataset=self.dataset.__and__(other))
def __or__(self, other):
"""X | Y <==> X.__or__(Y)."""
if isinstance(other, ResoKitDataset):
return attrs.evolve(
self,
dataset=self.dataset.__or__(other.dataset),
is_full=self.is_full and other.is_full,
)
return attrs.evolve(self, dataset=self.dataset.__or__(other))
def to_dataframe(
self,
columns: Union[list, None] = None,
copy: bool = True,
sort: bool = False,
) -> pd.DataFrame:
"""Convert data to pandas data frame.
This method constructs a data frame with the data inside the
dataset attribute.
Parameters
----------
columns : list, optional. Default: None.
Specific columns to return.
If `None`, return all columns.
copy : bool, optional. Default: True.
Whether to return a copy of the `DataFrame`, or the original.
sort : bool, optional. Default: False.
Whether to sort the dataset by the index columns.
Returns
-------
df: DataFrame
Data frame with the requested columns.
"""
if columns is not None:
used_cols = [
col for col in list(columns) if col in self.dataset.columns
]
df = self.dataset[used_cols]
else:
df = self.dataset
if copy and sort:
return df.sort_index(inplace=False).copy()
elif copy:
return df.copy()
elif sort:
return df.sort_index(inplace=False)
return df
def to_dict(self) -> dict:
"""Convert metadata to a dictionary.
This method constructs a dictionary with the data inside the
metadata attribute. It also adds the age, source, and origin.
Returns
-------
full_metadata : dict
Dictionary with the metadata.
"""
extra = {"age": self.age, "source": self.source, "origin": self.origin}
return {
**extra,
**self.metadata,
}
def copy(self) -> "ResoKitDataset":
"""Create and return copy of the :py:class:`ResoKitDataset`.
Returns
-------
ResoKitDataset
Copy of the ResoKitDataset.
"""
return attrs.evolve(self, dataset=self.dataset.copy())
def to_resokit(self, sort: bool = False) -> "ResoKitDataset":
"""Convert the dataset to a pure ResoKitDataset.
This method converts the dataset to a ResoKitDataset containing
only the columns required by ResoKit.
Parameters
----------
sort : bool, optional. Default: False.
Whether to sort the dataset by the index columns.
Returns
-------
dataset : ResoKitDataset
ResoKitDataset.
"""
dataset = self.to_dataframe(copy=False, sort=sort)
df = df_to_resokit(
dataset,
source=self.source,
drop=True,
copy=True,
sort_by=False,
return_df=True,
rename_index=False,
metadata=None,
)
return attrs.evolve(self, dataset=df)
def to_file(
self,
path_or_buf: Union[str, Path, BinaryIO, TextIOWrapper],
overwrite: bool = False,
verbose: bool = True,
) -> None:
"""Save the dataset to a file.
This method saves the dataset to a file in CSV format.
Parameters
----------
path_or_buf : str or Path or BinaryIO or TextIOWrapper
File path or buffer to save the dataset.
overwrite : bool, optional. Default: False.
Whether to overwrite the file if it already exists.
verbose : bool, optional. Default: True.
Whether to print informational messages.
"""
if not isinstance(path_or_buf, (BinaryIO, TextIOWrapper)):
file_path = Path(path_or_buf)
if file_path.exists() and not overwrite:
raise FileExistsError(
f"File {file_path} already exists.\n"
+ " Set overwrite=True to force the save."
)
else:
# If a buffer is provided, we assume it is a writable f-like object
file_path = "provided buffer"
# Save the dataset to a file
if not overwrite:
self.dataset.to_csv(path_or_buf, mode="x")
else:
self.dataset.to_csv(path_or_buf)
if verbose:
print(f"Dataset saved to {file_path}.")
return
def list_systems(self):
"""Yield the systems in the dataset."""
if self.dataset.empty:
return
for star_name in self.dataset.star_name.unique():
yield self.dataset[self.dataset.star_name == star_name]
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def _mk_empty_dataset(source: str) -> ResoKitDataset:
"""Create an empty dataset.
Parameters
----------
source : str
Source of the dataset ('eu' or 'nasa').
Returns
-------
dataset : ResoKitDataset
Empty ResoKitDataset.
"""
return ResoKitDataset(
dataset=pd.DataFrame(),
source=source,
age=-1,
origin="null",
is_full=False,
metadata=dict(DEFAULT_METADATA),
)
def _df_to_dataset(
df: pd.DataFrame,
source: str,
age: int = -1,
origin: str = "null",
is_full: bool = False,
metadata: dict = None,
copy: bool = True,
as_resokit: bool = True,
) -> ResoKitDataset:
"""Convert a pandas DataFrame to a ResoKitDataset.
Parameters
----------
df : pd.DataFrame
DataFrame to convert.
source : str
Source of the dataset ('eu' or 'nasa').
age : int, optional. Default: -1.
Age of the dataset in days.
origin : str, optional. Default: 'unknown'.
Origin of the dataset. Can be one of:
('file', 'zip', 'mixed', 'internet', or 'unknown').
is_full : bool, optional. Default: False.
Whether the dataset is complete.
metadata : dict, optional. Default: None.
Metadata for the dataset.
copy : bool, optional. Default: True.
Whether to return a copy of the DataFrame.
Despite this, the output will be a `ResoKitDataset`.
as_resokit : bool, optional. Default: True.
Whether to perform the column conversion to ResoKit columns.
Returns
-------
dataset : ResoKitDataset
ResoKitDataset.
"""
# Check if df is a DataFrame
if not isinstance(df, pd.DataFrame):
raise TypeError(f"df must be a DataFrame. Got: {type(df)} instead.")
if as_resokit:
my_df = df_to_resokit(
df,
source=source,
drop=True,
copy=copy,
sort_by=False,
return_df=True,
rename_index=False,
metadata=None,
)
else:
my_df = df.copy(deep=copy)
assert isinstance(my_df, pd.DataFrame), (
"Expected df to be a DataFrame, " + f"got {type(my_df)} instead."
)
if metadata is None:
metadata = dict(DEFAULT_METADATA)
return ResoKitDataset(
dataset=my_df,
source=source,
age=age,
origin=origin,
is_full=is_full,
metadata=metadata,
)
# =============================================================================
# MANAGERS
# =============================================================================
class DatasetManager:
"""Manager for the ResoKit datasets.
This class manages the datasets in memory and disk, allowing to load,
update, and check if they are outdated. It also provides methods to
download and store the datasets.
"""
def __init__(self):
# ---------------------- EU and NASA DATASETS ----------------------
self._datasets = {
"eu": _mk_empty_dataset("eu"),
"nasa": _mk_empty_dataset("nasa"),
}
self._indexes = {
"eu": _mk_empty_dataset("eu"),
"nasa": _mk_empty_dataset("nasa"),
}
self._parsed_indexes = {"eu": None, "nasa": None}
self._is_fully_stored = {"eu": False, "nasa": False}
# ------------------------------------------------------------------------
# Functions to manage datasets in memory
# ------------------------------------------------------------------------
def update(
self,
new_df: pd.DataFrame,
source: str,
age: int,
origin: str,
is_full: bool,
verbose: bool = True,
index_only: bool = False,
sort: bool = True,
metadata: Union[dict, None] = None,
overwrite: bool = False,
) -> None:
"""Update the stored dataset in memory."""
if self._is_fully_stored[source] and not overwrite:
return
if new_df.empty:
if verbose:
print(" No rows to store in memory.")
return
if is_full and (
self._indexes[source].dataset.empty
or (not self._indexes[source].dataset.empty and overwrite)
):
new_index = new_df[INDEX_COLUMNS[source]].copy()
self._indexes[source] = _df_to_dataset(
new_index,
source=source,
age=age,
origin=origin,
is_full=is_full,
metadata=metadata,
copy=True,
as_resokit=False,
)
parsed = new_index.astype(str)
parsed[INDEX_COLUMNS[source][0]] = parsed[
INDEX_COLUMNS[source][0]
].apply(parse_name, force=True)
parsed[INDEX_COLUMNS[source][1]] = parsed[
INDEX_COLUMNS[source][1]
].apply(parse_name, force=True)
self._parsed_indexes[source] = parsed
if verbose:
print("Updated stored index in memory.")
if index_only:
return
if is_full or self._datasets[source].dataset.empty:
self._datasets[source] = _df_to_dataset(
new_df,
source=source,
age=age,
origin=origin,
is_full=is_full,
metadata=metadata,
copy=True,
as_resokit=False,
)
if is_full:
self._is_fully_stored[source] = True
if verbose:
print("Stored dataset in memory.")
else:
new_to_store = new_df.index.to_list()
if verbose:
print(f" Stored rows {new_to_store} in memory...")
return
new_to_store = [
x
for x in new_df.index
if x not in self._datasets[source].dataset.index
]
# Pre-define values
updated_df = new_df
meta_old = dict(self._datasets[source].metadata)
age_old = self._datasets[source].age
origin_old = self._datasets[source].origin
if not new_to_store and not overwrite:
return
elif new_to_store and not overwrite:
new_df = new_df.loc[new_to_store]
updated_df = pd.concat([self._datasets[source].dataset, new_df])
# age_old = self._datasets[source].age
# origin_old = self._datasets[source].origin
# meta_old = dict(self._datasets[source].metadata)
elif overwrite:
repeated = [
x
for x in new_df.index
if x in self._datasets[source].dataset.index
]
if not repeated:
return
elif len(repeated) == len(self._datasets[source].dataset):
# updated_df = new_df
age_old = age
origin_old = origin
meta_old = metadata
else:
to_keep = [
x
for x in self._datasets[source].dataset.index
if x not in new_df.index
]
keep_df = self._datasets[source].dataset.loc[to_keep]
updated_df = pd.concat([keep_df, new_df])
# age_old = self._datasets[source].age
# origin_old = self._datasets[source].origin
# meta_old = dict(self._datasets[source].metadata)
new_to_store = new_df.index.to_list()
assert isinstance(meta_old, (dict, MetaData)), (
"Expected metadata to be a dictionary, "
+ f"got {type(meta_old)} instead."
)
if sort:
updated_df.sort_index(inplace=True)
if metadata is not None:
meta_old.update(metadata)
self._datasets[source] = _df_to_dataset(
updated_df,
source=source,
age=max(age_old, age),
origin="mixed" if origin_old != origin else origin_old,
is_full=False,
metadata=meta_old,
copy=True,
as_resokit=False,
)
if verbose:
print(f" Stored rows {new_to_store} in memory.")
def download(
self,
source: str,
to_memory: bool = True,
to_file: Union[str, Path, bool] = True,
to_zip: Union[str, Path, bool] = True,
dir_path: Union[str, Path, bool, None] = True,
overwrite: bool = False,
soft: bool = True,
check_outd: bool = True,
is_query: bool = False,
to_resokit: Union[bool, None] = None,
verbose: bool = True,
chunk_size: int = 1024,
print_size: float = 0.15,
) -> Union[Path, pd.DataFrame, ResoKitDataset, None]:
"""Download a dataset from the internet."""
source = source.lower()
if source not in DATASET_FILENAMES:
raise ValueError(
f"Invalid source: {source}. Must be 'eu' or 'nasa'."
)
if not to_file and not to_zip and not to_memory and to_resokit is None:
raise ValueError(
"Nothing to do. Set at least one of to_file, to_zip, "
+ "to_memory, or to_resokit."
)
bpaths, fpaths, zfpaths = resolve_paths(
to_file=to_file,
to_zip=to_zip,
dir_path=dir_path,
default_file=DATASET_FILENAMES[source],
default_zip=DATASET_ZIPNAMES[source],
default_dir=DATASETS_DIR,
)
for path in bpaths:
if not path.exists():
msg = f"Directory {path} not found."
if soft:
print(msg)
return None
raise FileNotFoundError(msg)
if not overwrite:
for file_path in fpaths:
if file_path.exists():
msg = (
f"File {file_path} already exists. "
+ "Set overwrite=True to force the download."
)
if soft:
print(msg)
return None
raise FileExistsError(msg)
for zipf_path in zfpaths:
zip_path = zipf_path.parent
if zip_path.exists():
msg = (
f"Zip file {zip_path} already exists. "
+ "Set overwrite=True to force the download."
)
if soft:
print(msg)
return None
raise FileExistsError(msg)
save_file = len(fpaths) > 0
save_zip = len(zfpaths) > 0
if (
self._is_fully_stored[source]
and not overwrite
and not save_file
and not save_zip
):
if verbose:
print(
"Dataset is already fully stored. "
+ "Set overwrite=True to force the download."
)
if to_resokit is not None:
return (
self._datasets[source]
if to_resokit
else self._datasets[source].to_dataframe()
)
return None
if check_outd:
outdated = check_outdated(source, verbose=verbose)
if not outdated:
if verbose:
print(
"No need to download the dataset. "
+ "Set check_outd=False to really force it."
)
if to_resokit is not None:
df = self.load(source, verbose=False, to_df=not to_resokit)
assert isinstance(df, (pd.DataFrame, ResoKitDataset)), (
"Expected df to be a pandas DataFrame or "
+ f"ResoKitDataset, got {type(df)} instead."
)
return df
return None
# Set default df
df = _mk_empty_dataset(source).dataset
# Get url
url = DATASET_URLS[source]
# Check if full download or query new
if not is_query: # Download
data = request_dataset(
url,
verbose=verbose,
chunk_size=chunk_size,
print_size=print_size,
)
if not data or len(data) == 0:
raise ValueError(f"Empty dataset downloaded from {url}.")
elif verbose:
print(
f" Data downloaded successfully. ({len(data)/1e6:.2f} MB)"
)
else: # Query
old_df, new_df = self.query_new(
source=source,
to_resokit=False,
verbose=verbose,
rename=True,
old_df_and_new=True,
)
# Check if empty
if len(new_df) == 0:
raise ValueError(f"No new rows downloaded from {url}.")
# Add missing columns
for col in old_df.columns:
is_num = pd.api.types.is_numeric_dtype(old_df[col].dtype)
if col not in new_df.columns and is_num:
new_df[col] = np.nan
elif col not in new_df.columns and not is_num:
new_df[col] = ""
else:
new_df[col].astype(old_df[col].dtype)
# Merge old and new into one
df = merge_old_and_new(
old_df=old_df, new_df=new_df, source=source, verbose=verbose
)
# Set columns dtypes
df = df.astype(DATASET_DTYPES[source])
# Convert to bytes for possible file writing
buffer = BytesIO()
df.to_csv(buffer) # This is the magic N° 1
buffer.seek(0)
data = buffer.getvalue() # There are the bytes. Magic N°2
for zipf_path in zfpaths:
file_name = zipf_path.name
zip_path = zipf_path.parent
if not zip_path.exists() and verbose:
print(f"Creating the ZIP archive {zip_path}...")
else:
remove_from_zip(str(zip_path), file_name, verbose=verbose)
with ZipFile(zip_path, "a", compression=ZIP_DEFLATED) as zipf:
zipf.writestr(file_name, data)
if verbose:
print(f" Written {file_name} to {zip_path}.")
for file_path in fpaths:
if not file_path.exists() and verbose:
print(f"Creating the file {file_path}...")
with open(file_path, "wb") as f:
f.write(data)
if verbose:
print(f" Written {file_path}.")
if (to_memory or to_resokit is not None) and len(df) == 0:
df = pd.read_csv(BytesIO(data), dtype=DATASET_DTYPES[source])
metadata = dict(DEFAULT_METADATA)
metadata.update(
{
"downloaded": datetime.datetime.now().isoformat(),
"url": url,
}
)
if to_memory:
self.update(
df,
source=source,
age=0,
origin="internet",
is_full=True,
verbose=verbose,
index_only=False,
sort=True,
metadata=metadata,
overwrite=True,
)
if to_resokit is not None:
return _df_to_dataset(
df,
source=source,
age=0,
origin="internet",
is_full=True,
copy=True,
as_resokit=to_resokit,
)
if len(fpaths) == 1:
fpaths = list(fpaths)[0]
if len(zfpaths) == 1:
zfpaths = list(zfpaths)[0]
if save_file and save_zip:
return fpaths, zfpaths
if save_file:
return fpaths
if save_zip:
return zfpaths
return None
def query_new(
self,
source: str,
to_resokit: Union[None, bool] = False,
verbose: bool = True,
load_kwargs: Union[Dict, None] = None,
rename: bool = True,
old_df_and_new: bool = False,
) -> Union[pd.DataFrame, ResoKitDataset, Tuple]:
"""Query new rows from online dataset."""
source = source.lower() # Ensure lowercase
# Define last update row name
if source == "eu":
update_col = "updated"
online_col = "modification_date"
# raise NotImplementedError(
# "This feature is not implemented yet, as the TAP services of"
# + "\nhttps://exoplanet.eu/ do not include the values for the"
# + "\n'updated' column."
# + "\nThis has already been informed to the Exoplanet EU Team"
# + "\n(https://exoplanet.eu/team/), and will be implemented"
# + "\nwhen the available."
# )
elif source == "nasa":
update_col = "rowupdate"
online_col = update_col
else:
raise ValueError("Invalid source. Must be 'eu' or 'nasa'.")
# Get old
if load_kwargs is None:
load_kwargs = {}
load_kwargs.update(
{
"to_df": True,
"to_resokit": False,
"only_index": False,
"only_rows": False,
"verbose": False,
}
)
old_df = self.load(source=source, **load_kwargs)
assert isinstance(
old_df, pd.DataFrame
), f"Error: Expected a pandas DataFrame, got {type(old_df)} instead."
if len(old_df) == 0:
raise IndexError("Could not load local dataset. No rows found.")
# Get last update
max_date_str = old_df[update_col][~old_df[update_col].isna()].max()
# Message
if verbose:
print(f"Latest row update in local dataset: {max_date_str}")
print("Querying online rows update after that date.")
# Build the query
query = build_query(
source=source,
select="*",
conditions=f"{online_col} >= '{max_date_str}'",
)
# Get new
new_df = execute_query(
query=query, source=source, cache=True, verbose=verbose
)
# Message
if verbose:
if len(new_df) == 0:
print("No new rows downloaded")
else:
print(f"Amount of rows downloaded: {len(new_df)}")
# Rename?
if rename:
# Now, updated from eu can be a problem...
if source == "eu" and (
"updated" in new_df.columns
and "modification_date" in new_df.columns
):
new_df.drop(columns="updated", inplace=True)
# Updated is rewritten with rename
new_df.rename(columns=QUERY_MAPPINGS[source], inplace=True)
# Define new
if to_resokit is False:
new = new_df
else:
if to_resokit is None:
to_resokit = False
new = _df_to_dataset(
new_df,
source=source,
age=0,
origin="internet",
is_full=False,
copy=False,
as_resokit=to_resokit,
)
# Return
if old_df_and_new:
return old_df, new
return new
def load_full(
self,
source: str,
to_resokit: bool = True,
sort: bool = True,
) -> ResoKitDataset:
"""Load the full dataset from memory or disk."""
if not self._is_fully_stored[source]:
raise ValueError(f"Source {source} is not fully stored.")
ds = self._datasets[source]
if sort:
sorted_df = ds.dataset.sort_index()
return _df_to_dataset(
sorted_df,
source=source,
age=ds.age,
origin=ds.origin,
is_full=ds.is_full,
metadata=dict(ds.metadata),
copy=False,
as_resokit=to_resokit,
)
return ds.to_resokit() if to_resokit else ds.copy()
def load_rows(
self,
source: str,
rows: Union[list, None] = None,
full: bool = False,
) -> Union[Tuple[pd.DataFrame, list, int, str], ResoKitDataset]:
"""Load specific rows from the dataset."""
if full:
return self.load_full(source, to_resokit=True, sort=True)
if rows is not None:
stored = [
x for x in rows if x in self._datasets[source].dataset.index
]
not_stored = [x for x in rows if x not in stored]
df = self._datasets[source].dataset.loc[stored].copy()
age = self._datasets[source].age
origin = self._datasets[source].origin
return df, not_stored, age, origin
raise ValueError("No rows provided.")
def load_index(
self,
source: str,
to_df: bool = False,
to_resokit: bool = True,
parsed: bool = False,
) -> Union[pd.DataFrame, ResoKitDataset, None]:
"""Load the index of a given source dataset."""
if parsed:
return self._parsed_indexes[source]
index_ds = self._indexes[source]
if index_ds.dataset.empty:
return index_ds
if not to_df:
return index_ds.to_resokit() if to_resokit else index_ds
return index_ds.to_dataframe()
@staticmethod
def _aux_load_full(
df: pd.DataFrame,
source: str,
age: int,
origin: str,
is_full: bool,
to_resokit: bool,
to_df: bool,
metadata: Union[dict, None] = None,
) -> Union[pd.DataFrame, ResokitDataFrame, ResoKitDataset]:
"""Auxiliary function to load a full dataset."""
if not to_df:
return _df_to_dataset(
df,
source=source,
age=age,
origin=origin,
is_full=is_full,
metadata=metadata,
copy=False,
as_resokit=to_resokit,
)
if to_resokit:
return df_to_resokit(
df,
source=source,
drop=True,
copy=False,
sort_by=False,
metadata=metadata,
return_df=True,
)
return df
def load(
self,
source: str,
from_memory: bool = True,
from_file: Union[str, Path, bool] = False,
from_zip: Union[str, Path, bool] = True,
dir_path: Union[str, Path, bool, None] = True,
to_resokit: bool = True,
to_df: bool = False,
check_age: bool = False,
only_index: bool = False,
only_rows: Union[list, int] = False,
verbose: bool = True,
store: Union[bool, str] = True,
store_index: Union[bool, str] = True,
) -> Union[pd.DataFrame, ResokitDataFrame, ResoKitDataset, None]:
"""Load a dataset from memory, ZIP, or file."""
source = source.lower()
if source not in DATASET_FILENAMES:
raise ValueError(
f"Invalid source: {source}. Must be 'eu' or 'nasa'."
)
# Check if something to do
if not from_memory and not from_zip and not from_file:
raise ValueError(
"Nothing to do. Set at least one of "
+ "from_memory, from_zip, or from_file."
)
bpaths, fpaths, zfpaths = resolve_paths(
to_file=from_file,
to_zip=from_zip,
dir_path=dir_path,
default_file=DATASET_FILENAMES[source],
default_zip=DATASET_ZIPNAMES[source],
default_dir=DATASETS_DIR,
)
if len(fpaths) + len(zfpaths) > 1:
raise ValueError(
"Could not resolve paths where to load the data. Got:\n"
+ f"{fpaths},\n"
+ f"{zfpaths}"
)
if len(bpaths) > 1:
raise ValueError(
"Could not resolve dir paths where to load the data. Got:\n"
+ f"{bpaths}"
)
dir_path = list(bpaths)[0] if len(bpaths) > 0 else None
file_path = list(fpaths)[0] if len(fpaths) > 0 else None
zfip_path = list(zfpaths)[0] if len(zfpaths) > 0 else None
# Check store_index
if store and only_index:
store_index = True
# Define initials: origin, age, ...
origin = []
age = -1
not_stored_rows = []
stored_rows = []
requested_rows = []
data_stored = pd.DataFrame()
# Define overwrite
overwrite = False
if (
isinstance(store, str) and store.lower()[0] in ["o", "f", "y", "s"]
) or (
isinstance(store_index, str)
and store_index.lower()[0] in ["o", "f", "y", "s"]
):
overwrite = True
# Check if only rows and only index
if (
only_rows and only_index
): # Check if only one of the options is provided
raise ValueError("Cannot specify both only_rows and only_index.")
elif (
not isinstance(only_rows, bool) and isinstance(only_rows, int)
) or only_rows: # If only_rows is provided, set up the skip_rows func
if isinstance(only_rows, bool):
raise ValueError("only_rows must be a list or an integer.")
iter_only_rows = parse_to_iter(only_rows) # Convert to iterable
# Remove duplicates
seen = set()
seen_add = seen.add
requested_rows = [
x for x in iter_only_rows if not (x in seen or seen_add(x))
]
# Check no negative values
if any(x < 0 for x in requested_rows):
raise ValueError("only_rows must be positive integers.")
# Load stored rows if available
if from_memory:
data_stored, not_stored_rows, xage, xorigin = self.load_rows(
source,
rows=requested_rows,
full=False,
)
assert isinstance(data_stored, pd.DataFrame), (
"Expected data to be a pandas DataFrame, "
+ f"got {type(data_stored)} instead."
)
assert isinstance(xage, int), (
"Expected age to be an integer, "
+ f"got {type(xage)} instead."
)
# Update origin
if not data_stored.empty:
age = max(age, xage)
origin.append(xorigin)
else:
# If not from memory, set data_stored to empty
data_stored = pd.DataFrame()
not_stored_rows = requested_rows
# Define stored_rows and not_stored
stored_rows = list(data_stored.index)
# Message
if verbose and not data_stored.empty:
print(
f" Loaded rows {stored_rows} "
+ f"from {source} memory stored dataset..."
)
# Check if all rows are stored
if len(data_stored) == len(requested_rows):
# Check if the dataset is fully stored (and loaded)
is_full = (
len(data_stored) == len(self._datasets[source])
) and self._is_fully_stored[source]
# No need to load the dataset or store the rows
# (because they are already stored)
return self._aux_load_full(
df=data_stored,
source=source,
age=age,
origin=origin[0],
is_full=is_full,
to_resokit=to_resokit,
to_df=to_df,
metadata=dict(self._datasets[source].metadata),
)
elif (zfip_path is not None) and (
file_path is not None
): # If no file or ZIP provided
raise ValueError(
"Some rows are not stored and no file or ZIP provided."
)
# Add header and update only_rows
only_rows = [0] + [
x + 1 for x in requested_rows if x in not_stored_rows
]
def skip_rows(x: int) -> bool: # Skip rows not in the list
return x not in only_rows
elif only_rows: # If only_rows is True...
raise ValueError("only_rows must be a list or an integer.")
else: # If not only_rows...
skip_rows = None
only_rows = False
# Check if the index columns are already stored in memory
if only_index and from_memory:
# Check if parsed requested
parsed = (
isinstance(only_index, str) and only_index.lower()[0] == "p"
)
data = self.load_index(
source, to_df=False, to_resokit=to_resokit, parsed=parsed
)
if data is None:
extra = "parsed " if parsed else ""
if verbose:
print(
f" No {extra}index columns stored "
+ f"in memory for {source}."
)
return None
elif parsed:
if verbose:
print(
" Loaded parsed index columns from "
+ "memory stored datasets."
)
return data
assert isinstance(data, ResoKitDataset), (
"Expected data to be a ResoKitDataset, "
+ f"got {type(data)} instead."
)
if check_age and int(data.age) >= 0:
print(f" Last modified: {data.age} days ago.")
if to_df:
data = data.to_dataframe()
if not data.empty:
if verbose:
print(
" Loaded index columns from "
+ "memory stored datasets."
)
return data
# Check if the dataset is already stored in memory
if (
not (
only_index or only_rows
) # Check if loading the entire dataset
and self._is_fully_stored[source] # Check if fully stored
and from_memory # Check if loading from memory
):
data = self.load_full(source, to_resokit=to_resokit, sort=True)
if verbose:
print(" Loaded full dataset from memory stored datasets.")
if check_age and data.age >= 0:
print(f" Last modified: {data.age} days ago.")
# Check if to df
if to_df:
return data.to_dataframe()
return data
# Define columns to load
usecols = INDEX_COLUMNS[source] if only_index else None
# Aux message
if verbose: # Print message if verbose
if only_index:
print(" Loading only index columns...")
elif only_rows:
print(f" Loading rows {not_stored_rows}...")
else:
print(" Loading the entire dataset...")
# Load the dataset from the ZIP archive
if zfip_path is not None:
file_name = zfip_path.name
zip_path = zfip_path.parent
try:
data = load_from_zip(
zip_path=zip_path,
file_name=file_name,
source=source,
skip_rows=skip_rows,
usecols=usecols,
verbose=verbose,
)
except FileNotFoundError:
msg = ""
# Check if it is the default path
if dir_path == DATASETS_DIR:
msg = (
"\n Try running "
+ f"`resokit.datasets.download({source=},"
+ " to_zip=True)` first to download the dataset."
)
zip_name = zip_path.name
raise FileNotFoundError(
f"Zip file {zip_name} not found at {dir_path}." + msg
)
age = check_file_age(
file_path=file_name,
zip_path=zip_path,
verbose=check_age,
)
origin.append("zip")
# Load the dataset from the file
elif file_path is not None:
try:
data = pd.read_csv(
file_path,
header=0,
skiprows=skip_rows,
usecols=usecols,
dtype=DATASET_DTYPES[source],
)
except FileNotFoundError:
msg = ""
if dir_path == DATASETS_DIR:
msg = (
"\n Try running "
+ f"`resokit.datasets.download({source=},"
+ " to_file=True)` first to download the dataset."
)
file_name = file_path.name
raise FileNotFoundError(
f"File {file_name} not found at {dir_path}." + msg
)
age = check_file_age(
file_path=file_path,
zip_path=None,
verbose=check_age,
)
origin.append("file")
else:
raise ValueError(
"Data not found in memory, and no file or ZIP provided."
)
# Check empty dataset
if data.empty and not only_rows:
warnings.warn("Empty dataset loaded.", stacklevel=2)
# Reindex according to only_rows if provided
elif only_rows:
assert isinstance(not_stored_rows, list), (
"Expected not_stored_rows to be a list, "
+ f"got {type(not_stored_rows)} instead."
)
# Get ordered list of rows to keep
sorted_rows = sorted(not_stored_rows)
n_used_rows = len(data) # Number of rows effectively used
# Warn if the number of rows is less than the requested
# This means that the user requested more rows than the dataset has
if n_used_rows < len(sorted_rows):
out_of_bounds_rows = sorted_rows[n_used_rows:]
warnings.warn(
f"Rows {out_of_bounds_rows} are out of bounds.",
stacklevel=2,
)
used_rows = sorted_rows[:n_used_rows] # Keep only the used rows
# Reindex the dataset
data.set_index(pd.Index(used_rows), inplace=True)
# Concatenate the stored rows with the loaded rows
if not data_stored.empty:
data = pd.concat([data_stored, data])
# Finally, get the original order
new_index = [
x for x in requested_rows if x in used_rows + stored_rows
]
data = data.reindex(new_index, copy=False)
# Define origin
origin = "mixed" if len(set(origin)) > 1 else origin[0]
# Define is_full
is_full = not only_rows
# Define index_only
index_only = bool(only_index or (store_index and not store))
# Check storing
if store_index or store:
self.update(
data,
source,
age=age,
origin=origin,
is_full=is_full,
verbose=verbose,
index_only=index_only,
sort=True,
overwrite=overwrite,
)
return self._aux_load_full(
df=data,
source=source,
age=age,
origin=origin,
is_full=is_full,
to_resokit=to_resokit,
to_df=to_df,
)
def clear_memory(
self, source: str, verbose: bool = True, files: bool = False
):
"""Clear the memory for the specified dataset."""
source = source.lower()
if files:
if source in ["eu", "nasa"]:
file_path = DATASETS_DIR / DATASET_ZIPNAMES[source]
if file_path.exists():
file_path.unlink()
if verbose:
print(f" Removed {file_path} from disk.")
elif source == "both":
for key in ["eu", "nasa"]:
self.clear_memory(key, verbose, files=True)
else:
raise ValueError("Invalid EU/NASA source.")
else:
if source in self._datasets:
self._indexes[source] = _mk_empty_dataset(source)
self._datasets[source] = _mk_empty_dataset(source)
self._is_fully_stored[source] = False
self._parsed_indexes[source] = None
if verbose:
print(f" Cleared memory for source: {source}")
elif source == "both":
for key in self._datasets:
self.clear_memory(key, verbose=verbose)
else:
raise ValueError("Invalid EU/NASA source.")
class BinaryDatasetManager:
"""Manager for the ResoKit binaries datasets.
This class manages the binaries datasets in memory and disk, allowing to
load, update, and check if they are outdated. It also provides methods to
download and store the datasets.
"""
def __init__(self):
# -------------------- BINARY SYSTEMS DATASETS ----------------------
self._datasets = {"s": pd.DataFrame(), "p": pd.DataFrame()}
self._headers = {"s": "", "p": ""}
@staticmethod
def _extract_header_and_data(
lines: List[str], circumbinary: bool, inferr: bool
) -> Tuple[str, pd.DataFrame]:
"""Extract header and data from lines of the dataset.
Parameters
----------
lines : List[str]
Lines of the dataset.
circumbinary : bool
Whether the dataset is circumbinary.
inferr : bool
Whether the width of the columns is inferred.
If False, the width of the columns is fixed.
Returns
-------
Tuple[str, pd.DataFrame]
header : str
The header of the dataset.
data : pd.DataFrame
The dataset as a pandas DataFrame.
"""
# Find the index of the last line that starts with "Note:"
# (or any number of hyphens)
separator_index = len(lines)
for i, line in enumerate(reversed(lines)):
stripped = line.strip()
if stripped.startswith("Note:") or stripped.startswith("-"):
separator_index = len(lines) - i
break
# Check if the separator was found
if separator_index == len(lines):
raise ValueError("Separator line not found.")
# The header is everything before the separator line
header = "".join(lines[:separator_index]).strip()
# The data starts after the last "Note:" line, so we extract the data
data_lines = [
line.replace("\t", " ") for line in lines[separator_index:]
]
# Define widths for fixed-width formatted data
kwargs = {}
if inferr:
kwargs["colspecs"] = "infer"
elif circumbinary:
kwargs["widths"] = [15, 10, 6, 6, 8, 2, 7, 7, 2, 10, 6, 9, 8, 8]
else:
kwargs["widths"] = [15, 10, 6, 6, 8, 2, 8, 7, 2, 8, 6, 9, 7, 8]
# Use pandas to read the fixed-width formatted data
# starting after the header
data = pd.read_fwf(
StringIO("".join(data_lines)), header=None, **kwargs
)
return header, data
def load(
self,
source: str,
from_memory: bool = True,
from_file: Union[str, bool] = True,
dir_path: Union[str, Path, bool, None] = True,
rename_columns: bool = True,
ret_header: bool = False,
inferr: bool = False,
clean: bool = True,
verbose: bool = True,
) -> Union[pd.DataFrame, str]:
"""Read the provided multi-star system dataset."""
# Check the source
source = source.lower()
if source in ["circumbinary", "c", "p"]:
circumbinary = True
elif source in ["simple", "s"]:
circumbinary = False
else:
raise ValueError(
"Invalid source. "
+ "Must be 'circumbinary', 'c', 'p', 'simple', or 's'."
)
# Define the filename based on the circumbinary parameter
letter = "p" if circumbinary else "s"
# Check if something to do
if not from_memory and not from_file:
raise ValueError(
"Nothing to do. Set at least one of "
+ "from_memory, or from_file."
)
bpaths, fpaths, _ = resolve_paths(
to_file=from_file,
to_zip=False,
dir_path=dir_path,
default_file=BINARIES_FILENAMES[letter],
default_zip="False",
default_dir=DATASETS_DIR,
)
if len(fpaths) > 1:
raise ValueError(
"Could not resolve paths where to load the data. Got:\n"
+ f"{fpaths}"
)
dir_path = list(bpaths)[0] if len(bpaths) > 0 else None
file_path = list(fpaths)[0] if len(fpaths) > 0 else None
# Default lines
lines = []
# Load the dataset from memory
if from_memory:
if ret_header and self._headers[letter] != "":
if verbose:
print(f"Loading the type-{letter} header from memory.")
return str(self._headers[letter]) # Return a copy
elif not self._datasets[letter].empty:
if verbose:
print(f"Loading the type-{letter} dataset from memory.")
df = self._datasets[letter].copy()
# Clean if requested
if clean:
df.loc[df[7] > 98, 7] = pd.NA # eccentricity
df.loc[df[13] > 998, 13] = pd.NA # imutual
# Rename columns if requested
if rename_columns:
df.columns = BINARIES_COLUMNS
return df
# Load the dataset from the file
if file_path is not None:
file_name = file_path.name
if verbose:
print(
f"Loading the type-{letter} dataset from file {file_name}"
)
with open(file_path, "r") as f:
lines = f.readlines()
# Extract header and data from lines
header, data = self._extract_header_and_data(
lines=lines, circumbinary=circumbinary, inferr=inferr
)
# Store the data and header in memory
self._headers[letter] = str(header)
self._datasets[letter] = data.copy(deep=True)
if verbose:
print(f"Stored the type-{letter} dataset and header into memory.")
# Clean data
if clean:
data.loc[data[7] > 98, 7] = pd.NA # eccentricity
data.loc[data[13] > 998, 13] = pd.NA # imutual
# Rename columns
if rename_columns:
data.columns = BINARIES_COLUMNS
# Return the header if requested
if ret_header:
return header
return data
def download(
self,
source: str,
to_file: Union[str, Path, bool] = True,
dir_path: Union[str, Path, bool, None] = True,
to_memory: bool = True,
return_data: bool = True,
overwrite: bool = False,
soft: bool = True,
verbose: bool = True,
chunk_size: int = 1024,
print_size: float = 0.00001,
) -> Union[Path, pd.DataFrame, None]:
"""Download a dataset from a specified source and save it locally."""
# Check the source
source = source.lower()
if source in ["circumbinary", "c", "p"]:
circumbinary = True
elif source in ["simple", "s"]:
circumbinary = False
else:
raise ValueError(
"Invalid source. "
+ "Must be 'circumbinary', 'c', 'p', 'simple', or 's'."
)
# Define the filename based on the circumbinary parameter
letter = "p" if circumbinary else "s"
# Check if something to do
if not to_file and not to_memory and not return_data:
raise ValueError(
"Nothing to do. Set at least one of "
+ "to_file, to_zip, to_memory, or return_data."
)
if (
not to_file
and to_memory
and not return_data
and not self._datasets[letter].empty
and not overwrite
):
raise ValueError(
"Nothing to do. Dataset is already stored in memory and "
+ "overwrite is False."
)
# Define URS
url = BINARIES_URLS[letter]
bpaths, fpaths, _ = resolve_paths(
to_file=to_file,
to_zip=False,
dir_path=dir_path,
default_file=BINARIES_FILENAMES[letter],
default_zip="False",
default_dir=DATASETS_DIR,
)
for path in bpaths:
if not path.exists():
raise FileNotFoundError(f"Directory {path} not found.")
if not overwrite:
for file_path in fpaths:
if file_path.exists():
msg = (
f"File {file_path} already exists. "
+ "Set overwrite=True to force the download."
)
if soft:
print(msg)
return
raise FileExistsError(msg)
# Download the dataset
data = request_dataset(
url, verbose=verbose, chunk_size=chunk_size, print_size=print_size
)
# Check if the data is valid. If not, raise an error. Check length > 0
if not data or len(data) == 0:
raise ValueError(f"Empty dataset downloaded from {url}.")
elif verbose:
if len(data) < 1e6:
print(
f" Data downloaded successfully. ({len(data)/1e3:.3f} KB)"
)
else:
print(
f" Data downloaded successfully. ({len(data)/1e6:.3f} MB)"
)
# Default df
df = pd.DataFrame()
# Store the data in file
for file_path in fpaths:
if not file_path.exists() and verbose:
print(f" Creating the file {file_path}...")
# Write the file
with open(file_path, "wb") as f:
f.write(data)
# Print message
if verbose:
print(f" Written {file_path}.")
# Store the data in memory? Only if to_memory or return_data
if to_memory or return_data:
header, df = self._extract_header_and_data(
lines=StringIO(data.decode(encoding="utf-8")).readlines(),
circumbinary=circumbinary,
inferr=False,
)
if to_memory:
# Store the data in memory
self._headers[letter] = header
self._datasets[letter] = df
if verbose:
print(f" Stored the type-{letter} dataset in memory.")
# Return the data
if return_data:
# Try to rename the columns
try:
df.columns = BINARIES_COLUMNS
except ValueError:
if verbose:
print("Columns could not be renamed.")
return df
# Return the path
if len(fpaths) > 0:
if len(fpaths) == 1:
return list(fpaths)[0]
return fpaths
return
def clear_memory(
self, source: str, verbose: bool = True, files: bool = False
):
"""Clear stored binary data from memory and/or disk."""
source = source.lower()
if files:
if source in BINARIES_FILENAMES:
file_path = DATASETS_DIR / BINARIES_FILENAMES[source]
if file_path.exists():
file_path.unlink()
if verbose:
print(f" Removed {file_path} from disk.")
elif source in ["both", "all"]:
for key in BINARIES_FILENAMES:
self.clear_memory(key, verbose=verbose, files=True)
else:
raise ValueError("Invalid binary source.")
else:
if source in self._datasets:
self._datasets[source] = pd.DataFrame()
self._headers[source] = ""
if verbose:
print(f" Cleared memory for binaries type-{source}")
elif source in ["both", "all"]:
for key in self._datasets:
self.clear_memory(key, verbose=verbose)
else:
raise ValueError("Invalid binary source.")
# ------------------------- INITIALIZATION --------------------------
_full_manager = DatasetManager()
_binary_manager = BinaryDatasetManager()
# =============================================================================
# FUNCTIONS
# =============================================================================
# --------------------------- EU AND NASA DATASETS ----------------------------
[docs]
def load(
source: str,
from_memory: bool = True,
from_zip: Union[str, Path, bool] = True,
from_file: Union[str, Path, bool] = True,
dir_path: Union[str, Path, bool, None] = True,
to_resokit: bool = True,
to_df: bool = False,
check_age: bool = False,
only_index: bool = False,
only_rows: Union[list, int] = False,
verbose: bool = True,
store: Union[bool, str] = True,
store_index: Union[bool, str] = True,
) -> Union[pd.DataFrame, ResokitDataFrame, ResoKitDataset, None]:
"""Load the dataset from a specified source.
The dataset is loaded from a ZIP archive or a CSV file, or from memory
if already stored. The priority is given to the memory saved dataset,
then to the zip archive, and finally to the file.
Note
----
Storing the dataset in memory is useful for faster access and to avoid
reading the file multiple times.
Note
----
If both `from_file` and `from_zip` are provided, it is assumed that the
file inside the ZIP archive is the same as the one provided in `from_file`.
Finally, the path constructed is: `dir_path / zip_name / file_name`.
Parameters
----------
source : str
Identifier for the data source ('eu' or 'nasa').
from_memory : bool, optional. Default: True.
If `True`, loads the dataset from memory if available.
from_zip : str or Path or bool, optional. Default: True.
Path to the ZIP archive to load the dataset.
If `True`, default ZIP filename is used.
If `False`, the file is not loaded from the ZIP archive.
from_file : str or Path or bool, optional. Default: True.
Path to the file to load the dataset.
If `True`, default filename is used.
If `False`, the file is not loaded.
dir_path : str, Path or bool, optional. Default: True.
Directory path to load the dataset from.
If `True` or `None` the default directory is used.
to_resokit : bool, optional. Default: True.
If `True`, returns the dataset including only the columns
required by ResoKit.
to_df : bool, optional. Default: False.
If `True`, returns the raw dataset as a pandas DataFrame.
If `False`, returns the dataset as a ResoKitDataset.
check_age : bool, optional. Default: False.
If `True`, displays the file's last modified date.
used by ResoKit.
only_index : bool, optional. Default: False.
If `True`, loads only the index columns.
If `p` or a string starting with "p", loads the parsed index
columns. Only compatible with `from_memory=True`. If not previously
stored, `None` is returned.
only_rows : list|int, optional. Default: [].
If provided, loads only the specified rows.
Remember that python is 0-indexed, so
the first row (system) is 0.
verbose : bool, optional. Default: True.
If `True`, prints messages about the process.
store : bool, str, optional. Default: True.
If `str`, then "f" or "y" or "s" or "o" overwrites the stored dataset.
If `True`, stores the dataset in memory.
store_index : bool, str, optional. Default: True.
If `True`, stores the dataset index in memory.
If `only_rows` is provided, the index is not stored.
If `str`, then "f" or "y" or "s" or "o" overwrites the stored index.
Returns
-------
dataset : DataFrame or ResoKitDataset
The loaded dataset as a pandas DataFrame or a ResoKitDataset.
"""
return _full_manager.load(
source=source,
from_memory=from_memory,
from_zip=from_zip,
from_file=from_file,
dir_path=dir_path,
to_resokit=to_resokit,
to_df=to_df,
check_age=check_age,
only_index=only_index,
only_rows=only_rows,
verbose=verbose,
store=store,
store_index=store_index,
)
[docs]
def download(
source: str,
to_memory: bool = True,
to_file: Union[str, Path, bool] = True,
to_zip: Union[str, Path, bool] = True,
dir_path: Union[str, Path, bool, None] = True,
overwrite: bool = False,
soft: bool = False,
check_outd: bool = True,
only_new_rows: bool = False,
to_resokit: Union[bool, None] = None,
verbose: bool = True,
chunk_size: int = 1024,
print_size: float = 0.15,
) -> Union[Path, pd.DataFrame, ResoKitDataset, None, dict]:
"""Download a dataset from a specified source and save it locally.
The dataset is downloaded from the internet, from the online NASA
or exoplanet.eu databases, and can be stored in a file, a ZIP archive,
in memory, and/or simply returned.
Note
----
Requires the requests library.
Parameters
----------
source : str
Identifier for the data source ('eu' or 'nasa').
If "all" or "both", downloads both datasets.
to_memory : bool, optional. Default: True.
If `True`, stores the dataset in memory.
to_file : str or Path or bool, optional. Default: True.
Path or str to the file to store the dataset.
If `True`, default filename is used.
If `False`, the file is not saved nor created.
to_zip : str or Path or bool, optional. Default: True.
Path or str to the ZIP archive to store the dataset.
If `True`, default ZIP filename is used.
If `False`, the file is not saved nor created in the ZIP archive.
dir_path : str or Path or bool or None. Default: True
Directory path to save the dataset, or path to the ZIP archive.
If `None` or `True` the default directory is used.
overwrite : bool, optional. Default: False.
If `True`, overwrites the file if it already exists.
The memory stored Dataset and Index are always overwritten,
independently of this parameter.
soft : bool, optiona. Default: False
If `True`, prints a message instead of raising an error, in
case of file existing and overwrite = `False`.
check_outd : bool, optional. Default: True.
Whether to check if the dataset is already up-to-date.
only_new_rows : bool, optional. Default: False.
Whether to perform a query of only rows updated after the
latest local row-update. If no previous local dataset exists
an error is raised.
If False, the whole dataset is downloaded.
to_resokit : bool, dict, optional. Default: None.
If `True`, returns the dataset as a ResoKitDataset.
If `False`, returns the dataset as a pandas DataFrame.
If `None`, returns the path to the downloaded file.
verbose : bool, optional. Default: True.
If `True`, displays messages about the download process.
chunk_size : int, optional. Default: 1024.
Size of the chunks to download the dataset, in bytes.
Default is 1024 bytes (1 KB).
print_size: float, optional. Default: 0.15.
Update frequency for the download progress bar.
Returns
-------
downloaded : Path or pd.DataFrame or None
`Path` to the downloaded dataset (and or zip archive),
or the dataset if `to_resokit` is not `None`.
"""
if source.lower() in ["all", "both"]:
# Download both datasets
eu = download(
source="eu",
to_memory=to_memory,
to_file=to_file,
to_zip=to_zip,
dir_path=dir_path,
overwrite=overwrite,
soft=soft,
check_outd=check_outd,
to_resokit=to_resokit,
verbose=verbose,
chunk_size=chunk_size,
print_size=print_size,
)
nasa = download(
source="nasa",
to_memory=to_memory,
to_file=to_file,
to_zip=to_zip,
dir_path=dir_path,
overwrite=overwrite,
soft=soft,
check_outd=check_outd,
to_resokit=to_resokit,
verbose=verbose,
chunk_size=chunk_size,
print_size=print_size,
)
if to_resokit is None and eu is None and nasa is None:
return
return {"eu": eu, "nasa": nasa}
return _full_manager.download(
source=source,
to_memory=to_memory,
to_file=to_file,
to_zip=to_zip,
dir_path=dir_path,
overwrite=overwrite,
soft=soft,
check_outd=check_outd,
to_resokit=to_resokit,
is_query=only_new_rows,
verbose=verbose,
chunk_size=chunk_size,
print_size=print_size,
)
[docs]
def query_new_rows(
source: str,
check_outd: bool = True,
to_resokit: Union[None, bool] = False,
verbose: bool = True,
rename: bool = True,
load_kwargs: Union[Dict, None] = None,
) -> Union[pd.DataFrame, ResoKitDataset, Tuple]:
"""Query online the rows updated after latest local dataset row-update.
The rows are queried according the the corresponding row-update value.
The resulting pandas dataframe is cached for the duration of the session.
If querying from NASA, the rows will have all (including non default
and controversial) new planets.
Note
----
This function does not update the local dataset, but caches the queries
in case of reusing when calling `resokit.databases.update`.
Note
----
Requires the requests library.
Parameters
----------
source : str
Identifier for the data source ('eu' or 'nasa').
If "all" or "both", queries rows from both datasets.
check_outd : bool, optional. Default: True.
Whether to check if the dataset is already up-to-date.
If so, no query is performed.
to_resokit : bool, dict, optional. Default: None.
Formats the final dataset:
If `True`, as a ResoKitDataset.
If `False`, as a pandas DataFrame.
If `None`, as a ResoKitDataset, using all original columns.
verbose : bool, optional. Default: True.
If `True`, displays messages about the query process.
rename : bool, optional. Default: True.
If `True`, renames the columns to match the original
databe column names. Mainly for EU database queries.
load_kwargs : dict, None, optional. Default: None
Dictionary with keyboard arguments for the `resokit.load`
function.
If `None`, the default arguments are used.
Returns
-------
downloaded : pd.DataFrame or ResoKitDataset or Tuple
The requested rows with specified format; or tuple if
both sources requested.
"""
# Ensure lowercase
source = source.lower()
if source in ["all", "both"]:
eu_new = query_new_rows(
source="eu",
check_outd=check_outd,
to_resokit=to_resokit,
verbose=verbose,
rename=rename,
load_kwargs=load_kwargs,
)
if verbose:
print("")
nasa_new = query_new_rows(
source="nasa",
check_outd=check_outd,
to_resokit=to_resokit,
verbose=verbose,
rename=rename,
load_kwargs=load_kwargs,
)
return eu_new, nasa_new
if check_outd:
check_outdated(which=source, verbose=verbose)
if load_kwargs is None:
load_kwargs = {
"from_memory": True,
"from_file": True,
"from_zip": True,
}
result = _full_manager.query_new(
source=source,
to_resokit=to_resokit,
verbose=verbose,
load_kwargs=load_kwargs,
rename=rename,
old_df_and_new=False,
)
assert isinstance(result, (pd.DataFrame, ResoKitDataset)), (
"Expected result to be a pd.DataFrame or ResoKitDataset, "
+ f"got {type(result)} instead."
)
return result
[docs]
def update(
source: str,
load_kwargs: Union[Dict, None] = None,
to_memory: bool = True,
to_file: Union[str, Path, bool] = True,
to_zip: Union[str, Path, bool] = True,
dir_path: Union[str, Path, bool, None] = True,
overwrite: bool = False,
check_outd: bool = True,
to_resokit: Union[bool, None] = None,
verbose: bool = True,
) -> Union[Path, pd.DataFrame, ResoKitDataset, None, dict]:
"""Update the local dataset with new rows from a specified source.
This function is a wrapper for the function
`resokit.datasets.download(..., only_new_rows=True)`; but is
mandatory that the dataset exists previously to be loaded first.
No download printing progress available for this function.
Note
----
Requires the requests library.
Parameters
----------
source : str
Identifier for the data source ('eu' or 'nasa').
If "all" or "both", downloads both datasets.
load_kwargs : dict or None, optional. Defalt: None
Dictionary with keyboard arguments for the `resokit.load`
function.
If `None`, the default arguments are used.
to_memory : bool, optional. Default: True.
If `True`, stores the dataset in memory.
to_file : str or Path or bool, optional. Default: True.
Path or str to the file to store the dataset.
If `True`, default filename is used.
If `False`, the file is not saved nor created.
to_zip : str or Path or bool, optional. Default: True.
Path or str to the ZIP archive to store the dataset.
If `True`, default ZIP filename is used.
If `False`, the file is not saved nor created in the ZIP archive.
dir_path : str or Path or bool or None. Default: True
Directory path to save the dataset, or path to the ZIP archive.
If `None` or `True` the default directory is used.
overwrite : bool, optional. Default: False.
If `True`, overwrites the file if it already exists.
The memory stored Dataset and Index are always overwritten,
independently of this parameter.
check_outd : bool, optional. Default: True.
Whether to check if the dataset is already up-to-date.
to_resokit : bool, dict, optional. Default: None.
If `True`, returns the dataset as a ResoKitDataset.
If `False`, returns the dataset as a pandas DataFrame.
If `None`, returns the path to the downloaded file.
verbose : bool, optional. Default: True.
If `True`, displays messages about the download process.
Returns
-------
updated : Path or pd.DataFrame or None
`Path` to the updated dataset (and or zip archive),
or the dataset if `to_resokit` is not `None`.
"""
if load_kwargs is None:
load_kwargs = {}
# Load the dataset
if source in ["all", "both"]:
load(source="eu", **load_kwargs)
load(source="nasa", **load_kwargs)
else:
load(source=source, **load_kwargs)
# Now update it
return download(
source=source,
to_memory=to_memory,
to_file=to_file,
to_zip=to_zip,
dir_path=dir_path,
overwrite=overwrite,
soft=False,
check_outd=check_outd,
only_new_rows=True,
to_resokit=to_resokit,
verbose=verbose,
)
# --------------------------- BINARY SYSTEMS DATASETS -------------------------
[docs]
def load_binary(
which: Union[str, bool],
from_memory: bool = True,
from_file: Union[str, bool] = True,
dir_path: Union[str, Path, bool] = True,
rename_columns: bool = True,
ret_header: bool = False,
inferr: bool = False,
clean: bool = True,
verbose: bool = True,
) -> Union[pd.DataFrame, str]:
"""Load a binary dataset.
Parameters
----------
which : str, bool
Which dataset to load:
'circumbinary' or 'c' or 'p' for the p-type circumbinaries dataset,
'simple' or 's' for the s-type binaries dataset.
If `True`, loads the default dataset (circumbinary).
If `False`, loads the simple binary dataset.
from_memory : bool, optional. Default: True.
If `True`, loads the dataset from memory if available.
from_file : str or bool, optional. Default: True.
If `True`, default filename is used.
If `False`, the file is not loaded.
dir_path : str, Path or bool, optional. Default: True.
Directory path to load the dataset from.
If `True` or `None` the default directory is used.
rename_columns : bool, optional. Default: True.
If True, rename the columns for human readability.
ret_header : bool, optional. Default: False.
If True, return the header.
If False, return the data.
inferr : bool, optional. Default: False.
If False, the width of the columns is fixed. (Recommended)
If True, the parsed width of the columns is inferred. Use in case
the dataset cannot be parsed with fixed-width columns.
clean : bool, optional. Default: True.
If True, replace the unknown values with NaN.
verbose : bool, optional. Default: True.
If True, print the header and messages.
Returns
-------
Union[pd.DataFrame, str]
header : str if ret_header is True.
The header of the dataset.
data : pd.DataFrame if ret_header is False.
The dataset as a pandas DataFrame.
"""
# Check the which parameter
if isinstance(which, bool):
if which:
which = "circumbinary"
else:
which = "simple"
return _binary_manager.load(
source=which,
from_memory=from_memory,
from_file=from_file,
dir_path=dir_path,
rename_columns=rename_columns,
ret_header=ret_header,
inferr=inferr,
clean=clean,
verbose=verbose,
)
[docs]
def download_binary(
which: str,
to_file: Union[str, Path, bool] = True,
dir_path: Union[str, Path, bool, None] = True,
to_memory: bool = True,
return_data: bool = True,
overwrite: bool = False,
soft: bool = True,
verbose: bool = True,
chunk_size: int = 1024,
print_size: float = 0.00001,
) -> Union[Path, pd.DataFrame, None, dict]:
"""Download a binary dataset from a specified source and save it locally.
The dataset is downloaded from the internet and can be stored in a file,
in memory, and/or simply returned.
Note
----
Requires the requests library.
Parameters
----------
which : str
Which dataset to download:
'circumbinary' or 'c' or 'p' for the p-type circumbinaries dataset,
'simple' or 's' for the s-type binaries dataset.
If "all" or "both", downloads both datasets.
to_file : str or Path or bool, optional. Default: True.
Path or str to the file to store the dataset.
If `True`, default filename is used.
If `False`, the file is not saved nor created.
dir_path : str or Path or bool or None. Default:True
Directory path to save the dataset.
If `None` or `True`, the default directory is used.
to_memory : bool, optional. Default: True.
If `True`, stores the dataset in memory.
return_data : bool, optional. Default: True.
If `True`, returns the dataset.
overwrite : bool, optional. Default: False.
If `True`, overwrites the file if it already exists.
It also overwrites the stored dataset in memory.
soft : bool, optiona. Default: True
If `True`, prints a message instead of raising an error, in
case of file existing and overwrite = `False`.
verbose : bool, optional. Default: True.
If `True`, displays messages about the download process.
chunk_size : int, optional. Default: 1024.
Size of the chunks to download the dataset, in bytes.
Default is 1024 bytes (1 KB).
print_size: float, optional. Default: 0.15.
Update frequency for the download progress bar.
Returns
-------
downloaded : Path or pd.DataFrame or str or None
`Path` to the downloaded dataset (and or zip archive),
or the dataset if return_data is `True`, or `None`.
"""
if which.lower() in ["all", "both"]:
# Download both datasets
s = download_binary(
which="simple",
to_file=to_file,
dir_path=dir_path,
to_memory=to_memory,
return_data=return_data,
overwrite=overwrite,
soft=soft,
verbose=verbose,
chunk_size=chunk_size,
print_size=print_size,
)
p = download_binary(
which="circumbinary",
to_file=to_file,
dir_path=dir_path,
to_memory=to_memory,
return_data=return_data,
overwrite=overwrite,
soft=soft,
verbose=verbose,
chunk_size=chunk_size,
print_size=print_size,
)
if return_data:
return {"s": s, "p": p}
return
return _binary_manager.download(
source=which,
to_file=to_file,
dir_path=dir_path,
to_memory=to_memory,
return_data=return_data,
overwrite=overwrite,
soft=soft,
verbose=verbose,
chunk_size=chunk_size,
print_size=print_size,
)
# --------------------------- AUXILIAR FUNCTIONS ----------------------------
[docs]
def clear_memory(
which: str, verbose: bool = True, files: bool = False
) -> None:
"""Clear the memory for the specified dataset.
Parameters
----------
which : str
Which dataset ('eu', 'nasa', 'datasets',
'p', 's', 'binary', 'all').
verbose : bool, optional. Default: True.
Whether to print informational messages.
files : bool, optional. Default: False.
If `True`, also removes the files from disk.
"""
which = which.lower() # Ensure lowercase
if which in DATASET_FILENAMES:
_full_manager.clear_memory(source=which, verbose=verbose, files=files)
elif which in BINARIES_FILENAMES:
_binary_manager.clear_memory(
source=which, verbose=verbose, files=files
)
elif which == "datasets":
_full_manager.clear_memory(source="both", verbose=verbose, files=files)
elif which == "binary":
_binary_manager.clear_memory(
source="both", verbose=verbose, files=files
)
elif which == "all":
_full_manager.clear_memory(source="both", verbose=verbose, files=files)
_binary_manager.clear_memory(
source="both", verbose=verbose, files=files
)
else:
raise ValueError(
f"Invalid {which=}. Must be 'eu', 'nasa', 'p', 's', "
+ "'binary', 'datasets', or 'all'."
)
if files is True:
clear_memory(which=which, verbose=verbose, files=False)
[docs]
def check_outdated(
which: str = "both", verbose: bool = True, soft=True
) -> Union[bool, Tuple[bool, bool]]:
"""Check if the specified stored dataset is outdated.
Parameters
----------
which : str, optional. Default: 'both'
Which dataset ('eu' or 'nasa').
If 'both', then both 'eu' and 'nasa'.
If 'all', then 'both' and both binaries too.
verbose : bool, optional. Default: True.
Whether to print informational messages.
Returns
-------
outdated : bool
Whether the dataset is outdated.
"""
# Check if which is valid
which = which.lower() # Ensure lowercase
if which == "both":
eu = check_outdated(which="eu", verbose=verbose, soft=soft)
if verbose:
print("") # A space between prints
nasa = check_outdated(which="nasa", verbose=verbose, soft=soft)
return eu, nasa
if which == "all":
both = check_outdated(which="both", verbose=verbose, soft=soft)
binas = check_binary_outdated(which="both", verbose=verbose, soft=soft)
return both[0], both[1], binas[0], binas[1]
if which not in DATASET_FILENAMES:
if which in BINARIES_FILENAMES:
if verbose:
print(
f"Use `check_binary_outdated({which=}) to check if"
+ "binary dataset is outdated."
)
raise ValueError(f"Invalid {which=}. Must be 'eu' or 'nasa'.")
if verbose:
print(f"Checking local dataset from {which=} source...")
# Check if the dataset is stored
try:
if which == "eu":
df_stored = _full_manager.load(
"eu",
verbose=False,
from_file=True,
to_df=True,
only_index=True,
check_age=True,
only_rows=False,
store=False,
store_index=True,
)
else:
df_stored = _full_manager.load(
"nasa",
verbose=False,
from_file=True,
to_df=True,
to_resokit=False,
check_age=True,
only_index=False,
only_rows=False,
store=False,
store_index=True,
)
# Keep only non controversial and default_flag
if "default_flag" in df_stored.columns:
df_stored = df_stored[df_stored["default_flag"] == 1]
elif verbose:
print(
" Unable to select default solutions for outdated check."
)
except FileNotFoundError as error:
if verbose:
print(
f" File from {which=} source to check if outdated not found."
)
if soft:
return True
raise error
except ValueError as error:
if (
str(error)
== "Data not found in memory, and no file or ZIP provided."
):
if verbose:
print(" Unable to load data to check if outdated.")
print(" Try downloading/loading it first")
if soft:
return True
raise error
assert isinstance(df_stored, pd.DataFrame), (
"Expected df_stored to be a pd.DataFrame, "
+ f"got {type(df_stored)} instead."
)
n_local = len(df_stored)
if n_local > 0 and verbose:
print(f" Number of planets in stored dataset: {n_local}")
if which == "nasa":
print(" (Including only default parameters sets.)")
elif verbose:
print(" Could not load the stored dataset. ")
# Check if the dataset is outdated
n_online, _ = check_outdated_dataset(source=which, verbose=verbose)
if n_online == n_local:
if verbose:
print("Dataset is already up-to-date.")
return False
elif n_online <= 0:
if verbose:
print("Cannot check if the dataset is up-to-date. ")
print("The dataset could be outdated.")
return True
elif n_online < n_local:
if verbose:
print(
"The online dataset has less rows than the stored dataset. "
+ "\n This could be the result of some online row(s) deleted."
+ "\n Although this is usually not a problem, running "
+ f"\n`resokit.datasets.download({which=})` could solve it "
+ "if needed."
)
return False
# n_online > n_local
if verbose:
print("The online dataset has more rows than the stored dataset. ")
print("The dataset is outdated.")
return True
[docs]
def check_binary_outdated(
which: Union[str, bool] = "both", verbose: bool = True, soft=True
) -> Union[bool, Tuple[bool, bool]]:
"""Check if the specified stored binary dataset is outdated.
Parameters
----------
which : str, bool
Which dataset: 'p' (circumbinary) or 's' (single binary).
If 'both' or 'all, both datasets are checked.
If True, circumbinary; if False, single binary.
verbose : bool, optional. Default: True.
Whether to print informational messages.
Returns
-------
outdated : bool
Whether the dataset is outdated.
"""
# Check if which is valid
if isinstance(which, bool):
which = "p" if which is True else "s"
which = which.lower() # Ensure lowercase
if which in ["both", "all"]:
p = check_binary_outdated(which="p", verbose=verbose, soft=soft)
if verbose:
print("") # A space between prints
s = check_binary_outdated(which="s", verbose=verbose, soft=soft)
return p, s
if which not in BINARIES_FILENAMES:
if which in DATASET_FILENAMES:
if verbose:
print(
f"Use `check_outdated({which=}) to check if "
+ f"'{which}' dataset is outdated."
)
raise ValueError(f"Invalid {which=}. Must be 'p' or 's'.")
# Check if the dataset is stored
try:
header = _binary_manager.load(source=which, ret_header=True)
df = _binary_manager.load(source=which, ret_header=False)
except FileNotFoundError as error:
if verbose:
print(
f"File from '{which}'-type binary source "
+ "to check if outdated not found."
)
if soft:
return True
raise error
assert isinstance(header, str), (
"Expected header to be a str, " + f"got {type(header)} instead."
)
n_local = len(df) + len(header.splitlines())
if n_local > 0 and verbose:
print(f" Number of lines in stored dataset: {n_local}")
elif verbose:
print("Could not load the stored dataset. ")
# Check if the dataset is outdated
n_online = check_outdated_binary(source=which, verbose=verbose)
if n_online == n_local:
if verbose:
print("Dataset is already up-to-date.")
return False
elif n_online <= 0:
if verbose:
print("Cannot check if the dataset is up-to-date. ")
print("The dataset could be outdated.")
return True
elif n_online < n_local:
if verbose:
print("The online dataset has less rows than the stored dataset. ")
print("This is unexpected.")
return False
# n_online > n_local
if verbose:
print("The online dataset has more rows than the stored dataset. ")
print("The dataset is outdated.")
return True