diff --git a/src/sed/core/config_model.py b/src/sed/core/config_model.py index bca9f959..f8bf496b 100644 --- a/src/sed/core/config_model.py +++ b/src/sed/core/config_model.py @@ -26,6 +26,7 @@ class PathsModel(BaseModel): raw: DirectoryPath processed: Optional[Union[DirectoryPath, NewPath]] = None + meta: Optional[Union[DirectoryPath, NewPath]] = None class CopyToolModel(BaseModel): diff --git a/src/sed/loader/flash/loader.py b/src/sed/loader/flash/loader.py index a3e6c0a2..67f28e6f 100644 --- a/src/sed/loader/flash/loader.py +++ b/src/sed/loader/flash/loader.py @@ -58,8 +58,10 @@ def __init__(self, config: dict, verbose: bool = True) -> None: set_verbosity(logger, self._verbose) self.instrument: str = self._config["core"].get("instrument", "hextof") # default is hextof + self.beamtime_dir: str = None self.raw_dir: str = None self.processed_dir: str = None + self.meta_dir: str = None @property def verbose(self) -> bool: @@ -111,9 +113,14 @@ def _initialize_dirs(self) -> None: # Only raw_dir is necessary, processed_dir can be based on raw_dir, if not provided if "paths" in self._config["core"]: raw_dir = Path(self._config["core"]["paths"].get("raw", "")) + print(raw_dir) processed_dir = Path( self._config["core"]["paths"].get("processed", raw_dir.joinpath("processed")), ) + meta_dir = Path( + self._config["core"]["paths"].get("meta", raw_dir.joinpath("meta")), + ) + beamtime_dir = Path(raw_dir).parent else: try: @@ -147,11 +154,14 @@ def _initialize_dirs(self) -> None: raw_dir = raw_paths[0].resolve() processed_dir = beamtime_dir.joinpath("processed") + meta_dir = beamtime_dir.joinpath("meta/fabtrack/") processed_dir.mkdir(parents=True, exist_ok=True) + self.beamtime_dir = str(beamtime_dir) self.raw_dir = str(raw_dir) self.processed_dir = str(processed_dir) + self.meta_dir = str(meta_dir) @property def available_runs(self) -> list[int]: @@ -223,7 +233,7 @@ def get_files_from_run_id( # type: ignore[override] # Return the list of found files return [str(file.resolve()) for file in files] - def parse_metadata(self, token: str = None) -> dict: + def parse_scicat_metadata(self, token: str = None) -> dict: """Uses the MetadataRetriever class to fetch metadata from scicat for each run. Returns: @@ -239,6 +249,23 @@ def parse_metadata(self, token: str = None) -> dict: return metadata + def parse_local_metadata(self) -> dict: + """Uses the MetadataRetriever class to fetch metadata from local folder for each run. + + Returns: + dict: Metadata dictionary + """ + metadata_retriever = MetadataRetriever(self._config["metadata"]) + metadata = metadata_retriever.get_local_metadata( + beamtime_id=self._config["core"]["beamtime_id"], + beamtime_dir=self.beamtime_dir, + meta_dir=self.meta_dir, + runs=self.runs, + metadata=self.metadata, + ) + + return metadata + def get_count_rate(self, fids=None, **kwds) -> tuple[np.ndarray, np.ndarray]: """ Calculates the count rate using the number of rows and elapsed time for each file. @@ -362,7 +389,7 @@ def read_dataframe( folders: str | Sequence[str] = None, runs: str | int | Sequence[str | int] = None, ftype: str = "h5", - metadata: dict = {}, + metadata: dict | None = None, collect_metadata: bool = False, **kwds, ) -> tuple[dd.DataFrame, dd.DataFrame, dict]: @@ -406,6 +433,9 @@ def read_dataframe( FileNotFoundError: If the conversion fails for some files or no data is available. ValueError: If collect_metadata is True and no token is available. """ + if metadata is None: + metadata = {} + detector = kwds.pop("detector", "") force_recreate = kwds.pop("force_recreate", False) processed_dir = kwds.pop("processed_dir", None) @@ -465,7 +495,25 @@ def read_dataframe( if self.instrument == "wespe": df, df_timed = wespe_convert(df, df_timed) - self.metadata.update(self.parse_metadata(token) if collect_metadata else {}) + # self.metadata.update(self.parse_metadata(token) if collect_metadata else {}) + # if len(self.parse_scicat_metadata(token)) == 0: + + # print("No SciCat metadata available, checking local folder") + # self.metadata.update(self.parse_local_metadata()) + # else: + # print("Metadata taken from SciCat") + # self.metadata.update(self.parse_scicat_metadata(token) if collect_metadata else {}) + scicat_metadata = self.parse_scicat_metadata(token) + scicat_runs = scicat_metadata.get("scientificMetadata", {}) + + if not any(scicat_runs.values()): + logger.warning("No SciCat metadata available, checking local folder") + self.metadata.update(self.parse_local_metadata()) + else: + logger.warning("Metadata taken from SciCat") + if collect_metadata: + self.metadata.update(scicat_metadata) + self.metadata.update(bh.metadata) logger.info(f"Loading complete in {time.time() - t0: .2f} s") diff --git a/src/sed/loader/flash/metadata.py b/src/sed/loader/flash/metadata.py index 578fa9fd..396b9043 100644 --- a/src/sed/loader/flash/metadata.py +++ b/src/sed/loader/flash/metadata.py @@ -5,6 +5,8 @@ from __future__ import annotations import requests +import json, yaml +from pathlib import Path from sed.core.config import read_env_var from sed.core.config import save_env_var @@ -15,8 +17,7 @@ class MetadataRetriever: """ - A class for retrieving metadata from a Scicat instance based - on beamtime and run IDs. + Retrieves metadata from SciCat or local YAML files for a given beamtime and runs. """ def __init__(self, metadata_config: dict, token: str = None) -> None: @@ -55,10 +56,11 @@ def get_metadata( self, beamtime_id: str, runs: list, - metadata: dict = None, + metadata: dict | None = None, ) -> dict: """ - Retrieves metadata for a given beamtime ID and list of runs. + Retrieves metadata for a beamtime and runs from SciCat. + Returns a dict with 'scientificMetadata' keyed by run ID. Args: beamtime_id (str): The ID of the beamtime. @@ -77,12 +79,17 @@ def get_metadata( if metadata is None: metadata = {} + all_runs_metadata: dict[str, dict] = {} + for run in runs: pid = f"{beamtime_id}/{run}" - logger.debug(f"Retrieving metadata for PID: {pid}") + # logger.debug(f"Retrieving metadata for PID: {pid}") metadata_run = self._get_metadata_per_run(pid) - metadata.update(metadata_run) # TODO: Not correct for multiple runs + # metadata.update(metadata_run) # TODO: Not correct for multiple runs + # Use 'scientificMetadata' if available, otherwise entire dict + all_runs_metadata[run] = metadata_run.get("scientificMetadata", metadata_run) + metadata["scientificMetadata"] = all_runs_metadata logger.debug(f"Retrieved metadata with {len(metadata)} entries") return metadata @@ -103,44 +110,172 @@ def _get_metadata_per_run(self, pid: str) -> dict: headers2["Authorization"] = f"Bearer {self.token}" try: - logger.debug(f"Attempting to fetch metadata with new URL format for PID: {pid}") - dataset_response = requests.get( - self._create_new_dataset_url(pid), - headers=headers2, - timeout=10, - ) - dataset_response.raise_for_status() + # logger.debug(f"Attempting to fetch metadata with new URL format for PID: {pid}") + # dataset_response = requests.get( + # self._create_new_dataset_url(pid), + # headers=headers2, + # timeout=10, + # ) + # dataset_response.raise_for_status() + logger.debug(f"Fetching metadata (new URL) for PID: {pid}") + response = requests.get(self._create_new_dataset_url(pid), headers=headers2, timeout=10) + response.raise_for_status() # Check if response is an empty object because wrong url for older implementation - if not dataset_response.content: + # if not dataset_response.content: + # logger.debug("Empty response, trying old URL format") + # dataset_response = requests.get( + # self._create_old_dataset_url(pid), + # headers=headers2, + # timeout=10, + # ) + if not response.content: logger.debug("Empty response, trying old URL format") - dataset_response = requests.get( - self._create_old_dataset_url(pid), - headers=headers2, - timeout=10, - ) + response = requests.get(self._create_old_dataset_url(pid), headers=headers2, timeout=10) # If the dataset request is successful, return the retrieved metadata # as a JSON object - return dataset_response.json() + # return dataset_response.json() + return response.json() + except requests.exceptions.RequestException as e: + logger.warning(f"Failed to retrieve metadata for PID {pid}: {e}") + return {} - except requests.exceptions.RequestException as exception: - logger.warning(f"Failed to retrieve metadata for PID {pid}: {str(exception)}") - return {} # Return an empty dictionary for this run + # except requests.exceptions.RequestException as exception: + # logger.warning(f"Failed to retrieve metadata for PID {pid}: {str(exception)}") + # return {} # Return an empty dictionary for this run def _create_old_dataset_url(self, pid: str) -> str: - return "{burl}/{url}/%2F{npid}".format( - burl=self.url, - url="Datasets", - npid=self._reformat_pid(pid), - ) + return f"{self.url}datasets/%2F{self._reformat_pid(pid)}" def _create_new_dataset_url(self, pid: str) -> str: - return "{burl}/{url}/{npid}".format( - burl=self.url, - url="Datasets", - npid=self._reformat_pid(pid), - ) + return f"{self.url}datasets/{self._reformat_pid(pid)}" def _reformat_pid(self, pid: str) -> str: """SciCat adds a pid-prefix + "/" but at DESY prefix = "" """ - return (pid).replace("/", "%2F") + """Replace '/' with '%2F' for SciCat PID.""" + return pid.replace("/", "%2F") + + # ---------------------------- + # Local metadata + # ---------------------------- + def get_local_metadata( + self, + beamtime_id: str, + beamtime_dir: str | Path, + meta_dir: str | Path, + runs: list, + metadata: dict | None = None, + ) -> dict: + """ + Retrieves metadata for a beamtime and runs from local YAML files. + Returns a dict with 'scientificMetadata' keyed by run ID. + + Args: + beamtime_dir (str)|Path: Beamtime directory. + meta_dir (str)|Path: Local metadata directory.. + runs (list): A list of run IDs. + metadata (dict, optional): The existing metadata dictionary. + Defaults to None. + + Returns: + Dict: The updated metadata dictionary. + + Raises: + Exception: If the request to retrieve metadata fails. + """ + if metadata is None: + metadata = {} + + beamtime_metadata = self._get_beamtime_metadata(beamtime_dir, beamtime_id) + metadata.update(beamtime_metadata) + + all_runs_metadata: dict[str, dict] = {} + + for run in runs: + logger.debug(f"Retrieving local metadata for run: {run}") + run_metadata = self._get_local_metadata_per_run(meta_dir, run) + all_runs_metadata[run] = run_metadata.get("_data", {}) + + metadata["scientificMetadata"] = all_runs_metadata + + logger.debug(f"Retrieved metadata with {len(metadata)} entries") + return metadata + + def _get_beamtime_metadata( + self, + beamtime_dir: str | Path, + beamtime_id: str, + ) -> dict: + """ + Retrieves general metadata for a given beamtime ID from beamtime-metadata-{beamtime_id}.json file + + Args: + beamtime_dir (str)|Path: Beamtime directory. + beamtime_id (str): The ID of the beamtime. + + Returns: + Dict: The retrieved metadata dictionary. + + Raises: + Exception: If the request to retrieve metadata fails. + """ + try: + beamtime_dir = Path(beamtime_dir) + filepath = beamtime_dir / f"beamtime-metadata-{beamtime_id}.json" + with filepath.open("r") as f: + return json.load(f) + except Exception as exc: + logger.warning(f"Failed to retrieve metadata for beamtime {beamtime_id}: {exc}") + return {} + + + def _get_local_metadata_per_run(self, meta_dir: str | Path, run: str) -> dict: + """ + Retrieves metadata for a specific run from the latest YAML file: + {run}_N.yaml (highest N) or fallback to {run}.yaml + + Args: + meta_dir(str): The existing local metadata folder. + + Returns: + dict: The retrieved metadata. + + Raises: + Exception: If the request to retrieve metadata fails. + """ + try: + # run = str(run) + meta_dir = Path(meta_dir) + + # candidates = [] + run = str(run) + candidates: list[tuple[int, Path]] = [] + + # Look for versioned YAML files + for path in meta_dir.glob(f"{run}_*.yaml"): + try: + version = int(path.stem.split("_")[-1]) + candidates.append((version, path)) + except ValueError: + continue + + # Fallback: unversioned single file + if not candidates: + single_file = meta_dir / f"{run}.yaml" + if single_file.exists(): + candidates.append((0, single_file)) + + if not candidates: + raise FileNotFoundError(f"No metadata files found for run {run} in {meta_dir}") + + # Pick the latest version + _, latest_path = max(candidates, key=lambda x: x[0]) + + logger.info(f"Loading local metadata from {latest_path.name}") + + run_metadata = yaml.safe_load(latest_path.read_text()) + return run_metadata or {"_data": {}} + + except Exception as exc: + logger.warning(f"Failed to retrieve local metadata for run {run}: {exc}") + return {"_data": {}}