Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions avise/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ def main(arguments=None) -> None:
parser.add_argument(
"--verbose", "-v", action="store_true", help="Enable verbose logging"
)
parser.add_argument(
"--ai-summary",
action="store_true",
help="Generate AI-powered summary of results using Ollama",
)
parser.add_argument("--version", "-V", action="version", version=__version__)
args = parser.parse_args(arguments)

Expand Down Expand Up @@ -182,6 +187,7 @@ def main(arguments=None) -> None:
output_path=args.output,
report_format=report_format,
reports_dir=args.reports_dir,
generate_ai_summary=args.ai_summary,
)

# Print a small summary to the console
Expand Down
2 changes: 1 addition & 1 deletion avise/configs/connector/languagemodel/ollama.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"target_model": {
"connector": "ollama-lm",
"type": "language_model",
"name": "phi3:latest",
"name": "phi4-mini:latest",
"api_url": "http://localhost:11434",
"api_key": null,
"max_tokens": 768
Expand Down
11 changes: 10 additions & 1 deletion avise/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def run_test(
output_path: Optional[str] = None,
report_format: ReportFormat = ReportFormat.JSON,
reports_dir: str = DEFAULT_REPORTS_DIR,
generate_ai_summary: bool = False,
) -> dict:
"""Run the 4-phase pipeline

Expand All @@ -105,6 +106,7 @@ def run_test(
output_path: Optional custom output path (overrides date-based)
report_format: Report format (JSON, HTML, or MARKDOWN)
reports_dir: Base directory for reports
generate_ai_summary: Whether to generate AI-powered summary

Returns:
Report dictionary
Expand Down Expand Up @@ -147,7 +149,14 @@ def run_test(
report_format=report_format,
)

return set_instance.run(connector, set_config_path, output_path, report_format)
return set_instance.run(
connector,
set_config_path,
output_path,
report_format,
connector_config_path=connector_config_path,
generate_ai_summary=generate_ai_summary,
)

def _build_connector(self, connector_config: dict, evaluation: bool = False) -> Any:
"""Helper fundtion to handle building a connector.
Expand Down
14 changes: 10 additions & 4 deletions avise/models/evaluation_lm.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(
self.model_name = model_name
self.model_path = Path("avise/models/" + model_name)
try:
self.tokenizer = MistralCommonBackend.from_pretrained(self.model_path)
self.tokenizer = MistralCommonBackend.from_pretrained(str(self.model_path))
self.model = Mistral3ForConditionalGeneration.from_pretrained(
self.model_path, device_map=use_device
)
Expand All @@ -67,7 +67,9 @@ def __init__(
)
self._model_download(self.model_path, model_name)
try:
self.tokenizer = MistralCommonBackend.from_pretrained(self.model_path)
self.tokenizer = MistralCommonBackend.from_pretrained(
str(self.model_path)
)
self.model = Mistral3ForConditionalGeneration.from_pretrained(
self.model_path, device_map=use_device
)
Expand Down Expand Up @@ -143,7 +145,7 @@ def generate(self, prompt) -> list:
else:
messages = [self.system_prompt, {"role": "user", "content": prompt}]

response = self._mistral_text_generation(messages)
response = self._mistral_text_generation(messages)

# Update history
if self.conversation_history:
Expand All @@ -168,7 +170,11 @@ def _mistral_text_generation(self, messages: list) -> str:
messages, return_tensors="pt", return_dict=True
)

tokenized["input_ids"] = tokenized["input_ids"].to(device=self.device)
# Move all tensors to the correct device
tokenized = {
k: v.to(device=self.device) if hasattr(v, "to") else v
for k, v in tokenized.items()
}
# tokenized["pixel_values"] = tokenized["pixel_values"].to(dtype=bfloat16, device=self.device)
# image_sizes = [tokenized["pixel_values"].shape[-2:]]

Expand Down
90 changes: 89 additions & 1 deletion avise/pipelines/languagemodel/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

"""

import logging
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Dict, Any, Optional
Expand All @@ -17,6 +18,8 @@

from scipy.special import erfinv

logger = logging.getLogger(__name__)


class ReportFormat(Enum):
"""Available file formats."""
Expand Down Expand Up @@ -120,13 +123,15 @@ def report(
results: List[EvaluationResult],
output_path: str,
report_format: ReportFormat = ReportFormat.JSON,
generate_ai_summary: bool = False,
) -> ReportData:
"""Generate the final report in the desired format and save it to target location.

Args:
results: List[EvaluationResult] from evaluate()
output_path: Path for output file (../user/reports/..)
report_format: Report format (Json, Toml, Yaml...) Set to JSON as default.
generate_ai_summary: Whether to generate AI summary (optional)

Returns:
ReportData: The final report with all the SET data
Expand All @@ -143,6 +148,7 @@ def run(
output_path: str,
report_format: ReportFormat = ReportFormat.JSON,
connector_config_path: Optional[str] = None,
generate_ai_summary: bool = False,
) -> ReportData:
"""Orchestration method that executes the 4-phase pipeline.
This method gets called by the execution engine.
Expand All @@ -153,6 +159,7 @@ def run(
output_path: Path where the output report is written
report_format: Desired output format
connector_config_path: Path to model configuration (for report metadata)
generate_ai_summary: Whether to generate AI summary

Requirements:
Return the final report
Expand All @@ -173,10 +180,71 @@ def run(
results = self.evaluate(execution_data)

# Report
report_data = self.report(results, output_path, report_format)
report_data = self.report(
results, output_path, report_format, generate_ai_summary
)

return report_data

def generate_ai_summary(
self,
results: List[EvaluationResult],
summary_stats: Dict[str, Any],
connector_config_path: Optional[str] = None,
subcategory_runs: Optional[Dict[str, int]] = None,
) -> Optional[Dict[str, Any]]:
"""Generate an AI summary of the security evaluation test results.

This is an optional helper method that can be called in the report phase
to generate an AI-powered summary of the test results.

Args:
results: List of EvaluationResult from evaluate()
summary_stats: Summary statistics from calculate_passrates()
connector_config_path: Path to connector config for AI summarizer
subcategory_runs: Optional dict of subcategory -> number of runs

Returns:
Dict with ai_summary or None if generation fails
"""
import json

if not connector_config_path:
logger.warning(
"No connector config path provided for AI summary generation"
)
return None

try:
with open(connector_config_path) as f:
config = json.load(f)

# If no eval_model is defined, use target_model for AI summarization
if "eval_model" not in config:
logger.info(
"No eval_model in config, using target_model for AI summarization"
)
config["eval_model"] = config.get("target_model", {})

from avise.reportgen.summarizers.ai_summarizer_ollama import (
AISummarizerOllama,
)

summarizer = AISummarizerOllama(config)
results_dict = [r.to_dict() for r in results]
ai_summary = summarizer.generate_summary(
results_dict, summary_stats, subcategory_runs
)

return {
"issue_summary": ai_summary.issue_summary,
"recommended_remediations": ai_summary.recommended_remediations,
"notes": ai_summary.notes,
}
except Exception as e:
logger.error(f"Failed to generate AI summary: {e}")
return None

@staticmethod
def calculate_passrates(results: List[EvaluationResult]) -> Dict[str, Any]:
"""Calculate summary statistics (pass%, fail%, error%) based on results.
Expand Down Expand Up @@ -220,6 +288,26 @@ def calculate_passrates(results: List[EvaluationResult]) -> Dict[str, Any]:
"ci_upper_bound": confidence_interval[2],
}

