Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 131 additions & 57 deletions app/workers/prune_event_logs_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ class PruneEventLogsWorker < BaseWorker
HIGH_VOLUME_EVENTS = %w[
license.validation.succeeded
license.validation.failed
license.usage.incremented
license.usage.decremented
machine.heartbeat.ping
machine.heartbeat.pong
process.heartbeat.ping
process.heartbeat.pong
artifact.downloaded
release.downloaded
].freeze

sidekiq_options queue: :cron,
Expand All @@ -40,87 +44,157 @@ def perform(ts = Time.current.iso8601)
return if
BACKLOG_DAYS <= 0 # never prune -- keep event backlog forever

cutoff_end_date = BACKLOG_DAYS.days.ago.to_date
cutoff_start_date = EventLog.where(created_date: ..cutoff_end_date).minimum(:created_date) || cutoff_end_date
start_time = Time.parse(ts)

# we only want to prune certain high-volume event logs for ent accounts
hi_vol_event_type_ids = EventType.where(event: HIGH_VOLUME_EVENTS)
.ids
@hi_vol_event_type_ids = EventType.where(event: HIGH_VOLUME_EVENTS).ids
@cutoff_end_date = BACKLOG_DAYS.days.ago.to_date
@cutoff_start_date = EventLog.where(created_date: ..cutoff_end_date).minimum(:created_date) || cutoff_end_date
@start_time = Time.parse(ts)

Keygen.logger.info "[workers.prune-event-logs] Starting: start=#{start_time} cutoff_start=#{cutoff_start_date} cutoff_end=#{cutoff_end_date}"

(cutoff_start_date...cutoff_end_date).each do |date|
accounts = Account.preload(:plan).where_assoc_exists(:event_logs,
created_date: date,
)
accounts = Account.preload(:plan).where_assoc_exists(:event_logs, created_date: date)

Keygen.logger.info "[workers.prune-event-logs] Pruning day: accounts=#{accounts.count} date=#{date}"
Keygen.logger.info "[workers.prune-event-logs] Pruning period: accounts=#{accounts.count} date=#{date}"

accounts.unordered.find_each do |account|
account_id = account.id
event_logs = account.event_logs.where(created_date: date)
plan = account.plan
catch :pause do
accounts.unordered.find_each do |account|
prune_event_logs_if_needed(account, date:)
end
end

total = event_logs.count
sum = 0
Keygen.logger.info "[workers.prune-event-logs] Done: date=#{date}"
end
end

batches = (total / BATCH_SIZE) + 1
batch = 0
private

Keygen.logger.info "[workers.prune-event-logs] Pruning #{total} rows: account_id=#{account_id} date=#{date} batches=#{batches}"
attr_reader :hi_vol_event_type_ids,
:cutoff_start_date,
:cutoff_end_date,
:start_time

loop do
unless (t = Time.current).before?(start_time + EXEC_TIMEOUT.seconds)
Keygen.logger.info "[workers.prune-event-logs] Pausing: date=#{date} start=#{start_time} end=#{t}"
def prune_event_logs_if_needed(account, date:)
if within_retention_period?(account, date:)
dedup_hi_vol_event_logs_for_date(account, date:)
else
prune_event_logs_for_date(account, date:)
end
end

return # we'll pick up on the next cron
end
def dedup_hi_vol_event_logs_for_date(account, date:)
hi_vol_event_logs = account.event_logs.where(
event_type_id: hi_vol_event_type_ids,
created_date: date,
)

# partition and rank to dedup high volume events within retention period
ranked_event_logs = hi_vol_event_logs.unordered.select(<<~SQL.squish)
event_logs.id,
event_logs.created_at,
ROW_NUMBER() OVER (
PARTITION BY
event_logs.account_id,
event_logs.event_type_id,
event_logs.resource_id,
event_logs.resource_type,
event_logs.created_date
ORDER BY
event_logs.created_at DESC
) AS rank
SQL

