pip install together-sandboxRequires Python 3.10+.
Set your Together AI API key as an environment variable:
export TOGETHER_API_KEY=your_api_keyOr pass it directly when constructing the client (see below).
import asyncio
from together_sandbox import TogetherSandbox
async def main():
sdk = TogetherSandbox() # reads TOGETHER_API_KEY from env
async with await sdk.sandboxes.start("your-sandbox-id") as sandbox:
content = await sandbox.files.read("/package.json")
print(content)
await sandbox.shutdown()
asyncio.run(main())Note: The
async withblock closes the HTTP connection on exit. It does not automatically shut down the VM — callawait sandbox.shutdown()explicitly when you're done.
The main entry point for the SDK.
sdk = TogetherSandbox(api_key=None, base_url="https://api.codesandbox.io")| Parameter | Type | Description |
|---|---|---|
api_key |
str | None |
Together AI API key. Falls back to the TOGETHER_API_KEY env var. |
base_url |
str |
Management API base URL. Defaults to https://api.codesandbox.io. |
retry |
RetryConfig | None |
Retry configuration for transient failures. See Retry below. |
TogetherSandbox supports use as an async context manager:
async with TogetherSandbox() as sdk:
sandbox = await sdk.sandboxes.start("your-sandbox-id")
...
# Closes the management API HTTP connection on exit.Sandbox lifecycle namespace.
sdk.sandboxes.create(*, millicpu=1000, memory_bytes=2*1024**3, disk_bytes=10*1024**3, id=None, snapshot_id=None, snapshot_alias=None, ephemeral=None) -> SandboxModel
Creates a new sandbox record from a snapshot. Does not start the VM — call sdk.sandboxes.start() with the returned ID afterwards.
sandbox_model = await sdk.sandboxes.create(snapshot_alias="my-app@v1")
sandbox = await sdk.sandboxes.start(sandbox_model.id)Resource params (millicpu, memory_bytes, disk_bytes) default to 1 vCPU / 2 GiB memory / 10 GiB disk if omitted.
| Parameter | Type | Required | Description |
|---|---|---|---|
snapshot_id |
str | None |
* | ID of the snapshot to use. One of snapshot_id or snapshot_alias is required. |
snapshot_alias |
str | None |
* | Alias of the snapshot to use. One of snapshot_id or snapshot_alias is required. |
millicpu |
int |
No | CPU allocation in millicpu (must be ≥ 250 and a multiple of 250). Default: 1000 (1 vCPU). |
memory_bytes |
int |
No | Memory allocation in bytes. Default: 2 * 1024 ** 3 (2 GiB). |
disk_bytes |
int |
No | Disk allocation in bytes. Default: 10 * 1024 ** 3 (10 GiB). |
id |
str | None |
No | Sandbox ID (6–8 characters). Generated if not provided. |
ephemeral |
bool | None |
No | Mark the sandbox as ephemeral. |
Starts the VM for the given sandbox ID and returns a connected Sandbox instance.
sandbox = await sdk.sandboxes.start("your-sandbox-id")
# Pin a specific VM version:
sandbox = await sdk.sandboxes.start("your-sandbox-id", version_number=3)Suspends (hibernates) a VM by sandbox ID.
await sdk.sandboxes.hibernate("your-sandbox-id")Shuts down a VM by sandbox ID.
await sdk.sandboxes.shutdown("your-sandbox-id")Snapshot creation namespace. Snapshots are images you can pass to sdk.sandboxes.create().
Create a snapshot from either a Docker build context (built remotely by default) or an existing public Docker image.
From a build context (remote build):
Submit a Docker build context to Together's remote image-builder service. The service builds the image, pushes it to the internal registry, and the SDK then registers it as a snapshot. No local Docker installation is required.
from together_sandbox import CreateContextSnapshotParams
result = await sdk.snapshots.create(CreateContextSnapshotParams(
context="./my-app",
dockerfile="./my-app/Dockerfile.prod", # optional
alias="my-app@v1", # optional
on_progress=lambda e: print(e.output),
))
# Use the snapshot ID to create a sandbox:
sandbox_model = await sdk.sandboxes.create(snapshot_id=result.snapshot_id)Local build opt-in. Set
TOGETHER_LOCAL_BUILD=1in the environment to build the image with your own Docker daemon and push it to the registry from your machine instead of using the remote image-builder. This requires Docker to be installed and running. Useful for debugging build issues locally or when working in restricted network environments.export TOGETHER_LOCAL_BUILD=1
| Parameter | Type | Description |
|---|---|---|
context |
str |
Path to the Docker build context directory. |
dockerfile |
str | None |
Path to a Dockerfile. Defaults to Dockerfile inside context. |
alias |
str | None |
Alias for the snapshot. Format: tag or namespace@tag. Namespace defaults to the context directory name. |
on_progress |
Callable[[SnapshotProgress], None] | None |
Optional progress callback. Receives a SnapshotProgress at each stage. |
From a public Docker image:
Register an existing public Docker image as a snapshot — no local build required.
from together_sandbox import CreateImageSnapshotParams
result = await sdk.snapshots.create(CreateImageSnapshotParams(
image="node:22",
alias="my-node@latest", # optional
))
print(result.snapshot_id)| Parameter | Type | Description |
|---|---|---|
image |
str |
Docker image name or reference (e.g. node:22, registry.example.com/org/app:tag). |
alias |
str | None |
Alias for the snapshot. Format: tag or namespace@tag. Namespace defaults to the image name. |
on_progress |
Callable[[SnapshotProgress], None] | None |
Optional progress callback. |
| Property | Type | Description |
|---|---|---|
snapshot_id |
str |
ID of the created snapshot. |
alias |
str | None |
The full alias (namespace@tag) if one was assigned. |
| Property | Type | Description |
|---|---|---|
step |
str |
Current stage: "prepare", "build", "auth", "push", "register", "memory-snapshot", or "alias". |
output |
str |
Human-readable progress message. |
A connected, running VM. Returned by sdk.sandboxes.start(). All sub-namespaces are available as properties.
async with await sdk.sandboxes.start("sandbox-id") as sandbox:
...| Property | Type | Description |
|---|---|---|
id |
str |
The sandbox/VM ID. |
vm_info |
SandboxModel |
Raw VM start response (id, agent_url, etc.) |
File system operations.
Read the content of a file.
content = await sandbox.files.read("/src/main.py")Create or overwrite a file. Content can be str (encoded as UTF-8) or bytes.
await sandbox.files.create("/hello.txt", "Hello, world!")
await sandbox.files.create("/data.bin", b"\x00\x01\x02")Delete a file.
await sandbox.files.delete("/old-file.txt")Move a file.
await sandbox.files.move("/src/old.py", "/src/new.py")Copy a file.
await sandbox.files.copy("/src/template.py", "/src/copy.py")Get file metadata (size, type, modified time, etc.).
info = await sandbox.files.stat("/package.json")Watch a directory for file system changes via SSE. Returns an async iterator of event dicts.
async for event in sandbox.files.watch("/src", recursive=True, ignore_patterns=["node_modules"]):
print(event)| Parameter | Type | Description |
|---|---|---|
recursive |
bool | None |
Watch subdirectories recursively. |
ignore_patterns |
list[str] | None |
Glob patterns for paths to ignore. |
Directory operations.
List the contents of a directory.
files = await sandbox.directories.list("/src")Create a directory.
await sandbox.directories.create("/src/utils")Delete a directory.
await sandbox.directories.delete("/tmp/scratch")Shell execution operations.
List all active execs.
execs = await sandbox.execs.list()execs.create(command, args, *, autorun=None, interactive=None, pty=None, cwd=None, env=None, uid=None, gid=None) -> Exec
Create a new exec (run a command).
exec_ = await sandbox.execs.create(
command="npm",
args=["install"],
cwd="/workspace",
env={"NODE_ENV": "production"},
)| Parameter | Type | Required | Description |
|---|---|---|---|
command |
str |
Yes | Command to execute (e.g. "npm"). |
args |
list[str] |
Yes | Command line arguments (e.g. ["install"]). |
autorun |
bool | None |
No | Whether to automatically start the exec (defaults to true). |
interactive |
bool | None |
No | Whether to start an interactive shell session. |
pty |
bool | None |
No | Whether to start a PTY shell session. |
cwd |
str | None |
No | Working directory for the command. |
env |
dict[str, str] | None |
No | Environment variables to set, as a plain dict. |
uid |
int | None |
No | User ID to run the command as (defaults to 1000). |
gid |
int | None |
No | Group ID to run the command as (defaults to 1000). |
Get an exec by ID.
exec_ = await sandbox.execs.get("exec-id")Resume a stopped exec (sets its status back to running).
await sandbox.execs.resume("exec-id")Delete an exec.
await sandbox.execs.delete("exec-id")Stream exec output via SSE. Optionally provide last_sequence to resume from a specific point.
async for chunk in sandbox.execs.stream_output("exec-id"):
print(chunk)One-shot poll for exec output (non-streaming).
output = await sandbox.execs.get_output("exec-id")Send raw stdin data to a running exec.
await sandbox.execs.send_stdin("exec-id", "yes\n")Resize the PTY for an interactive exec.
await sandbox.execs.resize("exec-id", cols=80, rows=24)Stream the live list of all active execs via SSE.
async for update in sandbox.execs.stream_list():
print(update)Port discovery.
List all open ports.
ports = await sandbox.ports.list()Stream port changes via SSE.
async for event in sandbox.ports.stream_list():
print(event)Suspend (hibernate) this VM.
await sandbox.hibernate()Shut down this VM.
await sandbox.shutdown()Close the underlying sandbox HTTP client connection without affecting the VM state.
Sandbox supports use as an async context manager. Exiting the block closes the HTTP connection but does not shut down the VM.
async with await sdk.sandboxes.start("sandbox-id") as sandbox:
content = await sandbox.files.read("/README.md")
await sandbox.shutdown()Convenience classmethods that create a temporary SDK client internally.
sandbox = await Sandbox.start("sandbox-id", api_key="your-key")await Sandbox.hibernate("sandbox-id", api_key="your-key")await Sandbox.shutdown("sandbox-id", api_key="your-key")Raised by SDK operations for every failure. Inherits from RuntimeError,
so existing except RuntimeError: clauses keep working. HTTP-level errors
(non-success status) and transport-level failures (DNS, timeout, connection
refused) both surface as HttpError, distinguished by the status field.
| Attribute | Type | Description |
|---|---|---|
args |
tuple |
Standard exception args — first element is the formatted message. |
status |
int |
HTTP status code, or 0 as a sentinel for transport failures (no response received). |
Because 0 is not a valid HTTP status, e.status == 0 cleanly identifies
"the request never reached the server" without losing the rest of the
error-handling shape. The original transport exception (httpx.TimeoutException,
httpx.ConnectError, httpx.RemoteProtocolError) is preserved on
__cause__ for debugging tracebacks.
from together_sandbox import HttpError
try:
await sdk.sandboxes.start("...")
except HttpError as e:
if e.status == 0:
# transport-level failure (DNS / timeout / connection refused)
raise
elif e.status == 404:
return None # not found
raiseAll operations automatically retry on transient failures. By default:
- HTTP status codes
408,429,500,502,503,504trigger a retry. - Transport-level failures (
httpx.TimeoutException,httpx.ConnectError,httpx.RemoteProtocolError) — these surface asHttpErrorwithstatus == 0— trigger a retry. - 3 total attempts (1 initial + 2 retries).
- Exponential backoff: starts at 0.5 s, doubles each attempt, plus up to 0.25 s of random jitter.
Pass a RetryConfig to TogetherSandbox(retry=...) to customise this behaviour.
RetryConfig is a dataclass — instantiate it with keyword arguments.
| Field | Type | Default | Description |
|---|---|---|---|
max_attempts |
int |
3 |
Total number of attempts (including the first). Set to 1 to disable retries. |
should_retry |
Callable[[RetryContext], bool | float] | Awaitable[...] |
None |
Override the retry decision. Return False to abort immediately, True to retry with the default backoff delay, or a float (seconds) to retry after a custom delay. May be a coroutine function. |
on_retry |
Callable[[RetryContext], None] | Awaitable[None] |
None |
Called before each retry. Use for logging, metrics, or UI progress updates. May be a coroutine function. |
| Field | Type | Description |
|---|---|---|
operation |
str |
The operation that failed, e.g. 'startSandbox', 'files.read'. |
attempt |
int |
1-based number of the attempt that just failed. |
error |
Exception |
The HttpError that was raised. |
status |
int | None |
HTTP status code, or 0 for transport-level failures. |
delay |
float |
Seconds to wait before the next attempt (default computed, override via should_retry). |
import asyncio
from together_sandbox import TogetherSandbox, RetryConfig, RetryContext
async def main():
sdk = TogetherSandbox(
api_key="...",
retry=RetryConfig(
max_attempts=4,
should_retry=lambda ctx: (
# Never retry snapshot creation — it is not idempotent
False if ctx.operation == "snapshots.create"
# Give up on auth errors
else False if ctx.status in (401, 403)
# Retry everything else with default backoff
else True
),
on_retry=lambda ctx: print(
f"[retry] {ctx.operation} attempt {ctx.attempt} failed "
f"({'transport' if ctx.status == 0 else f'HTTP {ctx.status}'}) "
f"— retrying in {ctx.delay:.2f}s"
),
),
)
asyncio.run(main())Note —
snapshots.createis not idempotent. Retrying after a transient 500 once the snapshot has already been created will register a duplicate. Exclude it viashould_retryas shown above, or use the shorthand:RetryConfig(should_retry=lambda ctx: ctx.operation != "snapshots.create")
| Variable | Description |
|---|---|
TOGETHER_API_KEY |
Required. Your Together AI API key. |