diff --git a/AGENTS.md b/AGENTS.md index e11aef26be..3c8b249ed4 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -37,6 +37,10 @@ After making changes, ensure build and lint pass before committing. - **Avoid over-abstraction for tests**: Prefer simple policy knobs/configuration to stabilize tests over introducing heavy test-only injection layers. Unit-test OS-dependent helpers separately. - **Use `testify/require` for assertions**: Prefer `require.Equal`/`require.NoError`/`require.Contains` etc over hand-written assertion blocks. +### Documentation and Rationale + +- **Explain non-obvious complexity**: If a change introduces logic that looks “overly complex”, add a short but explicit comment near the code describing the constraint that forced it, why a simpler approach does not work (or what would break), and what would need to change to make simplification safe later. + ### DRY and Structural Organization - **DRY**: Don’t copy and paste large blocks of identical logic; wherever `for` loops/table-driven approaches/unified helpers can be used, duplication must be eliminated. diff --git a/components/playground-ng/boot.go b/components/playground-ng/boot.go index 9d69d8de7f..dbebe0ce4a 100644 --- a/components/playground-ng/boot.go +++ b/components/playground-ng/boot.go @@ -539,7 +539,7 @@ func (p *Playground) bootCluster(ctx context.Context, options *BootOptions) (err } executor := newBootExecutor(p, src) - if err := executor.Download(plan); err != nil { + if err := executor.Download(ctx, plan); err != nil { return err } if len(plan.Downloads) > 0 { diff --git a/components/playground-ng/boot_executor.go b/components/playground-ng/boot_executor.go index fc43138658..901aaebd69 100644 --- a/components/playground-ng/boot_executor.go +++ b/components/playground-ng/boot_executor.go @@ -2,7 +2,12 @@ package main import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" + "io" + "os" + "path/filepath" "slices" "strings" "time" @@ -11,7 +16,11 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" "github.com/pingcap/errors" "github.com/pingcap/tiup/components/playground-ng/proc" + "github.com/pingcap/tiup/pkg/localdata" + "github.com/pingcap/tiup/pkg/repository" + "github.com/pingcap/tiup/pkg/repository/v1manifest" "github.com/pingcap/tiup/pkg/utils" + "golang.org/x/sync/errgroup" ) type bootExecutor struct { @@ -31,19 +40,127 @@ var servicePreRunHandlers = map[proc.ServiceID]preRunHandler{ }, } -func (e *bootExecutor) Download(plan BootPlan) error { +// maxParallelComponentDownloads caps the number of concurrent component +// downloads during boot. +const maxParallelComponentDownloads = 4 + +// Download installs components required by the boot plan. +// +// This implementation is intentionally more complex than a simple loop calling +// src.EnsureInstalled(). +// +// Background / constraints: +// - The repository layer reports progress via repository.DownloadProgress which +// only models a *single* active download (Start/SetCurrent/Finish). Our UI +// adapter (repoDownloadProgress) also keeps mutable "current download" state +// (e.g. `task`, throttling fields). +// - To show multiple downloads concurrently in the TUI we need *one progress +// instance per download*, otherwise different downloads would race on the +// shared state and the UI would mix/flicker. +// - envComponentSource.EnsureInstalled delegates to env.V1Repository().UpdateComponents, +// which is primarily designed for sequential installation and shares a single +// mirror/progress instance internally. Simply calling it concurrently would +// not yield the desired UI and would also introduce unnecessary contention +// around manifest updates. +// +// Approach: +// - Prefetch version items (URL + SHA256) serially, so manifest fetch/update +// happens deterministically and without concurrent writes to local manifests. +// - Download + verify + untar in parallel with an errgroup limit, using a fresh +// mirror/progress instance per goroutine. +// +// Note: this logic intentionally lives in playground-ng to avoid invasive +// changes to shared pkg/repository APIs. +func (e *bootExecutor) Download(ctx context.Context, plan BootPlan) error { if e == nil || e.src == nil { return errors.New("component source not initialized") } - for _, d := range plan.Downloads { - if d.ComponentID == "" || d.ResolvedVersion == "" { - continue + if ctx == nil { + ctx = context.Background() + } + + downloads := normalizeDownloadPlans(plan.Downloads) + if len(downloads) == 0 { + return nil + } + + var ( + progressFactory func() repository.DownloadProgress + ) + if e.pg != nil { + progressFactory = e.pg.downloadProgressFactory() + } + if progressFactory == nil { + progressFactory = func() repository.DownloadProgress { return repository.DisableProgress{} } + } + + if src, ok := e.src.(*envComponentSource); ok && src != nil && src.env != nil { + repo := src.env.V1Repository() + if repo == nil { + return errors.New("repository not initialized") } - if err := e.src.EnsureInstalled(d.ComponentID, d.ResolvedVersion); err != nil { - return err + mirror := repo.Mirror() + if mirror == nil || strings.TrimSpace(mirror.Source()) == "" { + return errors.New("repository mirror not initialized") + } + profile := src.env.Profile() + if profile == nil { + return errors.New("profile not initialized") + } + + disableDecompress := false + if r, ok := repo.(*repository.V1Repository); ok { + disableDecompress = r.DisableDecompress + } + + keepSource := false + if v := os.Getenv(localdata.EnvNameKeepSourceTarget); v == "enable" || v == "true" { + keepSource = true + } + + prepared := make([]preparedDownload, 0, len(downloads)) + for _, d := range downloads { + item, err := repo.ComponentVersion(d.ComponentID, d.ResolvedVersion, false) + if err != nil { + return err + } + if item == nil || strings.TrimSpace(item.URL) == "" { + return errors.Errorf("missing download URL for component %s:%s", d.ComponentID, d.ResolvedVersion) + } + if item.Hashes == nil || strings.TrimSpace(item.Hashes[v1manifest.SHA256]) == "" { + return errors.Errorf("missing sha256 hash for component %s:%s", d.ComponentID, d.ResolvedVersion) + } + prepared = append(prepared, preparedDownload{ + component: d.ComponentID, + version: d.ResolvedVersion, + item: item, + }) + } + + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(maxParallelComponentDownloads) + for _, d := range prepared { + d := d + g.Go(func() error { + return downloadAndInstallComponent(gctx, mirror.Source(), profile, d, downloadInstallOptions{ + disableDecompress: disableDecompress, + keepSource: keepSource, + progress: progressFactory(), + }) + }) } + return g.Wait() } - return nil + + g, _ := errgroup.WithContext(ctx) + g.SetLimit(maxParallelComponentDownloads) + for _, d := range downloads { + d := d + g.Go(func() error { + return e.src.EnsureInstalled(d.ComponentID, d.ResolvedVersion) + }) + } + return g.Wait() } func (e *bootExecutor) PreRun(ctx context.Context, plan BootPlan) error { @@ -199,3 +316,142 @@ func preflightBootPlan(ctx context.Context, plan BootPlan) error { } return nil } + +type preparedDownload struct { + component string + version string + item *v1manifest.VersionItem +} + +type downloadInstallOptions struct { + disableDecompress bool + keepSource bool + progress repository.DownloadProgress +} + +// downloadAndInstallComponent is a minimal, playground-ng-local +// reimplementation of the "download + verify + (optional) untar" part of +// repository.UpdateComponents. +// +// It exists because parallel boot downloads need: +// - per-download progress reporters (DownloadProgress is single-task), and +// - per-download mirror instances (each mirror has its own tempdir/progress), +// without expanding shared pkg/repository APIs just for playground-ng. +// +// It relies on ComponentVersion() having been called earlier so the local +// component manifest exists (for later BinaryPath resolution via Entry). +func downloadAndInstallComponent(ctx context.Context, mirrorSource string, profile *localdata.Profile, d preparedDownload, opt downloadInstallOptions) error { + if ctx == nil { + ctx = context.Background() + } + if strings.TrimSpace(mirrorSource) == "" { + return errors.New("mirror source is empty") + } + if profile == nil { + return errors.New("profile is nil") + } + if strings.TrimSpace(d.component) == "" || strings.TrimSpace(d.version) == "" || d.item == nil { + return errors.New("download plan is invalid") + } + + installDir := profile.Path(localdata.ComponentParentDir, d.component, d.version) + target := filepath.Join(installDir, d.item.URL) + + mirror := repository.NewMirror(mirrorSource, repository.MirrorOptions{ + Context: ctx, + Progress: opt.progress, + }) + if err := mirror.Open(); err != nil { + return err + } + defer func() { _ = mirror.Close() }() + + if err := mirror.Download(d.item.URL, installDir); err != nil { + _ = os.RemoveAll(installDir) + return err + } + + expected := strings.TrimSpace(d.item.Hashes[v1manifest.SHA256]) + if expected == "" { + _ = os.RemoveAll(installDir) + return errors.Errorf("missing sha256 hash for %s", target) + } + if err := verifySHA256(target, expected); err != nil { + _ = os.RemoveAll(installDir) + return errors.Errorf("validation failed for %s: %s", target, err) + } + + if !opt.disableDecompress { + reader, err := os.Open(target) + if err != nil { + _ = os.RemoveAll(installDir) + return err + } + err = utils.Untar(reader, installDir) + _ = reader.Close() + if err != nil { + _ = os.RemoveAll(installDir) + return err + } + } + + if !opt.disableDecompress && !opt.keepSource { + _ = os.Remove(target) + } + + return nil +} + +func verifySHA256(path string, expected string) error { + expected = strings.ToLower(strings.TrimSpace(expected)) + if expected == "" { + return errors.New("expected sha256 is empty") + } + + f, err := os.Open(path) + if err != nil { + return err + } + defer func() { _ = f.Close() }() + + hasher := sha256.New() + if _, err := io.Copy(hasher, f); err != nil { + return err + } + actual := hex.EncodeToString(hasher.Sum(nil)) + if actual != expected { + return fmt.Errorf("sha256 mismatch (expect %s, got %s)", expected, actual) + } + return nil +} + +// normalizeDownloadPlans trims/filters and de-duplicates download plans. +// +// Boot planning can produce multiple service instances that depend on the same +// component@version. Downloading the same tarball more than once would waste +// bandwidth and also makes the progress UI misleading. +func normalizeDownloadPlans(plans []DownloadPlan) []DownloadPlan { + if len(plans) == 0 { + return nil + } + + seen := make(map[string]struct{}, len(plans)) + normalized := make([]DownloadPlan, 0, len(plans)) + for _, d := range plans { + component := strings.TrimSpace(d.ComponentID) + resolved := strings.TrimSpace(d.ResolvedVersion) + if component == "" || resolved == "" { + continue + } + key := component + "@" + resolved + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + normalized = append(normalized, DownloadPlan{ + ComponentID: component, + ResolvedVersion: resolved, + }) + } + return normalized +} diff --git a/components/playground-ng/boot_executor_test.go b/components/playground-ng/boot_executor_test.go index 319e8b97b7..4316876692 100644 --- a/components/playground-ng/boot_executor_test.go +++ b/components/playground-ng/boot_executor_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "strconv" + "sync" "testing" "time" @@ -15,6 +16,8 @@ import ( ) type recordingExecutorSource struct { + mu sync.Mutex + ensureInstalledCalls []string binaryPathCalls []string @@ -30,13 +33,17 @@ func (s *recordingExecutorSource) PlanInstall(proc.ServiceID, string, string, bo } func (s *recordingExecutorSource) EnsureInstalled(component, resolved string) error { + s.mu.Lock() s.ensureInstalledCalls = append(s.ensureInstalledCalls, component+"@"+resolved) + s.mu.Unlock() return nil } func (s *recordingExecutorSource) BinaryPath(component, resolved string) (string, error) { _ = resolved + s.mu.Lock() s.binaryPathCalls = append(s.binaryPathCalls, component) + s.mu.Unlock() if s.binaryPathByComponent != nil { if path := s.binaryPathByComponent[component]; path != "" { return path, nil @@ -45,6 +52,53 @@ func (s *recordingExecutorSource) BinaryPath(component, resolved string) (string return filepath.Join("/bin", component), nil } +type blockingDownloadSource struct { + ctx context.Context + + entered chan struct{} + release chan struct{} + + mu sync.Mutex + current int + max int +} + +func (s *blockingDownloadSource) ResolveVersion(_ string, constraint string) (string, error) { + return constraint, nil +} + +func (s *blockingDownloadSource) PlanInstall(proc.ServiceID, string, string, bool) (*DownloadPlan, error) { + return nil, nil +} + +func (s *blockingDownloadSource) EnsureInstalled(string, string) error { + s.mu.Lock() + s.current++ + if s.current > s.max { + s.max = s.current + } + s.mu.Unlock() + + s.entered <- struct{}{} + + defer func() { + s.mu.Lock() + s.current-- + s.mu.Unlock() + }() + + select { + case <-s.release: + return nil + case <-s.ctx.Done(): + return s.ctx.Err() + } +} + +func (s *blockingDownloadSource) BinaryPath(component, _ string) (string, error) { + return filepath.Join("/bin", component), nil +} + func TestBootExecutor_PreRun_GeneratesTiProxyCerts(t *testing.T) { dir := t.TempDir() @@ -96,8 +150,8 @@ func TestBootExecutor_Download_EnsuresInstalled(t *testing.T) { }, } - require.NoError(t, executor.Download(plan)) - require.Equal(t, []string{"tidb@v1.0.0", "tikv@v1.0.0"}, src.ensureInstalledCalls) + require.NoError(t, executor.Download(context.Background(), plan)) + require.ElementsMatch(t, []string{"tidb@v1.0.0", "tikv@v1.0.0"}, src.ensureInstalledCalls) } func TestBootExecutor_AddProcs_CachesBinaryPathByComponentVersion(t *testing.T) { @@ -238,6 +292,54 @@ func TestBootExecutor_AddProcs_ResolvesRequiredBinaryPath(t *testing.T) { require.Equal(t, tikvWorker, workerProcs[0].Info().BinPath) } +func TestBootExecutor_Download_RespectsConcurrencyLimit(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + total := maxParallelComponentDownloads * 3 + entered := make(chan struct{}, total) + release := make(chan struct{}) + + src := &blockingDownloadSource{ + ctx: ctx, + entered: entered, + release: release, + } + executor := newBootExecutor(nil, src) + + plan := BootPlan{Downloads: make([]DownloadPlan, 0, total)} + for i := 0; i < total; i++ { + plan.Downloads = append(plan.Downloads, DownloadPlan{ + ComponentID: "comp-" + strconv.Itoa(i), + ResolvedVersion: "v1.0.0", + }) + } + + errCh := make(chan error, 1) + go func() { + errCh <- executor.Download(ctx, plan) + }() + + for i := 0; i < maxParallelComponentDownloads; i++ { + select { + case <-entered: + case <-ctx.Done(): + require.FailNow(t, "downloads did not reach concurrency limit") + } + } + + src.mu.Lock() + maxSeen := src.max + current := src.current + src.mu.Unlock() + + require.Equal(t, maxParallelComponentDownloads, current) + require.LessOrEqual(t, maxSeen, maxParallelComponentDownloads) + + close(release) + require.NoError(t, <-errCh) +} + func TestBootExecutor_ExecuteBootPlan_DownloadPreRunAddProcsStart(t *testing.T) { oldStdout := tuiv2output.Stdout.Get() tuiv2output.Stdout.Set(io.Discard) @@ -309,8 +411,8 @@ func TestBootExecutor_ExecuteBootPlan_DownloadPreRunAddProcsStart(t *testing.T) }, } - require.NoError(t, executor.Download(plan)) - require.Equal(t, []string{"prometheus@v1.0.0", "tiproxy@v1.0.0"}, src.ensureInstalledCalls) + require.NoError(t, executor.Download(context.Background(), plan)) + require.ElementsMatch(t, []string{"prometheus@v1.0.0", "tiproxy@v1.0.0"}, src.ensureInstalledCalls) require.NoError(t, executor.PreRun(context.Background(), plan)) _, err := os.Stat(filepath.Join(dir, "tiproxy.crt")) diff --git a/components/playground-ng/main.go b/components/playground-ng/main.go index f8002c7f03..d7f1908832 100644 --- a/components/playground-ng/main.go +++ b/components/playground-ng/main.go @@ -689,6 +689,40 @@ type repoDownloadProgress struct { latestSize int64 } +// Clone returns an independent progress adapter instance. +// +// repository.DownloadProgress only supports one active download, and +// repoDownloadProgress has mutable per-download state (`task`, `byURL`, +// throttling fields). When boot downloads are parallelized, sharing a single +// instance would cause different downloads to race on that state and corrupt +// the UI (mixed progress, flicker, wrong task attribution). +// +// The clone shares the `expected` task map with the original intentionally: +// it is populated once via SetExpectedDownloads before downloads start and is +// read-only afterwards. Reusing the same *progressv2.Task pointers is safe +// because Task is an emit-only handle and the UI loop owns the render state. +func (p *repoDownloadProgress) Clone() *repoDownloadProgress { + if p == nil { + return nil + } + + p.mu.Lock() + expected := p.expected + p.mu.Unlock() + + now := p.now + if now == nil { + now = time.Now + } + + return &repoDownloadProgress{ + ctx: p.ctx, + group: p.group, + expected: expected, + now: now, + } +} + func (p *repoDownloadProgress) SetExpectedDownloads(downloads []DownloadPlan) { if p == nil || p.group == nil { return diff --git a/components/playground-ng/main_test.go b/components/playground-ng/main_test.go index 4ca88653cb..345d45145d 100644 --- a/components/playground-ng/main_test.go +++ b/components/playground-ng/main_test.go @@ -360,6 +360,38 @@ func TestRepoDownloadProgress_Start_UnexpectedDownloadCreatesNewTask(t *testing. require.NotSame(t, expected, got) } +func TestRepoDownloadProgress_Clone_HasIndependentState(t *testing.T) { + g := &progressv2.Group{} + progress := newRepoDownloadProgress(context.Background(), g) + + p, ok := progress.(*repoDownloadProgress) + require.True(t, ok) + + p.SetExpectedDownloads([]DownloadPlan{ + {ComponentID: "tidb", ResolvedVersion: "v7.1.0"}, + }) + + clone := p.Clone() + require.NotNil(t, clone) + + p.mu.Lock() + require.Nil(t, p.task) + expected := p.expected["tidb@v7.1.0"] + p.mu.Unlock() + require.NotNil(t, expected) + + clone.Start("https://example.com/tidb-v7.1.0-linux-amd64.tar.gz", 123) + + clone.mu.Lock() + got := clone.task + clone.mu.Unlock() + require.Same(t, expected, got) + + p.mu.Lock() + require.Nil(t, p.task) + p.mu.Unlock() +} + func TestRepoDownloadProgress_Finish_WhenCanceled_MarksCanceled(t *testing.T) { f, err := os.CreateTemp("", "tiup-playground-download-progress-*.log") require.NoError(t, err) diff --git a/components/playground-ng/playground.go b/components/playground-ng/playground.go index db8dc8ccc5..dcfdfb3be7 100644 --- a/components/playground-ng/playground.go +++ b/components/playground-ng/playground.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tiup/components/playground-ng/proc" pgservice "github.com/pingcap/tiup/components/playground-ng/service" + "github.com/pingcap/tiup/pkg/repository" tuiv2output "github.com/pingcap/tiup/pkg/tuiv2/output" progressv2 "github.com/pingcap/tiup/pkg/tuiv2/progress" ) @@ -98,6 +99,29 @@ func (p *Playground) terminalWriter() io.Writer { return tuiv2output.Stdout.Get() } +// downloadProgressFactory returns a factory that creates a fresh progress +// reporter for each concurrent download. +// +// We intentionally return a factory instead of exposing `p.downloadProgress` +// directly because repoDownloadProgress keeps mutable "current download" state. +// Parallel downloads must use independent instances (see repoDownloadProgress.Clone). +func (p *Playground) downloadProgressFactory() func() repository.DownloadProgress { + if p == nil { + return nil + } + + p.progressMu.Lock() + base := p.downloadProgress + p.progressMu.Unlock() + if base == nil { + return nil + } + + return func() repository.DownloadProgress { + return base.Clone() + } +} + var errProcessGroupClosed = fmt.Errorf("process group closed") // ProcessGroup manages a dynamic set of long-running goroutines (process waiters,