# select all rows except the top of the partition to delete i.e. to dedup events per-date/event/resource
selected_event_logs = EventLog.from("(#{ranked_event_logs.to_sql}) AS ranked")
.where('ranked.rank > 1')
.reorder(
'ranked.created_at ASC',
)

total = selected_event_logs.count
sum = 0

batches = (total / BATCH_SIZE) + 1
batch = 0

Keygen.logger.info "[workers.prune-event-logs] Deduping #{total} rows: account_id=#{account.id} date=#{date}"

loop do
unless within_execution_timeout?
Keygen.logger.info "[workers.prune-event-logs] Pausing dedup: date=#{date} start=#{start_time} end=#{current_time}"

throw :pause
end

count = event_logs.statement_timeout(STATEMENT_TIMEOUT) do
prune = account.event_logs.where(id: event_logs.limit(BATCH_SIZE).ids)
count = EventLog.statement_timeout(STATEMENT_TIMEOUT) do
selected_ids = selected_event_logs.limit(BATCH_SIZE).select(:id)

# for ent accounts, we keep the event backlog for the retention period except dup high-volume events.
# for std accounts, we prune everything in the event backlog.
if plan.ent?
hi_vol = prune.where(event_type_id: hi_vol_event_type_ids) # dedup even in retention period
EventLog.where(id: selected_ids).delete_all
end

# apply the account's log retention policy if there is one
if plan.event_log_retention_duration?
retention_cutoff_date = plan.event_log_retention_duration.seconds.ago.to_date
sum += count
batch += 1

prune = prune.where(created_date: ...retention_cutoff_date)
end
Keygen.logger.info "[workers.prune-event-logs] Deduped #{count} rows: account_id=#{account.id} date=#{date} batch=#{batch}/#{batches} progress=#{sum}/#{total}"

# for high-volume events, we keep one event per-day per-event per-resource since some of these can
# be very high-volume, e.g. thousands and thousands of validations and heartbeats per-day.
keep = hi_vol.distinct_on(:resource_id, :resource_type, :event_type_id, :created_date)
.reorder(:resource_id, :resource_type, :event_type_id,
created_date: :desc,
)
.select(
:id,
)
sleep BATCH_WAIT

# FIXME(ezekg) would be better to somehow rollup this data vs deduping
hi_vol.delete_by("id NOT IN (#{keep.to_sql})")
end
break if count < BATCH_SIZE
end

prune.delete_all
end
Keygen.logger.info "[workers.prune-event-logs] Deduping done: account_id=#{account.id} date=#{date} progress=#{sum}/#{total}"
end

sum += count
batch += 1
def prune_event_logs_for_date(account, date:)
event_logs = account.event_logs.where(created_date: date)

Keygen.logger.info "[workers.prune-event-logs] Pruned #{sum}/#{total} rows: account_id=#{account_id} date=#{date} batch=#{batch}/#{batches}"
total = event_logs.count
sum = 0

sleep BATCH_WAIT
batches = (total / BATCH_SIZE) + 1
batch = 0

break if count < BATCH_SIZE
end
Keygen.logger.info "[workers.prune-event-logs] Pruning #{total} rows: account_id=#{account.id} date=#{date}"

loop do
unless within_execution_timeout?
Keygen.logger.info "[workers.prune-event-logs] Pausing: date=#{date} start=#{start_time} end=#{current_time}"

throw :pause
end

Keygen.logger.info "[workers.prune-event-logs] Done: date=#{date}"
count = event_logs.statement_timeout(STATEMENT_TIMEOUT) do
event_logs.limit(BATCH_SIZE).delete_all
end

sum += count
batch += 1

Keygen.logger.info "[workers.prune-event-logs] Pruned #{count} rows: account_id=#{account.id} date=#{date} batch=#{batch}/#{batches} count=#{sum}/#{total}"

sleep BATCH_WAIT

break if count < BATCH_SIZE
end

Keygen.logger.info "[workers.prune-event-logs] Pruning done: account_id=#{account.id} date=#{date} count=#{sum}/#{total}"
end

def current_time = Time.current

def within_execution_timeout?
current_time.before?(start_time + EXEC_TIMEOUT.seconds)
end

