diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..49efe07 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,44 @@ +name: LocalStack TypeDB Extension Tests + +on: + pull_request: + workflow_dispatch: + +env: + LOCALSTACK_DISABLE_EVENTS: "1" + LOCALSTACK_AUTH_TOKEN: ${{ secrets.TEST_LOCALSTACK_AUTH_TOKEN }} + +jobs: + integration-tests: + name: Run Integration Tests + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup LocalStack and extension + run: | + cd localstack-typedb + + docker pull localstack/localstack-pro & + docker pull typedb/typedb & + pip install localstack + + make install + make dist + localstack extensions -v install file://$(ls ./dist/localstack_typedb-*.tar.gz) + + DEBUG=1 localstack start -d + localstack wait + + - name: Run integration tests + run: | + cd localstack-typedb + make test + + - name: Print logs + if: always() + run: | + localstack logs + localstack stop diff --git a/localstack-typedb/localstack_typedb/extension.py b/localstack-typedb/localstack_typedb/extension.py index 26694b7..487746a 100644 --- a/localstack-typedb/localstack_typedb/extension.py +++ b/localstack-typedb/localstack_typedb/extension.py @@ -1,4 +1,5 @@ from localstack_typedb.utils.docker import ProxiedDockerContainerExtension +from rolo import Request class TypeDbExtension(ProxiedDockerContainerExtension): @@ -9,5 +10,12 @@ class TypeDbExtension(ProxiedDockerContainerExtension): def __init__(self): super().__init__( - image_name=self.DOCKER_IMAGE, container_ports=[8000, 1729], host=self.HOST + image_name=self.DOCKER_IMAGE, + container_ports=[8000, 1729], + host=self.HOST, + request_to_port_router=self.request_to_port_router, ) + + def request_to_port_router(self, request: Request): + # TODO add REST API / gRPC routing based on request + return 1729 diff --git a/localstack-typedb/localstack_typedb/utils/docker.py b/localstack-typedb/localstack_typedb/utils/docker.py index 7865100..08e1586 100644 --- a/localstack-typedb/localstack_typedb/utils/docker.py +++ b/localstack-typedb/localstack_typedb/utils/docker.py @@ -1,22 +1,29 @@ import re import logging from functools import cache - +from typing import Callable import requests + from localstack import config -from rolo.proxy import Proxy +from localstack_typedb.utils.h2_proxy import apply_http2_patches_for_grpc_support from localstack.utils.docker_utils import DOCKER_CLIENT from localstack.extensions.api import Extension, http -from rolo.router import RuleAdapter, WithHost -from localstack.http import Request, route +from localstack.http import Request from localstack.utils.container_utils.container_client import PortMappings from localstack.utils.net import get_addressable_container_host from localstack.utils.sync import retry +from rolo import route +from rolo.proxy import Proxy +from rolo.routing import RuleAdapter, WithHost LOG = logging.getLogger(__name__) -LOG.setLevel(logging.DEBUG if config.DEBUG else logging.INFO) +logging.getLogger("localstack_typedb").setLevel( + logging.DEBUG if config.DEBUG else logging.INFO +) logging.basicConfig() +TYPEDB_PORT = 1729 + class ProxiedDockerContainerExtension(Extension): name: str @@ -35,36 +42,40 @@ class ProxiedDockerContainerExtension(Extension): path: str | None """Optional path on which to expose the container endpoints.""" - # TODO: currently not yet used ... - tcp_proxy_ports: list | None - tcp_proxies: list[int] + request_to_port_router: Callable[[Request], int] | None + """Callable that returns the target port for a given request, for routing purposes""" def __init__( self, image_name: str, container_ports: list[int], - tcp_proxy_ports: list[int] | None = None, host: str | None = None, path: str | None = None, container_name: str | None = None, + request_to_port_router: Callable[[Request], int] | None = None, ): self.image_name = image_name self.container_ports = container_ports self.host = host self.path = path self.container_name = container_name - self.tcp_proxy_ports = tcp_proxy_ports - self.tcp_proxies = [] + self.request_to_port_router = request_to_port_router def update_gateway_routes(self, router: http.Router[http.RouteHandler]): - resource = RuleAdapter(ProxyResource(self)) - if self.host: - resource = WithHost(self.host, [resource]) if self.path: raise NotImplementedError( "Path-based routing not yet implemented for this extension" ) + self.start_container() + # add resource for HTTP/1.1 requests + resource = RuleAdapter(ProxyResource(self)) + if self.host: + resource = WithHost(self.host, [resource]) router.add(resource) + # apply patches to serve HTTP/2 requests + apply_http2_patches_for_grpc_support( + get_addressable_container_host(), TYPEDB_PORT + ) def on_platform_shutdown(self): self._remove_container() @@ -95,12 +106,13 @@ def start_container(self) -> None: main_port = self.container_ports[0] container_host = get_addressable_container_host() - def _ping_api(): + def _ping_endpoint(): + # TODO: allow defining a custom healthcheck endpoint ... response = requests.get(f"http://{container_host}:{main_port}/") assert response.ok try: - retry(_ping_api, retries=40, sleep=1) + retry(_ping_endpoint, retries=40, sleep=1) except Exception as e: LOG.info("Failed to connect to container %s: %s", container_name, e) self._remove_container() diff --git a/localstack-typedb/localstack_typedb/utils/h2_proxy.py b/localstack-typedb/localstack_typedb/utils/h2_proxy.py new file mode 100644 index 0000000..85720c3 --- /dev/null +++ b/localstack-typedb/localstack_typedb/utils/h2_proxy.py @@ -0,0 +1,64 @@ +import logging +import socket + +from twisted.internet import reactor + +from localstack.utils.patch import patch +from twisted.web._http2 import H2Connection + +LOG = logging.getLogger(__name__) + + +class TcpForwarder: + """Simple helper class for bidirectional forwarding of TPC traffic.""" + + buffer_size = 1024 + + def __init__(self, port: int, host: str = "localhost"): + self.port = port + self.host = host + self._socket = None + self.connect() + + def connect(self): + if not self._socket: + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect((self.host, self.port)) + + def receive_loop(self, callback): + while True: + data = self._socket.recv(self.buffer_size) + callback(data) + if not data: + break + + def send(self, data): + self._socket.sendall(data) + + +def apply_http2_patches_for_grpc_support(target_host: str, target_port: int): + """ + Apply some patches to proxy incoming gRPC requests and forward them to a target port. + Note: this is a very brute-force approach and needs to be fixed/enhanced over time! + """ + + @patch(H2Connection.connectionMade) + def _connectionMade(fn, self, *args, **kwargs): + def _process(data): + LOG.debug("Received data (%s bytes) from upstream HTTP2 server", len(data)) + self.transport.write(data) + + # TODO: make port configurable + self._ls_forwarder = TcpForwarder(target_port, host=target_host) + LOG.debug( + "Starting TCP forwarder to port %s for new HTTP2 connection", target_port + ) + reactor.getThreadPool().callInThread(self._ls_forwarder.receive_loop, _process) + + @patch(H2Connection.dataReceived) + def _dataReceived(fn, self, data, *args, **kwargs): + forwarder = getattr(self, "_ls_forwarder", None) + if not forwarder: + return fn(self, data, *args, **kwargs) + LOG.debug("Forwarding data (%s bytes) from HTTP2 client to server", len(data)) + forwarder.send(data) diff --git a/localstack-typedb/pyproject.toml b/localstack-typedb/pyproject.toml index 56e642c..85a2357 100644 --- a/localstack-typedb/pyproject.toml +++ b/localstack-typedb/pyproject.toml @@ -14,6 +14,7 @@ authors = [ keywords = ["LocalStack", "TypeDB"] classifiers = [] dependencies = [ + "httpx" ] [project.urls] @@ -26,7 +27,8 @@ dev = [ "localstack", "pytest", "rolo", - "ruff" + "ruff", + "typedb-driver", ] [project.entry-points."localstack.extensions"] diff --git a/localstack-typedb/tests/test_extension.py b/localstack-typedb/tests/test_extension.py index c63cf83..9fab030 100644 --- a/localstack-typedb/tests/test_extension.py +++ b/localstack-typedb/tests/test_extension.py @@ -1,8 +1,9 @@ import requests from localstack.utils.strings import short_uid +from typedb.driver import TypeDB, Credentials, DriverOptions, TransactionType -def test_connect_to_db(): +def test_connect_to_db_via_http_api(): host = "typedb.localhost.localstack.cloud:4566" # get auth token @@ -35,3 +36,34 @@ def test_connect_to_db(): headers={"Authorization": f"bearer {token}"}, ) assert response.ok + + +def test_connect_to_db_via_grpc_endpoint(): + db_name = "access-management-db" + server_host = "typedb.localhost.localstack.cloud:4566" + + driver_cfg = TypeDB.driver( + server_host, + Credentials("admin", "password"), + DriverOptions(is_tls_enabled=False), + ) + with driver_cfg as driver: + if driver.databases.contains(db_name): + driver.databases.get(db_name).delete() + driver.databases.create(db_name) + + with driver.transaction(db_name, TransactionType.SCHEMA) as tx: + tx.query("define entity person;").resolve() + tx.query("define attribute name, value string; person owns name;").resolve() + tx.commit() + + with driver.transaction(db_name, TransactionType.WRITE) as tx: + tx.query("insert $p isa person, has name 'Alice';").resolve() + tx.query("insert $p isa person, has name 'Bob';").resolve() + tx.commit() + with driver.transaction(db_name, TransactionType.READ) as tx: + results = tx.query( + 'match $p isa person; fetch {"name": $p.name};' + ).resolve() + for json in results: + print(json)