Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: LocalStack TypeDB Extension Tests

on:
pull_request:
workflow_dispatch:

env:
LOCALSTACK_DISABLE_EVENTS: "1"
LOCALSTACK_AUTH_TOKEN: ${{ secrets.TEST_LOCALSTACK_AUTH_TOKEN }}

jobs:
integration-tests:
name: Run Integration Tests
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Setup LocalStack and extension
run: |
cd localstack-typedb

docker pull localstack/localstack-pro &
docker pull typedb/typedb &
pip install localstack

make install
make dist
localstack extensions -v install file://$(ls ./dist/localstack_typedb-*.tar.gz)

DEBUG=1 localstack start -d
localstack wait

- name: Run integration tests
run: |
cd localstack-typedb
make test

- name: Print logs
if: always()
run: |
localstack logs
localstack stop
10 changes: 9 additions & 1 deletion localstack-typedb/localstack_typedb/extension.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from localstack_typedb.utils.docker import ProxiedDockerContainerExtension
from rolo import Request


class TypeDbExtension(ProxiedDockerContainerExtension):
Expand All @@ -9,5 +10,12 @@ class TypeDbExtension(ProxiedDockerContainerExtension):

def __init__(self):
super().__init__(
image_name=self.DOCKER_IMAGE, container_ports=[8000, 1729], host=self.HOST
image_name=self.DOCKER_IMAGE,
container_ports=[8000, 1729],
host=self.HOST,
request_to_port_router=self.request_to_port_router,
)

def request_to_port_router(self, request: Request):
# TODO add REST API / gRPC routing based on request
return 1729
44 changes: 28 additions & 16 deletions localstack-typedb/localstack_typedb/utils/docker.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
import re
import logging
from functools import cache

from typing import Callable
import requests

from localstack import config
from rolo.proxy import Proxy
from localstack_typedb.utils.h2_proxy import apply_http2_patches_for_grpc_support
from localstack.utils.docker_utils import DOCKER_CLIENT
from localstack.extensions.api import Extension, http
from rolo.router import RuleAdapter, WithHost
from localstack.http import Request, route
from localstack.http import Request
from localstack.utils.container_utils.container_client import PortMappings
from localstack.utils.net import get_addressable_container_host
from localstack.utils.sync import retry
from rolo import route
from rolo.proxy import Proxy
from rolo.routing import RuleAdapter, WithHost

LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG if config.DEBUG else logging.INFO)
logging.getLogger("localstack_typedb").setLevel(
logging.DEBUG if config.DEBUG else logging.INFO
)
logging.basicConfig()

TYPEDB_PORT = 1729


class ProxiedDockerContainerExtension(Extension):
name: str
Expand All @@ -35,36 +42,40 @@ class ProxiedDockerContainerExtension(Extension):
path: str | None
"""Optional path on which to expose the container endpoints."""

# TODO: currently not yet used ...
tcp_proxy_ports: list | None
tcp_proxies: list[int]
request_to_port_router: Callable[[Request], int] | None
"""Callable that returns the target port for a given request, for routing purposes"""

def __init__(
self,
image_name: str,
container_ports: list[int],
tcp_proxy_ports: list[int] | None = None,
host: str | None = None,
path: str | None = None,
container_name: str | None = None,
request_to_port_router: Callable[[Request], int] | None = None,
):
self.image_name = image_name
self.container_ports = container_ports
self.host = host
self.path = path
self.container_name = container_name
self.tcp_proxy_ports = tcp_proxy_ports
self.tcp_proxies = []
self.request_to_port_router = request_to_port_router

def update_gateway_routes(self, router: http.Router[http.RouteHandler]):
resource = RuleAdapter(ProxyResource(self))
if self.host:
resource = WithHost(self.host, [resource])
if self.path:
raise NotImplementedError(
"Path-based routing not yet implemented for this extension"
)
self.start_container()
# add resource for HTTP/1.1 requests
resource = RuleAdapter(ProxyResource(self))
if self.host:
resource = WithHost(self.host, [resource])
router.add(resource)
# apply patches to serve HTTP/2 requests
apply_http2_patches_for_grpc_support(
get_addressable_container_host(), TYPEDB_PORT
)

def on_platform_shutdown(self):
self._remove_container()
Expand Down Expand Up @@ -95,12 +106,13 @@ def start_container(self) -> None:
main_port = self.container_ports[0]
container_host = get_addressable_container_host()

