feat: sync policies with redisson#2170
Conversation
7647763 to
78e011a
Compare
ade76f5 to
43f27a1
Compare
feat(casbin): Redis RTopic watcher for event-driven policy syncSummaryReplaces polling-only Casbin policy reload with a Redis pub/sub watcher, so all Datashare nodes pick up permission changes in near-real time when running in Redis mode. Includes a refactor of the user session layer to cleanly separate read-only user lookup from mutable session management. ChangesCore feature: Redis policy watcher
Session layer refactor
Mode wiring
CLI
Tests
|
Casbin sync manual test scenariosContextThe This file is a manual test guide covering three auth/user-provider combinations to confirm the Common setupPrerequisites: Redis, PostgreSQL, and (for scenario 1) the OAuth2 provider are running. Both instances share the same Redis and PostgreSQL. Instance 1 runs on port 8080, instance 2 on Common flags used in all scenarios: To confirm sync happened on instance 2, watch its logs for a line like: Or make an API call to instance 2 after the grant and verify the project appears in the user's Scenario 1 — OAuth2 authUsers authenticate via the IdP. Their session is cached in Redis by the OAuth2 filter. Start instance 1 (port 8080)./run.sh app start \
--auth oauth \
-d ~/Datashare/Data/ \
--redisAddress redis://redis:6379 \
--messageBusAddress redis://redis:6379 \
--dataSourceUrl "jdbc:postgresql://postgres/datashare?user=admin&password=admin" \
--busType REDIS \
--mode SERVER \
--oauthClientId datashareuidforseed \
--oauthClientSecret datasharesecretforseed \
--oauthAuthorizeUrl http://<OAuth>:3001/oauth/authorize \
--oauthTokenUrl http://<OAuth>:3001/oauth/token \
--oauthApiUrl http://<OAuth>:3001/api/v1/me.json \
--oauthCallbackPath /auth/callbackStart instance 2 (port 8081)./run.sh app start \
--auth oauth \
-d ~/Datashare/Data/ \
--redisAddress redis://redis:6379 \
--messageBusAddress redis://redis:6379 \
--dataSourceUrl "jdbc:postgresql://postgres/datashare?user=admin&password=admin" \
--busType REDIS \
--mode SERVER \
--oauthClientId datashareuidforseed \
--oauthClientSecret datasharesecretforseed \
--oauthAuthorizeUrl http://<OAuth>:3001/oauth/authorize \
--oauthTokenUrl http://<OAuth>:3001/oauth/token \
--oauthApiUrl http://<OAuth>:3001/api/v1/me.json \
--oauthCallbackPath /auth/callback \
--port 8081Same flags, add Test steps
Scenario 2 — Form auth, database user providerUsers are stored in PostgreSQL. Authentication uses the form login filter. Create the test user./run.sh user create \
--login jdoe \
--email jdoe@example.org \
--name "Jane Doe" \
--password secret \
--provider local \
--authUsersProvider database \
--dataSourceUrl "jdbc:postgresql://postgres/datashare?user=admin&password=admin"Start instance 1 (port 8080)./run.sh app start \
--auth form \
--authUsersProvider database \
-d ~/Datashare/Data/ \
--redisAddress redis://redis:6379 \
--messageBusAddress redis://redis:6379 \
--dataSourceUrl "jdbc:postgresql://postgres/datashare?user=admin&password=admin" \
--busType REDIS \
--mode SERVERStart instance 2 (port 8081)./run.sh app start \
--auth form \
--authUsersProvider database \
-d ~/Datashare/Data/ \
--redisAddress redis://redis:6379 \
--messageBusAddress redis://redis:6379 \
--dataSourceUrl "jdbc:postgresql://postgres/datashare?user=admin&password=admin" \
--busType REDIS \
--mode SERVER \
--port 8081Same flags, add Test steps
./run.sh project revoke --project local-datashare --user jdoe \
--authUsersProvider database \
--dataSourceUrl "jdbc:postgresql://postgres/datashare?user=admin&password=admin" \
--busType REDIS \
--redisAddress redis://redis:6379Scenario 3 — Form auth, Redis user providerUsers are stored in Redis. Authentication uses the form login filter with password hashed in Redis. Create the test user./run.sh user create \
--login jdoe \
--email jdoe@example.org \
--name "Jane Doe" \
--password secret \
--provider local \
--authUsersProvider redis \
--redisAddress redis://redis:6379 \
--dataSourceUrl "jdbc:postgresql://postgres/datashare?user=admin&password=admin"Confirm: Start instance 1 (port 8080)./run.sh app start \
--auth form \
--authUsersProvider redis \
-d ~/Datashare/Data/ \
--redisAddress redis://redis:6379 \
--messageBusAddress redis://redis:6379 \
--dataSourceUrl "jdbc:postgresql://postgres/datashare?user=admin&password=admin" \
--busType REDIS \
--mode SERVERStart instance 2 (port 8081)./run.sh app start \
--auth form \
--authUsersProvider redis \
-d ~/Datashare/Data/ \
--redisAddress redis://redis:6379 \
--messageBusAddress redis://redis:6379 \
--dataSourceUrl "jdbc:postgresql://postgres/datashare?user=admin&password=admin" \
--busType REDIS \
--mode SERVER \
--port 8081Same flags, add Test steps
Expected log output on instance 2 after grantExpected log output on instance 1 after grant (self-suppression)Nothing — own notifications are suppressed at DEBUG level only. No INFO line appears. |
pirhoo
left a comment
There was a problem hiding this comment.
Great work!
I've made only two non-blocking suggestions :)
|
|
||
| @Override | ||
| public void setUpdateCallback(Runnable callback) { | ||
| listenerId = topic.addListener(String.class, (ch, msg) -> { |
There was a problem hiding this comment.
suggestion: remove the prior listener before registering a new one, e.g. guard with if (listenerId != -1) topic.removeListener(listenerId); at the top of each overload, or track a List<Integer> and remove all on close().
| // Event-driven: RTopic notifies all instances immediately on policy change. | ||
| // startAutoLoadPolicy backstop defaults to 0 (disabled); set policyReloadInterval | ||
| // explicitly to enable periodic reloads as a hedge against missed publications. | ||
| long interval = parseInt(propertiesProvider.get(POLICY_RELOAD_INTERVAL_OPT).orElse("0")); |
There was a problem hiding this comment.
suggestion: this creates a discreptencies with the default values in the legacy joptsimple parser, we could change it to:
long interval = parseInt(propertiesProvider.get(POLICY_RELOAD_INTERVAL_OPT)
.orElse(String.valueOf(DEFAULT_POLICY_RELOAD_INTERVAL)));
Where DEFAULT_POLICY_RELOAD_INTERVAL is 30_000. This is important to change so default value is the same both with joptsimple and picocli.
There was a problem hiding this comment.
This is a particular case, in event driven, we don't want backstop polling policy reload by default (0), unless specified.
Howere joptsimple and picocli should behave the same, I'm fixing the initial value in joptsimple
Adds PolicyWatcher (Watcher + Closeable) backed by Redisson RTopic. Switches Authorizer from Enforcer to SyncedEnforcer to support setWatcher(). When busType=REDIS, CommonMode wires a PolicyWatcher into Authorizer: policy mutations auto-publish to datashare:policy-updated and all subscribed instances call loadPolicy() immediately. Falls back to no watcher when Redis is not configured.
When busType=REDIS the RTopic watcher handles policy sync in real time. Otherwise SyncedEnforcer.startAutoLoadPolicy() is used when --policyReloadInterval (ms, default 30000) is non-zero; set to 0 to disable. Authorizer now implements Closeable; stopAutoLoadPolicy() is called on shutdown in both paths.
…nd CommonMode wiring
Redisson RTopic delivers messages to all subscribers including the publisher. Without filtering, addGroupingPolicy() would cause the same instance to call loadPolicy() on itself, momentarily clearing in-memory state. Each instance now tags its own publications with a UUID and ignores messages it sent.
Two end-to-end tests using real Redis and two independent Authorizer instances (separate Redisson clients, separate PolicyWatcher instances): - policy_change_on_instance1_notifies_instance2: verifies cross-instance notification arrives within 3s (CountDownLatch) - policy_change_does_not_trigger_own_watcher: verifies the UUID self-filter suppresses the callback on the publishing instance Also fixes two pre-existing compilation issues on this branch: - SyncedEnforcer(Model, Adapter, boolean) removed in jcasbin 1.96.0; replaced with SyncedEnforcer(Model, Adapter) + conditional enableLog() - co.elastic.clients:elasticsearch-java needed as provided scope so datashare-app main sources compile (it was test-only transitively)
The option was only registered in the legacy joptsimple parser (DatashareCliOptions). Add it to the picocli GlobalOptions command too, with the same default (30000 ms) and INHERIT scope, so it is available on all subcommands and surfaced in help — matching every other global option. Add a DatashareCommandTest assertion for its default.
… grant() Extend `grant()` functionality to check `UsersWritable` when a user is not found in SQL, ensuring compatibility with custom auth providers like Redis. Include corresponding unit test.
…es in `project grant` command
UsersInRedis.saveOrUpdate always stamped transaction.expire() with this.ttl, which defaults to 1 second in the CLI process. Running `project grant` called appendProjectToInventory → saveOrUpdate and overwrote a manually-provisioned (no-TTL) Redis user entry with a 1-second expiry, logging the user out immediately after the grant. Fix: read the key's current TTL before writing. If it is -1 (persistent, no expiry), skip the expire call. Only apply sessionTtlSeconds for new keys (-2) or keys that already carry a session TTL. Also sync the user's groups_by_applications to Redis on grant/revoke so the running server picks up the updated project membership list.
The previous fix skipped expire() only for persistent keys (ttl=-1), but still applied this.ttl=1s for keys with an existing session TTL (ttl>0), logging out OAuth2 users seconds after a project-grant. Replace the -1 guard with an extend-only rule: - new key (ttl=-2): set this.ttl (initial session creation) - active session (ttl>0): set max(existing, this.ttl) — never shorten - persistent key (ttl=-1): omit expire (SET already cleared it) This also preserves the server-side OAuth2 session-refresh behaviour: on re-login the server's this.ttl (e.g. 600s) will always be ≥ any remaining TTL, so the session is refreshed as expected.
The TTL read (jedis.ttl) was outside the MULTI/EXEC block. A concurrent OAuth2 login could create the key with a long TTL after we read -2, then our EXEC would stamp EXPIRE=1s on the live session, logging the user out. Fix: WATCH the key before reading the TTL. EXEC returns null when the key is modified between WATCH and EXEC; the loop retries with a fresh TTL read so the extend-only rule is applied on consistent data. The try/finally discard() cleans up an open MULTI on unexpected errors.
…xception When project grant fails because a user exists only in Redis (OAuth2 session before first grant) but --authUsersProvider=UsersInRedis was not passed, the error message now reads: user 'alice' not found; OAuth2 sessions exist only in Redis; pass --authUsersProvider=UsersInRedis The hint is attached only when usersWritable is not UsersInRedis, i.e. when Redis was never consulted during the lookup. A plain "not found" is kept for the case where Redis was consulted and still empty.
jcasbin catches exceptions thrown by Watcher.update() inside notifyWatcher() and calls Util.logPrint() — INFO-level, gated on enableLog, no stack trace. A Redis outage during project grant therefore exits 0 with no operator-visible warning; other instances are never notified and policy stays stale. Fix: catch the exception in PolicyWatcher.update() before jcasbin's catch, emit a WARN with context, then swallow it. Not rethrowing is deliberate: the Casbin SQL write already committed, and rethrowing would trigger casbinWithRollback, corrupting the inventory/Casbin consistency invariant (SQL rule written, inventory row restored).
…anch + volatile listenerId Two cleanup fixes in the Redis policy-watcher path: - CommonMode.provideAuthorizer: the Redis branch returned the Authorizer directly without registering it with addCloseable(), unlike the interval branch. stopAutoLoadPolicy() is a no-op in watcher mode, but the asymmetry is a maintenance trap. Now consistent with interval branch. - PolicyWatcher.listenerId: written by setUpdateCallback() and read by close(), potentially on different threads (e.g. shutdown hook). Marked volatile to ensure the write is visible across threads.
A server that loses Redis connectivity during a watcher-based deployment misses policy publications and stays stale indefinitely. Adding a periodic reload as a backstop guarantees convergence after the partition heals, without sacrificing the low-latency watcher path. - Authorizer gains a public startAutoLoadPolicy(long intervalMs) method (mirrors the private call in the interval constructor). - CommonMode.provideAuthorizer (Redis branch) calls it after wiring the watcher, using the same --policyReloadInterval property as the interval-only branch. A value of 0 (the default) leaves the backstop disabled, preserving existing behaviour for operators who haven't configured an interval.
… for Redis-only users Two correctness gaps introduced by the Redis inventory sync: 1. Redis not rolled back on Casbin failure (casbinWithRollback) appendProjectToInventory writes to both SQL and Redis before the Casbin call. If swapCasbinRole throws, casbinWithRollback only restored SQL. Result: SQL = original, Redis = updated, Casbin = no role — the user saw the project in their dashboard but every enforce() returned false. Fix: add usersWritable.saveOrUpdate(original) to the catch block, mirroring the existing repository.save(original) compensating write. 2. revokeIfExists silently skipped Redis-only OAuth2 users revokeIfExists called repository.getUser() directly and short-circuited to noop on null, never consulting the Redis fallback. A user granted via the new Redis-fallback path could therefore never be revoked via revokeIfExists. Fix: extract the SQL→Redis lookup into findUser() and use it in both requireUser() and revokeIfExists().
Three minor fixes: - UsersInRedis.saveOrUpdate: replace unbounded while(true) retry with a 10-attempt for loop; throw IllegalStateException on exhaustion so pathological contention is observable rather than an infinite spin. - CommonMode.provideAuthorizer: replace Long.parseLong with parseInt (already statically imported) to match the Integer.class type declared in DatashareCliOptions.policyReloadInterval(); result widens to long at the call site. - AuthorizerTest.java: add missing trailing newline.
Shutdown was iterating closeables FIFO (LinkedList), so redissonClient closed before PolicyWatcher — watcher.close() then called RTopic.removeListener() on a dead connection. Switching to ArrayDeque with descendingIterator() ensures dependents close before their dependencies.
UsersIdProviderRedisCache opened a JedisPool unconditionally, breaking non-OAuth2 deployments when ProjectAdminServiceImpl was first injected. The session cache is an OAuth2 concern — only OAuth2CookieFilter writes sessions to Redis. All other auth modes get a no-op that silently succeeds on saveOrUpdate.
Single-instance, single-threaded Redis model makes WATCH unnecessary: no concurrent writer can race between the TTL read and the SET. The extend-only TTL logic is also dropped; every write resets to the configured sessionTtlSeconds, which is the correct semantics for an OAuth2 session refresh.
…d in GlobalOptions --authUsersProvider, --batchQueueType and --temporalNamespace were moved to GlobalOptions (ScopeType.INHERIT) so non-server subcommands like project grant can receive them, but the original declarations were never removed from ServerOptions, causing DuplicateOptionAnnotationsException on any CLI invocation.
Unconditional EXPIRE was silently adding a 12h expiry to Redis keys that had no TTL (manually-provisioned users) and shortening active OAuth2 sessions whose remaining TTL exceeded the configured default. Now reads the existing TTL first and only calls EXPIRE when creating a new key (TTL=-2) or extending a session that is about to expire (existingTtl < configuredTtl). Persistent keys (TTL=-1) are never touched.
…eloadInterval is absent
orElse("0") silently disabled Casbin polling for programmatic/embedded
startups that don't go through the PicoCLI layer. The 30s fallback
documented in the plan only took effect when GlobalOptions populated the
property. Now the constant is used as the fallback so all entry points
get consistent behaviour.
…nt/revoke Replaced UsersIdProviderCache (session cache) with net.codestory.http.security.Users as the user lookup source in ProjectAdminServiceImpl. The session cache was the wrong abstraction: it holds OAuth2/cookie session entries, not the authoritative user store. Users.find() delegates to the configured provider (UsersInDb or UsersInRedis), so grant/revoke now work correctly regardless of authUsersProvider — previously, repository.getUser() always queried SQL, causing UserNotFoundException for Redis-only users.
…efaults take effect
GlobalOptions was using DatashareOptions.put() with defaultValue="30000", which
unconditionally wrote policyReloadInterval=30000 into the properties map on every
CLI invocation. This meant the .orElse("0") fallback in CommonMode.provideAuthorizer
never fired, silently giving Redis mode a 30s backstop instead of the intended 0
(event-driven only).
Switching to Integer + putIfNotNull lets the property remain absent when the user
does not pass --policyReloadInterval, so CommonMode can apply its own per-bus-type
defaults: 30s for non-Redis polling, 0 for Redis backstop.
The file tested UsersIdProviderRedisCache, not UsersInRedis.
…gement abstraction
…ations Log at INFO when a remote notification triggers a reload, and at DEBUG when the instance suppresses its own publication. Makes it observable in production logs whether the cross-instance sync fired or was correctly self-suppressed.
When busType=REDIS the UsersIdProviderRedisCache is now always activated (not only for OAuth2). provideUsers returns a composite that checks the session cache first then falls back to the UserStore, so that OAuth2 session users are found by grant/revoke CLI commands without requiring --authUsersProvider on the command line.
…epository appendProjectToInventory, removeProjectFromInventory, and casbinWithRollback were calling repository.save(User) which only writes to SQL. For Redis-backed user stores this meant the groups_by_applications field was never updated, so the granted project never appeared in the app. Switch all three to userStore.save(User) so the write goes to whichever backend (SQL or Redis) is active. Update ProjectAdminServiceImplTest to mock UserStore instead of Repository for user saves, and add a regression test that verifies groups_by_applications is updated in the store and repository.save(User) is never called.
…imple default for policyReloadInterval - Guard each setUpdateCallback overload with removeListener(listenerId) so re-registration does not leak the previous RTopic listener - Remove .defaultsTo(DEFAULT_POLICY_RELOAD_INTERVAL) from the joptsimple option so the key stays absent when the flag is not passed, letting CommonMode apply its own per-mode defaults (0 for Redis, 30s otherwise) - Add DatashareCliTest cases asserting the key is absent when unset and propagated when explicit
d797361 to
bb4d695
Compare
…yUpdateMessage Replaces the implicit convention of prepending instanceId to the raw message string with an explicit PolicyUpdateMessage record carrying callerId and message fields. Listeners now use equals() on callerId instead of startsWith(), eliminating the edge case where a foreign callerId that merely shares a prefix with our own would be wrongly suppressed. Adds a regression test for that exact case.
Adds a Redis pub/sub watcher (
PolicyWatcher) that broadcasts Casbin policy changes across server instances in real time, replacing the previous polling-only approach. When a project grant or revoke mutates the policy, the watcher publishes a reload signal on a Redis topic so all instances apply the change immediately rather than waiting for the next poll cycle.Also fixes several correctness issues in the session and project-admin layers surfaced during review.
What changed
PolicyWatcher- newRTopic-backedWatcherthat publishesinstanceId:reloadon mutations and subscribes to reload events; suppresses self-triggered reloads; logs WARN when publish fails so the operator knows other instances won't see the change until the next poll cycleAuthorizer- newWatcherconstructor;startAutoLoadPolicy(intervalMs)backstop for partition recovery; implementsCloseableCommonMode- wiresPolicyWatcherwhenbusType=REDIS; registers both watcher and authorizer withaddCloseable; callsstartAutoLoadPolicyfor failsafe reloadProjectAdminServiceImpl-findUserhelper checks SQL then Redis (usersWritable.find) so OAuth2-only users (never in SQL) are found during grant/revoke;casbinWithRollbacknow rolls back both SQL and Redis on Casbin failure; actionable hint inUserNotFoundExceptionwhen--authUsersProvideris notUsersInRedisUsersInRedis.saveOrUpdate- WATCH/MULTI/EXEC retry loop eliminates the TOCTOU race betweenttl()andEXEC; extend-only TTL rule (never shorten an active session); bounded to 10 attemptsDatashareCliOptions/GlobalOptions/StageRunCommand---policyReloadIntervaloption (default 30 s fallback poll)Tests added
PolicyWatcherTest- unit tests for publish, self-suppress, listener registrationAuthorizerRedisSyncIntTest- integration test: twoAuthorizerinstances share a real Redis; a grant on instance A is visible on instance B within 500 msUsersInRedisTest- WATCH retry loop, extend-only TTL, persistent key handlingProjectAdminServiceImplTest- CLI option parsing, OAuth2 user fallback, hint messageAuthorizerTest- watcher notification, polling lifecycle (close()stops timer)Important
When
busTypeis notREDIS, it falls into theelsepath on line 320 ofCommonMode.java:PolicyWatcher— no Redis pub/sub, no cross-instance notificationsAuthorizer(adapter, interval)— creates an authorizer with only periodic polling viastartAutoLoadPolicy(interval)--policyReloadIntervalis also unset (defaults to"0"), theninterval=0→startAutoLoadPolicyis a no-op → the authorizer is completely static: policy is loaded once at startup and never refreshedSo the behavior matrix:
busType--policyReloadIntervalREDISREDIS3000030000In a single-server deploy without Redis (
busType=MEMORY), the static case is fine - there's only one instance so there's nothing to sync. But if someone runs multiple servers without Redis, grants won't propagate to other instances unless--policyReloadIntervalis set.Differences between branches
feat/casbin-redis-watcherandfeat/casbin-redis-watcher-exfeat/casbin-redis-watcherfeat/casbin-redis-watcher-exPolicyWatcherusing RedissonRTopicSafeWatcherExwrappingjcasbin-redis-watcher-exlibrary'sRedisWatcherExWatcher(basic — oneupdate()method)WatcherEx(fine-grained — separate callbacks per operation:updateForAddPolicy,updateForRemovePolicy, etc.)instanceId; the publisher ignores its own messagesjcasbin-redis-watcher-ex 1.1.0+ 3 Netty exclusions to avoid classpath conflictsjcasbin-redis-watcher-ex)PolicyWatcher.update()catches and logs WARNSafeWatcherExcatches allupdateFor*()and logs WARN (necessary because jcasbin swallows them silently)PolicyWatcher implements Closeable— removes the RTopic listenerCloseableneeded on the watcherstartAutoLoadPolicy(interval)Key trade-off: the
-watcherbranch adds zero dependencies and keeps full control; the-watcher-exbranch delegates to the library but requires managing Netty exclusions and adds a shim layer purely to recover the error visibility that jcasbin strips away. TheWatcherExgranularity (per-operation callbacks) is the only functional advantage of the library, and Datashare doesn't currently use those callbacks for anything beyond triggering a reload.