Real World Rails Background Jobs

Rails apps can be built to perform much of their work in background jobs. Reasons for offloading work to background jobs include speeding up page response times, performing slow tasks that could timeout if performed during an HTTP server request, and improving fault tolerance (jobs are typically setup to be retried when they fail).

Below you’ll find a wide range of example jobs from real world applications for you to explore and learn from.

Bulk Email Reports

class SendAnalyticsEmailJob < ActiveJob::Base
  def perform
    Group.with_analytics.find_each do |group|
      BaseMailer.send_bulk_mail(to: group.admins) do |user|
        UserMailer.delay(priority: 10).analytics(user: user, group: group)
      end
    end
  end
end

Source

Generate PDFs

class PdfGenerationJob < ActiveJob::Base
  queue_as :pdf_generation

  def perform(claim)
    Rails.logger.info "Starting PdfGenerationJob"
    claim.generate_pdf!
    Rails.logger.info "Finished PdfGenerationJob"
  end
end

Source

Update Database Counts

class PetitionCountJob < ActiveJob::Base
  class InvalidSignatureCounts < RuntimeError; end

  queue_as :high_priority

  def perform
    petitions = Petition.with_invalid_signature_counts

    unless petitions.empty?
      petitions.each(&:update_signature_count!)

      Appsignal.send_exception(exception(petitions))
    end
  end

  private

  def exception(petitions)
    InvalidSignatureCounts.new(error_message(petitions))
  end

  def error_message(petitions)
    I18n.t(
      :"invalid_signature_counts",
        scope:  :"petitions.errors",
        count:  petitions.size,
        ids:    petitions.map(&:id).inspect,
        id:     petitions.first.id.to_s
    )
  end
end

Source

Geocode

class GeocodeJob < ActiveJob::Base
  def perform(location)
    address = [location.address, location.zipcode].join(" ")

    latitude, longitude = Geocoder.coordinates(address)

    location.update!(latitude: latitude, longitude: longitude)
  end
end

Source

Import RSS Feeds

class FeedImporter
  include Sidekiq::Worker
  sidekiq_options queue: :critical, retry: false

  def perform(import_item_id)
    import_item = ImportItem.find(import_item_id)
    user = import_item.import.user

    finder = FeedFinder.new(import_item.details[:xml_url])
    if finder.options.any?
      feed = finder.create_feed(finder.options.first)
      if feed
        subscription = user.subscriptions.find_or_create_by(feed: feed)
        if import_item.details[:title] && subscription
          subscription.update(title: import_item.details[:title])
        end
        if import_item.details[:tag]
          feed.tag(import_item.details[:tag], user, false)
        end
      end
    end
  rescue ActiveRecord::RecordNotFound
  end

end

Source

Cleanup Data

class CleanupEvents < ApplicationJob
  def perform
    Event::Base.transaction do
      Event::Base.where(project_logged: true, mails_sent: true, queued: true, undone_jobs: 0).lock(true).delete_all
    end
  end
end

Source

Subscribe to Mailing List

class UserSubscribeJob < ApplicationJob
  queue_as :default

  def perform(name, email)
    MailChimpService.user_subscribe(name, email)
  end
end

Source

Unsubscribe from Mailing List

class UserUnsubscribeJob < ApplicationJob
  queue_as :default

  def perform(email)
    MailChimpService.user_unsubscribe(email)
  end
end

Source

Group/Batch/Spawn Jobs

class DailyTasksJob < ActiveJob::Base
  queue_as :default

  def perform
    FlagOverdueJob.perform_later
    FlagMissedJob.perform_later
    DenyMissedRequestsJob.perform_later
    EmailCheckinReminderJob.perform_later
    EmailCheckoutReminderJob.perform_later
    EmailMissedReservationsJob.perform_later
    EmailOverdueReminderJob.perform_later
    DeleteOldBlackoutsJob.perform_later
    DeleteMissedReservationsJob.perform_later
  end
