Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased] - ??

### Added

- `rsconnect content get-lockfile` command allows fetching a lockfile with the
dependencies installed by connect to run the deployed content
- `rsconnect content venv` command recreates a local python environment
equal to the one used by connect to run the content.


## [1.28.2] - 2025-12-05

### Fixed
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ requires-python = ">=3.8"
dependencies = [
"typing-extensions>=4.8.0",
"pip>=10.0.0",
"uv>=0.9.0",
"semver>=2.0.0,<4.0.0",
"pyjwt>=2.4.0",
"click>=8.0.0",
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ wheel
six>=1.14.0
click>=8.0.0
pip>=10.0.0
uv>=0.9.0
semver>=2.0.0,<3.0.0
pyjwt>=2.4.0
black==24.3.0
Expand Down
5 changes: 5 additions & 0 deletions rsconnect/actions_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ def download_bundle(connect_server: Union[RSConnectServer, SPCSConnectServer], g
return client.download_bundle(guid_with_bundle.guid, guid_with_bundle.bundle_id)


def download_lockfile(connect_server: Union[RSConnectServer, SPCSConnectServer], guid: str):
with RSConnectClient(connect_server) as client:
return client.content_lockfile(guid)


def get_content(connect_server: Union[RSConnectServer, SPCSConnectServer], guid: str | list[str]):
"""
:param guid: a single guid as a string or list of guids.
Expand Down
8 changes: 8 additions & 0 deletions rsconnect/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,14 @@ def bundle_download(self, content_guid: str, bundle_id: str) -> HTTPResponse:
response = self._server.handle_bad_response(response, is_httpresponse=True)
return response

def content_lockfile(self, content_guid: str) -> HTTPResponse:
response = cast(
HTTPResponse,
self.get("v1/content/%s/lockfile" % content_guid, decode_response=False),
)
response = self._server.handle_bad_response(response, is_httpresponse=True)
return response

def content_list(self, filters: Optional[Mapping[str, JsonData]] = None) -> list[ContentItemV1]:
response = cast(Union[List[ContentItemV1], HTTPResponse], self.get("v1/content", query_params=filters))
response = self._server.handle_bad_response(response)
Expand Down
11 changes: 11 additions & 0 deletions rsconnect/http_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,17 @@ def __init__(
except json.decoder.JSONDecodeError:
self.response_body

def getheader(self, name: str) -> Optional[str]:
"""
This method retrieves a specific header from the response.

:param name: the name of the header to retrieve.
:return: the value of the header, or None if not present.
"""
if self._response is None:
return None
return self._response.getheader(name)


class HTTPServer(object):
"""
Expand Down
178 changes: 178 additions & 0 deletions rsconnect/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import sys
import textwrap
import traceback
import shutil
import subprocess
import tempfile
from functools import wraps
from os.path import abspath, dirname, exists, isdir, join
from typing import (
Expand Down Expand Up @@ -55,6 +58,7 @@
build_remove_content,
build_start,
download_bundle,
download_lockfile,
emit_build_log,
get_content,
search_content,
Expand Down Expand Up @@ -2762,6 +2766,180 @@ def content_bundle_download(
f.write(result.response_body)


@content.command(
name="get-lockfile",
short_help="Download a content item's lockfile.",
)
@server_args
@spcs_args
@click.option(
"--guid",
"-g",
required=True,
type=StrippedStringParamType(),
metavar="TEXT",
help="The GUID of a content item whose lockfile will be downloaded.",
)
@click.option(
"--output",
"-o",
type=click.Path(),
default="requirements.txt.lock",
show_default=True,
help="Defines the output location for the lockfile download.",
)
@click.option(
"--overwrite",
"-w",
is_flag=True,
help="Overwrite the output file if it already exists.",
)
@click.pass_context
def content_get_lockfile(
ctx: click.Context,
name: Optional[str],
server: Optional[str],
api_key: Optional[str],
snowflake_connection_name: Optional[str],
insecure: bool,
cacert: Optional[str],
guid: str,
output: str,
overwrite: bool,
verbose: int,
):
set_verbosity(verbose)
output_params(ctx, locals().items())
with cli_feedback("", stderr=True):
ce = RSConnectExecutor(
ctx=ctx,
name=name,
server=server,
api_key=api_key,
snowflake_connection_name=snowflake_connection_name,
insecure=insecure,
cacert=cacert,
logger=None,
).validate_server()
if not isinstance(ce.remote_server, (RSConnectServer, SPCSConnectServer)):
raise RSConnectException("`rsconnect content get-lockfile` requires a Posit Connect server.")
if exists(output) and not overwrite:
raise RSConnectException("The output file already exists: %s, maybe you want to --overwrite?" % output)

logger.info("Downloading %s for content %s" % (output, guid))
result = download_lockfile(ce.remote_server, guid)
if not isinstance(result.response_body, bytes):
raise RSConnectException("The response body must be bytes (not string or None).")
with open(output, "wb") as f:
f.write(result.response_body)


@content.command(
name="venv",
short_help="Replicate a Python environment from Connect",
help="Create a ENV_PATH Python virtual environment that mimics "
"the environment of a deployed content item on Posit Connect. "
"This will use the 'uv' tool to locally create and manage the virtual environment. "
"If the required Python version isn't already installed, uv will download it automatically."
"\n\n"
"run it from the directory of a deployed content item to auto-detect the GUID, "
"or provide the --guid option to specify a content item explicitly.",
)
@server_args
@spcs_args
@click.option(
"--guid",
"-g",
type=StrippedStringParamType(),
metavar="TEXT",
help=(
"The GUID of a content item whose lockfile will be used to build the environment. "
"If omitted, rsconnect will try to auto-detect the last deployed GUID for the current server "
"from local deployment metadata."
),
)
@click.argument("env_path", metavar="ENV_PATH", type=click.Path())
@click.pass_context
def content_venv(
ctx: click.Context,
name: Optional[str],
server: Optional[str],
api_key: Optional[str],
snowflake_connection_name: Optional[str],
insecure: bool,
cacert: Optional[str],
guid: Optional[str],
env_path: str,
verbose: int,
):
set_verbosity(verbose)
output_params(ctx, locals().items())
uv_path = shutil.which("uv")
if not uv_path:
raise RSConnectException(
"uv is required for `rsconnect content venv`. make sure it's available in your PATH and try again."
)

def _python_version_from_header(header: Optional[str]) -> str:
header = header or ""
*_, version = header.split("python=", 1)
version = version.split(".")[:2] # major.minor
return ".".join(version)

def _guid_for_current_server(server_url: str) -> Optional[str]:
for candidate in _get_names_to_check(os.getcwd()):
deployment = AppStore(candidate).get(server_url)
if deployment:
return deployment.get("app_guid") or deployment.get("app_id")
return None

with cli_feedback("", stderr=True):
ce = RSConnectExecutor(
ctx=ctx,
name=name,
server=server,
api_key=api_key,
snowflake_connection_name=snowflake_connection_name,
insecure=insecure,
cacert=cacert,
logger=None,
).validate_server()
if not isinstance(ce.remote_server, (RSConnectServer, SPCSConnectServer)):
raise RSConnectException("`rsconnect content venv` requires a Posit Connect server.")

guid = guid or _guid_for_current_server(ce.remote_server.url)
if not guid:
raise RSConnectException(
"No GUID provided and none found for this server in local deployment metadata. "
"Provide --guid or deploy from this directory first."
)

result = download_lockfile(ce.remote_server, guid)
if not isinstance(result.response_body, bytes):
raise RSConnectException("The response body must be bytes (not string or None).")

python_version = _python_version_from_header(result.getheader("Generated-By"))
with tempfile.NamedTemporaryFile("wb") as lockfile:
lockfile.write(result.response_body)
lockfile.flush()

if not exists(env_path):
uv_venv_cmd = [uv_path, "venv"]
if python_version:
uv_venv_cmd.extend(["--python", python_version])
uv_venv_cmd.append(env_path)
venv_result = subprocess.run(uv_venv_cmd, env=dict(os.environ, UV_PYTHON_DOWNLOADS="auto"))
if venv_result.returncode != 0:
raise RSConnectException("uv venv failed with exit code %d" % venv_result.returncode)

logger.info("Syncing environment %s" % env_path)
result = subprocess.run([uv_path, "pip", "install", "--python", env_path, "-r", lockfile.name])
if result.returncode != 0:
raise RSConnectException("uv pip install failed with exit code %d" % result.returncode)

logger.info("Environment ready. Activate with: source %s/bin/activate" % env_path)


@content.group(no_args_is_help=True, help="Build content on Posit Connect. Requires Connect >= 2021.11.1")
def build():
pass
Expand Down
90 changes: 90 additions & 0 deletions tests/test_main_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import shutil
import tarfile
import unittest
from unittest import mock

import httpretty
from click.testing import CliRunner
Expand Down Expand Up @@ -46,6 +47,15 @@ def register_content_endpoints(i: int, guid: str):
+ "}",
adding_headers={"Content-Type": "application/json"},
)
httpretty.register_uri(
httpretty.GET,
f"{connect_server}/__api__/v1/content/{guid}/lockfile",
body="click==8.1.3\n",
adding_headers={
"Content-Type": "text/plain",
"Generated-By": "connect; python=11.99.23",
},
)

httpretty.register_uri(
httpretty.GET,
Expand Down Expand Up @@ -160,6 +170,86 @@ def test_content_download_bundle(self):
manifest = json.loads(tgz.extractfile("manifest.json").read())
self.assertIn("metadata", manifest)

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_content_get_lockfile(self):
register_uris(self.connect_server)
os.makedirs(TEMP_DIR, exist_ok=True)
runner = CliRunner()
output_path = f"{TEMP_DIR}/requirements.txt.lock"
args = [
"content",
"get-lockfile",
"-g",
"7d59c5c7-c4a7-4950-acc3-3943b7192bc4",
"-o",
output_path,
]
apply_common_args(args, server=self.connect_server, key=self.api_key)
result = runner.invoke(cli, args)
self.assertEqual(result.exit_code, 0, result.output)
with open(output_path) as lockfile:
self.assertEqual(lockfile.read(), "click==8.1.3\n", result.output)

args_exists = [
"content",
"get-lockfile",
"-g",
"7d59c5c7-c4a7-4950-acc3-3943b7192bc4",
"-o",
output_path,
]
apply_common_args(args_exists, server=self.connect_server, key=self.api_key)
result_exists = runner.invoke(cli, args_exists)
self.assertNotEqual(result_exists.exit_code, 0, result_exists.output)
self.assertIn("already exists", result_exists.output)

args_overwrite = [
"content",
"get-lockfile",
"-g",
"7d59c5c7-c4a7-4950-acc3-3943b7192bc4",
"-o",
output_path,
"-w",
]
apply_common_args(args_overwrite, server=self.connect_server, key=self.api_key)
result_overwrite = runner.invoke(cli, args_overwrite)
self.assertEqual(result_overwrite.exit_code, 0, result_overwrite.output)

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_content_venv(self):
register_uris(self.connect_server)
env_path = f"{TEMP_DIR}/venv"

# Mock subprocess.run so we don't actually invoke uv; capture the calls instead
with mock.patch("subprocess.run", return_value=mock.Mock(returncode=0)) as mock_run:
args = [
"content",
"venv",
"-g",
"7d59c5c7-c4a7-4950-acc3-3943b7192bc4",
env_path,
]
apply_common_args(args, server=self.connect_server, key=self.api_key)
result = CliRunner().invoke(cli, args)
self.assertEqual(result.exit_code, 0, result.output)

# Assert we made exactly two subprocess calls: uv venv, then uv pip install
self.assertEqual(mock_run.call_count, 2, mock_run.call_args_list)

venv_args, venv_kwargs = mock_run.call_args_list[0]
venv_cmd = " ".join(venv_args[0])
self.assertRegex(venv_cmd.lower(), r"uv(?:\.exe)?\s+venv\b")
self.assertIn("--python 11.99", venv_cmd)
self.assertIn(env_path, venv_cmd)
self.assertEqual(venv_kwargs.get("env", {}).get("UV_PYTHON_DOWNLOADS"), "auto")

pip_args, _ = mock_run.call_args_list[1]
pip_cmd = " ".join(pip_args[0])
self.assertRegex(pip_cmd.lower(), r"uv(?:\.exe)?\s+pip\s+install\b")
self.assertIn(f"--python {env_path}", pip_cmd)
self.assertIn(" -r ", pip_cmd)

@httpretty.activate(verbose=True, allow_net_connect=False)
def test_build(self):
register_uris(self.connect_server)
Expand Down
Loading