Story#108602614 - parallel history import

This commit is contained in:
Vladimir Krylov 2016-01-15 14:42:16 +02:00
parent 34aa3c0741
commit 1e88e27355
3 changed files with 36 additions and 4 deletions

View file

@ -12,6 +12,7 @@ gem 'rails', '4.2.4' # when update, all initializers eis_custom files nee
gem 'iso8601', '0.8.6' # for dates and times
gem 'hashie-forbidden_attributes', '0.1.1'
gem 'SyslogLogger', '2.0', require: 'syslog/logger'
gem 'parallel'
# load env
gem 'figaro', '1.1.1'

View file

@ -330,6 +330,7 @@ GEM
nprogress-rails (0.1.6.7)
open4 (1.3.4)
orm_adapter (0.5.0)
parallel (1.6.1)
parser (2.2.2.6)
ast (>= 1.1, < 3.0)
pdfkit (0.6.2)
@ -603,6 +604,7 @@ DEPENDENCIES
nokogiri (= 1.6.6.2)
nprogress-rails (= 0.1.6.7)
paper_trail!
parallel
pdfkit (= 0.6.2)
pg (= 0.18.2)
phantomjs (= 1.9.8.0)

View file

@ -5,10 +5,39 @@ namespace :import do
Rake::Task['import:history_domains'].invoke
end
def parallel_import all_ids
thread_pool = (Parallel.processor_count rescue 4) - 1
threads = []
all_ids.each_with_index do |one_id, i|
process = Process.fork do
begin
yield(one_id, i)
rescue => e
Rails.logger.error("[EXCEPTION] #{Process.pid}")
Rails.logger.error("#{Process.pid} #{e.message}" )
Rails.logger.error("#{Process.pid} #{e.backtrace.join("\n")}")
ensure
ActiveRecord::Base.remove_connection
Process.exit!
end
end
threads << process
if threads.count >= thread_pool
threads.delete(Process.wait(0))
end
end
Process.waitall
end
desc 'Import contact history'
task history_contacts: :environment do
Legacy::ContactHistory.uniq.pluck(:id).each do |legacy_contact_id|
old_ids = Legacy::ContactHistory.uniq.pluck(:id)
old_size = old_ids.size
parallel_import(old_ids) do |legacy_contact_id, process_idx|
start = Time.now.to_f
Contact.transaction do
data = []
@ -76,7 +105,7 @@ namespace :import do
end
ContactVersion.import_without_validations_or_callbacks data.first.keys, data.map(&:values) if data.any?
end
puts "Legacy Contact id #{legacy_contact_id} finished in #{Time.now.to_f - start}"
puts "[PID: #{Process.pid}] Legacy Contact #{legacy_contact_id} (#{process_idx}/#{old_size}) finished in #{Time.now.to_f - start}"
end
end
@ -84,8 +113,8 @@ namespace :import do
desc 'Import domain history'
task history_domains: :environment do
Domain.transaction do
Legacy::DomainHistory.uniq.pluck(:id).each do |legacy_domain_id|
parallel_import(Legacy::DomainHistory.uniq.pluck(:id)) do |legacy_domain_id|
Domain.transaction do
domain = Domain.find_by(legacy_id: legacy_domain_id)
version_domain = DomainVersion.where("object->>'legacy_id' = '#{legacy_domain_id}'").select(:item_id).first
domain ||= Domain.new(id: version_domain.item_id, legacy_id: legacy_domain_id) if version_domain