end

Source

Fetch API Data

class GithubRepoFetcherJob < ApplicationJob
  queue_as :http_request

  def perform(user_id)
    User.fetch_github_repositories(user_id)
  end
end

Source

Update Search Indexes

class SearchIndexer < ApplicationJob
  queue_as :search_indexer

  def perform(operation, type, id)
    obj = nil
    type.downcase!

    case type
    when "topic"
      obj = Topic.find_by_id(id)
    when "page"
      obj = Page.find_by_id(id)
    when "user"
      obj = User.find_by_id(id)
    end

    return false unless obj

    if operation == "update"
      obj.__elasticsearch__.update_document
    elsif operation == "delete"
      obj.__elasticsearch__.delete_document
    elsif operation == "index"
      obj.__elasticsearch__.index_document
    end
  end
end

Source

Push Notifications to Devices

class PushJob < ApplicationJob
  queue_as :notifications

  # user_ids: 用户编号列表
  # note: { alert: 'Hello APNS World!', sound: 'true', badge: 1 }
  def perform(user_ids, note = {})
    return false if Setting.apns_pem.blank?

    note[:sound] ||= "true"
    devices = Device.where(user_id: user_ids).all.to_a
    devices.reject! { |d| !d.alive? }
    tokens = devices.collect(&:token)
    return false if tokens.blank?

    notification = RubyPushNotifications::APNS::APNSNotification.new(tokens, aps: note)
    pusher = RubyPushNotifications::APNS::APNSPusher.new(Setting.apns_pem, !Rails.env.production?)
    pusher.push [notification]
    Rails.logger.tagged("PushJob") do
      Rails.logger.info "send to #{tokens.size} devices #{note} status: #{notification.success}"
    end
    notification.success
  end
end

Source

Export CSV Data

# frozen_string_literal: true
require 'csv'

class CsvExportJob < ActiveJob::Base
  queue_as :csv_jobs

  def perform(csv_export)
    ActiveRecord::Base.connection_pool.with_connection do
      create_export_folder(csv_export)
      generate_csv(csv_export)
      cleanup_downloaded
    end
  end

  private

  def cleanup_downloaded
    CsvExport.old.find_each(&:destroy!)
  end

  def generate_csv(csv_export)
    csv_export.status! :started
    remove_deleted_scope_and_create_report(csv_export)
    CsvMailer.created(csv_export).deliver_now if csv_export.email.present?
    csv_export.status! :finished
    Rails.logger.info("Export #{csv_export.download_name} completed")
  rescue Errno::EACCES, IOError, ActiveRecord::ActiveRecordError => e
    csv_export.delete_file
    csv_export.status! :failed
    Rails.logger.error("Export #{csv_export.id} failed with error #{e}")
    Airbrake.notify(e, error_message: "Export #{csv_export.id} failed.")
  end

  # Helper method to removes the default soft_deletion scope for these models for the report
  def remove_deleted_scope_and_create_report(csv_export)
    Deploy.with_deleted do
      Stage.with_deleted do
        Project.with_deleted do
          DeployGroup.with_deleted do
            Environment.with_deleted do
              deploy_csv_export(csv_export)
            end
          end
        end
      end
    end
  end

  def deploy_csv_export(csv_export)
    filename = csv_export.path_file
    filter = csv_export.filters

    deploys = filter_deploys(filter)
    summary = ["-", "Generated At", csv_export.updated_at, "Deploys", deploys.count.to_s]
    filters_applied = ["-", "Filters", filter.to_json]

    CSV.open(filename, 'w+') do |csv|
      csv << Deploy.csv_header
      deploys.find_each do |deploy|
        csv << deploy.csv_line
      end
      csv << summary
      csv << filters_applied
    end
  end

  def filter_deploys(filter)
    if filter.keys.include?('environments.production')
      production_value = filter.delete('environments.production')
      # To match logic of stages.production? True when any deploy_group environment is true or
      # deploy_groups environment is empty and stages is true
      production_query = if production_value
        "(StageProd.production = ? OR (StageProd.production IS NULL AND stages.production = ?))"
      else
        "(NOT StageProd.production = ? OR (StageProd.production IS NULL AND NOT stages.production = ?))"
      end

      # This subquery extracts the distinct pairs of stage.id to environment.production for the join below as StageProd
      stage_prod_subquery = "(SELECT DISTINCT deploy_groups_stages.stage_id, environments.production "\
      "FROM deploy_groups_stages " \
      "INNER JOIN deploy_groups ON deploy_groups.id = deploy_groups_stages.deploy_group_id " \
      "INNER JOIN environments ON environments.id = deploy_groups.environment_id) StageProd"

      # The query could result in duplicate entries when a stage has a production and non-production deploy group
      # so it is important this is run only if environments.production was set
      Deploy.includes(:buddy, job: :user, stage: :project).joins(:job, :stage).
        joins("LEFT JOIN #{stage_prod_subquery} ON StageProd.stage_id = stages.id").
        where(filter).where(production_query, true, true)
    else
      Deploy.includes(:buddy, job: :user, stage: :project).joins(:job, :stage).where(filter)
    end
  end

  def create_export_folder(csv_export)
    FileUtils.mkdir_p(File.dirname(csv_export.path_file))
  end
