diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..d6e0e8f --- /dev/null +++ b/.env.example @@ -0,0 +1,4 @@ +# If you're not using docker +KIND_EXPERIMENTAL_PROVIDER=podman +# Path to your local Platform Mesh setup (used by hack/kubeconfig-copy.sh) +PORTAL_DIRECTORY=/path/to/your/local-setup \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..148ccd0 --- /dev/null +++ b/README.md @@ -0,0 +1,151 @@ +> [!WARNING] +> This repository is under development and not ready for productive use. It is in an alpha stage. APIs and concepts may change on short notice, including breaking changes. + +# Platform Mesh - search-service +![Build Status](https://github.com/platform-mesh/search/actions/workflows/pipeline.yml/badge.svg) + +## Description + +The platform-mesh `search-service` provides a REST API to query resources indexed in OpenSearch and post-filter results through OpenFGA authorization checks. + +The service is organization-aware and derives org context from the request host. It resolves the active SearchIndex in KCP (`root:orgs`) and uses `status.indexName` as source of truth for the OpenSearch index. + +## Features + +- REST endpoints: + - `GET /rest/v1/search` + - `GET /rest/v1/search/resources` + - `GET /rest/v1/search/filter-values` +- Free-text search in OpenSearch with stable cursor pagination (`search_after`) +- OpenFGA post-filtering (`relation=get`) with fail-closed behavior for incomplete auth context +- Org-aware context + KCP token/org access pre-check +- SearchIndex-driven resource/field metadata: + - `defaultFields` drive searchable fields + - `filterableFields` drive exact-match filters + - `semanticFields` are exposed as metadata (no semantic query mode yet) +- Health endpoints: `/healthz`, `/readyz` + +## API + +### Search endpoint + +`GET /rest/v1/search?q=&limit=&cursor=&resource=&filter.=` + +Query params: + +- `q` (required): free-text query +- `resource` (optional): plural resource name; if omitted, searches across all resources +- `filter.` (optional, repeatable): exact-match filters; requires `resource` +- `limit` (optional): default `20`, max `100` +- `cursor` (optional): opaque pagination cursor + +Response shape: + +- `results[]` with compact fields (`id`, `score`, `kind`, `name`, `namespace`, `apiGroup`, `apiVersion`, `workspacePath`, `clusterName`, `organizationId`, `organizationName`, `accountId`, `accountName`) +- `results[].resource` indicates which resource index produced the hit +- `source` containing the raw indexed document source per hit +- `nextCursor` for pagination + +### Resource metadata endpoint + +`GET /rest/v1/search/resources` + +Returns all searchable resources for the org with: + +- `resource` +- `defaultFields` +- `filterableFields` +- `semanticFields` + +### Filter values endpoint + +`GET /rest/v1/search/filter-values?resource=&field=&q=&filter.=` + +Returns distinct authorized values for one filterable field within a single resource. + +## Getting Started + +### Requirements + +- Go `1.25+` (see [go.mod](go.mod)) +- Access to: + - KCP API (for org access check + SearchIndex resolution) + - OpenSearch + - OpenFGA gRPC endpoint + +### Run locally + +Example: + +```bash +export OPENSEARCH_USERNAME= +export OPENSEARCH_PASSWORD= + +go run . serve \ + --kubeconfig ~/.kube/config \ + --is-local=true \ + --opensearch-url http://localhost:9200 \ + --openfga-grpc-addr localhost:8081 +``` + +### Local Development Mode (`--is-local=true`) + +Use `--is-local=true` for local development to match the local behavior of `kubernetes-graphql-gateway`. + +When enabled: + +- org context is still derived from host (`localhost` is mapped to `--local-development-org`) +- JWT claims are still parsed for user/tenant context +- KCP org token validation (`ValidateTokenForOrg`) is bypassed + +This is intended for local/dev usage only. Keep `--is-local=false` for production-like environments. + +### Configuration flags + +Main runtime flags (with defaults): + +- `--port` (default: `8080`) +- `--opensearch-url` (default: `http://opensearch.platform-mesh-system.svc.cluster.local:9200`) +- `--opensearch-username` (default: value of env `OPENSEARCH_USERNAME`) +- `--opensearch-password` (default: value of env `OPENSEARCH_PASSWORD`) +- `--opensearch-insecure` (default: `false`) +- `--opensearch-timeout` (default: `10s`) +- `--openfga-grpc-addr` (default: `openfga:8081`) +- `--searchindex-workspace-path` (default: `root:orgs`) +- `--searchindex-group` (default: `core.platform-mesh.io`) +- `--searchindex-version` (default: `v1alpha1`) +- `--searchindex-resource` (default: `searchindexes`) +- `--search-default-limit` (default: `20`) +- `--search-max-limit` (default: `100`) +- `--search-fetch-batch-size` (default: `100`) +- `--search-max-scanned-hits` (default: `1000`) +- `--is-local` (default: `false`) enables local development behavior in auth middleware +- `--local-development-org` (default: env `SEARCH_LOCAL_ORG`, fallback `local`) org used when host is `localhost` + +Global flags from `golang-commons` are also available (e.g. logging and kubeconfig related flags). + +### Run tests + +```bash +go test ./... +``` + +## Security Notes + +- JWT signature validation is expected to happen upstream (gateway/mesh). +- The service consumes parsed claims from context (`mail`, fallback `sub`). +- Search hits missing required authorization hierarchy fields are dropped (fail-closed). + +## Releasing + +Releases are performed via GitHub Actions workflows. + +## Contributing + +Contributions are welcome via pull requests in the Platform Mesh GitHub organization. + +## Code of Conduct + +Please refer to our [Code of Conduct](https://github.com/platform-mesh/.github/blob/main/CODE_OF_CONDUCT.md) for expected conduct when contributing. + +

Bundesministerium für Wirtschaft und Energie (BMWE)-EU funding logo

