Skip to content

Supervisors

Supervision systems wrapped behind a unified Supervisor interface.

bells_o.supervisors

Impose structure on Class structure.

AutoCustomSupervisor

Class that implements automatic loading of previously implemented supervisor models from HuggingFace.

Source code in src/bells_o/supervisors/custom/auto_model.py
class AutoCustomSupervisor:
    """Class that implements automatic loading of previously implemented supervisor models from HuggingFace."""

    @classmethod
    def load(cls, model_id: str, **kwargs):
        """Load a CustomSupervisor automatically from pre-configured supervisors.

        Args:
            model_id (str): Model identifier for implemented supervisor model.
            **kwargs: Optional keyword arguments to override default parameters.

        """
        module_name, class_attribute, special_kwargs = MODEL_MAPPING[model_id.lower()]
        model_module = import_module(f".{module_name}", "bells_o.supervisors.custom")

        if hasattr(model_module, class_attribute):
            model_class = getattr(model_module, class_attribute)
        else:
            raise NotImplementedError(
                f"Did not find model `{class_attribute}` in module `bells_o.supervisors.custom.{module_name}`."
            )

        # merge kwargs, special kwargs take priority because they are determined by the model_id
        kwargs |= special_kwargs

        return model_class(**kwargs)

load classmethod

load(model_id: str, **kwargs)

Load a CustomSupervisor automatically from pre-configured supervisors.

Parameters:

Name Type Description Default
model_id str

Model identifier for implemented supervisor model.

required
**kwargs

Optional keyword arguments to override default parameters.

{}
Source code in src/bells_o/supervisors/custom/auto_model.py
@classmethod
def load(cls, model_id: str, **kwargs):
    """Load a CustomSupervisor automatically from pre-configured supervisors.

    Args:
        model_id (str): Model identifier for implemented supervisor model.
        **kwargs: Optional keyword arguments to override default parameters.

    """
    module_name, class_attribute, special_kwargs = MODEL_MAPPING[model_id.lower()]
    model_module = import_module(f".{module_name}", "bells_o.supervisors.custom")

    if hasattr(model_module, class_attribute):
        model_class = getattr(model_module, class_attribute)
    else:
        raise NotImplementedError(
            f"Did not find model `{class_attribute}` in module `bells_o.supervisors.custom.{module_name}`."
        )

    # merge kwargs, special kwargs take priority because they are determined by the model_id
    kwargs |= special_kwargs

    return model_class(**kwargs)

CustomSupervisor

Bases: Supervisor

An abstract class that builds the base for custom supervisors outside of REST or HuggingFace.

Source code in src/bells_o/supervisors/custom/custom_supervisor.py
class CustomSupervisor(Supervisor):
    """An abstract class that builds the base for custom supervisors outside of REST or HuggingFace."""

    def __init__(
        self,
        name: str,
        usage: Usage,
        res_map_fn: ResultMapper,
        pre_processing: list[PreProcessing] = [],
        provider_name: str | None = None,
        **supervisor_kwargs,
    ):
        """Initialize the custom supervisor.

        Args:
            name (str): Name of the supervisor
            usage (Usage): The usage type of the supervisor.
            res_map_fn (ResultMapper): The `ResultMapper` used to convert results.
            pre_processing (list[PreProcessing] | None, optional): List of Preprocessing techniques for inputs. Defaults to None.
            provider_name (str | None, optional): The name of the provider of this model. Defaults to None.
            supervisor_kwargs (dict[str, Any]): The kwargs to configure the loading of the supervisor.

        """
        self._supervisor_kwargs = supervisor_kwargs

        super().__init__(name, usage, res_map_fn, pre_processing, provider_name)

        self._load_supervisor()

    @property
    def supervisor_kwargs(self) -> dict[str, Any]:  # noqa: D102
        return self._supervisor_kwargs

    def metadata(self) -> dict[str, Any]:
        """Return metadata dictionary for this Supervisor.

        Returns:
            dict: Dictionary with metadata.

        """
        metadata = super().metadata()
        if self.supervisor_kwargs is not None:
            metadata["supervisor_kwargs"] = self.supervisor_kwargs
        return metadata

    @abstractmethod
    def _load_supervisor(self):
        raise NotImplementedError("This is an abstract class.")

