Skip to content

Commit eef3558

Browse files
authored
ENG-1322 End of task: Wrong worker (#905)
No error if a task ends that was started before currently completed task
1 parent b2a17ec commit eef3558

File tree

4 files changed

+116
-15
lines changed

4 files changed

+116
-15
lines changed

apps/roam/src/utils/syncDgNodesToSupabase.ts

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,17 @@ const notifyEndSyncFailure = ({
6767
});
6868
};
6969

70-
export const endSyncTask = async (
71-
worker: string,
72-
status: Enums<"task_status">,
73-
showToast: boolean = false,
74-
): Promise<void> => {
70+
export const endSyncTask = async ({
71+
worker,
72+
status,
73+
showToast = false,
74+
startTime,
75+
}: {
76+
worker: string;
77+
status: Enums<"task_status">;
78+
showToast: boolean;
79+
startTime: Date;
80+
}): Promise<void> => {
7581
try {
7682
const supabaseClient = await getLoggedInClient();
7783
if (!supabaseClient) return;
@@ -85,6 +91,7 @@ export const endSyncTask = async (
8591
s_function: SYNC_FUNCTION,
8692
s_worker: worker,
8793
s_status: status,
94+
s_started_at: startTime.toISOString(),
8895
});
8996
if (error) {
9097
console.error("endSyncTask: Error calling end_sync_task:", error);
@@ -392,7 +399,7 @@ export const setSyncActivity = (active: boolean) => {
392399
export const createOrUpdateDiscourseEmbedding = async (showToast = false) => {
393400
if (!doSync) return;
394401
console.debug("starting createOrUpdateDiscourseEmbedding");
395-
const startTime = new Date().valueOf();
402+
const startTime = new Date();
396403
let success = true;
397404
let claimed = false;
398405
const worker = window.roamAlphaAPI.user.uid();
@@ -455,14 +462,15 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => {
455462
context,
456463
});
457464
await cleanupOrphanedNodes(supabaseClient, context);
458-
await endSyncTask(worker, "complete", showToast);
459-
const duration = (new Date().valueOf() - startTime) / 1000.0;
465+
await endSyncTask({ worker, status: "complete", showToast, startTime });
466+
const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0;
460467
posthog.capture("Sync complete", { duration });
461468
} catch (error) {
462469
console.error("createOrUpdateDiscourseEmbedding: Process failed:", error);
463470
success = false;
464-
if (worker && claimed) await endSyncTask(worker, "failed", showToast);
465-
const duration = (new Date().valueOf() - startTime) / 1000.0;
471+
if (worker && claimed)
472+
await endSyncTask({ worker, status: "failed", showToast, startTime });
473+
const duration = (new Date().valueOf() - startTime.valueOf()) / 1000.0;
466474
posthog.capture("Sync error", { duration });
467475
if (error instanceof FatalError) {
468476
doSync = false;

packages/database/src/dbTypes.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1469,6 +1469,7 @@ export type Database = {
14691469
end_sync_task: {
14701470
Args: {
14711471
s_function: string
1472+
s_started_at?: string
14721473
s_status: Database["public"]["Enums"]["task_status"]
14731474
s_target: number
14741475
s_worker: string
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
DROP FUNCTION IF EXISTS public.end_sync_task(
2+
s_target bigint,
3+
s_function character varying,
4+
s_worker character varying,
5+
s_status public.task_status);
6+
7+
CREATE OR REPLACE FUNCTION public.end_sync_task(
8+
s_target bigint,
9+
s_function character varying,
10+
s_worker character varying,
11+
s_status public.task_status,
12+
s_started_at timestamptz = NULL
13+
) RETURNS void
14+
SET search_path = ''
15+
LANGUAGE plpgsql
16+
AS $$
17+
DECLARE t_id INTEGER;
18+
DECLARE t_worker varchar;
19+
DECLARE t_status public.task_status;
20+
DECLARE t_failure_count SMALLINT;
21+
DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE;
22+
DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE;
23+
DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE;
24+
BEGIN
25+
SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start
26+
INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start
27+
FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function;
28+
ASSERT s_status > 'active';
29+
IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN
30+
-- we probably took too long. Let the other task have priority.
31+
RETURN;
32+
END IF;
33+
ASSERT t_worker = s_worker, 'Wrong worker';
34+
ASSERT s_status >= t_status, 'do not go back in status';
35+
IF s_status = 'complete' THEN
36+
t_last_task_end := now();
37+
t_last_success_start := t_last_task_start;
38+
t_failure_count := 0;
39+
ELSE
40+
IF t_status != s_status THEN
41+
t_failure_count := t_failure_count + 1;
42+
END IF;
43+
END IF;
44+
45+
UPDATE public.sync_info
46+
SET status = s_status,
47+
task_times_out_at=null,
48+
last_task_end=t_last_task_end,
49+
last_success_start=t_last_success_start,
50+
failure_count=t_failure_count
51+
WHERE id=t_id;
52+
END;
53+
$$;
54+
55+
ALTER FUNCTION public.end_sync_task(
56+
s_target bigint,
57+
s_function character varying,
58+
s_worker character varying,
59+
s_status public.task_status,
60+
s_started_at timestamptz
61+
) OWNER TO "postgres";
62+
63+
GRANT ALL ON FUNCTION public.end_sync_task(
64+
s_target bigint,
65+
s_function character varying,
66+
s_worker character varying,
67+
s_status public.task_status,
68+
s_started_at timestamptz
69+
) TO "anon";
70+
GRANT ALL ON FUNCTION public.end_sync_task(
71+
s_target bigint,
72+
s_function character varying,
73+
s_worker character varying,
74+
s_status public.task_status,
75+
s_started_at timestamptz
76+
) TO "authenticated";
77+
GRANT ALL ON FUNCTION public.end_sync_task(
78+
s_target bigint,
79+
s_function character varying,
80+
s_worker character varying,
81+
s_status public.task_status,
82+
s_started_at timestamptz
83+
) TO "service_role";

packages/database/supabase/schemas/sync.sql

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ CREATE OR REPLACE FUNCTION public.end_sync_task(
5050
s_target bigint,
5151
s_function character varying,
5252
s_worker character varying,
53-
s_status public.task_status
53+
s_status public.task_status,
54+
s_started_at timestamptz = NULL
5455
) RETURNS void
5556
SET search_path = ''
5657
LANGUAGE plpgsql
@@ -67,6 +68,10 @@ BEGIN
6768
INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start
6869
FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function;
6970
ASSERT s_status > 'active';
71+
IF t_worker != s_worker AND COALESCE(s_started_at, t_last_task_start) < t_last_task_start THEN
72+
-- we probably took too long. Let the other task have priority.
73+
RETURN;
74+
END IF;
7075
ASSERT t_worker = s_worker, 'Wrong worker';
7176
ASSERT s_status >= t_status, 'do not go back in status';
7277
IF s_status = 'complete' THEN
@@ -93,7 +98,8 @@ ALTER FUNCTION public.end_sync_task(
9398
s_target bigint,
9499
s_function character varying,
95100
s_worker character varying,
96-
s_status public.task_status
101+
s_status public.task_status,
102+
s_started_at timestamptz
97103
) OWNER TO "postgres";
98104

99105

@@ -190,19 +196,22 @@ GRANT ALL ON FUNCTION public.end_sync_task(
190196
s_target bigint,
191197
s_function character varying,
192198
s_worker character varying,
193-
s_status public.task_status
199+
s_status public.task_status,
200+
s_started_at timestamptz
194201
) TO "anon";
195202
GRANT ALL ON FUNCTION public.end_sync_task(
196203
s_target bigint,
197204
s_function character varying,
198205
s_worker character varying,
199-
s_status public.task_status
206+
s_status public.task_status,
207+
s_started_at timestamptz
200208
) TO "authenticated";
201209
GRANT ALL ON FUNCTION public.end_sync_task(
202210
s_target bigint,
203211
s_function character varying,
204212
s_worker character varying,
205-
s_status public.task_status
213+
s_status public.task_status,
214+
s_started_at timestamptz
206215
) TO "service_role";
207216

208217
GRANT ALL ON FUNCTION public.propose_sync_task(

0 commit comments

Comments
 (0)