diff --git a/Gemfile b/Gemfile index 10c680786..efa8b1f81 100644 --- a/Gemfile +++ b/Gemfile @@ -12,6 +12,7 @@ gem 'rails', '4.2.4' # when update, all initializers eis_custom files nee gem 'iso8601', '0.8.6' # for dates and times gem 'hashie-forbidden_attributes', '0.1.1' gem 'SyslogLogger', '2.0', require: 'syslog/logger' +gem 'parallel' # load env gem 'figaro', '1.1.1' diff --git a/Gemfile.lock b/Gemfile.lock index b92d6dc1b..4b7c74561 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -330,6 +330,7 @@ GEM nprogress-rails (0.1.6.7) open4 (1.3.4) orm_adapter (0.5.0) + parallel (1.6.1) parser (2.2.2.6) ast (>= 1.1, < 3.0) pdfkit (0.6.2) @@ -603,6 +604,7 @@ DEPENDENCIES nokogiri (= 1.6.6.2) nprogress-rails (= 0.1.6.7) paper_trail! + parallel pdfkit (= 0.6.2) pg (= 0.18.2) phantomjs (= 1.9.8.0) diff --git a/lib/tasks/import_history.rake b/lib/tasks/import_history.rake index 0f41d0a5c..5b6e80230 100644 --- a/lib/tasks/import_history.rake +++ b/lib/tasks/import_history.rake @@ -5,10 +5,39 @@ namespace :import do Rake::Task['import:history_domains'].invoke end + def parallel_import all_ids + thread_pool = (Parallel.processor_count rescue 4) - 1 + threads = [] + + all_ids.each_with_index do |one_id, i| + process = Process.fork do + begin + yield(one_id, i) + rescue => e + Rails.logger.error("[EXCEPTION] #{Process.pid}") + Rails.logger.error("#{Process.pid} #{e.message}" ) + Rails.logger.error("#{Process.pid} #{e.backtrace.join("\n")}") + ensure + ActiveRecord::Base.remove_connection + Process.exit! + end + end + + threads << process + if threads.count >= thread_pool + threads.delete(Process.wait(0)) + end + end + + Process.waitall + end + desc 'Import contact history' task history_contacts: :environment do - Legacy::ContactHistory.uniq.pluck(:id).each do |legacy_contact_id| + old_ids = Legacy::ContactHistory.uniq.pluck(:id) + old_size = old_ids.size + parallel_import(old_ids) do |legacy_contact_id, process_idx| start = Time.now.to_f Contact.transaction do data = [] @@ -76,7 +105,7 @@ namespace :import do end ContactVersion.import_without_validations_or_callbacks data.first.keys, data.map(&:values) if data.any? end - puts "Legacy Contact id #{legacy_contact_id} finished in #{Time.now.to_f - start}" + puts "[PID: #{Process.pid}] Legacy Contact #{legacy_contact_id} (#{process_idx}/#{old_size}) finished in #{Time.now.to_f - start}" end end @@ -84,8 +113,8 @@ namespace :import do desc 'Import domain history' task history_domains: :environment do - Domain.transaction do - Legacy::DomainHistory.uniq.pluck(:id).each do |legacy_domain_id| + parallel_import(Legacy::DomainHistory.uniq.pluck(:id)) do |legacy_domain_id| + Domain.transaction do domain = Domain.find_by(legacy_id: legacy_domain_id) version_domain = DomainVersion.where("object->>'legacy_id' = '#{legacy_domain_id}'").select(:item_id).first domain ||= Domain.new(id: version_domain.item_id, legacy_id: legacy_domain_id) if version_domain