end

Source

Generate and Email Kindle Ebooks

class SendToKindle
  include Sidekiq::Worker
  sidekiq_options queue: :critical
  SUPPORTED_IMAGES = %w{.jpg .jpeg .gif .png .bmp}

  def perform(entry_id, kindle_address)
    if ENV["KINDLEGEN_PATH"].blank?
      Rails.logger.error { 'Missing ENV["KINDLEGEN_PATH"]' }
    else
      send_to_kindle(entry_id, kindle_address)
    end
  end

  def send_to_kindle(entry_id, kindle_address)
    @entry = Entry.find(entry_id)
    @working_directory = Dir.mktmpdir
    begin
      content_path = write_html
      mobi_path = kindlegen(content_path)
      if File.file?(mobi_path)
        UserMailer.kindle(kindle_address, mobi_path).deliver_now
      else
        # Notify user of error?
      end
    ensure
      FileUtils.remove_entry(@working_directory)
    end
  end

  def kindlegen(content_path)
    mobi_file = 'kindle.mobi'
    system("#{ENV["KINDLEGEN_PATH"]} #{content_path} -o #{mobi_file} > /dev/null")
    File.join(@working_directory, mobi_file)
  end

  def download_image(url, destination)
    File.open(destination, "wb") do |file|
      file.write(HTTParty.get(url, {timeout: 20}).parsed_response)
    end
  rescue
    false
  end

  def render_content(content)
    action_view = ActionView::Base.new()
    action_view.view_paths = ActionController::Base.view_paths
    action_view.extend(ApplicationHelper)
    action_view.render(template: "supported_sharing_services/kindle_content.html.erb", locals: {entry: @entry, content: content})
  end

  def write_html
    content = ContentFormatter.api_format(@entry.content, @entry)
    content = Nokogiri::HTML.fragment(content)
    content = prepare_images(content)
    content_path = file_destination('kindle.html')
    File.open(content_path, "w") do |file|
      file.write(render_content(content.to_xml))
    end
    content_path
  end

  def prepare_images(parsed_content)
    images = []
    parsed_content.search("img").each_with_index do |element, index|
      next if element['src'].blank?
      src = element['src'].strip
      extension = File.extname(src)
      if SUPPORTED_IMAGES.include?(extension)
        filename = index.to_s + extension
        destination = file_destination(filename)
        if download_image(src, destination)
          element['src'] = filename
        end
      else
        next
      end
    end
    parsed_content
  end

  def file_destination(filename)
    File.join(@working_directory, filename)
  end