diff --git a/cmd/server.go b/cmd/server.go index 7ab7edd..bc0f8f5 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -66,6 +66,12 @@ var serverCmd = &cobra.Command{ if err != nil { log.Fatal().Err(err).Msg("failed to create OpenFGA client") } + log.Info(). + Str("openSearchURL", serviceCfg.OpenSearch.URL). + Str("openFGAGRPCAddr", serviceCfg.OpenFGA.GRPCAddr). + Str("searchIndexWorkspacePath", serviceCfg.SearchIndex.WorkspacePath). + Str("searchIndexGVR", fmt.Sprintf("%s/%s/%s", serviceCfg.SearchIndex.Group, serviceCfg.SearchIndex.Version, serviceCfg.SearchIndex.Resource)). + Msg("search service backend configuration") metrics := observability.NewMetrics() svc := searchservice.NewService( @@ -82,7 +88,7 @@ var serverCmd = &cobra.Command{ ) mws := cmw.CreateMiddleware(log, true) - orgCtxMW := lmw.NewOrgContextMiddleware(orgValidator) + orgCtxMW := lmw.NewOrgContextMiddleware(orgValidator, defaultCfg.IsLocal, serviceCfg.LocalDevelopmentOrg) mws = append(mws, orgCtxMW.SetRequestContext()) r := router.CreateRouter(svc, mws) diff --git a/go.mod b/go.mod index c62f8fd..ac01430 100644 --- a/go.mod +++ b/go.mod @@ -1,23 +1,25 @@ module github.com/platform-mesh/search -go 1.25.0 +go 1.26 require ( github.com/go-chi/chi/v5 v5.2.5 - github.com/openfga/api/proto v0.0.0-20260217232149-f917ddb000ce - github.com/platform-mesh/golang-commons v0.13.12 + github.com/openfga/api/proto v0.0.0-20260319214821-f153694bfc20 + github.com/platform-mesh/golang-commons v0.13.24-0.20260331080836-cc42019dfb2e + github.com/platform-mesh/search-operator v0.1.2-0.20260413065247-a4a8625c1a9c github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.10 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.67.0 - google.golang.org/grpc v1.79.3 - k8s.io/client-go v0.35.2 + google.golang.org/grpc v1.80.0 + k8s.io/apimachinery v0.35.3 + k8s.io/client-go v0.35.3 ) require ( github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/envoyproxy/protoc-gen-validate v1.3.0 // indirect + github.com/envoyproxy/protoc-gen-validate v1.3.3 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/getsentry/sentry-go v0.43.0 // indirect github.com/go-jose/go-jose/v4 v4.1.3 // indirect @@ -26,7 +28,6 @@ require ( github.com/go-logr/zerologr v1.2.3 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mattn/go-colorable v0.1.14 // indirect @@ -35,7 +36,7 @@ require ( github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/rs/zerolog v1.34.0 // indirect + github.com/rs/zerolog v1.35.0 // indirect github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel v1.42.0 // indirect @@ -47,20 +48,20 @@ require ( go.opentelemetry.io/otel/trace v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.10.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect - golang.org/x/net v0.51.0 // indirect + golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.36.0 // indirect - golang.org/x/sys v0.41.0 // indirect - golang.org/x/term v0.40.0 // indirect - golang.org/x/text v0.34.0 // indirect - golang.org/x/time v0.9.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 // indirect + golang.org/x/sys v0.43.0 // indirect + golang.org/x/term v0.42.0 // indirect + golang.org/x/text v0.36.0 // indirect + golang.org/x/time v0.11.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260414002931-afd174a4e478 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - k8s.io/apimachinery v0.35.2 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect - k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2 // indirect + k8s.io/utils v0.0.0-20260319190234-28399d86e0b5 // indirect + sigs.k8s.io/controller-runtime v0.23.3 // indirect sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v6 v6.3.2-0.20260122202528-d9cc6641c482 // indirect diff --git a/go.sum b/go.sum index 524e0a9..6e125a1 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,9 @@ +github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= +github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -10,8 +11,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emicklei/go-restful/v3 v3.12.2 h1:DhwDP0vY3k8ZzE0RunuJy8GhNpPL6zqLkDf9B/a0/xU= github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= -github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4= -github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA= +github.com/envoyproxy/protoc-gen-validate v1.3.3 h1:MVQghNeW+LZcmXe7SY1V36Z+WFMDjpqGAGacLe2T0ds= +github.com/envoyproxy/protoc-gen-validate v1.3.3/go.mod h1:TsndJ/ngyIdQRhMcVVGDDHINPLWB7C82oDArY51KfB0= github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= github.com/getsentry/sentry-go v0.43.0 h1:XbXLpFicpo8HmBDaInk7dum18G9KSLcjZiyUKS+hLW4= @@ -31,26 +32,27 @@ github.com/go-logr/zerologr v1.2.3 h1:up5N9vcH9Xck3jJkXzgyOxozT14R47IyDODz8LM1KS github.com/go-logr/zerologr v1.2.3/go.mod h1:BxwGo7y5zgSHYR1BjbnHPyF/5ZjVKfKxAZANVu6E8Ho= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= -github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= -github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= +github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= +github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4= github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= -github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY= github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo= -github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ= +github.com/google/gnostic-models v0.7.1 h1:SisTfuFKJSKM5CPZkffwi6coztzzeYUhc3v4yxLWH8c= +github.com/google/gnostic-models v0.7.1/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J0b1vyeLSOYI8bm5wbJM/8yDe8= +github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c= -github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= -github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -61,13 +63,10 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= -github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4= +github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -78,22 +77,27 @@ github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFd github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/openfga/api/proto v0.0.0-20260217232149-f917ddb000ce h1:c9wvVBWpq+y4j9TUaCBsbX2Im3XSEm7YotAQZO5UT4k= -github.com/openfga/api/proto v0.0.0-20260217232149-f917ddb000ce/go.mod h1:XDX4qYNBUM2Rsa2AbKPh+oocZc2zgme+EF2fFC6amVU= +github.com/onsi/ginkgo/v2 v2.27.2 h1:LzwLj0b89qtIy6SSASkzlNvX6WktqurSHwkk2ipF/Ns= +github.com/onsi/ginkgo/v2 v2.27.2/go.mod h1:ArE1D/XhNXBXCBkKOLkbsb2c81dQHCRcF5zwn/ykDRo= +github.com/onsi/gomega v1.38.2 h1:eZCjf2xjZAqe+LeWvKb5weQ+NcPwX84kqJ0cZNxok2A= +github.com/onsi/gomega v1.38.2/go.mod h1:W2MJcYxRGV63b418Ai34Ud0hEdTVXq9NW9+Sx6uXf3k= +github.com/openfga/api/proto v0.0.0-20260319214821-f153694bfc20 h1:xdVG0EDz9Z9Uhd7YZ5OMN1F8tkAz/Dpgdjxd0cuTBJo= +github.com/openfga/api/proto v0.0.0-20260319214821-f153694bfc20/go.mod h1:XDX4qYNBUM2Rsa2AbKPh+oocZc2zgme+EF2fFC6amVU= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/platform-mesh/golang-commons v0.13.12 h1:u4ciVVMFbDXxSI1kuNvNszfSiV5GXj/RtFqik4bOXGs= -github.com/platform-mesh/golang-commons v0.13.12/go.mod h1:w1ZieZV5XIneflHr3GtBZqyFMPr+KxlZhasSkUOygzs= +github.com/platform-mesh/golang-commons v0.13.24-0.20260331080836-cc42019dfb2e h1:YTWUAllHCY/3bD40HeY3mEszgLq2ow2a6JKGO7fv8qg= +github.com/platform-mesh/golang-commons v0.13.24-0.20260331080836-cc42019dfb2e/go.mod h1:V4Db8FH1Uxpevz+dKrLBHFGqP2XvxYXXfCfF0xqngx0= +github.com/platform-mesh/search-operator v0.1.2-0.20260413065247-a4a8625c1a9c h1:BkmDIyvXbiYlcD4DCiYtNx4y90Fg9D+0jSgMD5OPjYk= +github.com/platform-mesh/search-operator v0.1.2-0.20260413065247-a4a8625c1a9c/go.mod h1:6QIMtynF6s23vEXziy5Ot69e2+z9waXmKaXEW1JOZCc= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= -github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= -github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= -github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= +github.com/rs/zerolog v1.35.0 h1:VD0ykx7HMiMJytqINBsKcbLS+BJ4WYjz+05us+LRTdI= +github.com/rs/zerolog v1.35.0/go.mod h1:EjML9kdfa/RMA7h/6z6pYmq1ykOuA8/mjWaEvGI+jcw= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= @@ -136,29 +140,33 @@ go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= -golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= -golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= +golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI= +golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY= +golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs= golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= -golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg= -golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM= -golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= -golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= -golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= -golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY= +golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= +golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= +golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s= +golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= -google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1:JLQynH/LBHfCTSbDWl+py8C+Rg/k1OVH3xfcaiANuF0= -google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:kSJwQxqmFXeo79zOmbrALdflXQeAYcUbgS7PbpMknCY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 h1:ggcbiqK8WWh6l1dnltU4BgWGIGo+EVYxCaAPih/zQXQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= -google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE= -google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7 h1:41r6JMbpzBMen0R/4TZeeAmGXSJC7DftGINUodzTkPI= +google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7/go.mod h1:EIQZ5bFCfRQDV4MhRle7+OgjNtZ6P1PiZBgAKuxXu/Y= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260414002931-afd174a4e478 h1:RmoJA1ujG+/lRGNfUnOMfhCy5EipVMyvUE+KNbPbTlw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260414002931-afd174a4e478/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= +google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -170,18 +178,20 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.35.2 h1:tW7mWc2RpxW7HS4CoRXhtYHSzme1PN1UjGHJ1bdrtdw= -k8s.io/api v0.35.2/go.mod h1:7AJfqGoAZcwSFhOjcGM7WV05QxMMgUaChNfLTXDRE60= -k8s.io/apimachinery v0.35.2 h1:NqsM/mmZA7sHW02JZ9RTtk3wInRgbVxL8MPfzSANAK8= -k8s.io/apimachinery v0.35.2/go.mod h1:jQCgFZFR1F4Ik7hvr2g84RTJSZegBc8yHgFWKn//hns= -k8s.io/client-go v0.35.2 h1:YUfPefdGJA4aljDdayAXkc98DnPkIetMl4PrKX97W9o= -k8s.io/client-go v0.35.2/go.mod h1:4QqEwh4oQpeK8AaefZ0jwTFJw/9kIjdQi0jpKeYvz7g= +k8s.io/api v0.35.3 h1:pA2fiBc6+N9PDf7SAiluKGEBuScsTzd2uYBkA5RzNWQ= +k8s.io/api v0.35.3/go.mod h1:9Y9tkBcFwKNq2sxwZTQh1Njh9qHl81D0As56tu42GA4= +k8s.io/apimachinery v0.35.3 h1:MeaUwQCV3tjKP4bcwWGgZ/cp/vpsRnQzqO6J6tJyoF8= +k8s.io/apimachinery v0.35.3/go.mod h1:jQCgFZFR1F4Ik7hvr2g84RTJSZegBc8yHgFWKn//hns= +k8s.io/client-go v0.35.3 h1:s1lZbpN4uI6IxeTM2cpdtrwHcSOBML1ODNTCCfsP1pg= +k8s.io/client-go v0.35.3/go.mod h1:RzoXkc0mzpWIDvBrRnD+VlfXP+lRzqQjCmKtiwZ8Q9c= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 h1:Y3gxNAuB0OBLImH611+UDZcmKS3g6CthxToOb37KgwE= k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912/go.mod h1:kdmbQkyfwUagLfXIad1y2TdrjPFWp2Q89B3qkRwf/pQ= -k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2 h1:AZYQSJemyQB5eRxqcPky+/7EdBj0xi3g0ZcxxJ7vbWU= -k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2/go.mod h1:xDxuJ0whA3d0I4mf/C4ppKHxXynQ+fxnkmQH0vTHnuk= +k8s.io/utils v0.0.0-20260319190234-28399d86e0b5 h1:kBawHLSnx/mYHmRnNUf9d4CpjREbeZuxoSGOX/J+aYM= +k8s.io/utils v0.0.0-20260319190234-28399d86e0b5/go.mod h1:xDxuJ0whA3d0I4mf/C4ppKHxXynQ+fxnkmQH0vTHnuk= +sigs.k8s.io/controller-runtime v0.23.3 h1:VjB/vhoPoA9l1kEKZHBMnQF33tdCLQKJtydy4iqwZ80= +sigs.k8s.io/controller-runtime v0.23.3/go.mod h1:B6COOxKptp+YaUT5q4l6LqUJTRpizbgf9KSRNdQGns0= sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5EXP7sU1kvOlxwZh5txg= sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= diff --git a/hack/kubeconfig-copy.sh b/hack/kubeconfig-copy.sh new file mode 100755 index 0000000..79d62bf --- /dev/null +++ b/hack/kubeconfig-copy.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# Load .env from repo root if present +ENV_FILE="$SCRIPT_DIR/../.env" +if [ -f "$ENV_FILE" ]; then + set -a + # shellcheck source=/dev/null + source "$ENV_FILE" + set +a +fi + +echo KCP_KUBECONFIG="$PORTAL_DIRECTORY/upstream/.secret/kcp/admin.kubeconfig" +KCP_SERVER="https://localhost:8443/clusters/root:providers:search" + +OUTPUT_KUBECONFIG="$SCRIPT_DIR/../admin.kubeconfig" + +echo "Checking for Kind cluster 'platform-mesh'..." +if [ -n "${KIND_EXPERIMENTAL_PROVIDER:-}" ]; then + export KIND_EXPERIMENTAL_PROVIDER +fi +if ! kind get clusters 2>/dev/null | grep -q "^platform-mesh$"; then + echo "Error: Kind cluster 'platform-mesh' not found" + echo "Available clusters:" + kind get clusters 2>/dev/null || echo "(none)" + exit 1 +fi + +if [ ! -f "$KCP_KUBECONFIG" ]; then + echo "Error: KCP kubeconfig not found at $KCP_KUBECONFIG" + exit 1 +fi + +echo "Copying kubeconfig and setting server to $KCP_SERVER..." + +cp "$KCP_KUBECONFIG" "$OUTPUT_KUBECONFIG" +yq eval -i '(.clusters[] | select(.name == "workspace.kcp.io/current") | .cluster.server) = "'"$KCP_SERVER"'"' "$OUTPUT_KUBECONFIG" + +echo "" +echo "Successfully wrote kubeconfig to $OUTPUT_KUBECONFIG" diff --git a/internal/clients/fga/authorizer.go b/internal/clients/fga/authorizer.go index 459889b..7eaa067 100644 --- a/internal/clients/fga/authorizer.go +++ b/internal/clients/fga/authorizer.go @@ -3,11 +3,11 @@ package fga import ( "context" "fmt" + "strconv" "strings" openfgav1 "github.com/openfga/api/proto/openfga/v1" - "github.com/platform-mesh/golang-commons/fga/helpers" - "github.com/platform-mesh/golang-commons/fga/util" + "github.com/platform-mesh/golang-commons/logger" "github.com/platform-mesh/search/internal/service/search" ) @@ -23,16 +23,29 @@ func NewAuthorizer(client openfgav1.OpenFGAServiceClient) *Authorizer { } func (a *Authorizer) FilterAuthorized(ctx context.Context, req search.AuthorizationRequest) (search.AuthorizationResult, error) { + log := logger.LoadLoggerFromContext(ctx) allowed := make([]bool, len(req.Hits)) if len(req.Hits) == 0 { return search.AuthorizationResult{Allowed: allowed}, nil } - storeID, err := helpers.GetStoreIDForTenant(ctx, a.client, req.Organization) + storeID, err := a.resolveStoreID(ctx, req.Organization) if err != nil { + log.Error(). + Err(err). + Str("organization", req.Organization). + Str("user", req.User). + Msg("failed to resolve OpenFGA store ID") return search.AuthorizationResult{}, fmt.Errorf("resolve store ID: %w", err) } + log.Debug(). + Str("organization", req.Organization). + Str("storeID", storeID). + Str("user", req.User). + Int("hits", len(req.Hits)). + Msg("resolved OpenFGA store for authorization") + result := search.AuthorizationResult{Allowed: allowed} for _, chunk := range chunkRanges(len(req.Hits), batchCheckChunkSize) { @@ -42,7 +55,7 @@ func (a *Authorizer) FilterAuthorized(ctx context.Context, req search.Authorizat indicesByCorrelation := make(map[string]int, end-start) for idx := start; idx < end; idx++ { - item, missingContext := buildBatchCheckItem(req.User, req.Relation, idx, req.Hits[idx]) + item, missingContext := buildBatchCheckItem(log, req.User, req.Relation, idx, req.Hits[idx]) if missingContext { result.DroppedMissingContext++ continue @@ -60,6 +73,13 @@ func (a *Authorizer) FilterAuthorized(ctx context.Context, req search.Authorizat Checks: items, }) if err != nil { + log.Error(). + Err(err). + Str("organization", req.Organization). + Str("storeID", storeID). + Str("user", req.User). + Int("checks", len(items)). + Msg("OpenFGA BatchCheck failed") return search.AuthorizationResult{}, fmt.Errorf("openfga batch check: %w", err) } result.Calls++ @@ -81,6 +101,21 @@ func (a *Authorizer) FilterAuthorized(ctx context.Context, req search.Authorizat return result, nil } +func (a *Authorizer) resolveStoreID(ctx context.Context, org string) (string, error) { + res, err := a.client.ListStores(ctx, &openfgav1.ListStoresRequest{}) + if err != nil { + return "", fmt.Errorf("list stores: %w", err) + } + + for _, store := range res.GetStores() { + if strings.TrimSpace(store.GetName()) == org { + return store.GetId(), nil + } + } + + return "", fmt.Errorf("no OpenFGA store found for organization %q", org) +} + func chunkRanges(total, chunkSize int) [][2]int { if total <= 0 || chunkSize <= 0 { return nil @@ -98,111 +133,70 @@ func chunkRanges(total, chunkSize int) [][2]int { return ranges } -func buildBatchCheckItem(user, relation string, index int, hit search.OpenSearchHit) (*openfgav1.BatchCheckItem, bool) { - ctx, ok := buildAuthorizationContext(hit.Source) +func buildBatchCheckItem(log *logger.Logger, user, relation string, index int, hit search.OpenSearchHit) (*openfgav1.BatchCheckItem, bool) { + authContext, ok := buildAuthorizationContext(log, hit.Source) if !ok { return nil, true } tupleKey := &openfgav1.CheckRequestTupleKey{ - User: fmt.Sprintf("user:%s", user), + User: fmt.Sprintf("user:%s", formatUser(user)), Relation: relation, - Object: ctx.object, + Object: authContext.object, } + log.Debug(). + Int("hitIndex", index). + Str("user", tupleKey.User). + Str("relation", tupleKey.Relation). + Str("object", tupleKey.Object). + Interface("contextualTuples", authContext.contextualTuples). + Msg("building FGA BatchCheck item") + return &openfgav1.BatchCheckItem{ TupleKey: tupleKey, - ContextualTuples: &openfgav1.ContextualTupleKeys{TupleKeys: ctx.contextualTuples}, - CorrelationId: fmt.Sprintf("%d", index), + ContextualTuples: &openfgav1.ContextualTupleKeys{TupleKeys: authContext.contextualTuples}, + CorrelationId: strconv.Itoa(index), }, false } -type authzContext struct { +type authContext struct { object string contextualTuples []*openfgav1.TupleKey } -func buildAuthorizationContext(source map[string]interface{}) (authzContext, bool) { +func buildAuthorizationContext(log *logger.Logger, source map[string]interface{}) (authContext, bool) { if source == nil { - return authzContext{}, false - } - - kind := readString(source, "kind") - name := readString(source, "name") - namespace := readString(source, "namespace") - apiGroup := readString(source, "api_group") - clusterName := readString(source, "cluster_name") - organizationID := readString(source, "organization_id") - accountID := readString(source, "account_id") - accountName := readString(source, "account_name") - - clusterID := firstNonEmpty(accountID, organizationID) - if clusterID == "" && !strings.Contains(clusterName, ":") { - clusterID = clusterName - } - - if kind == "" || name == "" || clusterID == "" { - return authzContext{}, false - } - - if namespace != "" { - if accountName == "" { - return authzContext{}, false - } - if firstNonEmpty(accountID, organizationID, clusterID) == "" { - return authzContext{}, false - } - } - - resourceType := util.ConvertToTypeName(apiGroup, kind) - object := fmt.Sprintf("%s:%s/%s", resourceType, clusterID, name) - if namespace != "" { - object = fmt.Sprintf("%s:%s/%s/%s", resourceType, clusterID, namespace, name) - } - - parentAccountCluster := firstNonEmpty(accountID, organizationID, clusterID) - accountObject := "" - if accountName != "" && parentAccountCluster != "" { - accountType := util.ConvertToTypeName("core.platform-mesh.io", "Account") - accountObject = fmt.Sprintf("%s:%s/%s", accountType, parentAccountCluster, accountName) - } - - tuples := make([]*openfgav1.TupleKey, 0, 2) - resourceManaged := managedTuple(apiGroup, kind) - - if namespace != "" && accountObject != "" { - namespaceType := util.ConvertToTypeName("", "Namespace") - namespaceObject := fmt.Sprintf("%s:%s/%s", namespaceType, clusterID, namespace) - - tuples = append(tuples, &openfgav1.TupleKey{ - Object: namespaceObject, - Relation: "parent", - User: accountObject, - }) - - if !resourceManaged { - tuples = append(tuples, &openfgav1.TupleKey{ - Object: object, - Relation: "parent", - User: namespaceObject, - }) + log.Debug().Msg("auth context build failed: source is nil") + return authContext{}, false + } + + fgaObject := readString(source, "fga_object") + permissionsRaw, hasPermissions := source["permissions"].([]interface{}) + + if fgaObject != "" { + tuples := make([]*openfgav1.TupleKey, 0) + if hasPermissions { + for _, p := range permissionsRaw { + m, ok := p.(map[string]interface{}) + if !ok { + continue + } + tuples = append(tuples, &openfgav1.TupleKey{ + User: readString(m, "user"), + Relation: readString(m, "relation"), + Object: readString(m, "object"), + }) + } } - } else if accountObject != "" && !resourceManaged { - tuples = append(tuples, &openfgav1.TupleKey{ - Object: object, - Relation: "parent", - User: accountObject, - }) + return authContext{object: fgaObject, contextualTuples: tuples}, true } - return authzContext{object: object, contextualTuples: tuples}, true + return authContext{}, false } -func managedTuple(group, kind string) bool { - if strings.EqualFold(group, "core.platform-mesh.io") && strings.EqualFold(kind, "Account") { - return true - } - return false +func formatUser(user string) string { + return strings.ReplaceAll(user, ":", ".") } func readString(source map[string]interface{}, key string) string { @@ -213,12 +207,3 @@ func readString(source map[string]interface{}, key string) string { s, _ := v.(string) return strings.TrimSpace(s) } - -func firstNonEmpty(values ...string) string { - for _, value := range values { - if strings.TrimSpace(value) != "" { - return strings.TrimSpace(value) - } - } - return "" -} diff --git a/internal/clients/fga/authorizer_test.go b/internal/clients/fga/authorizer_test.go index c3bf29b..626c4fb 100644 --- a/internal/clients/fga/authorizer_test.go +++ b/internal/clients/fga/authorizer_test.go @@ -4,28 +4,33 @@ import ( "reflect" "testing" + "github.com/platform-mesh/golang-commons/logger/testlogger" + "github.com/platform-mesh/search/internal/service/search" ) func TestBuildBatchCheckItemResourceObjectFormat(t *testing.T) { hit := search.OpenSearchHit{Source: map[string]interface{}{ - "kind": "Component", - "name": "my-component", - "namespace": "dev", - "api_group": "core.platform-mesh.io", - "organization_id": "orgcluster1", - "account_id": "acccluster1", - "account_name": "account-a", + "fga_object": "core_platform-mesh_io_component:cluster1/ns1/comp1", + "permissions": []interface{}{ + map[string]interface{}{ + "user": "core_platform-mesh_io_account:sap/workspaces", + "relation": "parent", + "object": "core_platform_mesh_io_namespace:cluster1/ns1", + }, + }, }} - item, missing := buildBatchCheckItem("alice@example.com", "get", 0, hit) + testlogger := testlogger.New().HideLogOutput() + + item, missing := buildBatchCheckItem(testlogger.Logger, "alice@example.com", "get", 0, hit) if missing { t.Fatalf("expected context to be valid") } if item.TupleKey.Relation != "get" { t.Fatalf("unexpected relation: %s", item.TupleKey.Relation) } - expected := "core_platform-mesh_io_component:acccluster1/dev/my-component" + expected := "core_platform-mesh_io_component:cluster1/ns1/comp1" if item.TupleKey.Object != expected { t.Fatalf("unexpected object: %s", item.TupleKey.Object) } @@ -36,15 +41,13 @@ func TestBuildBatchCheckItemResourceObjectFormat(t *testing.T) { func TestBuildBatchCheckItemDropsMissingAuthContext(t *testing.T) { hit := search.OpenSearchHit{Source: map[string]interface{}{ - "kind": "Component", - "name": "my-component", - "namespace": "dev", - "api_group": "core.platform-mesh.io", - "organization_id": "orgcluster1", - // account_name intentionally missing for namespaced resources + // missing fga_object + "kind": "Component", }} - _, missing := buildBatchCheckItem("alice@example.com", "get", 0, hit) + testlogger := testlogger.New().HideLogOutput() + + _, missing := buildBatchCheckItem(testlogger.Logger, "alice@example.com", "get", 0, hit) if !missing { t.Fatalf("expected missing auth context") } @@ -92,3 +95,60 @@ func TestChunkRanges(t *testing.T) { }) } } + +func TestFormatUser(t *testing.T) { + tests := []struct { + user string + want string + }{ + {"alice", "alice"}, + {"system:serviceaccount:default:auth", "system.serviceaccount.default.auth"}, + } + for _, tt := range tests { + if got := formatUser(tt.user); got != tt.want { + t.Errorf("formatUser(%q) = %q, want %q", tt.user, got, tt.want) + } + } +} + +func TestBuildAuthorizationContextFromDocumentMetadata(t *testing.T) { + source := map[string]interface{}{ + "fga_object": "core_platform-mesh_io_component:cluster-x/ns-y/comp-z", + "permissions": []interface{}{ + map[string]interface{}{ + "user": "core_platform_mesh_io_account:sap/workspaces", + "relation": "parent", + "object": "core_platform_mesh_io_namespace:cluster-x/ns-y", + }, + }, + } + + ctx, ok := buildAuthorizationContext(nil, source) + if !ok { + t.Fatalf("expected valid context") + } + + if ctx.object != source["fga_object"] { + t.Errorf("expected object %q, got %q", source["fga_object"], ctx.object) + } +} + +func TestBuildAuthorizationContextFromDocumentMetadataNoPermissions(t *testing.T) { + source := map[string]interface{}{ + "fga_object": "core_platform_mesh_io_workspace:cluster-x/work-y", + } + + testlogger := testlogger.New().HideLogOutput() + + ctx, ok := buildAuthorizationContext(testlogger.Logger, source) + if !ok { + t.Fatalf("expected valid context") + } + + if ctx.object != "core_platform_mesh_io_workspace:cluster-x/work-y" { + t.Errorf("unexpected object: %s", ctx.object) + } + if len(ctx.contextualTuples) != 0 { + t.Errorf("expected 0 tuples, got %d", len(ctx.contextualTuples)) + } +} diff --git a/internal/clients/kcp/access_validator.go b/internal/clients/kcp/access_validator.go index 521afd0..930a322 100644 --- a/internal/clients/kcp/access_validator.go +++ b/internal/clients/kcp/access_validator.go @@ -3,6 +3,7 @@ package kcp import ( "context" "fmt" + "io" "net/http" "net/url" "strings" @@ -50,6 +51,11 @@ func (v *OrgAccessValidator) ValidateTokenForOrg(ctx context.Context, authHeader resp, err := v.http.Do(req) if err != nil { + v.log.Error(). + Err(err). + Str("organization", org). + Str("clusterPath", clusterPath). + Msg("KCP org token validation request failed") return false, fmt.Errorf("execute KCP auth request: %w", err) } defer resp.Body.Close() //nolint:errcheck @@ -58,8 +64,24 @@ func (v *OrgAccessValidator) ValidateTokenForOrg(ctx context.Context, authHeader case http.StatusOK, http.StatusCreated, http.StatusForbidden: return true, nil case http.StatusUnauthorized: + v.log.Warn(). + Str("organization", org). + Str("clusterPath", clusterPath). + Int("statusCode", resp.StatusCode). + Msg("KCP org token validation denied request") return false, nil default: + bodyBytes, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) + bodySnippet := strings.TrimSpace(string(bodyBytes)) + logEvt := v.log.Warn(). + Str("organization", org). + Str("clusterPath", clusterPath). + Int("statusCode", resp.StatusCode) + if bodySnippet != "" { + logEvt = logEvt.Str("responseBody", bodySnippet) + } + logEvt.Msg("KCP org token validation returned unexpected status") + if strings.HasPrefix(fmt.Sprintf("%d", resp.StatusCode), "5") { return false, fmt.Errorf("kcp auth check failed with status %d", resp.StatusCode) } diff --git a/internal/clients/kcp/searchindex_resolver.go b/internal/clients/kcp/searchindex_resolver.go index ff49fa0..9bb0620 100644 --- a/internal/clients/kcp/searchindex_resolver.go +++ b/internal/clients/kcp/searchindex_resolver.go @@ -2,14 +2,18 @@ package kcp import ( "context" - "encoding/json" "fmt" - "io" - "net/http" "net/url" + "sort" "strings" "github.com/platform-mesh/golang-commons/logger" + "github.com/platform-mesh/search-operator/api/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "github.com/platform-mesh/search/internal/config" @@ -17,83 +21,290 @@ import ( ) type SearchIndexResolver struct { - http *http.Client - baseURL *url.URL - cfg config.SearchIndexConfig - log *logger.Logger + client dynamic.Interface + cfg config.SearchIndexConfig + log *logger.Logger } +const searchIndexOrgClusterIDLabel = "search.platform-mesh.io/org-cluster-id" + func NewSearchIndexResolver(restCfg *rest.Config, cfg config.SearchIndexConfig, log *logger.Logger) (*SearchIndexResolver, error) { - httpClient, err := rest.HTTPClientFor(restCfg) + scopedCfg, err := configForKCPCluster(cfg.WorkspacePath, restCfg) if err != nil { - return nil, fmt.Errorf("create KCP HTTP client: %w", err) + return nil, fmt.Errorf("create KCP workspace config: %w", err) } - baseURL, err := url.Parse(restCfg.Host) + dynamicClient, err := dynamic.NewForConfig(scopedCfg) if err != nil { - return nil, fmt.Errorf("parse KCP host URL: %w", err) + return nil, fmt.Errorf("create KCP dynamic client: %w", err) } - baseURL.Path = "" return &SearchIndexResolver{ - http: httpClient, - baseURL: baseURL, - cfg: cfg, - log: log, + client: dynamicClient, + cfg: cfg, + log: log, }, nil } -func (r *SearchIndexResolver) ResolveIndex(ctx context.Context, org string) (search.SearchIndexRef, error) { - resourceURL := fmt.Sprintf("%s://%s/clusters/%s/apis/%s/%s/%s/%s", - r.baseURL.Scheme, - r.baseURL.Host, - r.cfg.WorkspacePath, - r.cfg.Group, - r.cfg.Version, - r.cfg.Resource, - org, - ) - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, resourceURL, http.NoBody) +func (s *SearchIndexResolver) ResolveIndex(ctx context.Context, org, resource string) (search.SearchIndexRef, error) { + resource = strings.TrimSpace(resource) + if resource == "" { + return search.SearchIndexRef{}, fmt.Errorf("resource is required") + } + + indices, err := s.ListIndices(ctx, org) if err != nil { - return search.SearchIndexRef{}, fmt.Errorf("create SearchIndex request: %w", err) + return search.SearchIndexRef{}, err + } + + normalized := normalizeName(resource) + for _, index := range indices { + if normalizeName(index.Resource) == normalized { + return index, nil + } } - resp, err := r.http.Do(req) + return search.SearchIndexRef{}, fmt.Errorf("no SearchIndex matched org %q and resource %q", org, resource) +} + +func (s *SearchIndexResolver) ListIndices(ctx context.Context, org string) ([]search.SearchIndexRef, error) { + org = strings.TrimSpace(org) + if org == "" { + return nil, fmt.Errorf("organization is required") + } + + orgClusterID, err := s.resolveOrganizationClusterID(ctx, org) if err != nil { - return search.SearchIndexRef{}, fmt.Errorf("execute SearchIndex request: %w", err) + return nil, err } - defer resp.Body.Close() //nolint:errcheck - if resp.StatusCode >= http.StatusBadRequest { - raw, _ := io.ReadAll(io.LimitReader(resp.Body, 8192)) - return search.SearchIndexRef{}, fmt.Errorf("read SearchIndex failed with status %d: %s", resp.StatusCode, strings.TrimSpace(string(raw))) + list, err := s.listSearchIndices(ctx, orgClusterID) + if err != nil { + return nil, fmt.Errorf("list SearchIndex resources: %w", err) } - var payload struct { - Spec struct { - OrganizationClusterID string `json:"organizationClusterID"` - } `json:"spec"` - Status struct { - IndexName string `json:"indexName"` - } `json:"status"` + if len(list.Items) == 0 { + list, err = s.listSearchIndices(ctx, "") + if err != nil { + return nil, fmt.Errorf("list SearchIndex resources (fallback): %w", err) + } } - if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { - return search.SearchIndexRef{}, fmt.Errorf("decode SearchIndex response: %w", err) + if len(list.Items) == 0 { + return nil, fmt.Errorf("no SearchIndex resources found in workspace %q", s.cfg.WorkspacePath) } - indexName := strings.TrimSpace(payload.Status.IndexName) + refs := make([]search.SearchIndexRef, 0, len(list.Items)) + seenResource := make(map[string]string, len(list.Items)) + for _, item := range list.Items { + ref, ok := mapSearchIndexRef(item, orgClusterID, s.cfg) + if !ok { + continue + } + + if existingIndex, exists := seenResource[ref.Resource]; exists && existingIndex != ref.IndexName { + return nil, fmt.Errorf("multiple SearchIndex resources match org %q and resource %q", org, ref.Resource) + } + seenResource[ref.Resource] = ref.IndexName + refs = append(refs, ref) + } + + if len(refs) == 0 { + return nil, fmt.Errorf("no active SearchIndex resources found for org %q", org) + } + + sort.Slice(refs, func(i, j int) bool { + return refs[i].Resource < refs[j].Resource + }) + return refs, nil +} + +func (s *SearchIndexResolver) resolveOrganizationClusterID(ctx context.Context, org string) (string, error) { + obj, err := s.client.Resource(workspaceGVR).Get(ctx, org, metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("resolve workspace cluster ID for org %q: %w", org, err) + } + + clusterID, found, err := unstructured.NestedString(obj.Object, "spec", "cluster") + if err != nil || !found { + return "", fmt.Errorf("workspace %q does not expose spec.cluster", org) + } + + clusterID = strings.TrimSpace(clusterID) + if clusterID == "" { + return "", fmt.Errorf("workspace %q does not expose spec.cluster", org) + } + return clusterID, nil +} + +func (s *SearchIndexResolver) listSearchIndices(ctx context.Context, orgClusterID string) (v1alpha1.SearchIndexList, error) { + var list v1alpha1.SearchIndexList + + listOpts := metav1.ListOptions{} + if orgClusterID != "" { + listOpts.LabelSelector = fmt.Sprintf("%s=%s", searchIndexOrgClusterIDLabel, orgClusterID) + } + + objList, err := s.client.Resource(searchIndexGVR(s.cfg)).List(ctx, listOpts) + if err != nil { + return list, err + } + + items := make([]v1alpha1.SearchIndex, 0, len(objList.Items)) + for _, item := range objList.Items { + searchIndex := v1alpha1.SearchIndex{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(item.Object, &searchIndex); err != nil { + return list, fmt.Errorf("decode SearchIndex %q: %w", item.GetName(), err) + } + items = append(items, searchIndex) + } + + list.Items = items + return list, nil +} + +func mapSearchIndexRef(item v1alpha1.SearchIndex, orgClusterID string, cfg config.SearchIndexConfig) (search.SearchIndexRef, bool) { + indexName := strings.TrimSpace(item.Status.IndexName) if indexName == "" { - indexName = strings.TrimSpace(payload.Spec.OrganizationClusterID) + indexName = strings.TrimSpace(item.Name) } if indexName == "" { - return search.SearchIndexRef{}, fmt.Errorf("searchindex has neither status.indexName nor spec.organizationClusterID") + return search.SearchIndexRef{}, false + } + + orgID := firstNonEmpty(strings.TrimSpace(item.Spec.OrganizationClusterID), orgClusterID) + resource := deriveResourceName(item.Name, item.Spec.IndexPrefix, orgID) + if resource == "" { + resource = deriveResourceName(indexName, item.Spec.IndexPrefix, orgID) + } + if resource == "" { + return search.SearchIndexRef{}, false } return search.SearchIndexRef{ + Resource: resource, IndexName: indexName, - OrganizationClusterID: payload.Spec.OrganizationClusterID, - Group: r.cfg.Group, - Version: r.cfg.Version, - }, nil + IndexPrefix: strings.TrimSpace(item.Spec.IndexPrefix), + OrganizationClusterID: orgID, + DefaultFields: normalizeStringSlice(item.Spec.DefaultFields), + FilterableFields: normalizeStringSlice(item.Spec.FilterableFields), + SemanticFields: normalizeStringSlice(item.Spec.SemanticFields), + Group: cfg.Group, + Version: cfg.Version, + }, true +} + +func deriveResourceName(name, indexPrefix, orgClusterID string) string { + name = normalizeName(name) + if name == "" { + return "" + } + + prefix := normalizeName(indexPrefix) + orgID := normalizeName(orgClusterID) + + trimmed := name + if prefix != "" { + pattern := prefix + "-" + if !strings.HasPrefix(trimmed, pattern) { + return "" + } + trimmed = strings.TrimPrefix(trimmed, pattern) + } + if orgID != "" { + pattern := orgID + "-" + if !strings.HasPrefix(trimmed, pattern) { + return "" + } + trimmed = strings.TrimPrefix(trimmed, pattern) + } + + return strings.Trim(trimmed, "-") +} + +func normalizeName(value string) string { + value = strings.ToLower(value) + + var b strings.Builder + b.Grow(len(value)) + lastWasDash := false + for _, r := range value { + switch { + case r >= 'a' && r <= 'z': + b.WriteRune(r) + lastWasDash = false + case r >= '0' && r <= '9': + b.WriteRune(r) + lastWasDash = false + default: + if !lastWasDash { + b.WriteByte('-') + lastWasDash = true + } + } + } + return strings.Trim(b.String(), "-") +} + +func normalizeStringSlice(values []string) []string { + if len(values) == 0 { + return nil + } + + seen := make(map[string]struct{}, len(values)) + out := make([]string, 0, len(values)) + for _, value := range values { + trimmed := strings.TrimSpace(value) + if trimmed == "" { + continue + } + if _, ok := seen[trimmed]; ok { + continue + } + seen[trimmed] = struct{}{} + out = append(out, trimmed) + } + + sort.Strings(out) + + return out +} + +func configForKCPCluster(clusterName string, cfg *rest.Config) (*rest.Config, error) { + if cfg == nil { + return nil, fmt.Errorf("config cannot be nil") + } + + clusterCfg := rest.CopyConfig(cfg) + clusterCfgURL, err := url.Parse(clusterCfg.Host) + if err != nil { + return nil, fmt.Errorf("failed to parse host URL: %w", err) + } + + clusterCfgURL.Path = fmt.Sprintf("/clusters/%s", clusterName) + clusterCfg.Host = clusterCfgURL.String() + + return clusterCfg, nil +} + +func searchIndexGVR(cfg config.SearchIndexConfig) schema.GroupVersionResource { + return schema.GroupVersionResource{ + Group: cfg.Group, + Version: cfg.Version, + Resource: cfg.Resource, + } +} + +var workspaceGVR = schema.GroupVersionResource{ + Group: "tenancy.kcp.io", + Version: "v1alpha1", + Resource: "workspaces", +} + +func firstNonEmpty(values ...string) string { + for _, value := range values { + if strings.TrimSpace(value) != "" { + return strings.TrimSpace(value) + } + } + return "" } diff --git a/internal/clients/kcp/searchindex_resolver_test.go b/internal/clients/kcp/searchindex_resolver_test.go new file mode 100644 index 0000000..e5d07bc --- /dev/null +++ b/internal/clients/kcp/searchindex_resolver_test.go @@ -0,0 +1,180 @@ +package kcp + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/platform-mesh/search/internal/config" + "k8s.io/client-go/rest" +) + +func TestListIndicesBuildsResourceDescriptors(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/clusters/root:orgs/apis/tenancy.kcp.io/v1alpha1/workspaces/acme": + writeJSON(t, w, workspacePayload("acme", "cluster-123")) + case "/clusters/root:orgs/apis/core.platform-mesh.io/v1alpha1/searchindexes": + writeJSON(t, w, searchIndexListPayload(map[string]interface{}{ + "items": []map[string]interface{}{ + { + "metadata": map[string]interface{}{"name": "pm-cluster-123-accounts"}, + "spec": map[string]interface{}{ + "indexPrefix": "pm", + "organizationClusterID": "cluster-123", + "defaultFields": []string{"displayName", "description"}, + "filterableFields": []string{"status"}, + "semanticFields": []string{"description"}, + }, + "status": map[string]interface{}{"indexName": "pm-cluster-123-accounts"}, + }, + { + "metadata": map[string]interface{}{"name": "pm-cluster-123-products"}, + "spec": map[string]interface{}{ + "indexPrefix": "pm", + "organizationClusterID": "cluster-123", + "defaultFields": []string{"name"}, + "filterableFields": []string{"category"}, + }, + "status": map[string]interface{}{"indexName": "pm-cluster-123-products"}, + }, + }, + })) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + resolver := newTestResolver(t, server.URL) + refs, err := resolver.ListIndices(context.Background(), "acme") + if err != nil { + t.Fatalf("ListIndices returned error: %v", err) + } + if len(refs) != 2 { + t.Fatalf("expected 2 refs, got %d", len(refs)) + } + if refs[0].Resource != "accounts" || refs[1].Resource != "products" { + t.Fatalf("unexpected resources: %+v", refs) + } +} + +func TestResolveIndexByResource(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/clusters/root:orgs/apis/tenancy.kcp.io/v1alpha1/workspaces/acme": + writeJSON(t, w, workspacePayload("acme", "cluster-123")) + case "/clusters/root:orgs/apis/core.platform-mesh.io/v1alpha1/searchindexes": + writeJSON(t, w, searchIndexListPayload(map[string]interface{}{ + "items": []map[string]interface{}{ + { + "metadata": map[string]interface{}{"name": "pm-cluster-123-accounts"}, + "spec": map[string]interface{}{ + "indexPrefix": "pm", + "organizationClusterID": "cluster-123", + }, + "status": map[string]interface{}{"indexName": "pm-cluster-123-accounts"}, + }, + }, + })) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + resolver := newTestResolver(t, server.URL) + ref, err := resolver.ResolveIndex(context.Background(), "acme", "accounts") + if err != nil { + t.Fatalf("ResolveIndex returned error: %v", err) + } + if ref.IndexName != "pm-cluster-123-accounts" { + t.Fatalf("unexpected index name: %s", ref.IndexName) + } + if ref.Resource != "accounts" { + t.Fatalf("unexpected resource: %s", ref.Resource) + } +} + +func TestListIndicesReturnsErrorWhenResourceIsAmbiguous(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/clusters/root:orgs/apis/tenancy.kcp.io/v1alpha1/workspaces/acme": + writeJSON(t, w, workspacePayload("acme", "cluster-123")) + case "/clusters/root:orgs/apis/core.platform-mesh.io/v1alpha1/searchindexes": + writeJSON(t, w, searchIndexListPayload(map[string]interface{}{ + "items": []map[string]interface{}{ + { + "metadata": map[string]interface{}{"name": "pm-cluster-123-accounts"}, + "spec": map[string]interface{}{ + "indexPrefix": "pm", + "organizationClusterID": "cluster-123", + }, + "status": map[string]interface{}{"indexName": "pm-cluster-123-accounts-a"}, + }, + { + "metadata": map[string]interface{}{"name": "pm-cluster-123-accounts"}, + "spec": map[string]interface{}{ + "indexPrefix": "pm", + "organizationClusterID": "cluster-123", + }, + "status": map[string]interface{}{"indexName": "pm-cluster-123-accounts-b"}, + }, + }, + })) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + resolver := newTestResolver(t, server.URL) + if _, err := resolver.ListIndices(context.Background(), "acme"); err == nil { + t.Fatalf("expected ambiguous resource error") + } +} + +func newTestResolver(t *testing.T, base string) *SearchIndexResolver { + t.Helper() + + resolver, err := NewSearchIndexResolver(&rest.Config{Host: base}, config.SearchIndexConfig{ + WorkspacePath: "root:orgs", + Group: "core.platform-mesh.io", + Version: "v1alpha1", + Resource: "searchindexes", + }, nil) + if err != nil { + t.Fatalf("create resolver: %v", err) + } + + return resolver +} + +func writeJSON(t *testing.T, w http.ResponseWriter, payload interface{}) { + t.Helper() + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(payload); err != nil { + t.Fatalf("encode json: %v", err) + } +} + +func workspacePayload(name, clusterID string) map[string]interface{} { + return map[string]interface{}{ + "apiVersion": "tenancy.kcp.io/v1alpha1", + "kind": "Workspace", + "metadata": map[string]interface{}{ + "name": name, + }, + "spec": map[string]interface{}{ + "cluster": clusterID, + }, + } +} + +func searchIndexListPayload(payload map[string]interface{}) map[string]interface{} { + payload["apiVersion"] = "core.platform-mesh.io/v1alpha1" + payload["kind"] = "SearchIndexList" + return payload +} diff --git a/internal/clients/opensearch/client.go b/internal/clients/opensearch/client.go index a682085..c612e50 100644 --- a/internal/clients/opensearch/client.go +++ b/internal/clients/opensearch/client.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "net/url" + "sort" "strings" "time" @@ -58,46 +59,130 @@ func NewClient(cfg Config) (*Client, error) { }, nil } -func BuildQueryBody(query string, size int, searchAfter []interface{}) ([]byte, error) { - body := map[string]interface{}{ - "size": size, - "query": map[string]interface{}{ - "simple_query_string": map[string]interface{}{ - "query": query, - "fields": []string{"*"}, - "default_operator": "and", +func BuildQueryBody(req search.OpenSearchQuery) ([]byte, error) { + query := strings.TrimSpace(req.Query) + fields := dedupeStrings(req.Fields) + filters := normalizeFilters(req.Filters) + + var queryClause map[string]interface{} + if query == "" { + queryClause = map[string]interface{}{ + "match_all": map[string]interface{}{}, + } + } else { + simple := map[string]interface{}{ + "query": query, + "default_operator": "and", + } + if len(fields) > 0 { + simple["fields"] = fields + } + queryClause = map[string]interface{}{ + "simple_query_string": simple, + } + } + + if len(filters) > 0 { + filterClauses := make([]map[string]interface{}, 0, len(filters)) + keys := make([]string, 0, len(filters)) + for key := range filters { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, field := range keys { + values := filters[field] + if len(values) == 0 { + continue + } + keywordField := field + if !strings.HasSuffix(keywordField, ".keyword") { + keywordField += ".keyword" + } + filterClauses = append(filterClauses, map[string]interface{}{ + "terms": map[string]interface{}{ + keywordField: values, + }, + }) + } + + queryClause = map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []interface{}{queryClause}, + "filter": filterClauses, }, - }, + } + } + + body := map[string]interface{}{ + "size": req.Size, + "query": queryClause, "sort": []map[string]string{ {"_score": "desc"}, {"_id": "asc"}, + {"_index": "asc"}, }, } - if len(searchAfter) > 0 { - body["search_after"] = searchAfter + if len(req.SearchAfter) > 0 { + body["search_after"] = req.SearchAfter + } + + if field := strings.TrimSpace(req.AggregationField); field != "" { + keywordField := field + if !strings.HasSuffix(keywordField, ".keyword") { + keywordField += ".keyword" + } + aggSize := req.Size + if aggSize <= 0 { + aggSize = 10 + } + body["aggs"] = map[string]interface{}{ + "values": map[string]interface{}{ + "terms": map[string]interface{}{ + "field": keywordField, + "size": aggSize, + }, + }, + } + if req.Size <= 0 { + body["size"] = 0 + delete(body, "sort") + } } return json.Marshal(body) } -func (c *Client) Search(ctx context.Context, indexName, query string, size int, searchAfter []interface{}) (search.OpenSearchPage, error) { - body, err := BuildQueryBody(query, size, searchAfter) +func (c *Client) Search(ctx context.Context, query search.OpenSearchQuery) (search.OpenSearchPage, error) { + indices := dedupeStrings(query.Indices) + if len(indices) == 0 { + return search.OpenSearchPage{}, fmt.Errorf("at least one OpenSearch index is required") + } + + body, err := BuildQueryBody(search.OpenSearchQuery{ + Query: query.Query, + Fields: query.Fields, + Filters: query.Filters, + Size: query.Size, + SearchAfter: query.SearchAfter, + AggregationField: query.AggregationField, + }) if err != nil { return search.OpenSearchPage{}, fmt.Errorf("build OpenSearch query body: %w", err) } - requestURL := c.baseURL.ResolveReference(&url.URL{Path: fmt.Sprintf("/%s/_search", indexName)}) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, requestURL.String(), bytes.NewReader(body)) + requestURL := c.baseURL.ResolveReference(&url.URL{Path: fmt.Sprintf("/%s/_search", strings.Join(indices, ","))}) + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, requestURL.String(), bytes.NewReader(body)) if err != nil { return search.OpenSearchPage{}, fmt.Errorf("create OpenSearch request: %w", err) } - req.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("Content-Type", "application/json") if c.username != "" { - req.SetBasicAuth(c.username, c.password) + httpReq.SetBasicAuth(c.username, c.password) } - resp, err := c.http.Do(req) + resp, err := c.http.Do(httpReq) if err != nil { return search.OpenSearchPage{}, fmt.Errorf("execute OpenSearch request: %w", err) } @@ -111,12 +196,18 @@ func (c *Client) Search(ctx context.Context, indexName, query string, size int, var payload struct { Hits struct { Hits []struct { + Index string `json:"_index"` ID string `json:"_id"` Score float64 `json:"_score"` Sort []interface{} `json:"sort"` Source map[string]interface{} `json:"_source"` } `json:"hits"` } `json:"hits"` + Aggregations map[string]struct { + Buckets []struct { + Key string `json:"key"` + } `json:"buckets"` + } `json:"aggregations"` } if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { return search.OpenSearchPage{}, fmt.Errorf("decode OpenSearch response: %w", err) @@ -125,6 +216,7 @@ func (c *Client) Search(ctx context.Context, indexName, query string, size int, hits := make([]search.OpenSearchHit, 0, len(payload.Hits.Hits)) for _, hit := range payload.Hits.Hits { hits = append(hits, search.OpenSearchHit{ + Index: hit.Index, ID: hit.ID, Score: hit.Score, Sort: hit.Sort, @@ -132,5 +224,63 @@ func (c *Client) Search(ctx context.Context, indexName, query string, size int, }) } - return search.OpenSearchPage{Hits: hits}, nil + var values []string + if agg, ok := payload.Aggregations["values"]; ok { + values = make([]string, 0, len(agg.Buckets)) + for _, b := range agg.Buckets { + if b.Key != "" { + values = append(values, b.Key) + } + } + sort.Strings(values) + } + + return search.OpenSearchPage{Hits: hits, AggregationValues: values}, nil +} + +func dedupeStrings(values []string) []string { + if len(values) == 0 { + return nil + } + + seen := make(map[string]struct{}, len(values)) + out := make([]string, 0, len(values)) + for _, value := range values { + trimmed := strings.TrimSpace(value) + if trimmed == "" { + continue + } + if _, ok := seen[trimmed]; ok { + continue + } + seen[trimmed] = struct{}{} + out = append(out, trimmed) + } + sort.Strings(out) + return out +} + +func normalizeFilters(filters map[string][]string) map[string][]string { + if len(filters) == 0 { + return nil + } + + out := make(map[string][]string, len(filters)) + for field, rawValues := range filters { + field = strings.TrimSpace(field) + if field == "" { + continue + } + + values := dedupeStrings(rawValues) + if len(values) == 0 { + continue + } + out[field] = values + } + + if len(out) == 0 { + return nil + } + return out } diff --git a/internal/clients/opensearch/client_test.go b/internal/clients/opensearch/client_test.go index 6e3bf7b..834cdf5 100644 --- a/internal/clients/opensearch/client_test.go +++ b/internal/clients/opensearch/client_test.go @@ -3,10 +3,16 @@ package opensearch import ( "encoding/json" "testing" + + "github.com/platform-mesh/search/internal/service/search" ) func TestBuildQueryBodyWithoutSearchAfter(t *testing.T) { - body, err := BuildQueryBody("hello", 20, nil) + body, err := BuildQueryBody(search.OpenSearchQuery{ + Query: "hello", + Fields: []string{"name", "description"}, + Size: 20, + }) if err != nil { t.Fatalf("BuildQueryBody returned error: %v", err) } @@ -24,13 +30,21 @@ func TestBuildQueryBodyWithoutSearchAfter(t *testing.T) { } sort := payload["sort"].([]interface{}) - if len(sort) != 2 { - t.Fatalf("expected 2 sort fields") + if len(sort) != 3 { + t.Fatalf("expected 3 sort fields") } } func TestBuildQueryBodyWithSearchAfter(t *testing.T) { - body, err := BuildQueryBody("hello", 10, []interface{}{1.0, "id-1"}) + body, err := BuildQueryBody(search.OpenSearchQuery{ + Query: "hello", + Fields: []string{"name"}, + Size: 10, + SearchAfter: []interface{}{1.0, "id-1", "idx"}, + Filters: map[string][]string{ + "status": {"Ready"}, + }, + }) if err != nil { t.Fatalf("BuildQueryBody returned error: %v", err) } @@ -41,7 +55,33 @@ func TestBuildQueryBodyWithSearchAfter(t *testing.T) { } searchAfter := payload["search_after"].([]interface{}) - if len(searchAfter) != 2 { - t.Fatalf("expected 2 search_after values") + if len(searchAfter) != 3 { + t.Fatalf("expected 3 search_after values") + } + + query := payload["query"].(map[string]interface{}) + boolQuery := query["bool"].(map[string]interface{}) + if _, ok := boolQuery["filter"]; !ok { + t.Fatalf("expected filter clause") + } +} + +func TestBuildQueryBodyWithoutQueryUsesMatchAll(t *testing.T) { + body, err := BuildQueryBody(search.OpenSearchQuery{ + Query: "", + Size: 5, + }) + if err != nil { + t.Fatalf("BuildQueryBody returned error: %v", err) + } + + var payload map[string]interface{} + if err := json.Unmarshal(body, &payload); err != nil { + t.Fatalf("invalid json: %v", err) + } + + query := payload["query"].(map[string]interface{}) + if _, ok := query["match_all"]; !ok { + t.Fatalf("expected match_all query") } } diff --git a/internal/config/config.go b/internal/config/config.go index 6ef90b7..6d72161 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,6 +2,7 @@ package config import ( "os" + "strings" "time" "github.com/spf13/pflag" @@ -34,16 +35,18 @@ type SearchConfig struct { } type ServiceConfig struct { - Port int - OpenSearch OpenSearchConfig - OpenFGA OpenFGAConfig - SearchIndex SearchIndexConfig - Search SearchConfig + Port int + LocalDevelopmentOrg string + OpenSearch OpenSearchConfig + OpenFGA OpenFGAConfig + SearchIndex SearchIndexConfig + Search SearchConfig } func NewServiceConfig() *ServiceConfig { return &ServiceConfig{ - Port: 8080, + Port: 8080, + LocalDevelopmentOrg: localDevelopmentOrgFromEnv(), OpenSearch: OpenSearchConfig{ URL: "http://opensearch.platform-mesh-system.svc.cluster.local:9200", Username: os.Getenv("OPENSEARCH_USERNAME"), @@ -58,7 +61,7 @@ func NewServiceConfig() *ServiceConfig { WorkspacePath: "root:orgs", Group: "core.platform-mesh.io", Version: "v1alpha1", - Resource: "searchindices", + Resource: "searchindexes", }, Search: SearchConfig{ DefaultLimit: 20, @@ -71,6 +74,7 @@ func NewServiceConfig() *ServiceConfig { func (c *ServiceConfig) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&c.Port, "port", c.Port, "Set the service port") + fs.StringVar(&c.LocalDevelopmentOrg, "local-development-org", c.LocalDevelopmentOrg, "Organization to use when request host is localhost") fs.StringVar(&c.OpenSearch.URL, "opensearch-url", c.OpenSearch.URL, "Set OpenSearch URL") fs.StringVar(&c.OpenSearch.Username, "opensearch-username", c.OpenSearch.Username, "Set OpenSearch username") @@ -90,3 +94,11 @@ func (c *ServiceConfig) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&c.Search.FetchBatchSize, "search-fetch-batch-size", c.Search.FetchBatchSize, "Batch size for OpenSearch fetches") fs.IntVar(&c.Search.MaxScannedHits, "search-max-scanned-hits", c.Search.MaxScannedHits, "Maximum raw hits scanned per request") } + +func localDevelopmentOrgFromEnv() string { + v := strings.TrimSpace(os.Getenv("SEARCH_LOCAL_ORG")) + if v == "" { + return "local" + } + return v +} diff --git a/internal/middleware/orgauth.go b/internal/middleware/orgauth.go index b451c98..1dc2cf8 100644 --- a/internal/middleware/orgauth.go +++ b/internal/middleware/orgauth.go @@ -1,7 +1,6 @@ package middleware import ( - "context" "fmt" "net" "net/http" @@ -9,6 +8,7 @@ import ( "strings" pmcontext "github.com/platform-mesh/golang-commons/context" + "github.com/platform-mesh/golang-commons/logger" appcontext "github.com/platform-mesh/search/internal/context" "github.com/platform-mesh/search/internal/service/search" @@ -16,44 +16,73 @@ import ( var issuerRegex = regexp.MustCompile(`^.*\/realms\/(.*?)\/?$`) +const defaultLocalDevelopmentOrg = "local" + type OrgContextMiddleware struct { - validator search.OrgAccessValidator + validator search.OrgAccessValidator + localDevelopment bool + localOrg string } -func NewOrgContextMiddleware(validator search.OrgAccessValidator) *OrgContextMiddleware { - return &OrgContextMiddleware{validator: validator} +func NewOrgContextMiddleware(validator search.OrgAccessValidator, localDevelopment bool, localOrg string) *OrgContextMiddleware { + localOrg = strings.TrimSpace(localOrg) + if localOrg == "" { + localOrg = defaultLocalDevelopmentOrg + } + return &OrgContextMiddleware{ + validator: validator, + localDevelopment: localDevelopment, + localOrg: localOrg, + } } func (m *OrgContextMiddleware) SetRequestContext() func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() + log := logger.LoadLoggerFromContext(ctx) org := extractSubdomain(r.Host) if org == "" { http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) return } + localHost := isLocalHost(r.Host) + if localHost { + org = m.localOrg + } token, err := pmcontext.GetWebTokenFromContext(ctx) if err != nil { http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) return } - authHeader, err := pmcontext.GetAuthHeaderFromContext(ctx) - if err != nil { - http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) - return - } - allowed, err := m.validator.ValidateTokenForOrg(ctx, authHeader, org) - if err != nil { - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } - if !allowed { - http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden) - return + if !(m.localDevelopment || localHost) { + authHeader, err := pmcontext.GetAuthHeaderFromContext(ctx) + if err != nil { + http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return + } + authHeader, err = normalizeBearerAuthHeader(authHeader) + if err != nil { + http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return + } + + allowed, err := m.validator.ValidateTokenForOrg(ctx, authHeader, org) + if err != nil { + log.Error(). + Err(err). + Str("organization", org). + Msg("failed to validate token for org access") + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + if !allowed { + http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden) + return + } } user := strings.TrimSpace(token.Mail) @@ -96,6 +125,27 @@ func extractSubdomain(host string) string { return strings.TrimSpace(parts[0]) } +func isLocalHost(host string) bool { + if host == "" { + return false + } + if h, _, err := net.SplitHostPort(host); err == nil { + host = h + } else { + host = strings.Split(host, ":")[0] + } + host = strings.TrimSpace(strings.ToLower(host)) + return host == "localhost" || host == "127.0.0.1" || host == "::1" +} + +func normalizeBearerAuthHeader(header string) (string, error) { + parts := strings.Fields(strings.TrimSpace(header)) + if len(parts) != 2 || !strings.EqualFold(parts[0], "Bearer") || parts[1] == "" { + return "", fmt.Errorf("invalid authorization header") + } + return "Bearer " + parts[1], nil +} + func extractTenant(issuer string) (string, error) { match := issuerRegex.FindStringSubmatch(issuer) if len(match) < 2 || match[1] == "" { @@ -103,7 +153,3 @@ func extractTenant(issuer string) (string, error) { } return match[1], nil } - -func InjectRequestContext(ctx context.Context, rc appcontext.RequestContext) context.Context { - return appcontext.WithRequestContext(ctx, rc) -} diff --git a/internal/middleware/orgauth_test.go b/internal/middleware/orgauth_test.go index b2e9deb..25802b8 100644 --- a/internal/middleware/orgauth_test.go +++ b/internal/middleware/orgauth_test.go @@ -29,7 +29,7 @@ func (f *fakeOrgValidator) ValidateTokenForOrg(_ context.Context, authHeader, or func TestSetRequestContextSuccessUsesMailFallbackToSub(t *testing.T) { validator := &fakeOrgValidator{allowed: true} - mw := NewOrgContextMiddleware(validator) + mw := NewOrgContextMiddleware(validator, false, "local") req := httptest.NewRequest(http.MethodGet, "/rest/v1/search?q=test", nil) req.Host = "acme.platform-mesh.io:8443" @@ -77,7 +77,7 @@ func TestSetRequestContextSuccessUsesMailFallbackToSub(t *testing.T) { func TestSetRequestContextForbiddenWhenOrgCheckFails(t *testing.T) { validator := &fakeOrgValidator{allowed: false} - mw := NewOrgContextMiddleware(validator) + mw := NewOrgContextMiddleware(validator, false, "local") req := httptest.NewRequest(http.MethodGet, "/rest/v1/search?q=test", nil) req.Host = "acme.platform-mesh.io" @@ -102,7 +102,7 @@ func TestSetRequestContextForbiddenWhenOrgCheckFails(t *testing.T) { func TestSetRequestContextReturns500OnValidatorError(t *testing.T) { validator := &fakeOrgValidator{err: errors.New("boom")} - mw := NewOrgContextMiddleware(validator) + mw := NewOrgContextMiddleware(validator, false, "local") req := httptest.NewRequest(http.MethodGet, "/rest/v1/search?q=test", nil) req.Host = "acme.platform-mesh.io" @@ -127,7 +127,7 @@ func TestSetRequestContextReturns500OnValidatorError(t *testing.T) { func TestSetRequestContextReturns401ForInvalidTokenContext(t *testing.T) { validator := &fakeOrgValidator{allowed: true} - mw := NewOrgContextMiddleware(validator) + mw := NewOrgContextMiddleware(validator, false, "local") req := httptest.NewRequest(http.MethodGet, "/rest/v1/search?q=test", nil) req.Host = "acme.platform-mesh.io" @@ -145,7 +145,7 @@ func TestSetRequestContextReturns401ForInvalidTokenContext(t *testing.T) { func TestSetRequestContextReturns401ForInvalidIssuer(t *testing.T) { validator := &fakeOrgValidator{allowed: true} - mw := NewOrgContextMiddleware(validator) + mw := NewOrgContextMiddleware(validator, false, "local") req := httptest.NewRequest(http.MethodGet, "/rest/v1/search?q=test", nil) req.Host = "acme.platform-mesh.io" @@ -167,3 +167,146 @@ func TestSetRequestContextReturns401ForInvalidIssuer(t *testing.T) { t.Fatalf("expected 401, got %d", rr.Code) } } + +func TestSetRequestContextLocalhostOverridesOrgAndBypassesValidator(t *testing.T) { + validator := &fakeOrgValidator{allowed: false} + mw := NewOrgContextMiddleware(validator, false, "local-org-test") + + req := httptest.NewRequest(http.MethodGet, "/rest/v1/search?q=test", nil) + req.Host = "localhost:8443" + + ctx := pmcontext.AddAuthHeaderToContext(req.Context(), "bearer\tabc") + ctx = context.WithValue(ctx, keys.WebTokenCtxKey, jwt.WebToken{ + IssuerAttributes: jwt.IssuerAttributes{ + Issuer: "https://idp.example.org/auth/realms/acme-tenant", + Subject: "subject-user", + }, + ParsedAttributes: jwt.ParsedAttributes{Mail: "user@example.org"}, + }) + req = req.WithContext(ctx) + + rr := httptest.NewRecorder() + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + rc, err := appcontext.GetRequestContext(r.Context()) + if err != nil { + t.Fatalf("request context missing: %v", err) + } + if rc.Organization != "local-org-test" { + t.Fatalf("unexpected org: %s", rc.Organization) + } + w.WriteHeader(http.StatusNoContent) + }) + + mw.SetRequestContext()(next).ServeHTTP(rr, req) + + if rr.Code != http.StatusNoContent { + t.Fatalf("expected 204, got %d", rr.Code) + } + if validator.org != "" || validator.auth != "" { + t.Fatalf("validator must not be called for localhost requests") + } +} + +func TestSetRequestContextBypassesValidatorInLocalDevelopmentMode(t *testing.T) { + validator := &fakeOrgValidator{allowed: false} + mw := NewOrgContextMiddleware(validator, true, "local") + + req := httptest.NewRequest(http.MethodGet, "/rest/v1/search?q=test", nil) + req.Host = "acme.platform-mesh.io" + + ctx := pmcontext.AddAuthHeaderToContext(req.Context(), "Bearer abc") + ctx = context.WithValue(ctx, keys.WebTokenCtxKey, jwt.WebToken{ + IssuerAttributes: jwt.IssuerAttributes{ + Issuer: "https://idp.example.org/auth/realms/acme-tenant", + Subject: "subject-user", + }, + ParsedAttributes: jwt.ParsedAttributes{Mail: "user@example.org"}, + }) + req = req.WithContext(ctx) + + rr := httptest.NewRecorder() + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + rc, err := appcontext.GetRequestContext(r.Context()) + if err != nil { + t.Fatalf("request context missing: %v", err) + } + if rc.Organization != "acme" { + t.Fatalf("unexpected org: %s", rc.Organization) + } + w.WriteHeader(http.StatusNoContent) + }) + + mw.SetRequestContext()(next).ServeHTTP(rr, req) + + if rr.Code != http.StatusNoContent { + t.Fatalf("expected 204, got %d", rr.Code) + } + if validator.org != "" || validator.auth != "" { + t.Fatalf("validator must not be called in local development mode") + } +} + +func TestSetRequestContextReturns401ForMalformedAuthorizationHeader(t *testing.T) { + validator := &fakeOrgValidator{allowed: true} + mw := NewOrgContextMiddleware(validator, false, "local") + + req := httptest.NewRequest(http.MethodGet, "/rest/v1/search?q=test", nil) + req.Host = "acme.platform-mesh.io" + + ctx := pmcontext.AddAuthHeaderToContext(req.Context(), "abc") + ctx = context.WithValue(ctx, keys.WebTokenCtxKey, jwt.WebToken{ + IssuerAttributes: jwt.IssuerAttributes{ + Issuer: "https://idp.example.org/auth/realms/acme-tenant", + Subject: "subject-user", + }, + }) + req = req.WithContext(ctx) + + rr := httptest.NewRecorder() + mw.SetRequestContext()(http.HandlerFunc(func(http.ResponseWriter, *http.Request) { + t.Fatalf("next handler must not be called") + })).ServeHTTP(rr, req) + + if rr.Code != http.StatusUnauthorized { + t.Fatalf("expected 401, got %d", rr.Code) + } + if validator.org != "" || validator.auth != "" { + t.Fatalf("validator must not be called on malformed auth header") + } +} + +func TestNewOrgContextMiddlewareFallsBackToDefaultLocalOrg(t *testing.T) { + validator := &fakeOrgValidator{allowed: false} + mw := NewOrgContextMiddleware(validator, false, "") + + req := httptest.NewRequest(http.MethodGet, "/rest/v1/search?q=test", nil) + req.Host = "localhost:8443" + + ctx := pmcontext.AddAuthHeaderToContext(req.Context(), "Bearer abc") + ctx = context.WithValue(ctx, keys.WebTokenCtxKey, jwt.WebToken{ + IssuerAttributes: jwt.IssuerAttributes{ + Issuer: "https://idp.example.org/auth/realms/acme-tenant", + Subject: "subject-user", + }, + ParsedAttributes: jwt.ParsedAttributes{Mail: "user@example.org"}, + }) + req = req.WithContext(ctx) + + rr := httptest.NewRecorder() + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + rc, err := appcontext.GetRequestContext(r.Context()) + if err != nil { + t.Fatalf("request context missing: %v", err) + } + if rc.Organization != defaultLocalDevelopmentOrg { + t.Fatalf("unexpected org: %s", rc.Organization) + } + w.WriteHeader(http.StatusNoContent) + }) + + mw.SetRequestContext()(next).ServeHTTP(rr, req) + + if rr.Code != http.StatusNoContent { + t.Fatalf("expected 204, got %d", rr.Code) + } +} diff --git a/internal/router/router.go b/internal/router/router.go index 6b08838..1203982 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -4,11 +4,13 @@ import ( "context" "encoding/json" "errors" + "fmt" "net/http" "strconv" "strings" "github.com/go-chi/chi/v5" + "github.com/platform-mesh/golang-commons/logger" appcontext "github.com/platform-mesh/search/internal/context" "github.com/platform-mesh/search/internal/service/search" @@ -16,6 +18,8 @@ import ( type SearchService interface { Search(ctx context.Context, req search.SearchRequest) (search.SearchResponse, error) + ListResources(ctx context.Context, req search.SearchResourcesRequest) (search.SearchResourcesResponse, error) + FilterValues(ctx context.Context, req search.FilterValuesRequest) (search.FilterValuesResponse, error) } func CreateRouter(svc SearchService, mws []func(http.Handler) http.Handler) *chi.Mux { @@ -36,35 +40,107 @@ func CreateRouter(svc SearchService, mws []func(http.Handler) http.Handler) *chi } q := strings.TrimSpace(r.URL.Query().Get("q")) - limit := 0 - limitRaw := strings.TrimSpace(r.URL.Query().Get("limit")) - if limitRaw != "" { - parsed, err := strconv.Atoi(limitRaw) - if err != nil { - http.Error(w, "invalid limit", http.StatusBadRequest) - return - } - limit = parsed + limit, err := parseOptionalLimit(r.URL.Query().Get("limit")) + if err != nil { + http.Error(w, "invalid limit", http.StatusBadRequest) + return + } + + filters, err := parseFilters(r.URL.Query()) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return } resp, err := svc.Search(r.Context(), search.SearchRequest{ Organization: rc.Organization, User: rc.User, Query: q, + Resource: strings.TrimSpace(r.URL.Query().Get("resource")), + Filters: filters, Limit: limit, Cursor: strings.TrimSpace(r.URL.Query().Get("cursor")), }) if err != nil { + log := logger.LoadLoggerFromContext(r.Context()) + status := http.StatusInternalServerError switch { case errors.Is(err, search.ErrInvalidRequest), errors.Is(err, search.ErrInvalidCursor): - http.Error(w, err.Error(), http.StatusBadRequest) + status = http.StatusBadRequest + http.Error(w, err.Error(), status) case errors.Is(err, search.ErrUnauthorized): - http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + status = http.StatusUnauthorized + http.Error(w, http.StatusText(status), status) case errors.Is(err, search.ErrForbidden): - http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden) + status = http.StatusForbidden + http.Error(w, http.StatusText(status), status) default: - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + http.Error(w, http.StatusText(status), status) } + log.Error(). + Err(err). + Str("path", r.URL.Path). + Str("organization", rc.Organization). + Int("statusCode", status). + Msg("search request failed") + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(resp) + }) + + router.With(mws...).Get("/rest/v1/search/resources", func(w http.ResponseWriter, r *http.Request) { + rc, err := appcontext.GetRequestContext(r.Context()) + if err != nil { + http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return + } + + resp, err := svc.ListResources(r.Context(), search.SearchResourcesRequest{ + Organization: rc.Organization, + }) + if err != nil { + handleError(w, r, rc, err) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(resp) + }) + + router.With(mws...).Get("/rest/v1/search/filter-values", func(w http.ResponseWriter, r *http.Request) { + rc, err := appcontext.GetRequestContext(r.Context()) + if err != nil { + http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return + } + + limit, err := parseOptionalLimit(r.URL.Query().Get("limit")) + if err != nil { + http.Error(w, "invalid limit", http.StatusBadRequest) + return + } + + filters, err := parseFilters(r.URL.Query()) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + resp, err := svc.FilterValues(r.Context(), search.FilterValuesRequest{ + Organization: rc.Organization, + User: rc.User, + Resource: strings.TrimSpace(r.URL.Query().Get("resource")), + Field: strings.TrimSpace(r.URL.Query().Get("field")), + Query: strings.TrimSpace(r.URL.Query().Get("q")), + Filters: filters, + Limit: limit, + }) + if err != nil { + handleError(w, r, rc, err) return } @@ -75,3 +151,62 @@ func CreateRouter(svc SearchService, mws []func(http.Handler) http.Handler) *chi return router } + +func parseOptionalLimit(raw string) (int, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return 0, nil + } + return strconv.Atoi(raw) +} + +func parseFilters(values map[string][]string) (map[string][]string, error) { + filters := make(map[string][]string) + for key, entries := range values { + if !strings.HasPrefix(key, "filter.") { + continue + } + + field := strings.TrimSpace(strings.TrimPrefix(key, "filter.")) + if field == "" { + return nil, fmt.Errorf("invalid filter field") + } + + for _, entry := range entries { + entry = strings.TrimSpace(entry) + if entry == "" { + continue + } + filters[field] = append(filters[field], entry) + } + } + + if len(filters) == 0 { + return nil, nil + } + return filters, nil +} + +func handleError(w http.ResponseWriter, r *http.Request, rc appcontext.RequestContext, err error) { + log := logger.LoadLoggerFromContext(r.Context()) + status := http.StatusInternalServerError + switch { + case errors.Is(err, search.ErrInvalidRequest), errors.Is(err, search.ErrInvalidCursor): + status = http.StatusBadRequest + http.Error(w, err.Error(), status) + case errors.Is(err, search.ErrUnauthorized): + status = http.StatusUnauthorized + http.Error(w, http.StatusText(status), status) + case errors.Is(err, search.ErrForbidden): + status = http.StatusForbidden + http.Error(w, http.StatusText(status), status) + default: + http.Error(w, http.StatusText(status), status) + } + log.Error(). + Err(err). + Str("path", r.URL.Path). + Str("organization", rc.Organization). + Int("statusCode", status). + Msg("search request failed") +} diff --git a/internal/router/router_test.go b/internal/router/router_test.go index dffb67b..100136e 100644 --- a/internal/router/router_test.go +++ b/internal/router/router_test.go @@ -18,6 +18,14 @@ type fakeSearchService struct { response search.SearchResponse err error lastReq search.SearchRequest + + resourcesResp search.SearchResourcesResponse + resourcesErr error + lastResReq search.SearchResourcesRequest + + filterValuesResp search.FilterValuesResponse + filterValuesErr error + lastFilterReq search.FilterValuesRequest } func (f *fakeSearchService) Search(ctx context.Context, req search.SearchRequest) (search.SearchResponse, error) { @@ -25,6 +33,16 @@ func (f *fakeSearchService) Search(ctx context.Context, req search.SearchRequest return f.response, f.err } +func (f *fakeSearchService) ListResources(ctx context.Context, req search.SearchResourcesRequest) (search.SearchResourcesResponse, error) { + f.lastResReq = req + return f.resourcesResp, f.resourcesErr +} + +func (f *fakeSearchService) FilterValues(ctx context.Context, req search.FilterValuesRequest) (search.FilterValuesResponse, error) { + f.lastFilterReq = req + return f.filterValuesResp, f.filterValuesErr +} + func withRequestContext(rc appcontext.RequestContext) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -37,7 +55,7 @@ func TestCreateRouterSearchSuccess(t *testing.T) { svc := &fakeSearchService{response: search.SearchResponse{Results: []search.SearchHit{{ID: "1", Score: 1, Source: map[string]interface{}{"id": "1"}}}}} r := CreateRouter(svc, []func(http.Handler) http.Handler{withRequestContext(appcontext.RequestContext{Organization: "acme", User: "alice@example.com"})}) - req := httptest.NewRequest(http.MethodGet, "/rest/v1/search?q=hello&limit=15&cursor=abc", nil) + req := httptest.NewRequest(http.MethodGet, "/rest/v1/search?q=hello&limit=15&cursor=abc&resource=accounts&filter.status=Ready", nil) rr := httptest.NewRecorder() r.ServeHTTP(rr, req) @@ -47,9 +65,12 @@ func TestCreateRouterSearchSuccess(t *testing.T) { if svc.lastReq.Organization != "acme" || svc.lastReq.User != "alice@example.com" { t.Fatalf("unexpected request context: %+v", svc.lastReq) } - if svc.lastReq.Query != "hello" || svc.lastReq.Limit != 15 || svc.lastReq.Cursor != "abc" { + if svc.lastReq.Query != "hello" || svc.lastReq.Limit != 15 || svc.lastReq.Cursor != "abc" || svc.lastReq.Resource != "accounts" { t.Fatalf("unexpected request payload: %+v", svc.lastReq) } + if len(svc.lastReq.Filters["status"]) != 1 || svc.lastReq.Filters["status"][0] != "Ready" { + t.Fatalf("unexpected filters: %+v", svc.lastReq.Filters) + } var payload search.SearchResponse if err := json.Unmarshal(rr.Body.Bytes(), &payload); err != nil { @@ -136,6 +157,50 @@ func TestCreateRouterInvalidLimit(t *testing.T) { } } +func TestCreateRouterResourcesEndpoint(t *testing.T) { + svc := &fakeSearchService{ + resourcesResp: search.SearchResourcesResponse{ + Resources: []search.SearchResource{ + {Resource: "accounts", DefaultFields: []string{"name"}}, + }, + }, + } + r := CreateRouter(svc, []func(http.Handler) http.Handler{withRequestContext(appcontext.RequestContext{Organization: "acme", User: "alice@example.com"})}) + req := httptest.NewRequest(http.MethodGet, "/rest/v1/search/resources", nil) + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", rr.Code) + } + if svc.lastResReq.Organization != "acme" { + t.Fatalf("unexpected request: %+v", svc.lastResReq) + } +} + +func TestCreateRouterFilterValuesEndpoint(t *testing.T) { + svc := &fakeSearchService{ + filterValuesResp: search.FilterValuesResponse{Values: []string{"Ready", "Pending"}}, + } + r := CreateRouter(svc, []func(http.Handler) http.Handler{withRequestContext(appcontext.RequestContext{Organization: "acme", User: "alice@example.com"})}) + req := httptest.NewRequest(http.MethodGet, "/rest/v1/search/filter-values?resource=accounts&field=status&q=foo&filter.type=premium", nil) + rr := httptest.NewRecorder() + r.ServeHTTP(rr, req) + + if rr.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", rr.Code) + } + if svc.lastFilterReq.Organization != "acme" || svc.lastFilterReq.User != "alice@example.com" { + t.Fatalf("unexpected request context: %+v", svc.lastFilterReq) + } + if svc.lastFilterReq.Resource != "accounts" || svc.lastFilterReq.Field != "status" { + t.Fatalf("unexpected request payload: %+v", svc.lastFilterReq) + } + if len(svc.lastFilterReq.Filters["type"]) != 1 || svc.lastFilterReq.Filters["type"][0] != "premium" { + t.Fatalf("unexpected filters: %+v", svc.lastFilterReq.Filters) + } +} + func TestCreateRouterErrorMapping(t *testing.T) { tests := []struct { name string diff --git a/internal/service/search/cursor.go b/internal/service/search/cursor.go index 0ad42a9..c68de6f 100644 --- a/internal/service/search/cursor.go +++ b/internal/service/search/cursor.go @@ -12,6 +12,8 @@ type CursorState struct { Version int `json:"v"` Org string `json:"org"` QueryHash string `json:"qh"` + Resource string `json:"r,omitempty"` + FiltersHash string `json:"fh,omitempty"` Limit int `json:"l"` SearchAfter []interface{} `json:"sa"` } @@ -52,13 +54,19 @@ func DecodeCursor(token string) (CursorState, error) { return state, nil } -func ValidateCursor(state CursorState, org, qHash string, limit int) error { +func ValidateCursor(state CursorState, org, qHash, resource, fHash string, limit int) error { if state.Org != org { return fmt.Errorf("%w: org mismatch", ErrInvalidCursor) } if state.QueryHash != qHash { return fmt.Errorf("%w: query mismatch", ErrInvalidCursor) } + if state.Resource != resource { + return fmt.Errorf("%w: resource mismatch", ErrInvalidCursor) + } + if state.FiltersHash != fHash { + return fmt.Errorf("%w: filters mismatch", ErrInvalidCursor) + } if state.Limit != limit { return fmt.Errorf("%w: limit mismatch", ErrInvalidCursor) } diff --git a/internal/service/search/cursor_test.go b/internal/service/search/cursor_test.go index cab2e21..60e5a5f 100644 --- a/internal/service/search/cursor_test.go +++ b/internal/service/search/cursor_test.go @@ -31,13 +31,13 @@ func TestCursorRoundTrip(t *testing.T) { func TestValidateCursorMismatch(t *testing.T) { state := CursorState{Version: cursorVersion, Org: "acme", QueryHash: queryHash("foo"), Limit: 20, SearchAfter: []interface{}{1.0, "x"}} - if err := ValidateCursor(state, "other", queryHash("foo"), 20); err == nil { + if err := ValidateCursor(state, "other", queryHash("foo"), "", "", 20); err == nil { t.Fatalf("expected org mismatch error") } - if err := ValidateCursor(state, "acme", queryHash("bar"), 20); err == nil { + if err := ValidateCursor(state, "acme", queryHash("bar"), "", "", 20); err == nil { t.Fatalf("expected query mismatch error") } - if err := ValidateCursor(state, "acme", queryHash("foo"), 30); err == nil { + if err := ValidateCursor(state, "acme", queryHash("foo"), "", "", 30); err == nil { t.Fatalf("expected limit mismatch error") } } diff --git a/internal/service/search/hash.go b/internal/service/search/hash.go index 2bfc69b..9353213 100644 --- a/internal/service/search/hash.go +++ b/internal/service/search/hash.go @@ -3,9 +3,49 @@ package search import ( "crypto/sha256" "encoding/hex" + "sort" + "strings" ) func queryHash(q string) string { h := sha256.Sum256([]byte(q)) return hex.EncodeToString(h[:]) } + +func filtersHash(filters map[string][]string) string { + if len(filters) == 0 { + return "" + } + + keys := make([]string, 0, len(filters)) + for key := range filters { + trimmed := strings.TrimSpace(key) + if trimmed != "" { + keys = append(keys, trimmed) + } + } + sort.Strings(keys) + + var b strings.Builder + for _, key := range keys { + rawValues := filters[key] + values := make([]string, 0, len(rawValues)) + for _, value := range rawValues { + trimmed := strings.TrimSpace(value) + if trimmed != "" { + values = append(values, trimmed) + } + } + sort.Strings(values) + if len(values) == 0 { + continue + } + b.WriteString(key) + b.WriteString("=") + b.WriteString(strings.Join(values, ",")) + b.WriteString(";") + } + + h := sha256.Sum256([]byte(b.String())) + return hex.EncodeToString(h[:]) +} diff --git a/internal/service/search/service.go b/internal/service/search/service.go index 8b771af..854983a 100644 --- a/internal/service/search/service.go +++ b/internal/service/search/service.go @@ -3,6 +3,7 @@ package search import ( "context" "fmt" + "sort" "strings" "time" @@ -79,6 +80,11 @@ func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse if user == "" { return SearchResponse{}, fmt.Errorf("%w: user is required", ErrInvalidRequest) } + resource := strings.TrimSpace(req.Resource) + filters := normalizeFilters(req.Filters) + if resource == "" && len(filters) > 0 { + return SearchResponse{}, fmt.Errorf("%w: filters require a resource", ErrInvalidRequest) + } limit := req.Limit if limit <= 0 { @@ -89,28 +95,62 @@ func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse } qHash := queryHash(query) + fHash := filtersHash(filters) var searchAfter []interface{} if req.Cursor != "" { decoded, err := DecodeCursor(req.Cursor) if err != nil { return SearchResponse{}, err } - if err := ValidateCursor(decoded, org, qHash, limit); err != nil { + if err := ValidateCursor(decoded, org, qHash, resource, fHash, limit); err != nil { + log.Error().Err(err).Str("cursor", req.Cursor).Msg("invalid cursor") return SearchResponse{}, err } searchAfter = decoded.SearchAfter } - indexRef, err := s.resolver.ResolveIndex(ctx, org) - if err != nil { - return SearchResponse{}, fmt.Errorf("%w: resolve search index: %v", ErrBackend, err) + var ( + indices []string + resourceByIndex map[string]string + searchFields []string + filterQuery map[string][]string + ) + + if resource != "" { + indexRef, err := s.resolver.ResolveIndex(ctx, org, resource) + if err != nil { + log.Error().Err(err).Str("org", org).Str("resource", resource).Msg("failed to resolve search index") + return SearchResponse{}, fmt.Errorf("%w: resolve search index: %v", ErrBackend, err) + } + + if err := validateFiltersAllowed(filters, indexRef.FilterableFields); err != nil { + return SearchResponse{}, err + } + + indices = []string{indexRef.IndexName} + resourceByIndex = map[string]string{indexRef.IndexName: indexRef.Resource} + searchFields = searchableFields(indexRef.DefaultFields) + filterQuery = filters + } else { + indexRefs, err := s.resolver.ListIndices(ctx, org) + if err != nil { + log.Error().Err(err).Str("org", org).Msg("failed to list search indices") + return SearchResponse{}, fmt.Errorf("%w: list search indices: %v", ErrBackend, err) + } + + indices, resourceByIndex = indexLookup(indexRefs) + if len(indices) == 0 { + return SearchResponse{}, fmt.Errorf("%w: no active search indices for org %q", ErrBackend, org) + } + searchFields = searchableFieldsForRefs(indexRefs) } log.Debug(). Str("organization", org). Str("queryHash", qHash). Int("limit", limit). - Str("index", indexRef.IndexName). + Int("indexCount", len(indices)). + Str("resource", resource). Msg("starting search") results := make([]SearchHit, 0, limit) @@ -120,9 +160,17 @@ func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse outer: for len(results) < limit { - page, err := s.searcher.Search(ctx, indexRef.IndexName, query, s.cfg.FetchBatchSize, searchAfter) + page, err := s.searcher.Search(ctx, OpenSearchQuery{ + Indices: indices, + Query: query, + Fields: searchFields, + Filters: filterQuery, + Size: s.cfg.FetchBatchSize, + SearchAfter: searchAfter, + }) s.metrics.AddOpenSearchCalls(1) if err != nil { + log.Error().Err(err).Msg("failed to query OpenSearch") return SearchResponse{}, fmt.Errorf("%w: query OpenSearch: %v", ErrBackend, err) } if len(page.Hits) == 0 { @@ -137,6 +185,13 @@ outer: Hits: page.Hits, }) if err != nil { + log.Error(). + Err(err). + Str("organization", org). + Str("queryHash", qHash). + Str("resource", resource). + Int("scannedHits", totalScanned). + Msg("failed to authorize search hits with OpenFGA") return SearchResponse{}, fmt.Errorf("%w: filter authorization: %v", ErrBackend, err) } s.metrics.AddOpenFGACalls(authz.Calls) @@ -145,17 +200,18 @@ outer: for i, hit := range page.Hits { totalScanned++ - nextSearchAfter = hit.Sort if totalScanned > s.cfg.MaxScannedHits { break outer } + nextSearchAfter = hit.Sort if i >= len(authz.Allowed) || !authz.Allowed[i] { + log.Warn().Str("organization", org).Str("queryHash", qHash).Str("resource", resource).Int("hitIndex", i).Msg("skipping unauthorized search hit") continue } - results = append(results, mapHit(hit)) + results = append(results, mapHit(hit, resolveHitResource(hit, resource, resourceByIndex))) if len(results) == limit { break outer } @@ -174,10 +230,13 @@ outer: Version: cursorVersion, Org: org, QueryHash: qHash, + Resource: resource, + FiltersHash: fHash, Limit: limit, SearchAfter: nextSearchAfter, }) if err != nil { + log.Error().Err(err).Msg("failed to encode cursor") return SearchResponse{}, err } nextCursor = &cursor @@ -186,7 +245,162 @@ outer: return SearchResponse{Results: results, NextCursor: nextCursor}, nil } -func mapHit(hit OpenSearchHit) SearchHit { +func (s *Service) ListResources(ctx context.Context, req SearchResourcesRequest) (SearchResourcesResponse, error) { + org := strings.TrimSpace(req.Organization) + if org == "" { + return SearchResourcesResponse{}, fmt.Errorf("%w: organization is required", ErrInvalidRequest) + } + + refs, err := s.resolver.ListIndices(ctx, org) + if err != nil { + return SearchResourcesResponse{}, fmt.Errorf("%w: list search indices: %v", ErrBackend, err) + } + + resources := make([]SearchResource, 0, len(refs)) + byResource := make(map[string]SearchResource, len(refs)) + for _, ref := range refs { + resource := strings.TrimSpace(ref.Resource) + if resource == "" { + continue + } + byResource[resource] = SearchResource{ + Resource: resource, + DefaultFields: dedupeNonEmpty(ref.DefaultFields), + FilterableFields: dedupeNonEmpty(ref.FilterableFields), + SemanticFields: dedupeNonEmpty(ref.SemanticFields), + } + } + + keys := make([]string, 0, len(byResource)) + for resource := range byResource { + keys = append(keys, resource) + } + sort.Strings(keys) + for _, resource := range keys { + resources = append(resources, byResource[resource]) + } + + return SearchResourcesResponse{Resources: resources}, nil +} + +func (s *Service) FilterValues(ctx context.Context, req FilterValuesRequest) (FilterValuesResponse, error) { + start := time.Now() + s.metrics.IncSearchRequests() + defer func() { s.metrics.ObserveSearchDuration(time.Since(start)) }() + + org := strings.TrimSpace(req.Organization) + if org == "" { + return FilterValuesResponse{}, fmt.Errorf("%w: organization is required", ErrInvalidRequest) + } + user := strings.TrimSpace(req.User) + if user == "" { + return FilterValuesResponse{}, fmt.Errorf("%w: user is required", ErrInvalidRequest) + } + resource := strings.TrimSpace(req.Resource) + if resource == "" { + return FilterValuesResponse{}, fmt.Errorf("%w: resource is required", ErrInvalidRequest) + } + field := strings.TrimSpace(req.Field) + if field == "" { + return FilterValuesResponse{}, fmt.Errorf("%w: field is required", ErrInvalidRequest) + } + + limit := req.Limit + if limit <= 0 { + limit = s.cfg.DefaultLimit + } + if limit > s.cfg.MaxLimit { + limit = s.cfg.MaxLimit + } + + indexRef, err := s.resolver.ResolveIndex(ctx, org, resource) + if err != nil { + return FilterValuesResponse{}, fmt.Errorf("%w: resolve search index: %v", ErrBackend, err) + } + + allowed := fieldSet(indexRef.FilterableFields) + if _, ok := allowed[field]; !ok { + return FilterValuesResponse{}, fmt.Errorf("%w: field %q is not filterable for resource %q", ErrInvalidRequest, field, resource) + } + + filters := normalizeFilters(req.Filters) + if err := validateFiltersAllowed(filters, indexRef.FilterableFields); err != nil { + return FilterValuesResponse{}, err + } + + query := strings.TrimSpace(req.Query) + searchFields := searchableFields(indexRef.DefaultFields) + + searchAfter := []interface{}(nil) + totalScanned := 0 + seen := make(map[string]struct{}, limit) + values := make([]string, 0, limit) + +outer: + for len(values) < limit { + page, err := s.searcher.Search(ctx, OpenSearchQuery{ + Indices: []string{indexRef.IndexName}, + Query: query, + Fields: searchFields, + Filters: filters, + Size: s.cfg.FetchBatchSize, + SearchAfter: searchAfter, + }) + s.metrics.AddOpenSearchCalls(1) + if err != nil { + return FilterValuesResponse{}, fmt.Errorf("%w: query OpenSearch: %v", ErrBackend, err) + } + if len(page.Hits) == 0 { + break + } + + authz, err := s.authorizer.FilterAuthorized(ctx, AuthorizationRequest{ + Organization: org, + User: user, + Relation: "get", + Hits: page.Hits, + }) + if err != nil { + return FilterValuesResponse{}, fmt.Errorf("%w: filter authorization: %v", ErrBackend, err) + } + s.metrics.AddOpenFGACalls(authz.Calls) + s.metrics.AddDroppedMissingContext(authz.DroppedMissingContext) + s.metrics.AddAuthDenied(authz.Denied) + + for i, hit := range page.Hits { + totalScanned++ + if totalScanned > s.cfg.MaxScannedHits { + break outer + } + if i >= len(authz.Allowed) || !authz.Allowed[i] { + continue + } + for _, value := range extractFieldValues(hit.Source, field) { + if _, exists := seen[value]; exists { + continue + } + seen[value] = struct{}{} + values = append(values, value) + if len(values) >= limit { + break outer + } + } + } + + if len(page.Hits) < s.cfg.FetchBatchSize { + break + } + searchAfter = page.Hits[len(page.Hits)-1].Sort + } + + sort.Strings(values) + if len(values) > limit { + values = values[:limit] + } + return FilterValuesResponse{Values: values}, nil +} + +func mapHit(hit OpenSearchHit, resource string) SearchHit { src := hit.Source if src == nil { src = map[string]interface{}{} @@ -194,6 +408,7 @@ func mapHit(hit OpenSearchHit) SearchHit { return SearchHit{ ID: firstString(hit.ID, stringFromMap(src, "id")), Score: hit.Score, + Resource: resource, Kind: stringFromMap(src, "kind"), Name: stringFromMap(src, "name"), Namespace: stringFromMap(src, "namespace"), @@ -209,6 +424,196 @@ func mapHit(hit OpenSearchHit) SearchHit { } } +func indexLookup(refs []SearchIndexRef) ([]string, map[string]string) { + indices := make([]string, 0, len(refs)) + resourceByIndex := make(map[string]string, len(refs)) + seen := make(map[string]struct{}, len(refs)) + for _, ref := range refs { + indexName := strings.TrimSpace(ref.IndexName) + if indexName == "" { + continue + } + if _, ok := seen[indexName]; !ok { + indices = append(indices, indexName) + seen[indexName] = struct{}{} + } + if resource := strings.TrimSpace(ref.Resource); resource != "" { + resourceByIndex[indexName] = resource + } + } + sort.Strings(indices) + return indices, resourceByIndex +} + +func resolveHitResource(hit OpenSearchHit, requestedResource string, byIndex map[string]string) string { + if resource := strings.TrimSpace(requestedResource); resource != "" { + return resource + } + if resource := strings.TrimSpace(byIndex[hit.Index]); resource != "" { + return resource + } + return strings.TrimSpace(stringFromMap(hit.Source, "resource")) +} + +func searchableFields(defaultFields []string) []string { + fields := append([]string{"name"}, defaultFields...) + return dedupeNonEmpty(fields) +} + +func searchableFieldsForRefs(refs []SearchIndexRef) []string { + fields := make([]string, 0, len(refs)*2+1) + fields = append(fields, "name") + for _, ref := range refs { + fields = append(fields, ref.DefaultFields...) + } + return dedupeNonEmpty(fields) +} + +func normalizeFilters(filters map[string][]string) map[string][]string { + if len(filters) == 0 { + return nil + } + + normalized := make(map[string][]string, len(filters)) + for field, values := range filters { + field = strings.TrimSpace(field) + if field == "" { + continue + } + + clean := make([]string, 0, len(values)) + seen := make(map[string]struct{}, len(values)) + for _, value := range values { + value = strings.TrimSpace(value) + if value == "" { + continue + } + if _, ok := seen[value]; ok { + continue + } + seen[value] = struct{}{} + clean = append(clean, value) + } + if len(clean) == 0 { + continue + } + normalized[field] = clean + } + + if len(normalized) == 0 { + return nil + } + return normalized +} + +func validateFiltersAllowed(filters map[string][]string, allowedFields []string) error { + if len(filters) == 0 { + return nil + } + allowed := fieldSet(allowedFields) + for field := range filters { + if _, ok := allowed[field]; !ok { + return fmt.Errorf("%w: field %q is not filterable", ErrInvalidRequest, field) + } + } + return nil +} + +func fieldSet(values []string) map[string]struct{} { + out := make(map[string]struct{}, len(values)) + for _, value := range values { + trimmed := strings.TrimSpace(value) + if trimmed != "" { + out[trimmed] = struct{}{} + } + } + return out +} + +func dedupeNonEmpty(values []string) []string { + out := make([]string, 0, len(values)) + seen := make(map[string]struct{}, len(values)) + for _, value := range values { + trimmed := strings.TrimSpace(value) + if trimmed == "" { + continue + } + if _, ok := seen[trimmed]; ok { + continue + } + seen[trimmed] = struct{}{} + out = append(out, trimmed) + } + sort.Strings(out) + return out +} + +func extractFieldValues(source map[string]interface{}, fieldPath string) []string { + if len(source) == 0 { + return nil + } + parts := strings.Split(strings.TrimSpace(fieldPath), ".") + if len(parts) == 0 { + return nil + } + + results := make(map[string]struct{}) + collectFieldValues(source, parts, results) + if len(results) == 0 { + return nil + } + + values := make([]string, 0, len(results)) + for value := range results { + values = append(values, value) + } + sort.Strings(values) + return values +} + +func collectFieldValues(current interface{}, parts []string, out map[string]struct{}) { + if len(parts) == 0 { + collectScalarValues(current, out) + return + } + + switch typed := current.(type) { + case map[string]interface{}: + next, ok := typed[parts[0]] + if !ok { + return + } + collectFieldValues(next, parts[1:], out) + case []interface{}: + for _, item := range typed { + collectFieldValues(item, parts, out) + } + } +} + +func collectScalarValues(value interface{}, out map[string]struct{}) { + switch typed := value.(type) { + case string: + trimmed := strings.TrimSpace(typed) + if trimmed != "" { + out[trimmed] = struct{}{} + } + case []interface{}: + for _, item := range typed { + collectScalarValues(item, out) + } + case fmt.Stringer: + trimmed := strings.TrimSpace(typed.String()) + if trimmed != "" { + out[trimmed] = struct{}{} + } + case bool: + out[fmt.Sprintf("%t", typed)] = struct{}{} + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64: + out[fmt.Sprintf("%v", typed)] = struct{}{} + } +} + func stringFromMap(m map[string]interface{}, key string) string { v, ok := m[key] if !ok || v == nil { diff --git a/internal/service/search/service_test.go b/internal/service/search/service_test.go index a4b884e..56f7a65 100644 --- a/internal/service/search/service_test.go +++ b/internal/service/search/service_test.go @@ -7,20 +7,31 @@ import ( ) type fakeResolver struct { - index SearchIndexRef - err error + index SearchIndexRef + indices []SearchIndexRef + err error } -func (f fakeResolver) ResolveIndex(ctx context.Context, org string) (SearchIndexRef, error) { +func (f fakeResolver) ResolveIndex(ctx context.Context, org, resource string) (SearchIndexRef, error) { return f.index, f.err } +func (f fakeResolver) ListIndices(ctx context.Context, org string) ([]SearchIndexRef, error) { + if len(f.indices) > 0 { + return f.indices, f.err + } + if f.index.IndexName != "" { + return []SearchIndexRef{f.index}, f.err + } + return nil, f.err +} + type fakeSearcher struct { pages []OpenSearchPage calls int } -func (f *fakeSearcher) Search(ctx context.Context, indexName, query string, size int, searchAfter []interface{}) (OpenSearchPage, error) { +func (f *fakeSearcher) Search(ctx context.Context, req OpenSearchQuery) (OpenSearchPage, error) { if f.calls >= len(f.pages) { return OpenSearchPage{}, nil } @@ -151,3 +162,63 @@ func TestSearchClampsLimitToConfiguredMax(t *testing.T) { t.Fatalf("expected clamped limit 100, got %d", decoded.Limit) } } + +func TestFilterValuesPostFiltersAndEnforcesLimit(t *testing.T) { + searcher := &fakeSearcher{pages: []OpenSearchPage{ + {Hits: []OpenSearchHit{ + {ID: "1", Source: map[string]interface{}{"status": "Terminated"}}, + {ID: "2", Source: map[string]interface{}{"status": "Active"}}, + {ID: "3", Source: map[string]interface{}{"status": "Pending"}}, + }}, + }} + + svc := NewService( + fakeResolver{index: SearchIndexRef{ + IndexName: "idx", + FilterableFields: []string{"status"}, + }}, + searcher, + &fakeAuthorizer{results: []AuthorizationResult{ + {Allowed: []bool{false, true, true}, Calls: 1, Denied: 1}, + }}, + nil, + ServiceConfig{FetchBatchSize: 10, MaxScannedHits: 100}, + ) + + resp, err := svc.FilterValues(context.Background(), FilterValuesRequest{ + Organization: "acme", + User: "alice@example.com", + Resource: "pods", + Field: "status", + Limit: 1, + }) + if err != nil { + t.Fatalf("FilterValues returned error: %v", err) + } + + if len(resp.Values) != 1 { + t.Fatalf("expected 1 value, got %d", len(resp.Values)) + } + if resp.Values[0] != "Active" { + t.Fatalf("unexpected value: %s", resp.Values[0]) + } +} + +func TestFilterValuesRejectsMissingUser(t *testing.T) { + svc := NewService( + fakeResolver{index: SearchIndexRef{IndexName: "idx", FilterableFields: []string{"status"}}}, + &fakeSearcher{}, + &fakeAuthorizer{}, + nil, + ServiceConfig{}, + ) + + _, err := svc.FilterValues(context.Background(), FilterValuesRequest{ + Organization: "acme", + Resource: "pods", + Field: "status", + }) + if !errors.Is(err, ErrInvalidRequest) { + t.Fatalf("expected ErrInvalidRequest, got %v", err) + } +} diff --git a/internal/service/search/types.go b/internal/service/search/types.go index 5f62a80..fde6d32 100644 --- a/internal/service/search/types.go +++ b/internal/service/search/types.go @@ -6,6 +6,8 @@ type SearchRequest struct { Organization string User string Query string + Resource string + Filters map[string][]string Limit int Cursor string } @@ -18,6 +20,7 @@ type SearchResponse struct { type SearchHit struct { ID string `json:"id"` Score float64 `json:"score"` + Resource string `json:"resource,omitempty"` Kind string `json:"kind,omitempty"` Name string `json:"name,omitempty"` Namespace string `json:"namespace,omitempty"` @@ -32,14 +35,49 @@ type SearchHit struct { Source map[string]interface{} `json:"source"` } +type SearchResourcesRequest struct { + Organization string +} + +type SearchResource struct { + Resource string `json:"resource"` + DefaultFields []string `json:"defaultFields,omitempty"` + FilterableFields []string `json:"filterableFields,omitempty"` + SemanticFields []string `json:"semanticFields,omitempty"` +} + +type SearchResourcesResponse struct { + Resources []SearchResource `json:"resources"` +} + +type FilterValuesRequest struct { + Organization string + User string + Resource string + Field string + Query string + Filters map[string][]string + Limit int +} + +type FilterValuesResponse struct { + Values []string `json:"values"` +} + type SearchIndexRef struct { + Resource string IndexName string + IndexPrefix string OrganizationClusterID string + DefaultFields []string + FilterableFields []string + SemanticFields []string Group string Version string } type OpenSearchHit struct { + Index string ID string Score float64 Sort []interface{} @@ -47,7 +85,18 @@ type OpenSearchHit struct { } type OpenSearchPage struct { - Hits []OpenSearchHit + Hits []OpenSearchHit + AggregationValues []string +} + +type OpenSearchQuery struct { + Indices []string + Query string + Fields []string + Filters map[string][]string + Size int + SearchAfter []interface{} + AggregationField string } type AuthorizationRequest struct { @@ -65,11 +114,12 @@ type AuthorizationResult struct { } type SearchIndexResolver interface { - ResolveIndex(ctx context.Context, org string) (SearchIndexRef, error) + ResolveIndex(ctx context.Context, org, resource string) (SearchIndexRef, error) + ListIndices(ctx context.Context, org string) ([]SearchIndexRef, error) } type OpenSearchSearcher interface { - Search(ctx context.Context, indexName, query string, size int, searchAfter []interface{}) (OpenSearchPage, error) + Search(ctx context.Context, req OpenSearchQuery) (OpenSearchPage, error) } type FGAAuthorizer interface {