def within_retention_period?(account, date:)
plan = account.plan

return false unless
plan.present? && plan.event_log_retention_duration?

cutoff_date = plan.event_log_retention_duration.seconds
.ago
.to_date

date >= cutoff_date
end
end
72 changes: 45 additions & 27 deletions app/workers/prune_metrics_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ def perform(ts = Time.current.iso8601)
return if
BACKLOG_DAYS <= 0 # never prune -- keep metrics backlog forever

cutoff_end_date = BACKLOG_DAYS.days.ago.to_date
cutoff_start_date = Metric.where(created_date: ..cutoff_end_date).minimum(:created_date) || cutoff_end_date
start_time = Time.parse(ts)
@cutoff_end_date = BACKLOG_DAYS.days.ago.to_date
@cutoff_start_date = Metric.where(created_date: ..cutoff_end_date).minimum(:created_date) || cutoff_end_date
@start_time = Time.parse(ts)

Keygen.logger.info "[workers.prune-metrics] Starting: start=#{start_time} cutoff_start=#{cutoff_start_date} cutoff_end=#{cutoff_end_date}"

Expand All @@ -26,42 +26,60 @@ def perform(ts = Time.current.iso8601)

Keygen.logger.info "[workers.prune-metrics] Pruning day: accounts=#{accounts.count} date=#{date}"

accounts.unordered.find_each do |account|
account_id = account.id
metrics = account.metrics.where(created_date: date)
catch :pause do
accounts.unordered.find_each do |account|
prune_metrics_for_date(account, date:)
end
end

total = metrics.count
sum = 0
Keygen.logger.info "[workers.prune-metrics] Done: date=#{date}"
end
end

batches = (total / BATCH_SIZE) + 1
batch = 0
private

Keygen.logger.info "[workers.prune-metrics] Pruning #{total} rows: account_id=#{account_id} date=#{date} batches=#{batches}"
attr_reader :cutoff_start_date,
:cutoff_end_date,
:start_time

loop do
unless (t = Time.current).before?(start_time + EXEC_TIMEOUT.seconds)
Keygen.logger.info "[workers.prune-metrics] Pausing: date=#{date} start=#{start_time} end=#{t}"
def prune_metrics_for_date(account, date:)
metrics = account.metrics.where(created_date: date)

return # we'll pick up on the next cron
end
total = metrics.count
sum = 0

count = metrics.statement_timeout(STATEMENT_TIMEOUT) do
account.metrics.where(id: metrics.limit(BATCH_SIZE).ids)
.delete_all
end
batches = (total / BATCH_SIZE) + 1
batch = 0

sum += count
batch += 1
Keygen.logger.info "[workers.prune-metrics] Pruning #{total} rows: account_id=#{account.id} date=#{date}"

Keygen.logger.info "[workers.prune-metrics] Pruned #{sum}/#{total} rows: account_id=#{account_id} date=#{date} batch=#{batch}/#{batches}"
loop do
unless within_execution_timeout?
Keygen.logger.info "[workers.prune-metrics] Pausing: date=#{date} start=#{start_time} end=#{current_time}"

sleep BATCH_WAIT
throw :pause
end

break if count < BATCH_SIZE
end
count = metrics.statement_timeout(STATEMENT_TIMEOUT) do
metrics.limit(BATCH_SIZE).delete_all
end

Keygen.logger.info "[workers.prune-metrics] Done: date=#{date}"
sum += count
batch += 1

Keygen.logger.info "[workers.prune-metrics] Pruned #{count} rows: account_id=#{account.id} date=#{date} batch=#{batch}/#{batches} count=#{sum}/#{total}"

sleep BATCH_WAIT

break if count < BATCH_SIZE
end

Keygen.logger.info "[workers.prune-metrics] Pruning done: account_id=#{account.id} date=#{date} count=#{sum}/#{total}"
end

def current_time = Time.current

def within_execution_timeout?
current_time.before?(start_time + EXEC_TIMEOUT.seconds)
end
end
Loading