end

Source

Expire Trial SaaS Subscriptions

class TrialExpiration
  include Sidekiq::Worker
  sidekiq_options queue: :critical

  def perform
    plan = Plan.where(stripe_id: 'trial').first
    users = User.where(plan: plan, suspended: false).where('created_at < ?', Feedbin::Application.config.trial_days.days.ago)
    Subscription.where(user_id: users).update_all(active: false)
    users.update_all(suspended: true)
  end

end

Source

Scrape/Crawl and Process Website Images

require 'rmagick'
class FaviconFetcher
  include Sidekiq::Worker
  sidekiq_options retry: false

  def perform(host, force = false)
    @favicon = Favicon.unscoped.where(host: host).first_or_initialize
    @force = force
    update if should_update?
  rescue
    Librato.increment('favicon.failed')
  end

  def update
    data = nil
    favicon_found = false
    response = nil

    favicon_url = find_favicon_link
    if favicon_url
      response = download_favicon(favicon_url)
      favicon_found = true if !response.to_s.empty?
    end

    if !favicon_found
      favicon_url = default_favicon_location
      response = download_favicon(favicon_url)
    end

    if response
      processor = FaviconProcessor.new(response.to_s, @favicon.host)
      if processor.valid? && @favicon.data["favicon_hash"] != processor.favicon_hash
        @favicon.favicon = processor.encoded_favicon if processor.encoded_favicon
        @favicon.url = processor.favicon_url if processor.favicon_url
        @favicon.data = get_data(response, processor.favicon_hash)
        Librato.increment('favicon.updated')
      end
      Librato.increment('favicon.status', source: response.code)
    end

    @favicon.save
  end

  def get_data(response, favicon_hash)
    data = {favicon_hash: favicon_hash}
    if response
      data = data.merge!(response.headers.to_h.extract!("Last-Modified", "Etag"))
    end
    data
  end

  def find_favicon_link
    favicon_url = nil
    url = URI::HTTP.build(host: @favicon.host)
    response = HTTP
      .timeout(:global, write: 5, connect: 5, read: 5)
      .follow()
      .get(url)
      .to_s
    html = Nokogiri::HTML(response)
    favicon_links = html.search(xpath)
    if favicon_links.present?
      favicon_url = favicon_links.last.to_s
      favicon_url = URI.parse(favicon_url)
      favicon_url.scheme = 'http'
      if !favicon_url.host
        favicon_url = URI::HTTP.build(scheme: 'http', host: @favicon.host)
        favicon_url = favicon_url.merge(favicon_links.last.to_s)
      end
    end
    favicon_url
  rescue
    nil
  end

  def default_favicon_location
    URI::HTTP.build(host: @favicon.host, path: "/favicon.ico")
  end

  def download_favicon(url)
    response = HTTP
      .timeout(:global, write: 5, connect: 5, read: 5)
      .follow()
      .headers(request_headers)
      .get(url)
  end

  def request_headers
    headers = {user_agent: "Mozilla/5.0"}
    if !@force
      conditional_headers = ConditionalHTTP.new(@favicon.data["Etag"], @favicon.data["Last-Modified"])
      headers = headers.merge(conditional_headers.to_h)
    end
    headers
  end

  def should_update?
    if @force
      true
    else
      !updated_recently?
    end
  end

  def updated_recently?
    if @favicon.updated_at
      @favicon.updated_at > 1.hour.ago
    else
      false
    end
  end

  def xpath
    icon_names = ["icon", "shortcut icon"]
    icon_names = icon_names.map do |icon_name|
      "//link[not(@mask) and translate(@rel, 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz') = '#{icon_name}']/@href"
    end
    icon_names.join(" | ")
  end

end

Source

TLDR

Background jobs can be used to do pretty much anything you can dream up for your app.

Published by Eliot Sykes

I help teams grow their Rails apps and themselves as a Rails consultant and coach for Rails developers.