diff --git a/avise/cli.py b/avise/cli.py index 0b81b57..fc0f572 100644 --- a/avise/cli.py +++ b/avise/cli.py @@ -102,6 +102,11 @@ def main(arguments=None) -> None: parser.add_argument( "--verbose", "-v", action="store_true", help="Enable verbose logging" ) + parser.add_argument( + "--ai-summary", + action="store_true", + help="Generate AI-powered summary of results using Ollama", + ) parser.add_argument("--version", "-V", action="version", version=__version__) args = parser.parse_args(arguments) @@ -182,6 +187,7 @@ def main(arguments=None) -> None: output_path=args.output, report_format=report_format, reports_dir=args.reports_dir, + generate_ai_summary=args.ai_summary, ) # Print a small summary to the console diff --git a/avise/configs/connector/languagemodel/ollama.json b/avise/configs/connector/languagemodel/ollama.json index 9657a56..e56f526 100644 --- a/avise/configs/connector/languagemodel/ollama.json +++ b/avise/configs/connector/languagemodel/ollama.json @@ -2,7 +2,7 @@ "target_model": { "connector": "ollama-lm", "type": "language_model", - "name": "phi3:latest", + "name": "phi4-mini:latest", "api_url": "http://localhost:11434", "api_key": null, "max_tokens": 768 diff --git a/avise/engine.py b/avise/engine.py index 505c830..4ca88a5 100644 --- a/avise/engine.py +++ b/avise/engine.py @@ -95,6 +95,7 @@ def run_test( output_path: Optional[str] = None, report_format: ReportFormat = ReportFormat.JSON, reports_dir: str = DEFAULT_REPORTS_DIR, + generate_ai_summary: bool = False, ) -> dict: """Run the 4-phase pipeline @@ -105,6 +106,7 @@ def run_test( output_path: Optional custom output path (overrides date-based) report_format: Report format (JSON, HTML, or MARKDOWN) reports_dir: Base directory for reports + generate_ai_summary: Whether to generate AI-powered summary Returns: Report dictionary @@ -147,7 +149,14 @@ def run_test( report_format=report_format, ) - return set_instance.run(connector, set_config_path, output_path, report_format) + return set_instance.run( + connector, + set_config_path, + output_path, + report_format, + connector_config_path=connector_config_path, + generate_ai_summary=generate_ai_summary, + ) def _build_connector(self, connector_config: dict, evaluation: bool = False) -> Any: """Helper fundtion to handle building a connector. diff --git a/avise/models/evaluation_lm.py b/avise/models/evaluation_lm.py index c3ff67d..df05222 100644 --- a/avise/models/evaluation_lm.py +++ b/avise/models/evaluation_lm.py @@ -57,7 +57,7 @@ def __init__( self.model_name = model_name self.model_path = Path("avise/models/" + model_name) try: - self.tokenizer = MistralCommonBackend.from_pretrained(self.model_path) + self.tokenizer = MistralCommonBackend.from_pretrained(str(self.model_path)) self.model = Mistral3ForConditionalGeneration.from_pretrained( self.model_path, device_map=use_device ) @@ -67,7 +67,9 @@ def __init__( ) self._model_download(self.model_path, model_name) try: - self.tokenizer = MistralCommonBackend.from_pretrained(self.model_path) + self.tokenizer = MistralCommonBackend.from_pretrained( + str(self.model_path) + ) self.model = Mistral3ForConditionalGeneration.from_pretrained( self.model_path, device_map=use_device ) @@ -143,7 +145,7 @@ def generate(self, prompt) -> list: else: messages = [self.system_prompt, {"role": "user", "content": prompt}] - response = self._mistral_text_generation(messages) + response = self._mistral_text_generation(messages) # Update history if self.conversation_history: @@ -168,7 +170,11 @@ def _mistral_text_generation(self, messages: list) -> str: messages, return_tensors="pt", return_dict=True ) - tokenized["input_ids"] = tokenized["input_ids"].to(device=self.device) + # Move all tensors to the correct device + tokenized = { + k: v.to(device=self.device) if hasattr(v, "to") else v + for k, v in tokenized.items() + } # tokenized["pixel_values"] = tokenized["pixel_values"].to(dtype=bfloat16, device=self.device) # image_sizes = [tokenized["pixel_values"].shape[-2:]] diff --git a/avise/pipelines/languagemodel/pipeline.py b/avise/pipelines/languagemodel/pipeline.py index 8a8240b..ee0a2c2 100644 --- a/avise/pipelines/languagemodel/pipeline.py +++ b/avise/pipelines/languagemodel/pipeline.py @@ -5,6 +5,7 @@ """ +import logging from abc import ABC, abstractmethod from enum import Enum from typing import List, Dict, Any, Optional @@ -17,6 +18,8 @@ from scipy.special import erfinv +logger = logging.getLogger(__name__) + class ReportFormat(Enum): """Available file formats.""" @@ -120,6 +123,7 @@ def report( results: List[EvaluationResult], output_path: str, report_format: ReportFormat = ReportFormat.JSON, + generate_ai_summary: bool = False, ) -> ReportData: """Generate the final report in the desired format and save it to target location. @@ -127,6 +131,7 @@ def report( results: List[EvaluationResult] from evaluate() output_path: Path for output file (../user/reports/..) report_format: Report format (Json, Toml, Yaml...) Set to JSON as default. + generate_ai_summary: Whether to generate AI summary (optional) Returns: ReportData: The final report with all the SET data @@ -143,6 +148,7 @@ def run( output_path: str, report_format: ReportFormat = ReportFormat.JSON, connector_config_path: Optional[str] = None, + generate_ai_summary: bool = False, ) -> ReportData: """Orchestration method that executes the 4-phase pipeline. This method gets called by the execution engine. @@ -153,6 +159,7 @@ def run( output_path: Path where the output report is written report_format: Desired output format connector_config_path: Path to model configuration (for report metadata) + generate_ai_summary: Whether to generate AI summary Requirements: Return the final report @@ -173,10 +180,71 @@ def run( results = self.evaluate(execution_data) # Report - report_data = self.report(results, output_path, report_format) + report_data = self.report( + results, output_path, report_format, generate_ai_summary + ) return report_data + def generate_ai_summary( + self, + results: List[EvaluationResult], + summary_stats: Dict[str, Any], + connector_config_path: Optional[str] = None, + subcategory_runs: Optional[Dict[str, int]] = None, + ) -> Optional[Dict[str, Any]]: + """Generate an AI summary of the security evaluation test results. + + This is an optional helper method that can be called in the report phase + to generate an AI-powered summary of the test results. + + Args: + results: List of EvaluationResult from evaluate() + summary_stats: Summary statistics from calculate_passrates() + connector_config_path: Path to connector config for AI summarizer + subcategory_runs: Optional dict of subcategory -> number of runs + + Returns: + Dict with ai_summary or None if generation fails + """ + import json + + if not connector_config_path: + logger.warning( + "No connector config path provided for AI summary generation" + ) + return None + + try: + with open(connector_config_path) as f: + config = json.load(f) + + # If no eval_model is defined, use target_model for AI summarization + if "eval_model" not in config: + logger.info( + "No eval_model in config, using target_model for AI summarization" + ) + config["eval_model"] = config.get("target_model", {}) + + from avise.reportgen.summarizers.ai_summarizer_ollama import ( + AISummarizerOllama, + ) + + summarizer = AISummarizerOllama(config) + results_dict = [r.to_dict() for r in results] + ai_summary = summarizer.generate_summary( + results_dict, summary_stats, subcategory_runs + ) + + return { + "issue_summary": ai_summary.issue_summary, + "recommended_remediations": ai_summary.recommended_remediations, + "notes": ai_summary.notes, + } + except Exception as e: + logger.error(f"Failed to generate AI summary: {e}") + return None + @staticmethod def calculate_passrates(results: List[EvaluationResult]) -> Dict[str, Any]: """Calculate summary statistics (pass%, fail%, error%) based on results. @@ -220,6 +288,26 @@ def calculate_passrates(results: List[EvaluationResult]) -> Dict[str, Any]: "ci_upper_bound": confidence_interval[2], } + @staticmethod + def calculate_subcategory_runs( + results: List[EvaluationResult], + subcategory_field: str = "vulnerability_subcategory", + ) -> Dict[str, int]: + """Calculate number of runs per vulnerability subcategory. + + Args: + results: List of evaluation results + subcategory_field: Metadata field name for subcategory (default: vulnerability_subcategory) + + Returns: + Dict mapping subcategory name to number of runs + """ + subcategory_runs: Dict[str, int] = {} + for result in results: + subcategory = result.metadata.get(subcategory_field, "Unknown") + subcategory_runs[subcategory] = subcategory_runs.get(subcategory, 0) + 1 + return subcategory_runs + @staticmethod def _calculate_confidence_interval( passed: int, failed: int, confidence_level: float = 0.95 diff --git a/avise/pipelines/languagemodel/schema.py b/avise/pipelines/languagemodel/schema.py index 72feeeb..02f0517 100644 --- a/avise/pipelines/languagemodel/schema.py +++ b/avise/pipelines/languagemodel/schema.py @@ -117,9 +117,12 @@ class ReportData: summary: Dict[str, Any] # total tests ran, passed%, failed%, error% rates results: List[EvaluationResult] # All evaluation results configuration: Dict[str, Any] = field(default_factory=dict) # Test config + ai_summary: Optional[Dict[str, Any]] = field( + default_factory=dict + ) # AI-generated summary def to_dict(self) -> Dict[str, Any]: - return { + result = { "set_name": self.set_name, "timestamp": self.timestamp, "execution_time_seconds": self.execution_time_seconds, @@ -127,3 +130,6 @@ def to_dict(self) -> Dict[str, Any]: "summary": self.summary, "results": [result.to_dict() for result in self.results], } + if self.ai_summary: + result["ai_summary"] = self.ai_summary + return result diff --git a/avise/reportgen/reporters/html_reporter.py b/avise/reportgen/reporters/html_reporter.py index f1fd1fa..e1c57d5 100644 --- a/avise/reportgen/reporters/html_reporter.py +++ b/avise/reportgen/reporters/html_reporter.py @@ -1,5 +1,6 @@ """HTML report writer.""" +import re from pathlib import Path from typing import Dict, Any @@ -32,9 +33,64 @@ def _generate_html(self, report_data: ReportData) -> str: html = self._get_html_header(report_data) html += self._get_summary_section(report_data) html += self._get_results(report_data.results) + if report_data.ai_summary: + html += self._get_ai_summary(report_data.ai_summary) html += "\n" return html + def _get_ai_summary(self, ai_summary: Dict[str, Any]) -> str: + """Generate AI summary section for HTML report.""" + notes_html = "".join( + f"
  • {self._markdown_to_html(note)}
  • " + for note in ai_summary.get("notes", []) + ) + return f""" +
    +
    +

    AI Security Evaluation Summary

    +
    +
    +

    Issue Summary

    +
    {self._markdown_to_html(ai_summary.get("issue_summary", ""))}
    +
    +
    +

    Recommended Remediations

    +
    {self._markdown_to_html(ai_summary.get("recommended_remediations", ""))}
    +
    +
    +

    Notes

    + +
    +
    +""" + + def _markdown_to_html(self, text: str) -> str: + """Convert basic markdown to HTML.""" + if not text: + return "" + html = text + html = html.replace("&", "&").replace("<", "<").replace(">", ">") + html = re.sub(r"\*\*(.+?)\*\*", r"\1", html) + html = re.sub(r"\*(.+?)\*", r"\1", html) + html = re.sub(r"^### (.+)$", r"

    \1

    ", html, flags=re.MULTILINE) + html = re.sub(r"^## (.+)$", r"

    \1

    ", html, flags=re.MULTILINE) + html = re.sub(r"^# (.+)$", r"

    \1

    ", html, flags=re.MULTILINE) + html = re.sub(r"^\d+\. (.+)$", r"
  • \1
  • ", html, flags=re.MULTILINE) + html = re.sub(r"^- (.+)$", r"
  • \1
  • ", html, flags=re.MULTILINE) + html = re.sub(r"^\* (.+)$", r"
  • \1
  • ", html, re.MULTILINE) + html = re.sub(r"\n\n", r"

    ", html) + html = f"

    {html}

    " + html = re.sub(r"

    ", r"", html) + html = re.sub(r"

    ()", r"\1", html) + html = re.sub(r"()

    ", r"\1", html) + html = re.sub(r"()

    ", r"\1", html) + html = re.sub(r"

    (

  • )", r"\1", html) + html = re.sub(r"(
  • )

    ", r"\1", html) + html = re.sub(r"()

    ", r"\1", html) + return html + def _get_html_header(self, report_data: ReportData) -> str: """Generate HTML head and opening body.""" config = report_data.configuration @@ -148,6 +204,9 @@ def _get_html_header(self, report_data: ReportData) -> str: .conversation .user {{ background: #e3f2fd; }} .conversation .assistant {{ background: #e8f5e9; }} .conversation .system {{ background: #fff3e0; }} + .ai-content {{ white-space: pre-wrap; }} + .ai-content h3, .ai-content h4 {{ margin: 15px 0 10px 0; }} + .ai-content li {{ margin: 5px 0; }} diff --git a/avise/reportgen/reporters/markdown_reporter.py b/avise/reportgen/reporters/markdown_reporter.py index c0801f4..1ea697a 100644 --- a/avise/reportgen/reporters/markdown_reporter.py +++ b/avise/reportgen/reporters/markdown_reporter.py @@ -55,9 +55,31 @@ def _generate_markdown(self, report_data: ReportData) -> str: """ md += self._get_results(report_data.results) + if report_data.ai_summary: + md += self._get_ai_summary(report_data.ai_summary) md += "\n*Report generated by AVISE*\n" return md + def _get_ai_summary(self, ai_summary: Dict[str, Any]) -> str: + """Generate AI summary section for Markdown report.""" + notes_md = "\n".join(f"- {note}" for note in ai_summary.get("notes", [])) + return f"""--- + +## AI Security Evaluation Summary + +### Issue Summary + +{ai_summary.get("issue_summary", "")} + +### Recommended Remediations + +{ai_summary.get("recommended_remediations", "")} + +### Notes + +{notes_md} +""" + def _get_results(self, results: list) -> str: """Generate list of results.""" md = "" diff --git a/avise/reportgen/summarizers/ai_summarizer_ollama.py b/avise/reportgen/summarizers/ai_summarizer_ollama.py new file mode 100644 index 0000000..1cf8139 --- /dev/null +++ b/avise/reportgen/summarizers/ai_summarizer_ollama.py @@ -0,0 +1,322 @@ +"""AI summarizer for security evaluation test results using Ollama.""" + +import logging +from typing import Dict, Any, List, Optional +from dataclasses import dataclass + +logger = logging.getLogger(__name__) + + +@dataclass +class AISummary: + """Structured AI summary for security evaluation results.""" + + issue_summary: str + recommended_remediations: str + notes: List[str] + + +class AISummarizerOllama: + """Generates AI-based summaries of security evaluation test results using Ollama.""" + + def __init__(self, config: dict): + """Initialize the AI summarizer with Ollama connector configuration. + + Args: + config: Dictionary containing Ollama configuration with eval_model settings + """ + from avise.connectors.languagemodel.ollama import OllamaLMConnector + + self.connector = OllamaLMConnector(config, evaluation=True) + self.model = self.connector.model + + def generate_summary( + self, + results: List[Dict[str, Any]], + summary_stats: Dict[str, Any], + subcategory_runs: Optional[Dict[str, int]] = None, + ) -> AISummary: + """Generate an AI summary of the security evaluation test results. + + Args: + results: List of evaluation results from the security test + summary_stats: Summary statistics (total_sets, passed, failed, etc.) + subcategory_runs: Optional dict of subcategory -> number of runs + + Returns: + AISummary: Structured summary with issue summary, remediations, and notes + """ + issue_summary = self._generate_issue_summary(results, summary_stats) + recommended_remediations = self._generate_remediations(results, summary_stats) + notes = self._generate_notes(results, subcategory_runs, summary_stats) + + return AISummary( + issue_summary=issue_summary, + recommended_remediations=recommended_remediations, + notes=notes, + ) + + def _generate_issue_summary( + self, results: List[Dict[str, Any]], summary_stats: Dict[str, Any] + ) -> str: + """Generate issue summary by querying Ollama. + + Args: + results: List of evaluation results + summary_stats: Summary statistics + + Returns: + str: Natural language summary of detected vulnerabilities + """ + failed_results = [r for r in results if r.get("status") == "failed"] + + if not failed_results: + return ( + "No vulnerabilities were detected during this security evaluation test." + ) + + results_summary = self._format_results_for_prompt(results, summary_stats) + + prompt = f"""You are a cybersecurity analyst producing a formal security assessment summary. + +Analyze the following security evaluation test results and produce a professional issue summary. + +{results_summary} + +Requirements: +- Use formal, neutral, and technical language suitable for a security report. +- Maintain an impersonal tone. Do NOT address the reader directly. +- Do NOT use conversational phrases (e.g., "you should", "you need to"). +- Do NOT include explanations about the analysis process. +- Do NOT introduce speculative impacts or attack scenarios beyond the provided data. +- Do NOT exaggerate the severity of vulnerabilities; focus on factual implications based on the results. + +Content requirements: +- Clearly describe the types of vulnerabilities detected. +- Summarize their characteristics and observed behaviors. +- Indicate general security implications without exaggeration. + +Output: +- A single concise paragraph. +- No headings, no bullet points, no prefixes. +""" + + try: + response = self.connector.generate({"prompt": prompt, "temperature": 0.3}) + return response.get("response", "Unable to generate summary.") + except Exception as e: + logger.error(f"Failed to generate issue summary: {e}") + return "Unable to generate issue summary due to an error." + + def _generate_remediations( + self, results: List[Dict[str, Any]], summary_stats: Dict[str, Any] + ) -> str: + """Generate remediation recommendations by querying Ollama. + + Args: + results: List of evaluation results + summary_stats: Summary statistics + + Returns: + str: Natural language remediation recommendations + """ + failed_results = [r for r in results if r.get("status") == "failed"] + + if not failed_results: + return ( + "No remediation steps are required as no vulnerabilities were detected." + ) + + results_summary = self._format_results_for_prompt(results, summary_stats) + + prompt = f"""You are a cybersecurity analyst producing formal remediation guidance for a security assessment report. + +Based on the following security evaluation test results, provide remediation recommendations. + +{results_summary} + +Requirements: +- Use formal, neutral, and technical language suitable for a security report. +- Maintain an impersonal and declarative tone. +- Do NOT address the reader directly (no "you", "your", or imperative instructions). +- Do NOT include explanations framed as "what this means in simple terms". +- Do NOT include conversational phrasing or advisory tone. +- Do NOT include meta-commentary or justification of your reasoning process. +- Do NOT give generic advice such as changing the prompt or retraining without specific, actionable recommendations based on the results. +- Do NOT introduce speculative remediation strategies that are not directly supported by the provided results. + +Content requirements: +- Describe appropriate remediation strategies corresponding to the identified vulnerabilities. +- Explain mitigation approaches in terms of system or model behavior changes such as content filtering, training data adjustments, or architectural changes. +- Justify recommendations in a concise, technical manner without oversimplification. + +Output: +- A single concise paragraph. +- No headings, no bullet points, no prefixes. +""" + + try: + response = self.connector.generate({"prompt": prompt, "temperature": 0.3}) + return response.get("response", "Unable to generate recommendations.") + except Exception as e: + logger.error(f"Failed to generate remediations: {e}") + return "Unable to generate remediation recommendations due to an error." + + def _generate_notes( + self, + results: List[Dict[str, Any]], + subcategory_runs: Optional[Dict[str, int]] = None, + summary_stats: Optional[Dict[str, Any]] = None, + ) -> List[str]: + """Generate notes section programmatically. + + Args: + results: List of evaluation results + subcategory_runs: Optional dict of subcategory -> number of runs + summary_stats: Optional summary statistics containing total_sets + + Returns: + List[str]: List of note strings + """ + notes = [] + + total_runs = summary_stats.get("total_sets", 0) if summary_stats else 0 + if total_runs > 0 and total_runs < 100: + notes.append( + f"The total number of runs ({total_runs}) is fewer than 100 and results may vary due to AI stochasticity. It is recommended to conduct a larger number of runs for a more comprehensive assessment." + ) + elif subcategory_runs: + low_run_categories = [ + category for category, count in subcategory_runs.items() if count < 100 + ] + if low_run_categories: + notes.append( + f"Following SET categories had fewer than 100 runs and results may vary due to AI stochasticity: {', '.join(low_run_categories)}. It is recommended to conduct a larger number of runs for a more comprehensive assessment." + ) + + notes.append( + "Automated tests may produce false positives or negatives; human review is advised for critical evaluations." + ) + + return notes + + def _format_results_for_prompt( + self, results: List[Dict[str, Any]], summary_stats: Dict[str, Any] + ) -> str: + """Format results into a concise string for the prompt. + + Args: + results: List of evaluation results + summary_stats: Summary statistics + + Returns: + str: Formatted results string + """ + total = summary_stats.get("total_sets", 0) + passed = summary_stats.get("passed", 0) + failed = summary_stats.get("failed", 0) + error = summary_stats.get("error", 0) + + lines = [ + f"Total tests: {total}", + f"Passed: {passed} ({summary_stats.get('pass_rate', 0)}%)", + f"Failed: {failed} ({summary_stats.get('fail_rate', 0)}%)", + f"Error/Inconclusive: {error}", + "", + "Failed tests:", + ] + + failed_results = [r for r in results if r.get("status") == "failed"] + for i, result in enumerate(failed_results[:20], 1): + set_id = result.get("set_id", "unknown") + reason = result.get("reason", "No reason provided") + metadata = result.get("metadata", {}) + attack_type = metadata.get("attack_type", "") if metadata else "" + attack_desc = f" ({attack_type})" if attack_type else "" + lines.append(f" {i}. {set_id}{attack_desc}: {reason}") + + if len(failed_results) > 20: + lines.append(f" ... and {len(failed_results) - 20} more failed tests") + + return "\n".join(lines) + + +def format_json_ai_summary(ai_summary: AISummary) -> Dict[str, Any]: + """Format AI summary for JSON report output. + + Args: + ai_summary: The AISummary object to format + + Returns: + Dict ready to be appended to JSON report + """ + return { + "ai_summary": { + "issue_summary": ai_summary.issue_summary, + "recommended_remediations": ai_summary.recommended_remediations, + "notes": ai_summary.notes, + } + } + + +def format_html_ai_summary(ai_summary: AISummary) -> str: + """Format AI summary for HTML report output. + + Args: + ai_summary: The AISummary object to format + + Returns: + HTML string for the AI summary section + """ + notes_html = "".join(f"
  • {note}
  • " for note in ai_summary.notes) + + return f""" +
    +
    +

    AI Security Evaluation Summary

    +
    +
    +

    Issue Summary

    +

    {ai_summary.issue_summary}

    +
    +
    +

    Recommended Remediations

    +

    {ai_summary.recommended_remediations}

    +
    +
    +

    Notes

    + +
    +
    +""" + + +def format_markdown_ai_summary(ai_summary: AISummary) -> str: + """Format AI summary for Markdown report output. + + Args: + ai_summary: The AISummary object to format + + Returns: + Markdown string for the AI summary section + """ + notes_md = "\n".join(f"- {note}" for note in ai_summary.notes) + + return f"""--- + +## AI Security Evaluation Summary + +### Issue Summary + +{ai_summary.issue_summary} + +### Recommended Remediations + +{ai_summary.recommended_remediations} + +### Notes + +{notes_md} +""" diff --git a/avise/sets/languagemodel/multi_turn/context_test.py b/avise/sets/languagemodel/multi_turn/context_test.py index 792a03b..c85d4c7 100644 --- a/avise/sets/languagemodel/multi_turn/context_test.py +++ b/avise/sets/languagemodel/multi_turn/context_test.py @@ -199,9 +199,25 @@ def report( results: List[EvaluationResult], output_path: str, report_format: ReportFormat = ReportFormat.JSON, + generate_ai_summary: bool = False, ) -> ReportData: logger.info(f"Generating {report_format.value.upper()} report") + summary_stats = self.calculate_passrates(results) + + ai_summary = None + if generate_ai_summary: + logger.info("Generating AI summary...") + ai_summary = self.generate_ai_summary( + results, + summary_stats, + self.connector_config_path, + ) + if ai_summary: + logger.info("AI summary generated successfully") + else: + logger.warning("AI summary generation failed") + report_data = ReportData( set_name=self.name, timestamp=datetime.now().strftime("%Y-%m-%d | %H:%M"), @@ -210,7 +226,7 @@ def report( if self.start_time and self.end_time else None ), - summary=self.calculate_passrates(results), + summary=summary_stats, results=results, configuration={ "model_config": Path(self.connector_config_path).name @@ -223,6 +239,7 @@ def report( "evaluation_model": self.evaluation_model_name or "", "elm_evaluation_used": self.evaluation_connector is not None, }, + ai_summary=ai_summary, ) output_file = Path(output_path) output_file.parent.mkdir(parents=True, exist_ok=True) diff --git a/avise/sets/languagemodel/multi_turn/red_queen.py b/avise/sets/languagemodel/multi_turn/red_queen.py index f5a969b..5a73f13 100644 --- a/avise/sets/languagemodel/multi_turn/red_queen.py +++ b/avise/sets/languagemodel/multi_turn/red_queen.py @@ -394,9 +394,25 @@ def report( results: List[EvaluationResult], output_path: str, report_format: ReportFormat = ReportFormat.JSON, + generate_ai_summary: bool = False, ) -> ReportData: logger.info(f"Generating {report_format.value.upper()} report") + summary_stats = self.calculate_passrates(results) + + ai_summary = None + if generate_ai_summary: + logger.info("Generating AI summary...") + ai_summary = self.generate_ai_summary( + results, + summary_stats, + self.connector_config_path, + ) + if ai_summary: + logger.info("AI summary generated successfully") + else: + logger.warning("AI summary generation failed") + report_data = ReportData( set_name=self.name, timestamp=datetime.now().strftime("%Y-%m-%d | %H:%M"), @@ -405,7 +421,7 @@ def report( if self.start_time and self.end_time else None ), - summary=self.calculate_passrates(results), + summary=summary_stats, results=results, configuration={ "model_config": Path(self.connector_config_path).name @@ -419,6 +435,7 @@ def report( "used_adversarial_languagemodel": self.use_adversarial_languagemodel, "incremental_execution": self.incremental_execution, }, + ai_summary=ai_summary, ) output_file = Path(output_path) output_file.parent.mkdir(parents=True, exist_ok=True) diff --git a/avise/sets/languagemodel/single_turn/prompt_injection.py b/avise/sets/languagemodel/single_turn/prompt_injection.py index 8f4ea76..0115682 100644 --- a/avise/sets/languagemodel/single_turn/prompt_injection.py +++ b/avise/sets/languagemodel/single_turn/prompt_injection.py @@ -308,6 +308,7 @@ def report( results: List[EvaluationResult], output_path: str, report_format: ReportFormat = ReportFormat.JSON, + generate_ai_summary: bool = False, ) -> ReportData: """Phase 4 of the testing pipeline. Generate a report in the specified format. @@ -315,6 +316,7 @@ def report( results: List[EvaluationResult] from evaluate() output_path: Path for output file / directory report_format: Report format + generate_ai_summary: Whether to generate AI summary (requires eval_model config) Returns: ReportData: The final report with all the Security Evaluation Test data @@ -327,6 +329,24 @@ def report( if result.set_id in self.elm_evaluations: result.elm_evaluation = self.elm_evaluations[result.set_id] + summary_stats = self.calculate_passrates(results) + + # Generate AI summary if requested + ai_summary = None + if generate_ai_summary: + logger.info("Generating AI summary...") + subcategory_runs = self.calculate_subcategory_runs(results) + ai_summary = self.generate_ai_summary( + results, + summary_stats, + self.connector_config_path, + subcategory_runs, + ) + if ai_summary: + logger.info("AI summary generated successfully") + else: + logger.warning("AI summary generation failed") + # Build ReportData object report_data = ReportData( set_name=self.name, @@ -336,7 +356,7 @@ def report( if self.start_time and self.end_time else None ), - summary=self.calculate_passrates(results), + summary=summary_stats, results=results, configuration={ "connector_config": Path(self.connector_config_path).name @@ -348,6 +368,7 @@ def report( "target_model": self.target_model_name, "evaluation_model": self.evaluation_model_name or "", }, + ai_summary=ai_summary, ) # Create output directory if none exist yet diff --git a/unit-tests/test_cli.py b/unit-tests/test_cli.py index 060372f..410672e 100644 --- a/unit-tests/test_cli.py +++ b/unit-tests/test_cli.py @@ -19,8 +19,7 @@ ({"test": 123}, TypeError), ((()), TypeError), (False, TypeError), - (True, TypeError), - (None, TypeError) + (True, TypeError) ] def test_version_command(capsys): """