metadata

metadata() -> dict[str, Any]

Return metadata dictionary for this Supervisor.

Returns:

Name Type Description
dict dict[str, Any]

Dictionary with metadata.

Source code in src/bells_o/supervisors/custom/custom_supervisor.py
def metadata(self) -> dict[str, Any]:
    """Return metadata dictionary for this Supervisor.

    Returns:
        dict: Dictionary with metadata.

    """
    metadata = super().metadata()
    if self.supervisor_kwargs is not None:
        metadata["supervisor_kwargs"] = self.supervisor_kwargs
    return metadata

AutoHuggingFaceSupervisor

Class that implements automatic loading of previously implemented supervisor models from HuggingFace.

Source code in src/bells_o/supervisors/huggingface/auto_model.py
class AutoHuggingFaceSupervisor:
    """Class that implements automatic loading of previously implemented supervisor models from HuggingFace."""

    @classmethod
    def load(cls, model_id: str, **kwargs):
        """Load a HuggingFaceSupervisor automatically from pre-configured supervisors.

        Args:
            model_id (str): Model ID of implemented supervisor model on Huggingface.
            **kwargs: Optional keyword arguments to override default parameters.

        """
        module_name, class_attribute, special_kwargs = MODEL_MAPPING[model_id.lower()]
        model_module = import_module(f".{module_name}", "bells_o.supervisors.huggingface")

        if hasattr(model_module, class_attribute):
            model_class = getattr(model_module, class_attribute)
        else:
            raise NotImplementedError(
                f"Did not find model `{class_attribute}` in module `bells_o.supervisors.huggingface.{module_name}`."
            )

        # merge kwargs, special kwargs take priority because they are determined by the model_id
        kwargs |= special_kwargs

        return model_class(**kwargs)

load classmethod

load(model_id: str, **kwargs)

Load a HuggingFaceSupervisor automatically from pre-configured supervisors.

Parameters:

Name Type Description Default
model_id str

Model ID of implemented supervisor model on Huggingface.

required
**kwargs

Optional keyword arguments to override default parameters.

{}
Source code in src/bells_o/supervisors/huggingface/auto_model.py
@classmethod
def load(cls, model_id: str, **kwargs):
    """Load a HuggingFaceSupervisor automatically from pre-configured supervisors.

    Args:
        model_id (str): Model ID of implemented supervisor model on Huggingface.
        **kwargs: Optional keyword arguments to override default parameters.

    """
    module_name, class_attribute, special_kwargs = MODEL_MAPPING[model_id.lower()]
    model_module = import_module(f".{module_name}", "bells_o.supervisors.huggingface")

    if hasattr(model_module, class_attribute):
        model_class = getattr(model_module, class_attribute)
    else:
        raise NotImplementedError(
            f"Did not find model `{class_attribute}` in module `bells_o.supervisors.huggingface.{module_name}`."
        )

    # merge kwargs, special kwargs take priority because they are determined by the model_id
    kwargs |= special_kwargs

    return model_class(**kwargs)

HuggingFaceSupervisor

Bases: Supervisor

A concrete class that enables loading any HuggingFace model as a supervisor.

