Skip to content
85 changes: 83 additions & 2 deletions lib/images/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/google/go-containerregistry/pkg/v1/layout"
"github.com/kernel/hypeman/lib/logger"
"github.com/kernel/hypeman/lib/paths"
"go.opentelemetry.io/otel/metric"
)
Expand Down Expand Up @@ -230,8 +231,35 @@ func (m *manager) buildImage(ctx context.Context, ref *ResolvedRef) {

m.updateStatusByDigest(ref, StatusPulling, nil)

// Pull the image (digest is always known, uses cache if already pulled)
result, err := m.ociClient.pullAndExport(ctx, ref.String(), ref.Digest(), tempDir)
// Choose pull strategy based on registry type and cache state
var result *pullResult
var err error

layoutTag := digestToLayoutTag(ref.Digest())
alreadyCached := m.ociClient.existsInLayout(layoutTag)

log := logger.FromContext(ctx)
if isLocalRegistry(ref.Repository()) {
// For local registries, use streaming to bypass slow umoci.
// Local images are one-time conversions (no layer dedup benefit).
if alreadyCached {
// Stream directly from OCI cache (no network auth needed)
log.InfoContext(ctx, "using streaming unpack from layout for local registry image", "ref", ref.String())
result, err = m.ociClient.streamingUnpackFromLayout(ctx, layoutTag, tempDir)
} else {
// Rare case: local registry image not in cache yet
// This would need network auth - fall back to error for now
// (In practice, registry always pre-caches on push)
log.InfoContext(ctx, "using streaming unpack for uncached local image", "ref", ref.String())
result, err = m.ociClient.streamingUnpack(ctx, ref.String(), tempDir)
}
} else {
// For remote registries, use the cached path (pullAndExport).
// This enables layer deduplication across multiple pulls of related images.
log.InfoContext(ctx, "using cached unpack for remote image", "ref", ref.String(), "cached", alreadyCached)
result, err = m.ociClient.pullAndExport(ctx, ref.String(), ref.Digest(), tempDir)
}

if err != nil {
m.updateStatusByDigest(ref, StatusFailed, fmt.Errorf("pull and export: %w", err))
m.recordPullMetrics(ctx, "failed")
Expand Down Expand Up @@ -477,3 +505,56 @@ func (m *manager) TotalOCICacheBytes(ctx context.Context) (int64, error) {
}
return total, nil
}

// isLocalRegistry checks if a repository reference points to a local registry.
// Local registries include localhost, loopback, and private IP addresses (RFC 1918).
// For local registries, we skip the OCI cache since images are only pulled once.
func isLocalRegistry(repository string) bool {
// Extract the registry host from the repository
// Repository format: registry/path/to/image or path/to/image (implies docker.io)
parts := strings.SplitN(repository, "/", 2)
if len(parts) < 2 {
return false // Simple name like "alpine", implies docker.io
}

host := parts[0]

// Strip port if present (e.g., "172.30.0.1:8080" -> "172.30.0.1")
if colonIdx := strings.LastIndex(host, ":"); colonIdx != -1 {
host = host[:colonIdx]
}

// Check for localhost patterns
if strings.HasPrefix(host, "localhost") {
return true
}

// Check for loopback
if strings.HasPrefix(host, "127.") {
return true
}

// Check for RFC 1918 private IP addresses (used by gateway IPs)
// 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16
if strings.HasPrefix(host, "10.") {
return true
}
if strings.HasPrefix(host, "192.168.") {
return true
}
// 172.16.0.0 - 172.31.255.255
if strings.HasPrefix(host, "172.") {
// Extract second octet
octets := strings.Split(host, ".")
if len(octets) >= 2 {
var second int
if _, err := fmt.Sscanf(octets[1], "%d", &second); err == nil {
if second >= 16 && second <= 31 {
return true
}
}
}
}

return false
}
124 changes: 121 additions & 3 deletions lib/images/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,13 @@ func TestImportLocalImageFromOCICache(t *testing.T) {
require.NoError(t, err)
t.Logf("Wrote image to OCI cache: digest=%s, layoutTag=%s", digestStr, layoutTag)

// Step 3: Call ImportLocalImage (what buildBuilderFromDockerfile does)
imported, err := mgr.ImportLocalImage(ctx, "localhost:8080/internal/builder", "latest", digestStr)
// Step 3: Call ImportLocalImage with a non-local registry reference
// We use a remote registry reference so the build uses the cached path (not streaming).
// The streaming path is only for local registries where the image is already available.
imported, err := mgr.ImportLocalImage(ctx, "registry.example.com/internal/builder", "latest", digestStr)
require.NoError(t, err)
require.NotNil(t, imported)
require.Equal(t, "localhost:8080/internal/builder:latest", imported.Name)
require.Equal(t, "registry.example.com/internal/builder:latest", imported.Name)
t.Logf("ImportLocalImage returned: name=%s, status=%s, digest=%s", imported.Name, imported.Status, imported.Digest)

// Step 4: Wait for the async build pipeline to complete
Expand Down Expand Up @@ -420,6 +422,122 @@ func TestImportLocalImageFromOCICache(t *testing.T) {
t.Logf("Disk path verified: %s (%d bytes)", diskPath, diskStat.Size())
}

// TestImportLocalImageFromOCICacheWithLocalRegistry tests that local registry images
// use streamingUnpackFromLayout (the fast path) instead of pullAndExport.
// This is the actual production path for builder VM images.
func TestImportLocalImageFromOCICacheWithLocalRegistry(t *testing.T) {
dataDir := t.TempDir()
p := paths.New(dataDir)
mgr, err := NewManager(p, 1, nil)
require.NoError(t, err)

ctx := context.Background()

// Step 1: Create synthetic Docker image
img := createTestDockerImage(t)

imgDigest, err := img.Digest()
require.NoError(t, err)
digestStr := imgDigest.String()
layoutTag := digestToLayoutTag(digestStr)

// Step 2: Write to OCI layout cache (simulates registry pre-caching on push)
cacheDir := p.SystemOCICache()
require.NoError(t, os.MkdirAll(cacheDir, 0755))

path, err := layout.Write(cacheDir, empty.Index)
require.NoError(t, err)

err = path.AppendImage(img, layout.WithAnnotations(map[string]string{
"org.opencontainers.image.ref.name": layoutTag,
}))
require.NoError(t, err)
t.Logf("Wrote image to OCI cache: digest=%s, layoutTag=%s", digestStr, layoutTag)

// Step 3: Call ImportLocalImage with a LOCAL registry reference (172.30.x.x)
// This should trigger streamingUnpackFromLayout (fast path)
imported, err := mgr.ImportLocalImage(ctx, "172.30.0.1:8080/builds/testbuild", "latest", digestStr)
require.NoError(t, err)
require.NotNil(t, imported)
require.Equal(t, "172.30.0.1:8080/builds/testbuild:latest", imported.Name)
t.Logf("ImportLocalImage returned: name=%s, status=%s, digest=%s", imported.Name, imported.Status, imported.Digest)

// Step 4: Wait for the async build pipeline to complete
waitForReady(t, mgr, ctx, imported.Name)

// Step 5: Verify GetImage returns correct metadata
ready, err := mgr.GetImage(ctx, imported.Name)
require.NoError(t, err)
require.Equal(t, StatusReady, ready.Status)
require.Equal(t, digestStr, ready.Digest)
require.Equal(t, []string{"/usr/local/bin/guest-agent"}, ready.Entrypoint)
require.Equal(t, "/app", ready.WorkingDir)
require.Contains(t, ready.Env, "PATH")
require.NotNil(t, ready.SizeBytes)
require.Greater(t, *ready.SizeBytes, int64(0))
t.Logf("Image ready via streaming from layout: entrypoint=%v, workdir=%s, size=%d", ready.Entrypoint, ready.WorkingDir, *ready.SizeBytes)

// Step 6: Verify GetDiskPath returns path to a valid disk file
diskPath, err := GetDiskPath(p, imported.Name, digestStr)
require.NoError(t, err)
diskStat, err := os.Stat(diskPath)
require.NoError(t, err, "disk file should exist at %s", diskPath)
require.False(t, diskStat.IsDir())
require.Greater(t, diskStat.Size(), int64(0), "disk file should not be empty")
t.Logf("Disk path verified: %s (%d bytes)", diskPath, diskStat.Size())
}

// TestIsLocalRegistry tests the isLocalRegistry helper function
func TestIsLocalRegistry(t *testing.T) {
tests := []struct {
repository string
isLocal bool
}{
// Local registries - should use streaming
{"localhost:8080/internal/builder", true},
{"localhost/some/image", true},
{"127.0.0.1:5000/test/image", true},
{"127.0.0.1/test", true},
{"127.0.0.100:8080/test", true}, // Any 127.x.x.x

// RFC 1918 private IPs - 10.0.0.0/8
{"10.102.0.1:8080/tenant/app", true},
{"10.0.0.1:5000/builds/abc", true},
{"10.255.255.255/image", true},

// RFC 1918 private IPs - 172.16.0.0/12 (172.16-31.x.x)
{"172.30.0.1:8080/builds/xyz", true}, // Production gateway
{"172.16.0.1:8080/image", true},
{"172.31.255.255/image", true},
{"172.15.0.1/image", false}, // Outside 172.16-31 range
{"172.32.0.1/image", false}, // Outside 172.16-31 range

// RFC 1918 private IPs - 192.168.0.0/16
{"192.168.1.1:8080/image", true},
{"192.168.0.1/builds/test", true},

// Remote registries - should use cached path
{"docker.io/library/alpine", false},
{"ghcr.io/owner/repo", false},
{"gcr.io/project/image", false},
{"quay.io/organization/image", false},
{"registry.example.com/image", false},
{"8.8.8.8:5000/image", false}, // Public IP

// Edge cases
{"alpine", false}, // Simple name implies docker.io
{"myimage:latest", false}, // Simple name with tag implies docker.io
{"localregistry/image", false}, // Not localhost, just starts with "local"
}

for _, tt := range tests {
t.Run(tt.repository, func(t *testing.T) {
result := isLocalRegistry(tt.repository)
require.Equal(t, tt.isLocal, result, "isLocalRegistry(%q) should be %v", tt.repository, tt.isLocal)
})
}
}

// waitForReady waits for an image build to complete
func waitForReady(t *testing.T, mgr Manager, ctx context.Context, imageName string) {
for i := 0; i < 600; i++ {
Expand Down
Loading