def _ping_api():
def _ping_endpoint():
# TODO: allow defining a custom healthcheck endpoint ...
response = requests.get(f"http://{container_host}:{main_port}/")
assert response.ok

try:
retry(_ping_api, retries=40, sleep=1)
retry(_ping_endpoint, retries=40, sleep=1)
except Exception as e:
LOG.info("Failed to connect to container %s: %s", container_name, e)
self._remove_container()
Expand Down
64 changes: 64 additions & 0 deletions localstack-typedb/localstack_typedb/utils/h2_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import logging
import socket

from twisted.internet import reactor

from localstack.utils.patch import patch
from twisted.web._http2 import H2Connection

LOG = logging.getLogger(__name__)


class TcpForwarder:
"""Simple helper class for bidirectional forwarding of TPC traffic."""

buffer_size = 1024

def __init__(self, port: int, host: str = "localhost"):
self.port = port
self.host = host
self._socket = None
self.connect()

def connect(self):
if not self._socket:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect((self.host, self.port))

def receive_loop(self, callback):
while True:
data = self._socket.recv(self.buffer_size)
callback(data)
if not data:
break

def send(self, data):
self._socket.sendall(data)


def apply_http2_patches_for_grpc_support(target_host: str, target_port: int):
"""
Apply some patches to proxy incoming gRPC requests and forward them to a target port.
Note: this is a very brute-force approach and needs to be fixed/enhanced over time!
"""

@patch(H2Connection.connectionMade)
def _connectionMade(fn, self, *args, **kwargs):
def _process(data):
LOG.debug("Received data (%s bytes) from upstream HTTP2 server", len(data))
self.transport.write(data)

# TODO: make port configurable
self._ls_forwarder = TcpForwarder(target_port, host=target_host)
LOG.debug(
"Starting TCP forwarder to port %s for new HTTP2 connection", target_port
)
reactor.getThreadPool().callInThread(self._ls_forwarder.receive_loop, _process)

@patch(H2Connection.dataReceived)
def _dataReceived(fn, self, data, *args, **kwargs):
forwarder = getattr(self, "_ls_forwarder", None)
if not forwarder:
return fn(self, data, *args, **kwargs)
LOG.debug("Forwarding data (%s bytes) from HTTP2 client to server", len(data))
forwarder.send(data)
4 changes: 3 additions & 1 deletion localstack-typedb/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ authors = [
keywords = ["LocalStack", "TypeDB"]
classifiers = []
dependencies = [
"httpx"
]

[project.urls]
Expand All @@ -26,7 +27,8 @@ dev = [
"localstack",
"pytest",
"rolo",
"ruff"
"ruff",
"typedb-driver",
]

[project.entry-points."localstack.extensions"]
Expand Down
34 changes: 33 additions & 1 deletion localstack-typedb/tests/test_extension.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import requests
from localstack.utils.strings import short_uid
from typedb.driver import TypeDB, Credentials, DriverOptions, TransactionType


def test_connect_to_db():
def test_connect_to_db_via_http_api():
host = "typedb.localhost.localstack.cloud:4566"

# get auth token
Expand Down Expand Up @@ -35,3 +36,34 @@ def test_connect_to_db():
headers={"Authorization": f"bearer {token}"},
)
assert response.ok


def test_connect_to_db_via_grpc_endpoint():
db_name = "access-management-db"
server_host = "typedb.localhost.localstack.cloud:4566"

driver_cfg = TypeDB.driver(
server_host,
Credentials("admin", "password"),
DriverOptions(is_tls_enabled=False),
)
with driver_cfg as driver:
if driver.databases.contains(db_name):
driver.databases.get(db_name).delete()
driver.databases.create(db_name)

with driver.transaction(db_name, TransactionType.SCHEMA) as tx:
tx.query("define entity person;").resolve()
tx.query("define attribute name, value string; person owns name;").resolve()
tx.commit()

with driver.transaction(db_name, TransactionType.WRITE) as tx:
tx.query("insert $p isa person, has name 'Alice';").resolve()
tx.query("insert $p isa person, has name 'Bob';").resolve()
tx.commit()
with driver.transaction(db_name, TransactionType.READ) as tx:
results = tx.query(
'match $p isa person; fetch {"name": $p.name};'
).resolve()
for json in results:
print(json)