Source code in src/bells_o/supervisors/huggingface/hf_supervisor.py
class HuggingFaceSupervisor(Supervisor):
    """A concrete class that enables loading any HuggingFace model as a supervisor."""

    # TODO: doc strings
    def __init__(
        self,
        name: str,
        usage: Usage,
        res_map_fn: ResultMapper,
        pre_processing: list[PreProcessing] = [],
        model_kwargs: dict[str, Any] = {},
        tokenizer_kwargs: dict[str, Any] = {},
        generation_kwargs: dict[str, Any] = {},
        provider_name: str | None = None,
        backend: Literal["transformers", "vllm"] = "transformers",
    ):
        super().__init__(name, usage, res_map_fn, pre_processing)
        self._model_kwargs = model_kwargs
        self._tokenizer_kwargs = tokenizer_kwargs
        self.generation_kwargs = generation_kwargs
        self._provider_name = provider_name

        if getattr(self, "_supported_backends", None) is None:
            self._supported_backends = ["transformers", "vllm"]

        self._backend = backend
        self._load_model_tokenizer()

    def _load_model_tokenizer(self):
        # loading model and tokenizer for different backend implementations
        # making this a separate method such that it can be easily changed by supervisor implementations (e.g. for LORA)
        if self.backend not in self._supported_backends:
            raise NotImplementedError(
                f"The requested backend `{self.backend}` is not supported. Choose one of {self._supported_backends}."
            )
        if self.backend == "transformers":
            from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedTokenizerBase  # noqa: F401

            self._tokenizer = AutoTokenizer.from_pretrained(self.name, **self.tokenizer_kwargs)
            self._model = AutoModelForCausalLM.from_pretrained(self.name, **self.model_kwargs)

        elif self.backend == "vllm":
            try:
                from vllm import LLM, SamplingParams  # noqa: F401
            except ModuleNotFoundError:
                raise ModuleNotFoundError(
                    "This setup requires additional dependencies. The following required module is missing: ['vllm']. Please install it with `pip install bells_o[vllm]`."
                )
            self._model = LLM(self.name, **self.model_kwargs, tensor_parallel_size=torch.cuda.device_count())
            self._tokenizer = self._model.get_tokenizer()

    @property
    def backend(self) -> str:  # noqa: D102
        return self._backend

    @property
    def model_kwargs(self) -> dict[str, Any]:  # noqa: D102
        return self._model_kwargs

    @property
    def tokenizer_kwargs(self) -> dict[str, Any]:  # noqa: D102
        return self._tokenizer_kwargs

    def metadata(self) -> dict[str, Any]:
        """Return metadata dictionary for this Supervisor.

        Returns:
            dict: Dictionary with metadata.

        """
        metadata = super().metadata()
        if self.generation_kwargs is not None:
            metadata["generation_kwargs"] = self.generation_kwargs
        if self.model_kwargs is not None:
            metadata["model_kwargs"] = self.model_kwargs
        if self.tokenizer_kwargs is not None:
            metadata["tokenizer_kwargs"] = self.tokenizer_kwargs
        metadata["backend"] = self.backend
        return metadata

    def pre_process(self, inputs: str | list[str]) -> list[str]:
        """Apply all preprocessing steps except tokenization."""  # TODO: improve this docstring
        inputs = super().pre_process(inputs)

        inputs = self._apply_chat_template(inputs)
        return inputs

    def _apply_chat_template(self, inputs: list[str]) -> list[str]:
        if getattr(self._tokenizer, "chat_template", None) is not None:
            assert isinstance(inputs, list), (
                "If `tokenizer.chat_template` is not None, then use a `RoleWrapper` as the last pre-processor."
            )
            inputs = self._tokenizer.apply_chat_template(
                inputs,  # type: ignore
                tokenize=False,
                add_generation_prompt=True,
            )  # TODO customize the kwargs of apply_chat_template?
        return inputs

    # TODO: pass generation kwargs
    def judge(self, inputs: str | list[str]) -> list[OutputDict]:
        """Evaluate samples with model.

        Args:
            inputs (str | list[str]): The sample or batch of samples to be evaluated.

        Returns:
            list[str]: The outputs of the supervisor as a list.

        """
        if isinstance(inputs, str):
            inputs = [inputs]
        if self.backend == "transformers":
            return self._judge_transformers(inputs)
        if self.backend == "vllm":
            return self._judge_vllm(inputs)

        raise NotImplementedError(
            f"The requested backend `{self.backend}` is not supported. Choose one of {SUPPORTED_BACKENDS}."
        )

    # judge() implementations for different backends
    def _judge_transformers(self, inputs: list[str]) -> list[OutputDict]:
        from transformers import PreTrainedTokenizerBase

        assert self.backend == "transformers", (
            f'Backend should be "transformers" at this point, but got "{self.backend}".'
        )
        assert isinstance(self._tokenizer, PreTrainedTokenizerBase), f"got {type(self._tokenizer)}"

        encoded_batch = self._tokenizer(inputs, return_tensors="pt", padding=True).to(
            device=getattr(self._model, "device")
        )
        start_time = time()
        outputs = cast(torch.Tensor, self._model.generate(**encoded_batch, **self.generation_kwargs))
        generation_time = time() - start_time

        # cut outputs to only include generated tokens, assume that all samples were padded to the same length
        input_ids = encoded_batch["input_ids"]
        assert isinstance(input_ids, torch.Tensor)
        sequence_length = input_ids.size(1)  # padded sequence length
        print(f"DEBUG: outputs before: {self._tokenizer.batch_decode(outputs)}")
        print(f"DEBUG: sequence length: {sequence_length}")
        outputs = outputs[:, sequence_length:, ...]

        decoded_outputs: list[str] = self._tokenizer.batch_decode(outputs)
        print(f"DEBUG: outputs after: {decoded_outputs}")
        batch_size = len(inputs)

        input_tokens: list[int] = (
            cast(torch.Tensor, encoded_batch["attention_mask"]).sum(dim=1).tolist()
        )  # only count non-padding tokens

        output_tokens: list[int] = (
            cast(torch.Tensor, outputs != self._tokenizer.pad_token_id).sum(dim=1).tolist()
        )  # only count non-padding tokens

        assert len(output_tokens) == batch_size == len(input_tokens), (
            f"Expected all lengths to be equal, but got {len(output_tokens)}, {batch_size} and {len(input_tokens)}."
        )

        return [
            OutputDict(
                output_raw=output,
                metadata={
                    "latency": generation_time / batch_size,
                    "batch_size": batch_size,
                    "input_tokens": input_t,
                    "output_tokens": output_t,
                },
            )
            for output, input_t, output_t in zip(decoded_outputs, input_tokens, output_tokens)
        ]

    def _judge_vllm(self, inputs: list[str]):
        from vllm import LLM, SamplingParams

        assert self.backend == "vllm", f'Backend should be "vllm" at this point, but got "{self.backend}".'
        assert isinstance(self._model, LLM)

        sampling_params = SamplingParams(
            **self.generation_kwargs
        )  # TODO: somehow make it obvious that this takes vllm arguments
        start = time()  # note that by doing this, we are generalizing latency per prompt as batch_latency/n_prompts
        outputs = self._model.generate(inputs, sampling_params)
        generation_time = time() - start

        batch_size = len(inputs)

        return [
            OutputDict(
                output_raw=output.outputs[0].text,
                metadata={
                    "latency": generation_time / batch_size,
                    "batch_size": batch_size,
                    "input_tokens": len(cast(list[int], output.prompt_token_ids)),
                    "output_tokens": len(output.outputs[0].token_ids),
                },
            )
            for output in outputs
        ]

