Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pip install torch${TORCH_VERSION} --extra-index-url https://download.pytorch.org
pip install /wheels/*.whl

if [[ "${CUDA_VERSION_SHORT}" != "cu118" ]] && [[ "${PYTHON_VERSION}" != "3.9" ]]; then
pip install cuda-python dlblas==0.0.6 dlslime==0.0.1.post10
pip install cuda-python dlblas==0.0.6 dlslime==0.0.2.post1
fi

# install pre-built flash attention 3 wheel
Expand Down
4 changes: 2 additions & 2 deletions lmdeploy/pytorch/disagg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
pip install lmdeploy[all] >= 0.7.0

# Transfer Engine
pip install dlslime>=0.0.1.post7
pip install dlslime>=0.0.2
```

## Quick Start

A PD disaggregated deployment of DeepSeekV3 is shown below:
A PD disaggregated deployment of internlm2_5-7b-chat is shown below:

### 1. Launch Router Service

Expand Down
2 changes: 1 addition & 1 deletion lmdeploy/pytorch/disagg/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def register_memory_region(self, register_mr_request: DistServeRegisterMRMessage
raise NotImplementedError

@abstractmethod
def endpoint_info(self, remote_engine_id: int, protocol: MigrationProtocol):
def endpoint_info(self, remote_engine_id: str, protocol: MigrationProtocol):
return NotImplementedError

@abstractmethod
Expand Down
90 changes: 33 additions & 57 deletions lmdeploy/pytorch/disagg/backend/dlslime.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
import asyncio
import json
import os
from typing import Dict, List
from typing import Dict

from dlslime import Assignment as DLSlimeAssignment
from dlslime import NVLinkEndpoint, RDMAEndpoint, available_nic

from lmdeploy.logger import get_logger
Expand All @@ -20,97 +19,74 @@
LMDEPLOY_USE_ASYNC_MIGRATION = os.environ.get('LMDEPLOY_USE_ASYNC_MIGRATION', None)


async def read_batch_coroutine(endpoint: RDMAEndpoint, batch: List[DLSlimeAssignment]):
loop = asyncio.get_running_loop()
future = loop.create_future()

def _completion_handler(status: int):
loop.call_soon_threadsafe(future.set_result, status)

endpoint.read_batch_with_callback(
batch,
_completion_handler,
)
await future


class DLSlimeMigrationManagement:

def __init__(self, init_request: DistServeInitRequest):
self.rank = init_request.rank
self.local_engine_config: DistServeEngineConfig = init_request.local_engine_config
self.remote_engine_config: DistServeEngineConfig = init_request.remote_engine_config
self.endpoint: Dict[MigrationProtocol, RDMAEndpoint] = {
MigrationProtocol.TCP: None,
MigrationProtocol.RDMA: None,
MigrationProtocol.NVLINK: None,
}
self.local_engine_config: DistServeEngineConfig = (init_request.local_engine_config)
self.remote_engine_config: DistServeEngineConfig = (init_request.remote_engine_config)
Comment on lines +26 to +27
Copy link

Copilot AI Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parentheses around the assignment values are unnecessary and don't serve any purpose. They can be removed for cleaner code.

Suggested change
self.local_engine_config: DistServeEngineConfig = (init_request.local_engine_config)
self.remote_engine_config: DistServeEngineConfig = (init_request.remote_engine_config)
self.local_engine_config: DistServeEngineConfig = init_request.local_engine_config
self.remote_engine_config: DistServeEngineConfig = init_request.remote_engine_config

Copilot uses AI. Check for mistakes.
self.endpoint: Dict[MigrationProtocol, RDMAEndpoint | NVLinkEndpoint] = {}
if init_request.protocol == MigrationProtocol.RDMA:
nics = available_nic()
device_name = nics[self.rank % len(nics)]
logger.info(f'use device {device_name} for kv migration')
self.endpoint[MigrationProtocol.RDMA] = RDMAEndpoint(device_name=device_name,
ib_port=1,
link_type=init_request.rdma_config.link_type.name)
self.endpoint[MigrationProtocol.RDMA] = RDMAEndpoint(
device_name=device_name,
ib_port=1,
link_type=init_request.rdma_config.link_type.name,
)
elif init_request.protocol == MigrationProtocol.NVLINK:
self.endpoint[MigrationProtocol.NVLINK] = NVLinkEndpoint()

def register_memory_region(self, register_mr_request: DistServeRegisterMRMessage):
self.endpoint[register_mr_request.protocol].register_memory_region(register_mr_request.mr_key,
register_mr_request.addr,
register_mr_request.offset,
register_mr_request.length)
self.endpoint[register_mr_request.protocol].register_memory_region(
register_mr_request.mr_key,
register_mr_request.addr,
register_mr_request.offset,
register_mr_request.length,
)

def connect(self, kvtransfer_endpoint_info: DistServeKVTransferEndpointInfo):
self.endpoint[kvtransfer_endpoint_info.protocol].connect(json.loads(kvtransfer_endpoint_info.endpoint_info))

async def p2p_migrate(self, assignment: MigrationAssignment, async_op=False):
batch = [
DLSlimeAssignment(
mr_key=assign.mr_key,
target_offset=assign.target_offset,
source_offset=assign.source_offset,
length=assign.length,
) for assign in assignment.batch
]

if not LMDEPLOY_USE_ASYNC_MIGRATION:
MAX_NUM_READ_BATCH = 4096

def split(batch: List[DLSlimeAssignment]):
batch_split = []
for i in range(0, len(batch), MAX_NUM_READ_BATCH):
batch_split.append(batch[i:i + MAX_NUM_READ_BATCH])
return batch_split

batch_splited = split(batch)
for b_split in batch_splited:
self.endpoint[assignment.protocol].read_batch(b_split)
async def p2p_migrate(self, assignment: MigrationAssignment):
batch = [(
assign.mr_key,
assign.mr_key,
Comment on lines +54 to +55
Copy link

Copilot AI Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tuple contains duplicate mr_key values at positions 0 and 1. Based on the DLSlime v0.0.2 interface documentation and typical RDMA operation patterns, the tuple should likely contain (local_mr_key, remote_mr_key, target_offset, source_offset, length). If both local and remote memory regions use the same key, this duplication may be intentional, but please verify this matches the expected DLSlime v0.0.2 API signature.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same local and remote mr_key (0 for k cache and 1 for v cache).

assign.target_offset,
assign.source_offset,
assign.length,
) for assign in assignment.batch]

future = self.endpoint[assignment.protocol].read(batch)
if LMDEPLOY_USE_ASYNC_MIGRATION:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, future.wait)
else:
await read_batch_coroutine(self.endpoint[assignment.protocol], batch)
return future.wait()


@MIGRATION_BACKENDS.register_module(MigrationBackend.DLSlime.name)
class DLSlimeBackend(MigrationBackendImpl):
"""DLSlime Transfer Engine."""

def __init__(self):
self.links: Dict[int, DLSlimeMigrationManagement] = {}
self.links: Dict[str, DLSlimeMigrationManagement] = {}

def p2p_initialize(self, init_request: DistServeInitRequest):
self.links[init_request.remote_engine_id] = DLSlimeMigrationManagement(init_request)

def register_memory_region(self, register_mr_request: DistServeRegisterMRMessage):
self.links[register_mr_request.remote_engine_id].register_memory_region(register_mr_request)

def endpoint_info(self, remote_engine_id: int, protocol: MigrationProtocol):
return self.links[remote_engine_id].endpoint[protocol].endpoint_info
def endpoint_info(self, remote_engine_id: str, protocol: MigrationProtocol):
Copy link

Copilot AI Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter type for remote_engine_id has been changed from int to str in the implementation, but the abstract base class in base.py (line 20) still defines it as int. This creates a type mismatch. The abstract method signature should be updated to match the implementation, or all implementations should use consistent types. Note that other parts of the codebase (e.g., DistServeInitRequest) use str for engine IDs.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

str is right.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typehint of remote_engine_id in base is fixed.

return self.links[remote_engine_id].endpoint[protocol].endpoint_info()

def p2p_connect(self, remote_engine_id: str, conn_req: DistServeKVTransferEndpointInfo):
self.links[remote_engine_id].connect(conn_req)

async def p2p_migrate(self, assignment: MigrationAssignment, async_op: bool = False):
await self.links[assignment.remote_engine_id].p2p_migrate(assignment, async_op=async_op)
await self.links[assignment.remote_engine_id].p2p_migrate(assignment)

def store(self, assignment: MigrationAssignment, async_op: bool = False):
raise NotImplementedError
Expand Down
2 changes: 1 addition & 1 deletion lmdeploy/pytorch/disagg/conn/engine_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, engine: 'Engine'):

self.use_unique_kvtransfer_engine = os.environ.get('LMDEPLOY_USE_UNIQUE_KVTRANSFER_ENGINE', False)

async def p2p_initialize(self, init_request: DistServeInitRequest):
def p2p_initialize(self, init_request: DistServeInitRequest):
ctx = zmq.asyncio.Context(2)
sender = ctx.socket(zmq.PUSH)
sender_port = find_available_port()
Expand Down
4 changes: 2 additions & 2 deletions lmdeploy/pytorch/disagg/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class MigrationExecutionBatch(BaseModel):

class AssignmentInstruct(BaseModel):
"""Assignment Batch."""
mr_key: str
mr_key: int
target_offset: int
source_offset: int
length: int
Expand All @@ -42,7 +42,7 @@ class DistServeRegisterMRMessage(BaseModel):
protocol: MigrationProtocol

remote_engine_id: str
mr_key: str
mr_key: int
addr: int
offset: int
length: int
5 changes: 2 additions & 3 deletions lmdeploy/pytorch/engine/cache_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def p2p_initialize(self, migration_init_request: DistServeInitRequest) -> DistSe
continue
register_mr_request = DistServeRegisterMRMessage(protocol=migration_init_request.protocol,
remote_engine_id=migration_init_request.remote_engine_id,
mr_key=str(i),
mr_key=i,
addr=t.data_ptr(),
offset=t.storage_offset(),
length=t.numel() * t.itemsize)
Expand Down Expand Up @@ -445,8 +445,7 @@ def get_assignment_batch(mr_key, block_ids, assignment_len, layer_stride, remote
if t.numel() == 0:
continue
assignment_batch.extend(
get_assignment_batch(str(i), blocks_to_migration, assignment_len, layer_stride,
remote_layer_stride))
get_assignment_batch(i, blocks_to_migration, assignment_len, layer_stride, remote_layer_stride))
await self.migration_backend_impl.p2p_migrate(
MigrationAssignment(
protocol=migration_execution_inputs.protocol,
Expand Down
6 changes: 3 additions & 3 deletions lmdeploy/pytorch/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,13 +416,13 @@ def model_config(self) -> ModelConfig:
"""Model config."""
return self.executor.model_config

async def p2p_initialize(self, init_request: DistServeInitRequest):
return await self.engine_conn.p2p_initialize(init_request)
def p2p_initialize(self, init_request: DistServeInitRequest):
return self.engine_conn.p2p_initialize(init_request)

def p2p_connect(self, conn_request: DistServeConnectionRequest):
return self.engine_conn.p2p_connect(conn_request)

async def p2p_drop_connect(self, drop_conn_request: DistServeDropConnectionRequest):
def p2p_drop_connect(self, drop_conn_request: DistServeDropConnectionRequest):
return self.engine_conn.p2p_drop_connect(drop_conn_request)

def _loop_finally(self):
Expand Down
4 changes: 2 additions & 2 deletions lmdeploy/pytorch/engine/mp_engine/base_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ def get_schedule_metrics(self):
"""Get schedule metrics."""
return self.engine.get_schedule_metrics()

async def p2p_initialize(self, conn_request: DistServeInitRequest):
def p2p_initialize(self, conn_request: DistServeInitRequest):
"""Init rdma link."""
return await self.engine.p2p_initialize(conn_request)
return self.engine.p2p_initialize(conn_request)

def p2p_connect(self, conn_request: DistServeConnectionRequest):
"""rdma_connect."""
Expand Down
Loading