|
| 1 | +import os |
| 2 | +import time |
| 3 | +from typing import Any, List |
| 4 | +import logging |
| 5 | + |
| 6 | +from dotenv import load_dotenv |
| 7 | +from falkordb import FalkorDB, exceptions |
| 8 | + |
| 9 | +logger = logging.getLogger(__name__) |
| 10 | + |
| 11 | +load_dotenv() |
| 12 | + |
| 13 | + |
| 14 | +class FalkorDBManager: |
| 15 | + entity_id: str |
| 16 | + repo_id: str |
| 17 | + db: FalkorDB |
| 18 | + |
| 19 | + def __init__( |
| 20 | + self, |
| 21 | + repo_id: str = None, |
| 22 | + entity_id: str = None, |
| 23 | + uri: str = None, |
| 24 | + user: str = None, |
| 25 | + password: str = None, |
| 26 | + ): |
| 27 | + host = uri or os.getenv("FALKORDB_URI", "localhost") |
| 28 | + port = int(os.getenv("FALKORDB_PORT", 6379)) |
| 29 | + user = user or os.getenv("FALKORDB_USERNAME") |
| 30 | + password = password or os.getenv("FALKORDB_PASSWORD") |
| 31 | + |
| 32 | + self.db = FalkorDB(host=host, port=port, username=user, password=password) |
| 33 | + |
| 34 | + self.repo_id = repo_id if repo_id is not None else "default_repo" |
| 35 | + self.entity_id = entity_id if entity_id is not None else "default_user" |
| 36 | + |
| 37 | + def close(self): |
| 38 | + # Close the connection to the database |
| 39 | + self.db.close() |
| 40 | + |
| 41 | + def save_graph(self, nodes: List[Any], edges: List[Any]): |
| 42 | + self.create_nodes(nodes) |
| 43 | + self.create_edges(edges) |
| 44 | + |
| 45 | + def create_nodes(self, nodeList: List[Any]): |
| 46 | + # Function to create nodes in the FalkorDB database |
| 47 | + graph = self.db.select_graph(self.repo_id) |
| 48 | + for node in nodeList: |
| 49 | + labels = ":".join(node.get("extra_labels", []) + [node["type"], "NODE"]) |
| 50 | + attributes = node.get("attributes", {}) |
| 51 | + attributes.update({"repoId": self.repo_id, "entityId": self.entity_id}) |
| 52 | + # Construct parameterized query |
| 53 | + cypher_query = f"CREATE (n:{labels} $props)" |
| 54 | + graph.query(cypher_query, params={"props": attributes}) |
| 55 | + |
| 56 | + def create_edges(self, edgesList: List[Any]): |
| 57 | + # Function to create edges between nodes in the FalkorDB database |
| 58 | + graph = self.db.select_graph(self.repo_id) |
| 59 | + for edge in edgesList: |
| 60 | + # Construct parameterized query |
| 61 | + cypher_query = ( |
| 62 | + "MATCH (a:NODE {node_id: $sourceId}), " |
| 63 | + "(b:NODE {node_id: $targetId}) " |
| 64 | + "CREATE (a)-[r:$type {scopeText: $scopeText}]->(b)" |
| 65 | + ) |
| 66 | + graph.query( |
| 67 | + cypher_query, |
| 68 | + params={ |
| 69 | + "sourceId": edge["sourceId"], |
| 70 | + "targetId": edge["targetId"], |
| 71 | + "type": edge["type"], |
| 72 | + "scopeText": edge["scopeText"], |
| 73 | + }, |
| 74 | + ) |
| 75 | + |
| 76 | + def detach_delete_nodes_with_path(self, path: str): |
| 77 | + graph = self.db.select_graph(self.repo_id) |
| 78 | + # Construct parameterized query |
| 79 | + cypher_query = "MATCH (n {path: $path}) DETACH DELETE n" |
| 80 | + result = graph.query(cypher_query, params={"path": path}) |
| 81 | + return result.result_set |
0 commit comments