gitlab-ce/app/models/projects/build_artifacts_size_refres...

145 lines
4.3 KiB
Ruby

# frozen_string_literal: true
module Projects
class BuildArtifactsSizeRefresh < ApplicationRecord
include AfterCommitQueue
include BulkInsertSafe
STALE_WINDOW = 2.hours
# This delay is set to 10 minutes to accommodate any ongoing
# deletion that might have happened.
# The delete on the database may have been committed before
# the refresh completed its batching. If the resulting decrement is
# pushed into Redis after the refresh has ended, it would result in net negative value.
# The delay is needed to ensure this negative value is ignored.
FINALIZE_DELAY = 10.minutes
self.table_name = 'project_build_artifacts_size_refreshes'
COUNTER_ATTRIBUTE_NAME = :build_artifacts_size
belongs_to :project
validates :project, presence: true
# The refresh of the project statistics counter is performed in 4 stages:
# 1. created - The refresh is on the queue to be processed by Projects::RefreshBuildArtifactsSizeStatisticsWorker
# 2. running - The refresh is ongoing. The project statistics counter switches to the temporary refresh counter key.
# Counter increments are deduplicated.
# 3. pending - The refresh is pending to be picked up by Projects::RefreshBuildArtifactsSizeStatisticsWorker again.
# 4. finalizing - The refresh has finished summing existing job artifact size into the refresh counter key.
# The sum will need to be moved into the counter key.
STATES = {
created: 1,
running: 2,
pending: 3,
finalizing: 4
}.freeze
state_machine :state, initial: :created do
# created -> running <-> pending
state :created, value: STATES[:created]
state :running, value: STATES[:running]
state :pending, value: STATES[:pending]
state :finalizing, value: STATES[:finalizing]
event :process do
transition [:created, :pending, :running] => :running
end
event :requeue do
transition running: :pending
end
event :schedule_finalize do
transition running: :finalizing
end
before_transition created: :running do |refresh|
refresh.reset_project_statistics!
refresh.refresh_started_at = Time.zone.now
refresh.last_job_artifact_id_on_refresh_start = refresh.project.job_artifacts.last&.id
end
before_transition running: any do |refresh, transition|
refresh.updated_at = Time.zone.now
end
before_transition running: :pending do |refresh, transition|
refresh.last_job_artifact_id = transition.args.first
end
before_transition running: :finalizing do |refresh, transition|
refresh.schedule_finalize_worker
end
end
scope :stale, -> { with_state(:running).where('updated_at < ?', STALE_WINDOW.ago) }
scope :remaining, -> { with_state(:created, :pending).or(stale) }
scope :processing_queue, -> { remaining.order(state: :desc) }
after_destroy :schedule_namespace_aggregation_worker
def self.enqueue_refresh(projects)
now = Time.zone.now
records = Array(projects).map do |project|
new(project: project, state: STATES[:created], created_at: now, updated_at: now)
end
bulk_insert!(records, skip_duplicates: true)
end
def self.process_next_refresh!
next_refresh = nil
transaction do
next_refresh = processing_queue
.lock('FOR UPDATE SKIP LOCKED')
.take
next_refresh&.process!
end
next_refresh
end
def reset_project_statistics!
project.statistics.initiate_refresh!(COUNTER_ATTRIBUTE_NAME)
end
def next_batch(limit:)
project.job_artifacts.select(:id, :size)
.id_before(last_job_artifact_id_on_refresh_start)
.id_after(last_job_artifact_id.to_i)
.ordered_by_id
.limit(limit)
end
def started?
!created?
end
def finalize!
project.statistics.finalize_refresh(COUNTER_ATTRIBUTE_NAME)
destroy!
end
def schedule_finalize_worker
run_after_commit do
Projects::FinalizeProjectStatisticsRefreshWorker.perform_in(FINALIZE_DELAY, self.class.to_s, id)
end
end
private
def schedule_namespace_aggregation_worker
run_after_commit do
Namespaces::ScheduleAggregationWorker.perform_async(project.namespace_id)
end
end
end
end