metadata

metadata() -> dict[str, Any]

Return metadata dictionary for this Supervisor.

Returns:

Name Type Description
dict dict[str, Any]

Dictionary with metadata.

Source code in src/bells_o/supervisors/huggingface/hf_supervisor.py
def metadata(self) -> dict[str, Any]:
    """Return metadata dictionary for this Supervisor.

    Returns:
        dict: Dictionary with metadata.

    """
    metadata = super().metadata()
    if self.generation_kwargs is not None:
        metadata["generation_kwargs"] = self.generation_kwargs
    if self.model_kwargs is not None:
        metadata["model_kwargs"] = self.model_kwargs
    if self.tokenizer_kwargs is not None:
        metadata["tokenizer_kwargs"] = self.tokenizer_kwargs
    metadata["backend"] = self.backend
    return metadata

pre_process

pre_process(inputs: str | list[str]) -> list[str]

Apply all preprocessing steps except tokenization.

Source code in src/bells_o/supervisors/huggingface/hf_supervisor.py
def pre_process(self, inputs: str | list[str]) -> list[str]:
    """Apply all preprocessing steps except tokenization."""  # TODO: improve this docstring
    inputs = super().pre_process(inputs)

    inputs = self._apply_chat_template(inputs)
    return inputs

judge

judge(inputs: str | list[str]) -> list[OutputDict]

