Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ After making changes, ensure build and lint pass before committing.
- **Avoid over-abstraction for tests**: Prefer simple policy knobs/configuration to stabilize tests over introducing heavy test-only injection layers. Unit-test OS-dependent helpers separately.
- **Use `testify/require` for assertions**: Prefer `require.Equal`/`require.NoError`/`require.Contains` etc over hand-written assertion blocks.

### Documentation and Rationale

- **Explain non-obvious complexity**: If a change introduces logic that looks “overly complex”, add a short but explicit comment near the code describing the constraint that forced it, why a simpler approach does not work (or what would break), and what would need to change to make simplification safe later.

### DRY and Structural Organization

- **DRY**: Don’t copy and paste large blocks of identical logic; wherever `for` loops/table-driven approaches/unified helpers can be used, duplication must be eliminated.
Expand Down
2 changes: 1 addition & 1 deletion components/playground-ng/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (p *Playground) bootCluster(ctx context.Context, options *BootOptions) (err
}

executor := newBootExecutor(p, src)
if err := executor.Download(plan); err != nil {
if err := executor.Download(ctx, plan); err != nil {
return err
}
if len(plan.Downloads) > 0 {
Expand Down
270 changes: 263 additions & 7 deletions components/playground-ng/boot_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ package main

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"slices"
"strings"
"time"
Expand All @@ -11,7 +16,11 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pingcap/errors"
"github.com/pingcap/tiup/components/playground-ng/proc"
"github.com/pingcap/tiup/pkg/localdata"
"github.com/pingcap/tiup/pkg/repository"
"github.com/pingcap/tiup/pkg/repository/v1manifest"
"github.com/pingcap/tiup/pkg/utils"
"golang.org/x/sync/errgroup"
)

type bootExecutor struct {
Expand All @@ -31,19 +40,127 @@ var servicePreRunHandlers = map[proc.ServiceID]preRunHandler{
},
}

func (e *bootExecutor) Download(plan BootPlan) error {
// maxParallelComponentDownloads caps the number of concurrent component
// downloads during boot.
const maxParallelComponentDownloads = 4

// Download installs components required by the boot plan.
//
// This implementation is intentionally more complex than a simple loop calling
// src.EnsureInstalled().
//
// Background / constraints:
// - The repository layer reports progress via repository.DownloadProgress which
// only models a *single* active download (Start/SetCurrent/Finish). Our UI
// adapter (repoDownloadProgress) also keeps mutable "current download" state
// (e.g. `task`, throttling fields).
// - To show multiple downloads concurrently in the TUI we need *one progress
// instance per download*, otherwise different downloads would race on the
// shared state and the UI would mix/flicker.
// - envComponentSource.EnsureInstalled delegates to env.V1Repository().UpdateComponents,
// which is primarily designed for sequential installation and shares a single
// mirror/progress instance internally. Simply calling it concurrently would
// not yield the desired UI and would also introduce unnecessary contention
// around manifest updates.
//
// Approach:
// - Prefetch version items (URL + SHA256) serially, so manifest fetch/update
// happens deterministically and without concurrent writes to local manifests.
// - Download + verify + untar in parallel with an errgroup limit, using a fresh
// mirror/progress instance per goroutine.
//
// Note: this logic intentionally lives in playground-ng to avoid invasive
// changes to shared pkg/repository APIs.
func (e *bootExecutor) Download(ctx context.Context, plan BootPlan) error {
if e == nil || e.src == nil {
return errors.New("component source not initialized")
}
for _, d := range plan.Downloads {
if d.ComponentID == "" || d.ResolvedVersion == "" {
continue
if ctx == nil {
ctx = context.Background()
}

downloads := normalizeDownloadPlans(plan.Downloads)
if len(downloads) == 0 {
return nil
}

var (
progressFactory func() repository.DownloadProgress
)
if e.pg != nil {
progressFactory = e.pg.downloadProgressFactory()
}
if progressFactory == nil {
progressFactory = func() repository.DownloadProgress { return repository.DisableProgress{} }
}

if src, ok := e.src.(*envComponentSource); ok && src != nil && src.env != nil {
repo := src.env.V1Repository()
if repo == nil {
return errors.New("repository not initialized")
}
if err := e.src.EnsureInstalled(d.ComponentID, d.ResolvedVersion); err != nil {
return err
mirror := repo.Mirror()
if mirror == nil || strings.TrimSpace(mirror.Source()) == "" {
return errors.New("repository mirror not initialized")
}
profile := src.env.Profile()
if profile == nil {
return errors.New("profile not initialized")
}

disableDecompress := false
if r, ok := repo.(*repository.V1Repository); ok {
disableDecompress = r.DisableDecompress
}

keepSource := false
if v := os.Getenv(localdata.EnvNameKeepSourceTarget); v == "enable" || v == "true" {
keepSource = true
}

prepared := make([]preparedDownload, 0, len(downloads))
for _, d := range downloads {
item, err := repo.ComponentVersion(d.ComponentID, d.ResolvedVersion, false)
if err != nil {
return err
}
if item == nil || strings.TrimSpace(item.URL) == "" {
return errors.Errorf("missing download URL for component %s:%s", d.ComponentID, d.ResolvedVersion)
}
if item.Hashes == nil || strings.TrimSpace(item.Hashes[v1manifest.SHA256]) == "" {
return errors.Errorf("missing sha256 hash for component %s:%s", d.ComponentID, d.ResolvedVersion)
}
prepared = append(prepared, preparedDownload{
component: d.ComponentID,
version: d.ResolvedVersion,
item: item,
})
}

g, gctx := errgroup.WithContext(ctx)
g.SetLimit(maxParallelComponentDownloads)
for _, d := range prepared {
d := d
g.Go(func() error {
return downloadAndInstallComponent(gctx, mirror.Source(), profile, d, downloadInstallOptions{
disableDecompress: disableDecompress,
keepSource: keepSource,
progress: progressFactory(),
})
})
}
return g.Wait()
}
return nil

g, _ := errgroup.WithContext(ctx)
g.SetLimit(maxParallelComponentDownloads)
for _, d := range downloads {
d := d
g.Go(func() error {
return e.src.EnsureInstalled(d.ComponentID, d.ResolvedVersion)
})
}
return g.Wait()
}

func (e *bootExecutor) PreRun(ctx context.Context, plan BootPlan) error {
Expand Down Expand Up @@ -199,3 +316,142 @@ func preflightBootPlan(ctx context.Context, plan BootPlan) error {
}
return nil
}

type preparedDownload struct {
component string
version string
item *v1manifest.VersionItem
}

type downloadInstallOptions struct {
disableDecompress bool
keepSource bool
progress repository.DownloadProgress
}

// downloadAndInstallComponent is a minimal, playground-ng-local
// reimplementation of the "download + verify + (optional) untar" part of
// repository.UpdateComponents.
//
// It exists because parallel boot downloads need:
// - per-download progress reporters (DownloadProgress is single-task), and
// - per-download mirror instances (each mirror has its own tempdir/progress),
// without expanding shared pkg/repository APIs just for playground-ng.
//
// It relies on ComponentVersion() having been called earlier so the local
// component manifest exists (for later BinaryPath resolution via Entry).
func downloadAndInstallComponent(ctx context.Context, mirrorSource string, profile *localdata.Profile, d preparedDownload, opt downloadInstallOptions) error {
if ctx == nil {
ctx = context.Background()
}
if strings.TrimSpace(mirrorSource) == "" {
return errors.New("mirror source is empty")
}
if profile == nil {
return errors.New("profile is nil")
}
if strings.TrimSpace(d.component) == "" || strings.TrimSpace(d.version) == "" || d.item == nil {
return errors.New("download plan is invalid")
}

installDir := profile.Path(localdata.ComponentParentDir, d.component, d.version)
target := filepath.Join(installDir, d.item.URL)

mirror := repository.NewMirror(mirrorSource, repository.MirrorOptions{
Context: ctx,
Progress: opt.progress,
})
if err := mirror.Open(); err != nil {
return err
}
defer func() { _ = mirror.Close() }()

if err := mirror.Download(d.item.URL, installDir); err != nil {
_ = os.RemoveAll(installDir)
return err
}

expected := strings.TrimSpace(d.item.Hashes[v1manifest.SHA256])
if expected == "" {
_ = os.RemoveAll(installDir)
return errors.Errorf("missing sha256 hash for %s", target)
}
if err := verifySHA256(target, expected); err != nil {
_ = os.RemoveAll(installDir)
return errors.Errorf("validation failed for %s: %s", target, err)
}

if !opt.disableDecompress {
reader, err := os.Open(target)
if err != nil {
_ = os.RemoveAll(installDir)
return err
}
err = utils.Untar(reader, installDir)
_ = reader.Close()
if err != nil {
_ = os.RemoveAll(installDir)
return err
}
}

if !opt.disableDecompress && !opt.keepSource {
_ = os.Remove(target)
}

return nil
}

func verifySHA256(path string, expected string) error {
expected = strings.ToLower(strings.TrimSpace(expected))
if expected == "" {
return errors.New("expected sha256 is empty")
}

f, err := os.Open(path)
if err != nil {
return err
}
defer func() { _ = f.Close() }()

hasher := sha256.New()
if _, err := io.Copy(hasher, f); err != nil {
return err
}
actual := hex.EncodeToString(hasher.Sum(nil))
if actual != expected {
return fmt.Errorf("sha256 mismatch (expect %s, got %s)", expected, actual)
}
return nil
}

// normalizeDownloadPlans trims/filters and de-duplicates download plans.
//
// Boot planning can produce multiple service instances that depend on the same
// component@version. Downloading the same tarball more than once would waste
// bandwidth and also makes the progress UI misleading.
func normalizeDownloadPlans(plans []DownloadPlan) []DownloadPlan {
if len(plans) == 0 {
return nil
}

seen := make(map[string]struct{}, len(plans))
normalized := make([]DownloadPlan, 0, len(plans))
for _, d := range plans {
component := strings.TrimSpace(d.ComponentID)
resolved := strings.TrimSpace(d.ResolvedVersion)
if component == "" || resolved == "" {
continue
}
key := component + "@" + resolved
if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}
normalized = append(normalized, DownloadPlan{
ComponentID: component,
ResolvedVersion: resolved,
})
}
return normalized
}
Loading
Loading