@staticmethod
def calculate_subcategory_runs(
results: List[EvaluationResult],
subcategory_field: str = "vulnerability_subcategory",
) -> Dict[str, int]:
"""Calculate number of runs per vulnerability subcategory.

Args:
results: List of evaluation results
subcategory_field: Metadata field name for subcategory (default: vulnerability_subcategory)

Returns:
Dict mapping subcategory name to number of runs
"""
subcategory_runs: Dict[str, int] = {}
for result in results:
subcategory = result.metadata.get(subcategory_field, "Unknown")
subcategory_runs[subcategory] = subcategory_runs.get(subcategory, 0) + 1
return subcategory_runs

@staticmethod
def _calculate_confidence_interval(
passed: int, failed: int, confidence_level: float = 0.95
Expand Down
8 changes: 7 additions & 1 deletion avise/pipelines/languagemodel/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,19 @@ class ReportData:
summary: Dict[str, Any] # total tests ran, passed%, failed%, error% rates
results: List[EvaluationResult] # All evaluation results
configuration: Dict[str, Any] = field(default_factory=dict) # Test config
ai_summary: Optional[Dict[str, Any]] = field(
default_factory=dict
) # AI-generated summary

def to_dict(self) -> Dict[str, Any]:
return {
result = {
"set_name": self.set_name,
"timestamp": self.timestamp,
"execution_time_seconds": self.execution_time_seconds,
"configuration": self.configuration,
"summary": self.summary,
"results": [result.to_dict() for result in self.results],
}
if self.ai_summary:
result["ai_summary"] = self.ai_summary
return result
59 changes: 59 additions & 0 deletions avise/reportgen/reporters/html_reporter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""HTML report writer."""

import re
from pathlib import Path
from typing import Dict, Any

Expand Down Expand Up @@ -32,9 +33,64 @@ def _generate_html(self, report_data: ReportData) -> str:
html = self._get_html_header(report_data)
html += self._get_summary_section(report_data)
html += self._get_results(report_data.results)
if report_data.ai_summary:
html += self._get_ai_summary(report_data.ai_summary)
html += "</body>\n</html>"
return html

def _get_ai_summary(self, ai_summary: Dict[str, Any]) -> str:
"""Generate AI summary section for HTML report."""
notes_html = "".join(
f"<li>{self._markdown_to_html(note)}</li>"
for note in ai_summary.get("notes", [])
)
return f"""
<div class="category">
<div class="category-header">
<h2>AI Security Evaluation Summary</h2>
</div>
<div class="set-item">
<h3>Issue Summary</h3>
<div class="ai-content">{self._markdown_to_html(ai_summary.get("issue_summary", ""))}</div>
</div>
<div class="set-item">
<h3>Recommended Remediations</h3>
<div class="ai-content">{self._markdown_to_html(ai_summary.get("recommended_remediations", ""))}</div>
</div>
<div class="set-item">
<h3>Notes</h3>
<ul>
{notes_html}
</ul>
</div>
</div>
"""

def _markdown_to_html(self, text: str) -> str:
"""Convert basic markdown to HTML."""
if not text:
return ""
html = text
html = html.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
html = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", html)
html = re.sub(r"\*(.+?)\*", r"<em>\1</em>", html)
html = re.sub(r"^### (.+)$", r"<h4>\1</h4>", html, flags=re.MULTILINE)
html = re.sub(r"^## (.+)$", r"<h3>\1</h3>", html, flags=re.MULTILINE)
html = re.sub(r"^# (.+)$", r"<h2>\1</h2>", html, flags=re.MULTILINE)
html = re.sub(r"^\d+\. (.+)$", r"<li>\1</li>", html, flags=re.MULTILINE)
html = re.sub(r"^- (.+)$", r"<li>\1</li>", html, flags=re.MULTILINE)
html = re.sub(r"^\* (.+)$", r"<li>\1</li>", html, re.MULTILINE)
html = re.sub(r"\n\n", r"</p><p>", html)
html = f"<p>{html}</p>"
html = re.sub(r"<p></p>", r"", html)
html = re.sub(r"<p>(<h[234]>)", r"\1", html)
html = re.sub(r"(</h[234]>)<p>", r"\1", html)
html = re.sub(r"(</h[234]>)</p>", r"\1", html)
html = re.sub(r"<p>(<li>)", r"\1", html)
html = re.sub(r"(</li>)<p>", r"\1", html)
html = re.sub(r"(</li>)</p>", r"\1", html)
return html

def _get_html_header(self, report_data: ReportData) -> str:
"""Generate HTML head and opening body."""
config = report_data.configuration
Expand Down Expand Up @@ -148,6 +204,9 @@ def _get_html_header(self, report_data: ReportData) -> str:
.conversation .user {{ background: #e3f2fd; }}
.conversation .assistant {{ background: #e8f5e9; }}
.conversation .system {{ background: #fff3e0; }}
.ai-content {{ white-space: pre-wrap; }}
.ai-content h3, .ai-content h4 {{ margin: 15px 0 10px 0; }}
.ai-content li {{ margin: 5px 0; }}
</style>
</head>
<body>
Expand Down
22 changes: 22 additions & 0 deletions avise/reportgen/reporters/markdown_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,31 @@ def _generate_markdown(self, report_data: ReportData) -> str:

"""
md += self._get_results(report_data.results)
if report_data.ai_summary:
md += self._get_ai_summary(report_data.ai_summary)
md += "\n*Report generated by AVISE*\n"
return md

def _get_ai_summary(self, ai_summary: Dict[str, Any]) -> str:
"""Generate AI summary section for Markdown report."""
notes_md = "\n".join(f"- {note}" for note in ai_summary.get("notes", []))
return f"""---

## AI Security Evaluation Summary

### Issue Summary

{ai_summary.get("issue_summary", "")}

### Recommended Remediations

{ai_summary.get("recommended_remediations", "")}

### Notes

{notes_md}
"""

def _get_results(self, results: list) -> str:
"""Generate list of results."""
md = ""
Expand Down
Loading
Loading