-
Notifications
You must be signed in to change notification settings - Fork 0
<fix>[compute]: <description #3262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: zsv_4.10.28
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,6 +45,15 @@ | |
| import org.zstack.header.message.*; | ||
| import org.zstack.header.network.l3.*; | ||
| import org.zstack.header.storage.primary.*; | ||
| import org.zstack.header.storage.snapshot.*; | ||
| import org.zstack.header.storage.snapshot.group.VolumeSnapshotGroupRefVO; | ||
| import org.zstack.header.storage.snapshot.group.VolumeSnapshotGroupRefVO_; | ||
| import org.zstack.header.storage.snapshot.group.VolumeSnapshotGroupVO; | ||
| import org.zstack.header.storage.snapshot.group.VolumeSnapshotGroupVO_; | ||
| import org.zstack.header.tag.SystemTagVO; | ||
| import org.zstack.header.tag.SystemTagVO_; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.Base64; | ||
| import org.zstack.header.vm.*; | ||
| import org.zstack.header.vm.ChangeVmMetaDataMsg.AtomicHostUuid; | ||
| import org.zstack.header.vm.ChangeVmMetaDataMsg.AtomicVmState; | ||
|
|
@@ -66,30 +75,27 @@ | |
| import org.zstack.network.l3.L3NetworkManager; | ||
| import org.zstack.network.service.DnsUtils; | ||
| import org.zstack.network.service.NetworkServiceManager; | ||
| import org.zstack.resourceconfig.ResourceConfig; | ||
| import org.zstack.resourceconfig.ResourceConfigFacade; | ||
| import org.zstack.resourceconfig.*; | ||
| import org.zstack.tag.SystemTagCreator; | ||
| import org.zstack.tag.SystemTagUtils; | ||
| import org.zstack.tag.TagManager; | ||
| import org.zstack.utils.CollectionUtils; | ||
| import org.zstack.utils.ExceptionDSL; | ||
| import org.zstack.utils.ObjectUtils; | ||
| import org.zstack.utils.Utils; | ||
| import org.zstack.utils.*; | ||
| import org.zstack.utils.function.ForEachFunction; | ||
| import org.zstack.utils.function.Function; | ||
| import org.zstack.utils.gson.JSONObjectUtil; | ||
| import org.zstack.utils.logging.CLogger; | ||
| import org.zstack.utils.network.NicIpAddressInfo; | ||
| import org.zstack.utils.network.IPv6Constants; | ||
| import org.zstack.utils.network.IPv6NetworkUtils; | ||
| import org.zstack.utils.network.NetworkUtils; | ||
| import org.zstack.utils.network.NicIpAddressInfo; | ||
|
|
||
| import javax.persistence.PersistenceException; | ||
| import javax.persistence.Tuple; | ||
| import javax.persistence.TypedQuery; | ||
| import java.sql.Timestamp; | ||
| import java.time.LocalDateTime; | ||
| import java.util.*; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static java.util.Arrays.asList; | ||
|
|
@@ -140,6 +146,8 @@ public class VmInstanceBase extends AbstractVmInstance { | |
| private VmInstanceResourceMetadataManager vidm; | ||
| @Autowired | ||
| private NetworkServiceManager nwServiceMgr; | ||
| @Autowired | ||
| private ResourceDestinationMaker destMaker; | ||
|
|
||
| protected VmInstanceVO self; | ||
| protected VmInstanceVO originalCopy; | ||
|
|
@@ -533,6 +541,8 @@ protected void handleLocalMessage(Message msg) { | |
| handle((CancelFlattenVmInstanceMsg) msg); | ||
| } else if (msg instanceof KvmReportVmShutdownEventMsg) { | ||
| handle((KvmReportVmShutdownEventMsg) msg); | ||
| } else if (msg instanceof UpdateVmInstanceMetadataMsg) { | ||
| handle((UpdateVmInstanceMetadataMsg) msg); | ||
| } else { | ||
| VmInstanceBaseExtensionFactory ext = vmMgr.getVmInstanceBaseExtensionFactory(msg); | ||
| if (ext != null) { | ||
|
|
@@ -9369,5 +9379,139 @@ public void run(MessageReply reply) { | |
| } | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * 处理元数据更新消息。 | ||
| * | ||
| * <p>Layer 2 串行化保证:通过 ChainTask 确保同一 VM 的元数据更新串行执行。 | ||
| * 该消息由 hash 环路由,同一 VM 必定到达同一 MN,因此此处的 ChainTask | ||
| * 是跨 MN 串行的全局唯一汇聚点。</p> | ||
| * | ||
| * <p>失败路径不创建新 GC,直接返回错误 reply,由 GC 端的 onUpdateFail() 统一处理重试。</p> | ||
| */ | ||
| private void handle(UpdateVmInstanceMetadataMsg msg) { | ||
| thdf.chainSubmit(new ChainTask(msg) { | ||
| @Override | ||
| public String getSyncSignature() { | ||
| return String.format("handle-update-vm-%s-metadata", msg.getUuid()); | ||
| } | ||
|
|
||
| @Override | ||
| public void run(SyncTaskChain chain) { | ||
| doHandleUpdateVmInstanceMetadata(msg); | ||
| chain.next(); | ||
| } | ||
|
Comment on lines
+9400
to
+9403
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 链路串行化被提前 Line 9402 在异步 🛠️ 建议修复- private void handle(UpdateVmInstanceMetadataMsg msg) {
+ private void handle(UpdateVmInstanceMetadataMsg msg) {
thdf.chainSubmit(new ChainTask(msg) {
@@
`@Override`
public void run(SyncTaskChain chain) {
- doHandleUpdateVmInstanceMetadata(msg);
- chain.next();
+ doHandleUpdateVmInstanceMetadata(msg, new NoErrorCompletion(chain) {
+ `@Override`
+ public void done() {
+ chain.next();
+ }
+ });
}
@@
- private void doHandleUpdateVmInstanceMetadata(UpdateVmInstanceMetadataMsg msg) {
+ private void doHandleUpdateVmInstanceMetadata(UpdateVmInstanceMetadataMsg msg, NoErrorCompletion completion) {
@@
bus.send(umsg, new CloudBusCallBack(msg) {
`@Override`
public void run(MessageReply r) {
UpdateVmInstanceMetadataOnPrimaryStorageReply reply = new UpdateVmInstanceMetadataOnPrimaryStorageReply();
@@
bus.reply(msg, reply);
+ completion.done();
}
});
}🤖 Prompt for AI Agents |
||
|
|
||
| @Override | ||
| public String getName() { | ||
| return String.format("handle-update-vm-%s-metadata-task", msg.getUuid()); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| private void doHandleUpdateVmInstanceMetadata(UpdateVmInstanceMetadataMsg msg) { | ||
| Tuple tuple = Q.New(VolumeVO.class).select(VolumeVO_.primaryStorageUuid, VolumeVO_.uuid) | ||
| .eq(VolumeVO_.vmInstanceUuid, msg.getUuid()).eq(VolumeVO_.type, VolumeType.Root).findTuple(); | ||
| String primaryStorageUuid = tuple.get(0, String.class); | ||
| String rootVolumeUuid = tuple.get(1, String.class); | ||
|
Comment on lines
+9413
to
+9416
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 补充根卷查询为空的兜底处理。 Line 9389-9392 若根卷不存在, 🛠️ 建议修复- Tuple tuple = Q.New(VolumeVO.class).select(VolumeVO_.primaryStorageUuid, VolumeVO_.uuid)
- .eq(VolumeVO_.vmInstanceUuid, msg.getUuid()).eq(VolumeVO_.type, VolumeType.Root).findTuple();
- String primaryStorageUuid = tuple.get(0, String.class);
- String rootVolumeUuid = tuple.get(1, String.class);
+ Tuple tuple = Q.New(VolumeVO.class).select(VolumeVO_.primaryStorageUuid, VolumeVO_.uuid)
+ .eq(VolumeVO_.vmInstanceUuid, msg.getUuid()).eq(VolumeVO_.type, VolumeType.Root).findTuple();
+ if (tuple == null) {
+ UpdateVmInstanceMetadataOnPrimaryStorageReply reply = new UpdateVmInstanceMetadataOnPrimaryStorageReply();
+ reply.setError(operr("cannot find root volume for vm[uuid:%s]", msg.getUuid()));
+ bus.reply(msg, reply);
+ return;
+ }
+ String primaryStorageUuid = tuple.get(0, String.class);
+ String rootVolumeUuid = tuple.get(1, String.class);🤖 Prompt for AI Agents |
||
|
|
||
| UpdateVmInstanceMetadataOnPrimaryStorageMsg umsg = new UpdateVmInstanceMetadataOnPrimaryStorageMsg(); | ||
| umsg.setMetadata(buildVmInstanceMetadata(msg.getUuid())); | ||
| umsg.setPrimaryStorageUuid(primaryStorageUuid); | ||
| umsg.setRootVolumeUuid(rootVolumeUuid); | ||
| bus.makeLocalServiceId(umsg, PrimaryStorageConstant.SERVICE_ID); | ||
| bus.send(umsg, new CloudBusCallBack(msg) { | ||
| @Override | ||
| public void run(MessageReply r) { | ||
| UpdateVmInstanceMetadataOnPrimaryStorageReply reply = new UpdateVmInstanceMetadataOnPrimaryStorageReply(); | ||
|
|
||
| if (!r.isSuccess()) { | ||
| // 失败:直接返回错误 reply,不创建新 GC(避免滚雪球膨胀) | ||
| // GC 端收到失败 reply 后由 onUpdateFail() 统一走指数退避重试 | ||
| reply.setError(Platform.operr("failed to update vm[uuid=%s] metadata on primary storage", | ||
| msg.getUuid()).withCause(r.getError())); | ||
| } | ||
| bus.reply(msg, reply); | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| }); | ||
| } | ||
|
|
||
| private String buildVmInstanceMetadata(String vmInstanceUuid) { | ||
| VmInstanceMetadataDTO dto = new VmInstanceMetadataDTO(); | ||
|
|
||
| // ── VM 本体 ── | ||
| VmInstanceVO vm = Q.New(VmInstanceVO.class).eq(VmInstanceVO_.uuid, vmInstanceUuid).find(); | ||
| dto.vm = buildResourceMetadata(vm.getUuid(), vm); | ||
|
|
||
| // ── 云盘(挂载的 + 已卸载但 lastVmInstanceUuid 指向本 VM 的) ── | ||
| List<VolumeVO> volumes = new ArrayList<>(); | ||
| volumes.addAll(Q.New(VolumeVO.class).eq(VolumeVO_.vmInstanceUuid, vmInstanceUuid).list()); | ||
| volumes.addAll(Q.New(VolumeVO.class).isNull(VolumeVO_.vmInstanceUuid) | ||
| .eq(VolumeVO_.lastVmInstanceUuid, vmInstanceUuid).list()); | ||
| volumes.forEach(v -> dto.volumes.add(buildResourceMetadata(v.getUuid(), v))); | ||
|
|
||
| // ── 网卡 ── | ||
| List<VmNicVO> nics = Q.New(VmNicVO.class).eq(VmNicVO_.vmInstanceUuid, vmInstanceUuid).list(); | ||
| nics.forEach(n -> dto.nics.add(buildResourceMetadata(n.getUuid(), n))); | ||
|
|
||
| // ── 快照 ── | ||
| List<String> volumeUuids = volumes.stream().map(VolumeVO::getUuid).collect(Collectors.toList()); | ||
| if (!volumeUuids.isEmpty()) { | ||
| Q.New(VolumeSnapshotVO.class).in(VolumeSnapshotVO_.volumeUuid, volumeUuids).list() | ||
| .forEach(s -> dto.snapshots | ||
| .computeIfAbsent(s.getVolumeUuid(), k -> new ArrayList<>()) | ||
| .add(JSONObjectUtil.toJsonString(s))); | ||
| } | ||
|
|
||
| // ── 快照组 ── | ||
| List<VolumeSnapshotGroupVO> groups = Q.New(VolumeSnapshotGroupVO.class) | ||
| .eq(VolumeSnapshotGroupVO_.vmInstanceUuid, vmInstanceUuid).list(); | ||
| dto.snapshotGroups = groups.stream() | ||
| .map(JSONObjectUtil::toJsonString).collect(Collectors.toList()); | ||
|
|
||
| List<String> groupUuids = groups.stream() | ||
| .map(VolumeSnapshotGroupVO::getUuid).collect(Collectors.toList()); | ||
| if (!groupUuids.isEmpty()) { | ||
| dto.snapshotGroupRefs = Q.New(VolumeSnapshotGroupRefVO.class) | ||
| .in(VolumeSnapshotGroupRefVO_.volumeSnapshotGroupUuid, groupUuids).list() | ||
| .stream().map(JSONObjectUtil::toJsonString).collect(Collectors.toList()); | ||
| } | ||
|
|
||
| return JSONObjectUtil.toJsonString(dto); | ||
| } | ||
|
|
||
| /** | ||
| * 构建单个资源的 {@link VmInstanceMetadataDTO.ResourceMetadata}。 | ||
| * | ||
| * <p>VO 全量 JSON 明文存储;SystemTagVO 和 ResourceConfigVO 整体列表序列化为 JSON 数组后 | ||
| * 一次性 Base64 编码,以保护可能包含的密码、密钥等敏感信息。</p> | ||
| * | ||
| * @param resourceUuid 资源 UUID | ||
| * @param vo 资源 VO 对象(VmInstanceVO / VolumeVO / VmNicVO) | ||
| * @return 填充完毕的 ResourceMetadata | ||
| */ | ||
| private VmInstanceMetadataDTO.ResourceMetadata buildResourceMetadata(String resourceUuid, Object vo) { | ||
| VmInstanceMetadataDTO.ResourceMetadata meta = new VmInstanceMetadataDTO.ResourceMetadata(); | ||
| meta.resourceUuid = resourceUuid; | ||
| meta.vo = JSONObjectUtil.toJsonString(vo); | ||
|
|
||
| // SystemTagVO: 全部 → JSON 数组 → Base64 | ||
| List<SystemTagVO> tagVOs = Q.New(SystemTagVO.class) | ||
| .eq(SystemTagVO_.resourceUuid, resourceUuid).list(); | ||
| List<String> tagJsons = tagVOs.stream() | ||
| .map(JSONObjectUtil::toJsonString).collect(Collectors.toList()); | ||
| meta.systemTags = Base64.getEncoder().encodeToString( | ||
| JSONObjectUtil.toJsonString(tagJsons).getBytes(StandardCharsets.UTF_8)); | ||
|
|
||
| // ResourceConfigVO: 全部 → JSON 数组 → Base64 | ||
| List<ResourceConfigVO> cfgVOs = Q.New(ResourceConfigVO.class) | ||
| .eq(ResourceConfigVO_.resourceUuid, resourceUuid).list(); | ||
| List<String> cfgJsons = cfgVOs.stream() | ||
| .map(JSONObjectUtil::toJsonString).collect(Collectors.toList()); | ||
| meta.resourceConfigs = Base64.getEncoder().encodeToString( | ||
| JSONObjectUtil.toJsonString(cfgJsons).getBytes(StandardCharsets.UTF_8)); | ||
|
|
||
| return meta; | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: MatheMatrix/zstack
Length of output: 2170
🏁 Script executed:
Repository: MatheMatrix/zstack
Length of output: 8697
🏁 Script executed:
Repository: MatheMatrix/zstack
Length of output: 44
🏁 Script executed:
Repository: MatheMatrix/zstack
Length of output: 1264
移除未使用的
APIRegisterVmInstanceMsg导入。该导入在
intercept()方法中不存在对应的处理逻辑。APIRegisterVmInstanceMsg实际由PrimaryStorageBase处理,与本文件无关。🤖 Prompt for AI Agents