Evaluate samples with model.

Parameters:

Name Type Description Default
inputs str | list[str]

The sample or batch of samples to be evaluated.

required

Returns:

Type Description
list[OutputDict]

list[str]: The outputs of the supervisor as a list.

Source code in src/bells_o/supervisors/huggingface/hf_supervisor.py
def judge(self, inputs: str | list[str]) -> list[OutputDict]:
    """Evaluate samples with model.

    Args:
        inputs (str | list[str]): The sample or batch of samples to be evaluated.

    Returns:
        list[str]: The outputs of the supervisor as a list.

    """
    if isinstance(inputs, str):
        inputs = [inputs]
    if self.backend == "transformers":
        return self._judge_transformers(inputs)
    if self.backend == "vllm":
        return self._judge_vllm(inputs)

    raise NotImplementedError(
        f"The requested backend `{self.backend}` is not supported. Choose one of {SUPPORTED_BACKENDS}."
    )

AutoRestSupervisor

Class that implements automatic loading of previously implemented supervisor REST APIs.

Source code in src/bells_o/supervisors/rest/auto_endpoint.py
class AutoRestSupervisor:
    """Class that implements automatic loading of previously implemented supervisor REST APIs."""

    @classmethod
    def load(cls, endpoint_name, *args, **kwargs):
        """Load a RestSupervisor automatically from pre-configured APIs.

        Args:
            endpoint_name (str): name of the endpoint, consult documentation for implemented APIs.
            *args: Positional arguments for the class type.
            **kwargs: Optional keyword arguments for the instantiated class.

        """
        module_name, class_attribute = MODEL_MAPPING[endpoint_name]
        model_module = import_module(f".{module_name}", "bells_o.supervisors.rest")

        if hasattr(model_module, class_attribute):
            model_class = getattr(model_module, class_attribute)
        else:
            raise NotImplementedError(
                f"Did not find attribute `{class_attribute}` in module `bells_o.supervisors.rest.{module_name}`."
            )
        return model_class(*args, **kwargs)

load classmethod

load(endpoint_name: Literal['lakeraguard-default'], project_id: str, pre_processing: list[PreProcessing] = ..., api_key: str | None = ..., api_variable: str | None = ...) -> LakeraGuardDefaultSupervisor
load(endpoint_name: Literal['lakeraguard'], project_id: str, usage: Usage, pre_processing: list[PreProcessing] = ..., api_key: str | None = ..., api_variable: str | None = ...) -> LakeraGuardSupervisor
load(endpoint_name: str, *args, **kwargs) -> RestSupervisor
load(endpoint_name: Literal['lakeraguard-default'] | Literal['lakeraguard'] | str, *args, **kwargs) -> LakeraGuardDefaultSupervisor | LakeraGuardSupervisor | RestSupervisor

Load a RestSupervisor automatically from pre-configured APIs.

Parameters:

Name Type Description Default
endpoint_name str

name of the endpoint, consult documentation for implemented APIs.

required
*args

Positional arguments for the class type.

()
**kwargs

Optional keyword arguments for the instantiated class.

