"""Base class for all vulnerability framework SETs.
All SETs inherit from BaseSETPipeline and should implement all 4 phases:
initialize() -> execute() -> evaluate() -> report()
"""
import logging
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Dict, Any, Optional
from datetime import datetime
from math import sqrt
from .schema import LanguageModelSETCase, OutputData, EvaluationResult, ReportData
from ...connectors.languagemodel.base import BaseLMConnector
from ...models import EvaluationLanguageModel
from scipy.special import erfinv
logger = logging.getLogger(__name__)
[docs]
class BaseSETPipeline(ABC):
"""The base Pipeline class for Language Model Security Evaluation Tests.
Based on a 4-phase pipeline with defined data contracts:
Phase 1 - initialize(config_path) -> List[LanguageModelSETCase] (Get SET configuration from a configuration file and return a list of SET cases)
Phase 2 - execute(connector, sets) -> OutputData (Execute the SET cases against the defined model and return dataobjects for evaluation)
Phase 3 - evaluate(execution_data) -> List[EvaluationResult] (Evaluate the test results and return evaluation objects)
Phase 4 - report(results, output_path, format) -> ReportData (Take the evaluation objects and form a final report in a desired format
run() - Runs all phases
Data flow:
initialize() ---> List[LanguageModelSETCase] ---> execute() ---> OutputData(List[ExecutionOutput, execution_time]) ---> evaluate() ---> List[EvaluationResult] ---> report() ---> ReportData
When new language model SETs are designed, override these methods according to your needs.
New evaluators, connectors, loaders, reporters, configurations, and SETs can be written and used as long as they follow this pipeline structure.
"""
name: str = ""
description: str = ""
SUPPORTED_FORMATS = [ReportFormat.JSON, ReportFormat.HTML, ReportFormat.MARKDOWN]
def __init__(self):
self.set_cases: List[LanguageModelSETCase] = []
self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None
self.connector_config_path: Optional[str] = None
self.set_config_path: Optional[str] = None
self.target_model_name: Optional[str] = None
self.target_model_max_tokens: Optional[int] = None
self.evaluation_model_name: Optional[str] = None
self.evaluation_model_max_tokens: Optional[int] = None
self.evaluation_model: Optional[EvaluationLanguageModel] = None
[docs]
@abstractmethod
def initialize(self, set_config_path: str) -> List[LanguageModelSETCase]:
"""Load and return SET cases from configuration files.
Args:
set_config_path: Path to SET configuration file
Returns:
List[LanguageModelSETCase]: SET cases used in the run
Requirements:
- Each SET case must at least contain an ID and a prompt
- Additional data related to the SETs go to the metadata
"""
pass
[docs]
@abstractmethod
def execute(
self, connector: BaseLMConnector, sets: List[LanguageModelSETCase]
) -> OutputData:
"""Run the SETs against the target.
Args:
connector: A connector instance
sets: List of SET cases from initialize()
Returns:
OutputData: All SET outputs along with the execution time.
Requirements:
- Must produce one ExecutionOutput per LanguageModelSETCase.
- Metadata from LanguageModelSETCase should be carried through for final report.
- Errors should be placed to ExecutionOutput.error for later inspection.
"""
pass
[docs]
@abstractmethod
def evaluate(self, execution_data: OutputData) -> List[EvaluationResult]:
"""Evaluate the SET outputs with evaluators.
Args:
execution_data: OutputData from execute()
Returns:
List[EvaluationResult]: Evaluation of each SET
Requirements:
- Must produce one EvaluationResult per ExecutionOutput
- Status must be "passed", "failed", or "error" TODO: Something else?
- Reason should explain the SET status. Why did the SET pass, fail or cause an error?
"""
pass
[docs]
@abstractmethod
def report(
self,
results: List[EvaluationResult],
output_path: str,
report_format: ReportFormat = ReportFormat.JSON,
generate_ai_summary: bool = True,
) -> ReportData:
"""Generate the final report in the desired format and save it to target location.
Args:
results: List[EvaluationResult] from evaluate()
output_path: Path for output file (../user/reports/..)
report_format: Report format (Json, Toml, Yaml...) Set to JSON as default.
generate_ai_summary: Whether to generate AI summary (optional)
Returns:
ReportData: The final report with all the SET data
Requirements:
- Must write a report in the requested format to output_path
"""
pass
[docs]
def run(
self,
connector: BaseLMConnector,
set_config_path: str,
output_path: str,
report_format: ReportFormat = ReportFormat.JSON,
connector_config_path: Optional[str] = None,
generate_ai_summary: bool = True,
runs: int = 1,
) -> ReportData:
"""Orchestration method that executes the 4-phase pipeline.
This method gets called by the execution engine.
Args:
connector: A connector instance
set_config_path: Path to the SET configuration
output_path: Path where the output report is written
report_format: Desired output format
connector_config_path: Path to model configuration (for report metadata)
generate_ai_summary: Whether to generate AI summary
runs: How many times to run the SET
Returns:
ReportData: The final report with all the SET data
Requirements:
Return the final report
Calls other class methods with appropriate arguments
"""
# Store config paths and model name for report
self.connector_config_path = connector_config_path
self.set_config_path = set_config_path
self.target_model_name = connector.model
try:
# Initialize
sets = self.initialize(set_config_path)
results = []
# Loop to allow multiple SET runs
for run in range(runs):
logger.info(f"Starting SET run {run + 1}/{runs}.")
# Execute
execution_data = self.execute(connector, sets)
# Evaluate
results += self.evaluate(execution_data)
logger.info(f"SET run {run + 1}/{runs} finished.")
# Report
report_data = self.report(
results, output_path, report_format, generate_ai_summary
)
return report_data
finally:
if self.evaluation_model:
# Clean evaluation model from memory
self.evaluation_model.del_model()
self.evaluation_model = None
[docs]
def generate_ai_summary(
self,
results: List[EvaluationResult],
summary_stats: Dict[str, Any],
subcategory_runs: Optional[Dict[str, int]] = None,
) -> Optional[Dict[str, Any]]:
"""Generate an AI summary of the security evaluation test results.
This is an optional helper method that can be called in the report phase
to generate an AI-powered summary of the test results.
Args:
results: List of EvaluationResult from evaluate()
summary_stats: Summary statistics from calculate_passrates()
subcategory_runs: Optional dict of subcategory -> number of runs
"""
try:
from avise.reportgen.summarizers.ai_summarizer import AISummarizer
model_to_use = self.evaluation_model
if hasattr(self, "evaluation_model") and self.evaluation_model is not None:
logger.info("Reusing existing evaluation model for AI summary")
else:
logger.info(
"Creating new model for AI summary (no existing evaluation model)"
)
summarizer = AISummarizer(reuse_model=model_to_use)
results_dict = [r.to_dict() for r in results]
ai_summary = summarizer.generate_summary(
results_dict, summary_stats, subcategory_runs
)
# Delete model from memory
summarizer.del_model()
return {
"issue_summary": ai_summary.issue_summary,
"recommended_remediations": ai_summary.recommended_remediations,
"notes": ai_summary.notes,
}
except Exception as e:
logger.error(f"Failed to generate AI summary: {e}")
return None
[docs]
@staticmethod
def calculate_passrates(results: List[EvaluationResult]) -> Dict[str, Any]:
"""Calculate summary statistics (pass%, fail%, error%) based on results.
Helper for report phase. Can be overwritten.
"""
total_set_cases = len(results)
passed = 0
failed = 0
errors = 0
for result in results:
if result.status == "passed":
passed += 1
elif result.status == "failed":
failed += 1
elif result.status == "error":
errors += 1
else:
errors += 1
if total_set_cases > 0:
pass_rate = round(passed / total_set_cases * 100, 1)
fail_rate = round(failed / total_set_cases * 100, 1)
else:
pass_rate = 0
fail_rate = 0
confidence_interval = BaseSETPipeline._calculate_confidence_interval(
passed, failed
)
return {
"total_set_cases": total_set_cases,
"passed": passed,
"failed": failed,
"error": errors,
"pass_rate": pass_rate,
"fail_rate": fail_rate,
"ci_lower_bound": confidence_interval[1],
"ci_upper_bound": confidence_interval[2],
}
[docs]
@staticmethod
def calculate_subcategory_runs(
results: List[EvaluationResult],
subcategory_field: str = "vulnerability_subcategory",
) -> Dict[str, int]:
"""Calculate number of runs per vulnerability subcategory.
Args:
results: List of evaluation results
subcategory_field: Metadata field name for subcategory (default: vulnerability_subcategory)
Returns:
Dict mapping subcategory name to number of runs
"""
subcategory_runs: Dict[str, int] = {}
for result in results:
subcategory = result.metadata.get(subcategory_field, "Unknown")
subcategory_runs[subcategory] = subcategory_runs.get(subcategory, 0) + 1
return subcategory_runs
@staticmethod
def _calculate_confidence_interval(
passed: int, failed: int, confidence_level: float = 0.95
) -> tuple[int, float, float]:
"""Calculate confidence interval for binary data using Wilson score interval.
Arguments:
passed (int): Number of runs passed.
failed (int): Number of runs failed.
confidence_level (float): CI level (default 0.95).
Returns:
tuple : (proportion, lower_bound, upper_bound)
"""
n = passed + failed
if n == 0:
return (0, 0, 0)
# Sample proportion
p = passed / n
# Z-score for confidence level (1.96 for 95% CI)
z = 1.96 if confidence_level == 0.95 else sqrt(2) * erfinv(confidence_level)
# Wilson score interval
denominator = 1 + (z**2 / n)
center = (p + (z**2 / (2 * n))) / denominator
margin = (z / denominator) * sqrt((p * (1 - p) / n) + (z**2 / (4 * n**2)))
lower_bound = center - margin
upper_bound = center + margin
# Ensure bounds are within [0, 1]
lower_bound = max(0, lower_bound)
upper_bound = min(1, upper_bound)
return (p, lower_bound, upper_bound)