diff --git a/lib/forkvm/README.md b/lib/forkvm/README.md index 1bff4318..bb6567c7 100644 --- a/lib/forkvm/README.md +++ b/lib/forkvm/README.md @@ -14,6 +14,16 @@ to work across implementations. For networked forks, the fork gets a fresh host/guest identity (IP, MAC, TAP) instead of reusing the source identity. +## Fork data copy behavior + +- Guest directory copy is **sparse-only** for regular files. +- Copy uses `SEEK_DATA`/`SEEK_HOLE` to preserve sparse holes and avoid + de-sparsifying large overlay images. +- If sparse seeking is unsupported on the underlying filesystem, fork fails + with an explicit sparse-capability error (no dense-copy fallback). +- Non-essential runtime artifacts are skipped during copy: + `logs/` subtree and runtime socket files. + ## Cloud Hypervisor - Snapshot-based forks are supported by rewriting snapshot configuration before diff --git a/lib/forkvm/copy.go b/lib/forkvm/copy.go index b942f8c0..2076c0ce 100644 --- a/lib/forkvm/copy.go +++ b/lib/forkvm/copy.go @@ -1,15 +1,18 @@ package forkvm import ( + "errors" "fmt" - "io" "io/fs" "os" "path/filepath" ) +var ErrSparseCopyUnsupported = errors.New("sparse copy unsupported") + // CopyGuestDirectory recursively copies a guest directory to a new destination. -// Runtime sockets are skipped because they are host-runtime artifacts. +// Regular files are copied using sparse extent copy only (SEEK_DATA/SEEK_HOLE). +// Runtime sockets and logs are skipped because they are host-runtime artifacts. func CopyGuestDirectory(srcDir, dstDir string) error { srcInfo, err := os.Stat(srcDir) if err != nil { @@ -35,6 +38,9 @@ func CopyGuestDirectory(srcDir, dstDir string) error { if relPath == "." { return nil } + if d.IsDir() && shouldSkipDirectory(relPath) { + return filepath.SkipDir + } dstPath := filepath.Join(dstDir, relPath) info, err := d.Info() @@ -51,7 +57,7 @@ func CopyGuestDirectory(srcDir, dstDir string) error { return nil case mode.IsRegular(): - if err := copyRegularFile(path, dstPath, mode.Perm()); err != nil { + if err := copyRegularFileSparse(path, dstPath, mode.Perm()); err != nil { return fmt.Errorf("copy file %s: %w", path, err) } return nil @@ -76,24 +82,6 @@ func CopyGuestDirectory(srcDir, dstDir string) error { }) } -func copyRegularFile(srcPath, dstPath string, perms fs.FileMode) error { - src, err := os.Open(srcPath) - if err != nil { - return err - } - defer src.Close() - - dst, err := os.OpenFile(dstPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, perms) - if err != nil { - return err - } - if _, err := io.Copy(dst, src); err != nil { - _ = dst.Close() - return err - } - if err := dst.Close(); err != nil { - return err - } - - return nil +func shouldSkipDirectory(relPath string) bool { + return relPath == "logs" } diff --git a/lib/forkvm/copy_sparse_unix.go b/lib/forkvm/copy_sparse_unix.go new file mode 100644 index 00000000..4d0f587f --- /dev/null +++ b/lib/forkvm/copy_sparse_unix.go @@ -0,0 +1,144 @@ +//go:build darwin || linux + +package forkvm + +import ( + "errors" + "fmt" + "io" + "io/fs" + "os" + + "golang.org/x/sys/unix" +) + +var ( + seekDataFn = func(fd int, offset int64) (int64, error) { + return unix.Seek(fd, offset, unix.SEEK_DATA) + } + seekHoleFn = func(fd int, offset int64) (int64, error) { + return unix.Seek(fd, offset, unix.SEEK_HOLE) + } +) + +func copyRegularFileSparse(srcPath, dstPath string, perms fs.FileMode) (retErr error) { + src, err := os.Open(srcPath) + if err != nil { + return err + } + defer src.Close() + + info, err := src.Stat() + if err != nil { + return fmt.Errorf("stat source file: %w", err) + } + + dst, err := os.OpenFile(dstPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, perms) + if err != nil { + return err + } + defer func() { + if cerr := dst.Close(); retErr == nil && cerr != nil { + retErr = cerr + } + }() + + size := info.Size() + if err := unix.Ftruncate(int(dst.Fd()), size); err != nil { + return fmt.Errorf("truncate destination file: %w", err) + } + if size == 0 { + return nil + } + + srcFD := int(src.Fd()) + dstFD := int(dst.Fd()) + offset := int64(0) + + for offset < size { + dataStart, err := seekDataFn(srcFD, offset) + if err != nil { + if errors.Is(err, unix.ENXIO) { + break + } + if isSparseUnsupportedError(err) { + return fmt.Errorf("%w: SEEK_DATA unsupported for %s: %v", ErrSparseCopyUnsupported, srcPath, err) + } + return fmt.Errorf("seek data at offset %d: %w", offset, err) + } + if dataStart >= size { + break + } + + dataEnd, err := seekHoleFn(srcFD, dataStart) + if err != nil { + if errors.Is(err, unix.ENXIO) { + dataEnd = size + } else if isSparseUnsupportedError(err) { + return fmt.Errorf("%w: SEEK_HOLE unsupported for %s: %v", ErrSparseCopyUnsupported, srcPath, err) + } else { + return fmt.Errorf("seek hole at offset %d: %w", dataStart, err) + } + } + + if dataEnd > size { + dataEnd = size + } + if dataEnd < dataStart { + return fmt.Errorf("invalid sparse extent (%d..%d) for %s", dataStart, dataEnd, srcPath) + } + + length := dataEnd - dataStart + if length > 0 { + if err := copyFileExtent(srcFD, dstFD, dataStart, length); err != nil { + return fmt.Errorf("copy sparse extent [%d,%d): %w", dataStart, dataEnd, err) + } + } + offset = dataEnd + } + + return nil +} + +func copyFileExtent(srcFD, dstFD int, offset, length int64) error { + const chunkSize = 1 << 20 // 1 MiB + buf := make([]byte, chunkSize) + + pos := offset + remaining := length + for remaining > 0 { + toRead := int64(len(buf)) + if remaining < toRead { + toRead = remaining + } + + n, err := unix.Pread(srcFD, buf[:int(toRead)], pos) + if err != nil { + return err + } + if n == 0 { + return io.ErrUnexpectedEOF + } + + written := 0 + for written < n { + wn, werr := unix.Pwrite(dstFD, buf[written:n], pos+int64(written)) + if werr != nil { + return werr + } + if wn == 0 { + return io.ErrShortWrite + } + written += wn + } + + pos += int64(n) + remaining -= int64(n) + } + + return nil +} + +func isSparseUnsupportedError(err error) bool { + return errors.Is(err, unix.EINVAL) || errors.Is(err, unix.ENOTSUP) || errors.Is(err, unix.EOPNOTSUPP) +} diff --git a/lib/forkvm/copy_sparse_unix_test.go b/lib/forkvm/copy_sparse_unix_test.go new file mode 100644 index 00000000..7cf351df --- /dev/null +++ b/lib/forkvm/copy_sparse_unix_test.go @@ -0,0 +1,118 @@ +//go:build darwin || linux + +package forkvm + +import ( + "errors" + "fmt" + "net" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sys/unix" +) + +func TestCopyGuestDirectory_PreservesSparseFiles(t *testing.T) { + src := filepath.Join(t.TempDir(), "src") + dst := filepath.Join(t.TempDir(), "dst") + require.NoError(t, os.MkdirAll(src, 0755)) + + srcOverlay := filepath.Join(src, "overlay.raw") + const logicalSize = 256 * 1024 * 1024 // 256 MiB logical, tiny physical + require.NoError(t, writeSparseFile(srcOverlay, logicalSize)) + + require.NoError(t, CopyGuestDirectory(src, dst)) + + dstOverlay := filepath.Join(dst, "overlay.raw") + srcInfo, err := os.Stat(srcOverlay) + require.NoError(t, err) + dstInfo, err := os.Stat(dstOverlay) + require.NoError(t, err) + assert.Equal(t, srcInfo.Size(), dstInfo.Size(), "logical size must match") + + srcAllocated, err := allocatedBytes(srcOverlay) + require.NoError(t, err) + dstAllocated, err := allocatedBytes(dstOverlay) + require.NoError(t, err) + + // Guard against dense copy inflation. + assert.Less(t, dstAllocated, int64(logicalSize/10), "destination should remain sparse") + // Allow modest filesystem allocation variance while preserving sparsity. + assert.LessOrEqual(t, dstAllocated, srcAllocated+8*1024*1024) +} + +func TestCopyGuestDirectory_FailsWhenSparseSeekingUnsupported(t *testing.T) { + src := filepath.Join(t.TempDir(), "src") + dst := filepath.Join(t.TempDir(), "dst") + require.NoError(t, os.MkdirAll(src, 0755)) + require.NoError(t, os.WriteFile(filepath.Join(src, "metadata.json"), []byte(`{"id":"abc"}`), 0644)) + + origSeekData := seekDataFn + origSeekHole := seekHoleFn + seekDataFn = func(fd int, offset int64) (int64, error) { + return 0, unix.EINVAL + } + seekHoleFn = func(fd int, offset int64) (int64, error) { + return 0, unix.EINVAL + } + defer func() { + seekDataFn = origSeekData + seekHoleFn = origSeekHole + }() + + err := CopyGuestDirectory(src, dst) + require.Error(t, err) + assert.True(t, errors.Is(err, ErrSparseCopyUnsupported), "error should indicate sparse support is required") +} + +func TestCopyGuestDirectory_SkipsSocketRuntimeArtifacts(t *testing.T) { + base, err := os.MkdirTemp("/tmp", "forkvm-*") + require.NoError(t, err) + t.Cleanup(func() { _ = os.RemoveAll(base) }) + + src := filepath.Join(base, "src") + dst := filepath.Join(base, "dst") + require.NoError(t, os.MkdirAll(src, 0755)) + require.NoError(t, os.WriteFile(filepath.Join(src, "metadata.json"), []byte(`{"id":"abc"}`), 0644)) + + socketPath := filepath.Join(src, fmt.Sprintf("vz-%d.sock", time.Now().UnixNano())) + listener, err := net.Listen("unix", socketPath) + require.NoError(t, err) + defer func() { _ = listener.Close() }() + + require.NoError(t, CopyGuestDirectory(src, dst)) + + assert.NoFileExists(t, filepath.Join(dst, filepath.Base(socketPath))) + assert.FileExists(t, filepath.Join(dst, "metadata.json")) +} + +func writeSparseFile(path string, logicalSize int64) error { + f, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + + if _, err := f.WriteAt([]byte("begin"), 0); err != nil { + return err + } + if _, err := f.WriteAt([]byte("middle"), logicalSize/2); err != nil { + return err + } + if _, err := f.WriteAt([]byte("end"), logicalSize-4); err != nil { + return err + } + return f.Truncate(logicalSize) +} + +func allocatedBytes(path string) (int64, error) { + var st unix.Stat_t + if err := unix.Stat(path, &st); err != nil { + return 0, err + } + return st.Blocks * 512, nil +} diff --git a/lib/forkvm/copy_sparse_unsupported.go b/lib/forkvm/copy_sparse_unsupported.go new file mode 100644 index 00000000..78b6fba6 --- /dev/null +++ b/lib/forkvm/copy_sparse_unsupported.go @@ -0,0 +1,14 @@ +//go:build !darwin && !linux + +package forkvm + +import ( + "fmt" + "io/fs" +) + +func copyRegularFileSparse(srcPath, dstPath string, perms fs.FileMode) error { + _ = dstPath + _ = perms + return fmt.Errorf("%w: unsupported platform for sparse copy: %s", ErrSparseCopyUnsupported, srcPath) +} diff --git a/lib/forkvm/copy_test.go b/lib/forkvm/copy_test.go index 99e9226f..762499e4 100644 --- a/lib/forkvm/copy_test.go +++ b/lib/forkvm/copy_test.go @@ -14,19 +14,26 @@ func TestCopyGuestDirectory(t *testing.T) { dst := filepath.Join(t.TempDir(), "dst") require.NoError(t, os.MkdirAll(filepath.Join(src, "logs"), 0755)) + require.NoError(t, os.MkdirAll(filepath.Join(src, "snapshots", "snapshot-latest"), 0755)) require.NoError(t, os.WriteFile(filepath.Join(src, "metadata.json"), []byte(`{"id":"abc"}`), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(src, "config.ext4"), []byte("config"), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(src, "overlay.raw"), []byte("overlay"), 0644)) require.NoError(t, os.WriteFile(filepath.Join(src, "logs", "app.log"), []byte("hello"), 0644)) + require.NoError(t, os.WriteFile(filepath.Join(src, "snapshots", "snapshot-latest", "config.json"), []byte(`{}`), 0644)) require.NoError(t, os.Symlink("metadata.json", filepath.Join(src, "meta-link"))) require.NoError(t, CopyGuestDirectory(src, dst)) assert.FileExists(t, filepath.Join(dst, "metadata.json")) - assert.FileExists(t, filepath.Join(dst, "logs", "app.log")) + assert.FileExists(t, filepath.Join(dst, "config.ext4")) + assert.FileExists(t, filepath.Join(dst, "overlay.raw")) + assert.FileExists(t, filepath.Join(dst, "snapshots", "snapshot-latest", "config.json")) + assert.NoFileExists(t, filepath.Join(dst, "logs", "app.log")) assert.FileExists(t, filepath.Join(dst, "meta-link")) - app, err := os.ReadFile(filepath.Join(dst, "logs", "app.log")) - require.NoError(t, err) - assert.Equal(t, "hello", string(app)) + _, err := os.Stat(filepath.Join(dst, "logs")) + assert.Error(t, err) + assert.True(t, os.IsNotExist(err)) linkTarget, err := os.Readlink(filepath.Join(dst, "meta-link")) require.NoError(t, err) diff --git a/lib/instances/exec_test.go b/lib/instances/exec_test.go index 94f54ef2..cf1a5c74 100644 --- a/lib/instances/exec_test.go +++ b/lib/instances/exec_test.go @@ -1,6 +1,7 @@ package instances import ( + "bytes" "context" "fmt" "os" @@ -21,9 +22,21 @@ import ( func waitForExecAgent(ctx context.Context, mgr *manager, instanceID string, timeout time.Duration) error { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { - logs, err := collectLogs(ctx, mgr, instanceID, 100) - if err == nil && strings.Contains(logs, "[guest-agent] listening on vsock port 2222") { - return nil + meta, err := mgr.loadMetadata(instanceID) + if err == nil { + dialer, derr := hypervisor.NewVsockDialer(meta.HypervisorType, meta.VsockSocket, meta.VsockCID) + if derr == nil { + var stdout, stderr bytes.Buffer + exit, eerr := guest.ExecIntoInstance(ctx, dialer, guest.ExecOptions{ + Command: []string{"true"}, + Stdout: &stdout, + Stderr: &stderr, + WaitForAgent: 1 * time.Second, + }) + if eerr == nil && exit.Code == 0 { + return nil + } + } } time.Sleep(500 * time.Millisecond) } diff --git a/lib/instances/fork.go b/lib/instances/fork.go index 526b7100..028e6b3b 100644 --- a/lib/instances/fork.go +++ b/lib/instances/fork.go @@ -110,13 +110,17 @@ func (m *manager) forkInstance(ctx context.Context, id string, req ForkInstanceR } func ensureGuestAgentReadyForRunningFork(ctx context.Context, source *StoredMetadata) error { - if source == nil || !source.NetworkEnabled || source.SkipGuestAgent { + return ensureGuestAgentReadyForForkPhase(ctx, source, "before running fork") +} + +func ensureGuestAgentReadyForForkPhase(ctx context.Context, inst *StoredMetadata, phase string) error { + if inst == nil || !inst.NetworkEnabled || inst.SkipGuestAgent { return nil } - dialer, err := hypervisor.NewVsockDialer(source.HypervisorType, source.VsockSocket, source.VsockCID) + dialer, err := hypervisor.NewVsockDialer(inst.HypervisorType, inst.VsockSocket, inst.VsockCID) if err != nil { - return fmt.Errorf("create vsock dialer for running fork readiness check: %w", err) + return fmt.Errorf("create vsock dialer for %s readiness check: %w", phase, err) } var stdout, stderr bytes.Buffer @@ -127,12 +131,12 @@ func ensureGuestAgentReadyForRunningFork(ctx context.Context, source *StoredMeta WaitForAgent: 120 * time.Second, }) if err != nil { - return fmt.Errorf("wait for guest agent readiness before running fork: %w", err) + return fmt.Errorf("wait for guest agent readiness %s: %w", phase, err) } if exit.Code != 0 { return fmt.Errorf( - "guest agent readiness probe failed before running fork (exit=%d, stdout=%q, stderr=%q)", - exit.Code, strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), + "guest agent readiness probe failed %s (exit=%d, stdout=%q, stderr=%q)", + phase, exit.Code, strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), ) } return nil @@ -241,6 +245,9 @@ func (m *manager) forkInstanceFromStoppedOrStandby(ctx context.Context, id strin defer cu.Clean() if err := forkvm.CopyGuestDirectory(srcDir, dstDir); err != nil { + if errors.Is(err, forkvm.ErrSparseCopyUnsupported) { + return nil, fmt.Errorf("fork requires sparse-capable filesystem (SEEK_DATA/SEEK_HOLE unsupported): %w", err) + } return nil, fmt.Errorf("clone guest directory: %w", err) } diff --git a/lib/instances/fork_test.go b/lib/instances/fork_test.go index 047eac8a..4a6e0860 100644 --- a/lib/instances/fork_test.go +++ b/lib/instances/fork_test.go @@ -371,7 +371,6 @@ func TestForkCloudHypervisorFromRunningNetwork(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { _ = manager.DeleteInstance(context.Background(), source.Id) }) require.NoError(t, waitForVMReady(ctx, source.SocketPath, 5*time.Second)) - require.NoError(t, waitForLogMessage(ctx, manager, source.Id, "start worker processes", 15*time.Second)) assert.NotEmpty(t, source.IP) assert.NotEmpty(t, source.MAC) diff --git a/lib/instances/manager.go b/lib/instances/manager.go index 1d291fb1..d06ea48e 100644 --- a/lib/instances/manager.go +++ b/lib/instances/manager.go @@ -200,6 +200,14 @@ func (m *manager) ForkInstance(ctx context.Context, id string, req ForkInstanceR } return nil, fmt.Errorf("apply fork target state: %w", err) } + if inst.State == StateRunning { + if err := ensureGuestAgentReadyForForkPhase(ctx, &inst.StoredMetadata, "before returning running fork instance"); err != nil { + if cleanupErr := m.cleanupForkInstanceOnError(ctx, forked.Id); cleanupErr != nil { + return nil, fmt.Errorf("wait for fork guest agent readiness: %w; additionally failed to cleanup forked instance %s: %v", err, forked.Id, cleanupErr) + } + return nil, fmt.Errorf("wait for fork guest agent readiness: %w", err) + } + } return inst, nil } diff --git a/lib/instances/qemu_test.go b/lib/instances/qemu_test.go index 2937ba42..78d5a12f 100644 --- a/lib/instances/qemu_test.go +++ b/lib/instances/qemu_test.go @@ -917,7 +917,6 @@ func TestQEMUForkFromRunningNetwork(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { _ = manager.DeleteInstance(context.Background(), source.Id) }) require.NoError(t, waitForQEMUReady(ctx, source.SocketPath, 10*time.Second)) - require.NoError(t, waitForLogMessage(ctx, manager, source.Id, "start worker processes", 15*time.Second)) assert.NotEmpty(t, source.IP) assert.NotEmpty(t, source.MAC)