{}
Source code in src/bells_o/supervisors/rest/auto_endpoint.py
@classmethod
def load(cls, endpoint_name, *args, **kwargs):
    """Load a RestSupervisor automatically from pre-configured APIs.

    Args:
        endpoint_name (str): name of the endpoint, consult documentation for implemented APIs.
        *args: Positional arguments for the class type.
        **kwargs: Optional keyword arguments for the instantiated class.

    """
    module_name, class_attribute = MODEL_MAPPING[endpoint_name]
    model_module = import_module(f".{module_name}", "bells_o.supervisors.rest")

    if hasattr(model_module, class_attribute):
        model_class = getattr(model_module, class_attribute)
    else:
        raise NotImplementedError(
            f"Did not find attribute `{class_attribute}` in module `bells_o.supervisors.rest.{module_name}`."
        )
    return model_class(*args, **kwargs)

RestSupervisor dataclass

Bases: Supervisor

A concrete class that enables access to supervisors via REST API.

Source code in src/bells_o/supervisors/rest/rest_supervisor.py
@dataclass(kw_only=True)
class RestSupervisor(Supervisor):
    """A concrete class that enables access to supervisors via REST API."""

    # TODO: doc string
    def __init__(
        self,
        name: str,
        usage: Usage,
        res_map_fn: ResultMapper,
        base_url: str,
        req_map_fn: RequestMapper,
        auth_map_fn: AuthMapper,
        pre_processing: list[PreProcessing] = [],
        provider_name: str | None = None,
        api_key: str | None = None,
        api_variable: str | None = None,
        needs_api: bool = True,
        rate_limit_codes: list[int] = [429],
        custom_header: dict[str, str] = {},
    ):
        self._api_key = api_key
        self._api_variable = api_variable
        self._needs_api = needs_api

        assert not needs_api or self.api_key, (
            f"You have provide the API key in the default environment variable `{self.api_variable}`, set a custom environment variabe in which the API key can be found (by passing `api_variable`), or the API key itself (by passing `api_key`)."
        )

        super().__init__(name, usage, res_map_fn, pre_processing)

        self.base_url = base_url
        self.req_map_fn = req_map_fn
        self.auth_map_fn = auth_map_fn
        self.pre_processing = pre_processing
        self._provider_name = provider_name  # private
        self.rate_limit_codes = rate_limit_codes
        self.custom_header = custom_header

    @property
    def api_key(self) -> str:  # noqa: D102
        if not self._needs_api:
            return ""
        return self._api_key or getenv(self.api_variable, "")

    @api_key.setter
    def api_key(self, value: str):
        self._api_key = value
        self._needs_api = True

    @property
    def api_variable(self) -> str:  # noqa: D102
        return self._api_variable if self._api_variable is not None else ""

    @api_variable.setter
    def api_variable(self, value: str | None):
        self._api_variable = value

    def metadata(self) -> dict[str, Any]:
        """Return metadata dictionary for this Supervisor.

        Returns:
            dict: Dictionary with metadata.

        """
        metadata = super().metadata()
        metadata["url"] = self.base_url
        return metadata

    @classmethod
    @abstractmethod
    def _get_token_counts(cls, output_raw: dict[str, Any]) -> dict[str, Any]:
        """Get the input and output tokens from an output dictionary."""
        input_tokens = output_raw["some_key"]
        output_tokens = output_raw["some_other_key"]

        return {"input_tokens": input_tokens, "output_tokens": output_tokens}

    def _judge_sample(
        self,
        prompt: str,
    ) -> OutputDict:
        """Run an individual POST request for inference.

        Args:
            prompt (str): The prompt string to check.
            output_type (Literal["output_dict", "request"]): The type of the return value. Returns an `OutputDict` if `"output_dict"`,
                and returns `tuple[Response, float]` if `"request"`.


        Returns:
            OutputDict | tuple[Response, float]: The output of the Supervisor and corresponding metadata, mapped to an OutputDict or as a Response object.

        """
        printed_info = False
        rate_limit = False  # to distinguish between trying and retrying information
        no_valid_response = True  # to manage retries
        output_raw = None

        while no_valid_response:
            if rate_limit:
                if not printed_info:
                    print("INFO: Hit rate limit. Starting retry cycling (2 sec).")
                    printed_info = True
                sleep(2)

            start_time = time()
            headers = self.auth_map_fn(self) | self.custom_header
            response = post(
                self.base_url,
                json=self.req_map_fn(self, prompt),
                headers=headers,
            )
            latency = time() - start_time

            if response.status_code in self.rate_limit_codes:
                rate_limit = True
                continue

            try:
                output_raw = response.json()
                no_valid_response = False
            except JSONDecodeError:  # Usually a faulty output, so rerun. Note that it can lead to infinite while loop
                continue

        assert isinstance(output_raw, dict)
        metadata = self._get_token_counts(output_raw)
        metadata["latency"] = latency

        return OutputDict(output_raw=output_raw, metadata=metadata)

    def judge(self, prompts: list[str] | str) -> list[OutputDict]:
        """Evaluate a (batch of) prompt(s simultaneously).

        Args:
            prompts: List of prompts.

        Returns:
            list[OutputDict]: List of outputs and metadata.

        """
        if not prompts:
            return []

        if not isinstance(prompts, list):
            prompts = [prompts]

        with ThreadPoolExecutor() as executor:
            # the next statement runs a request for every prompt in parallel
            outputs = list(
                executor.map(
                    lambda prompt: self._judge_sample(prompt),
                    prompts,
                )
            )

        return outputs

metadata

metadata() -> dict[str, Any]

Return metadata dictionary for this Supervisor.

Returns:

Name Type Description
dict dict[str, Any]

Dictionary with metadata.

Source code in src/bells_o/supervisors/rest/rest_supervisor.py
def metadata(self) -> dict[str, Any]:
    """Return metadata dictionary for this Supervisor.

    Returns:
        dict: Dictionary with metadata.

    """
    metadata = super().metadata()
    metadata["url"] = self.base_url
    return metadata

judge

judge(prompts: list[str] | str) -> list[OutputDict]

Evaluate a (batch of) prompt(s simultaneously).

Parameters:

Name Type Description Default
prompts list[str] | str

List of prompts.

required

Returns:

Type Description
list[OutputDict]

list[OutputDict]: List of outputs and metadata.

Source code in src/bells_o/supervisors/rest/rest_supervisor.py
def judge(self, prompts: list[str] | str) -> list[OutputDict]:
    """Evaluate a (batch of) prompt(s simultaneously).

    Args:
        prompts: List of prompts.

    Returns:
        list[OutputDict]: List of outputs and metadata.

    """
    if not prompts:
        return []

    if not isinstance(prompts, list):
        prompts = [prompts]

    with ThreadPoolExecutor() as executor:
        # the next statement runs a request for every prompt in parallel
        outputs = list(
            executor.map(
                lambda prompt: self._judge_sample(prompt),
                prompts,
            )
        )

    return outputs

Supervisor

Bases: ABC

Abstract base class for Supervisors.

Attributes:

Name Type Description
name str

Name of the supervisor.

usage Usage

The usage type of the supervisor.

res_map_fn ResultMapper

Function to map the output to a Result dict.

pre_processing list[PreProcessing]

List of PreProcessing techniques that should be applied.

provider_name str

The name of the provider of this model.

Source code in src/bells_o/supervisors/supervisor.py
class Supervisor(ABC):
    """Abstract base class for Supervisors.

    Attributes:
        name (str): Name of the supervisor.
        usage (Usage): The usage type of the supervisor.
        res_map_fn (ResultMapper): Function to map the output to a `Result` dict.
        pre_processing (list[PreProcessing]): List of PreProcessing techniques that should be applied.
        provider_name (str): The name of the provider of this model.

    """

    def __init__(
        self,
        name: str,
        usage: Usage,
        res_map_fn: ResultMapper,
        pre_processing: list[PreProcessing] = [],
        provider_name: str | None = None,
    ):
        """Initialize the supervisor.

        Args:
            name (str): Name of the supervisor
            usage (Usage): The usage type of the supervisor.
            res_map_fn (ResultMapper): The `ResultMapper` used to convert results.
            pre_processing (list[PreProcessing] | None, optional): List of Preprocessing techniques for inputs. Defaults to None.
            provider_name (str | None, optional): The name of the provider of this model. Defaults to None.

        """
        self._name = name
        self._usage = usage
        self._res_map_fn = res_map_fn
        self.pre_processing = pre_processing
        self._provider_name = provider_name

    @property
    def name(self) -> str:  # noqa: D102
        return self._name

    @property
    def usage(self) -> Usage:  # noqa: D102
        return self._usage

    @property
    def res_map_fn(self) -> ResultMapper:  # noqa: D102
        return self._res_map_fn

    @property
    def provider_name(self) -> str:  # noqa: D102
        return self._provider_name if self._provider_name else "N/A"

    def __repr__(self) -> str:
        """Represent class object as string."""
        return f'{self.__name__}("{self.name}", "{self.usage}")'

    def __call__(self, inputs: str | list[str], *args, **kwargs) -> list[OutputDict]:
        """Complete full judging process."""
        if not isinstance(inputs, list):
            inputs = [inputs]

        inputs = self.pre_process(inputs)

        outputs: list[OutputDict] = self.judge(inputs)
        for output in outputs:
            output["output_result"] = self._res_map_fn(output["output_raw"], self.usage)  # pyright: ignore[reportArgumentType]
        return outputs

    def metadata(self) -> dict[str, Any]:
        """Return metadata dictionary for this Supervisor.

        Returns:
            dict: Dictionary with metadata.

        """
        metadata = {
            "provider": getattr(self, "provider_name", "Unknown"),
            "model": getattr(self, "name", "Unknown"),
            "usage": repr(self.usage),
            "supervisor_type": self.__class__.__name__,
        }

        return metadata

    @abstractmethod
    def judge(self, *args, **kwargs) -> list[OutputDict]:
        """Run one evaluation on the supervisor.

        Similar to `forward` in PyTorch, it expects prepped inputs s.t.
        they can be used directly with the supervisor.
        """
        pass

    def pre_process(self, inputs: list[str] | str):
        """Apply all preprocessing steps.

        Concrete classes will likely need a tokenization equivalent implemented.
        """
        if not isinstance(inputs, list):
            inputs = [inputs]

        if self.pre_processing:
            for pre_processor in self.pre_processing:
                inputs = [pre_processor(input) for input in inputs]
        return inputs

metadata

metadata() -> dict[str, Any]

Return metadata dictionary for this Supervisor.

Returns:

Name Type Description
dict dict[str, Any]

Dictionary with metadata.

Source code in src/bells_o/supervisors/supervisor.py
def metadata(self) -> dict[str, Any]:
    """Return metadata dictionary for this Supervisor.

    Returns:
        dict: Dictionary with metadata.

    """
    metadata = {
        "provider": getattr(self, "provider_name", "Unknown"),
        "model": getattr(self, "name", "Unknown"),
        "usage": repr(self.usage),
        "supervisor_type": self.__class__.__name__,
    }

    return metadata

judge abstractmethod

judge(*args, **kwargs) -> list[OutputDict]

Run one evaluation on the supervisor.

Similar to forward in PyTorch, it expects prepped inputs s.t. they can be used directly with the supervisor.

Source code in src/bells_o/supervisors/supervisor.py
@abstractmethod
def judge(self, *args, **kwargs) -> list[OutputDict]:
    """Run one evaluation on the supervisor.

    Similar to `forward` in PyTorch, it expects prepped inputs s.t.
    they can be used directly with the supervisor.
    """
    pass

pre_process

pre_process(inputs: list[str] | str)

Apply all preprocessing steps.

Concrete classes will likely need a tokenization equivalent implemented.

Source code in src/bells_o/supervisors/supervisor.py
def pre_process(self, inputs: list[str] | str):
    """Apply all preprocessing steps.

    Concrete classes will likely need a tokenization equivalent implemented.
    """
    if not isinstance(inputs, list):
        inputs = [inputs]

    if self.pre_processing:
        for pre_processor in self.pre_processing:
            inputs = [pre_processor(input) for input in inputs]
    return inputs