mirror of
https://github.com/internetee/registry.git
synced 2025-05-17 17:59:47 +02:00
246 lines
9.8 KiB
Ruby
246 lines
9.8 KiB
Ruby
namespace :import do
|
|
desc 'Import all history'
|
|
task history_all: :environment do
|
|
Rake::Task['import:history_contacts'].invoke
|
|
Rake::Task['import:history_domains'].invoke
|
|
end
|
|
|
|
def parallel_import all_ids
|
|
thread_pool = (Parallel.processor_count rescue 4)
|
|
threads = []
|
|
|
|
all_ids.each_with_index do |one_id, i|
|
|
process = Process.fork do
|
|
begin
|
|
yield(one_id, i)
|
|
rescue => e
|
|
Rails.logger.error("[EXCEPTION] #{Process.pid}")
|
|
Rails.logger.error("#{Process.pid} #{e.message}" )
|
|
Rails.logger.error("#{Process.pid} #{e.backtrace.join("\n")}")
|
|
ensure
|
|
ActiveRecord::Base.remove_connection
|
|
Process.exit!
|
|
end
|
|
end
|
|
|
|
threads << process
|
|
if threads.count >= thread_pool
|
|
threads.delete(Process.wait(0))
|
|
end
|
|
end
|
|
|
|
Process.waitall
|
|
end
|
|
|
|
|
|
desc 'Import contact history'
|
|
task history_contacts: :environment do
|
|
throw 'no config set ENV[legacy_legal_documents_dir]' unless ENV['legacy_legal_documents_dir']
|
|
|
|
old_ids = Legacy::ContactHistory
|
|
old_ids = old_ids.where(id: ENV['ids'].split(",")) if ENV['ids']
|
|
old_ids = old_ids.uniq.pluck(:id)
|
|
old_size = old_ids.size
|
|
|
|
parallel_import(old_ids) do |legacy_contact_id, process_idx|
|
|
start = Time.now.to_f
|
|
Contact.transaction do
|
|
data = []
|
|
contact = Contact.find_by(legacy_id: legacy_contact_id)
|
|
version_contact = ContactVersion.where("object->>'legacy_id' = '#{legacy_contact_id}'").select(:item_id).first
|
|
contact ||= Contact.new(id: version_contact.item_id, legacy_id: legacy_contact_id) if version_contact
|
|
contact ||= Contact.new(id: ::Contact.next_id, legacy_id: legacy_contact_id)
|
|
next if contact.versions.where(event: :create).any?
|
|
# add here to skip domains whith create history
|
|
|
|
# 1. add domain changes
|
|
# 2. add states
|
|
# compose hash of change time -> Object changes
|
|
last_changes = nil
|
|
history = Legacy::ObjectState.changes_dates_for(legacy_contact_id)
|
|
con_his = Legacy::ContactHistory.changes_dates_for(legacy_contact_id)
|
|
last_contact_action = con_his.sort.last[1].last # need to identify if we delete
|
|
|
|
# merging changes together
|
|
con_his.each do |time, klasses|
|
|
if history.has_key?(time)
|
|
history[time] = history[time] | klasses
|
|
else
|
|
history[time] = klasses
|
|
end
|
|
end
|
|
|
|
keys = history.keys.compact.sort
|
|
i = 0
|
|
keys.each_with_index do |time|
|
|
history[time].each do |orig_history_klass|
|
|
changes = {}
|
|
responder = orig_history_klass[:klass].get_record_at(legacy_contact_id, orig_history_klass[:id])
|
|
new_attrs = responder.get_current_contact_object(time, orig_history_klass[:param])
|
|
new_attrs[:id] = contact.id
|
|
|
|
event = :update
|
|
event = :create if i == 0
|
|
if orig_history_klass == last_contact_action && responder.valid_to.present?
|
|
event = :destroy
|
|
new_attrs = {}
|
|
end
|
|
|
|
new_attrs.each do |k, v|
|
|
if (old_val = last_changes.to_h[k]) != v then changes[k] = [old_val, v] end
|
|
end
|
|
next if changes.blank? && event != :destroy
|
|
obj_his = Legacy::ObjectHistory.find_by(historyid: responder.historyid)
|
|
user = Legacy::Domain.new_api_user_cached(obj_his.upid || obj_his.clid)
|
|
|
|
|
|
files = Legacy::File.for_history(responder.historyid).map do |leg_file|
|
|
file_dir = leg_file.path.sub(/\/[0-9]+\z/, '')
|
|
path = "#{ENV['legal_documents_dir']}/#{leg_file.path}_#{leg_file.name}"
|
|
old_path = "#{ENV['legacy_legal_documents_dir']}/#{leg_file.path}"
|
|
unless File.exists?(old_path)
|
|
Rails.logger.error("No such file or directory (#{old_path} -> #{path}) (old contact id #{legacy_contact_id}")
|
|
next
|
|
end
|
|
|
|
FileUtils.mkdir_p("#{ENV['legal_documents_dir']}/#{file_dir}", mode: 0775)
|
|
FileUtils.mv(old_path, path)
|
|
LegalDocument.create!(documentable_type: ::Contact.to_s,
|
|
documentable_id: contact.id,
|
|
document_type: leg_file.name.to_s.split(".").last,
|
|
path: path,
|
|
created_at: leg_file.crdate)
|
|
end
|
|
|
|
hash = {
|
|
item_type: Contact.to_s,
|
|
item_id: contact.id,
|
|
event: event,
|
|
whodunnit: user.try(:id),
|
|
object: last_changes,
|
|
object_changes: changes,
|
|
created_at: time,
|
|
children: {legacy_documents: files.compact.map(&:id)}
|
|
}
|
|
data << hash
|
|
|
|
last_changes = new_attrs
|
|
i += 1
|
|
end
|
|
end
|
|
ContactVersion.import_without_validations_or_callbacks data.first.keys, data.map(&:values) if data.any?
|
|
end
|
|
puts "[PID: #{Process.pid}] Legacy Contact #{legacy_contact_id} (#{process_idx}/#{old_size}) finished in #{Time.now.to_f - start}"
|
|
end
|
|
end
|
|
|
|
|
|
|
|
desc 'Import domain history'
|
|
task history_domains: :environment do
|
|
old_ids = Legacy::DomainHistory
|
|
old_ids = old_ids.where(id: ENV['ids'].split(",")) if ENV['ids']
|
|
old_ids = old_ids.uniq.pluck(:id)
|
|
old_size = old_ids.size
|
|
|
|
|
|
parallel_import(old_ids) do |legacy_domain_id, process_idx|
|
|
start = Time.now.to_f
|
|
Domain.transaction do
|
|
data = []
|
|
domain = Domain.find_by(legacy_id: legacy_domain_id)
|
|
version_domain = DomainVersion.where("object->>'legacy_id' = '#{legacy_domain_id}'").select(:item_id).first
|
|
domain ||= Domain.new(id: version_domain.item_id, legacy_id: legacy_domain_id) if version_domain
|
|
domain ||= Domain.new(id: ::Domain.next_id, legacy_id: legacy_domain_id)
|
|
next if domain.versions.where(event: :create).any?
|
|
# add here to skip domains whith create history
|
|
|
|
# 1. add domain changes
|
|
# 2. add states
|
|
# compose hash of change time -> Object changes
|
|
last_changes = nil
|
|
history = Legacy::ObjectState.changes_dates_for(legacy_domain_id)
|
|
dom_his = Legacy::DomainHistory.changes_dates_for(legacy_domain_id)
|
|
last_domain_action = dom_his.sort.last[1].last # need to identify if we delete
|
|
|
|
# merging changes together
|
|
dom_his.each do |time, klasses|
|
|
if history.has_key?(time)
|
|
history[time] = history[time] | klasses
|
|
else
|
|
history[time] = klasses
|
|
end
|
|
end
|
|
|
|
keys = history.keys.compact.sort
|
|
i = 0
|
|
keys.each_with_index do |time|
|
|
history[time].each do |orig_history_klass|
|
|
changes = {}
|
|
responder = orig_history_klass[:klass].get_record_at(legacy_domain_id, orig_history_klass[:id])
|
|
new_attrs = responder.get_current_domain_object(time, orig_history_klass[:param])
|
|
new_attrs[:id] = domain.id
|
|
new_attrs[:updated_at] = time
|
|
|
|
event = :update
|
|
event = :create if i == 0
|
|
if orig_history_klass == last_domain_action && responder.valid_to.present?
|
|
event = :destroy
|
|
new_attrs = {}
|
|
end
|
|
|
|
new_attrs.each do |k, v|
|
|
if (old_val = last_changes.to_h[k]) != v then changes[k] = [old_val, v] end
|
|
end
|
|
next if changes.blank? && event != :destroy
|
|
|
|
files = Legacy::File.for_history(responder.history_domain.all_history_ids).map do |leg_file|
|
|
file_dir = leg_file.path.sub(/\/[0-9]+\z/, '')
|
|
path = "#{ENV['legal_documents_dir']}/#{leg_file.path}_#{leg_file.name}"
|
|
old_path = "#{ENV['legacy_legal_documents_dir']}/#{leg_file.path}"
|
|
unless File.exists?(old_path)
|
|
Rails.logger.error("No such file or directory (#{old_path} -> #{path}) (old domain id #{legacy_domain_id}")
|
|
next
|
|
end
|
|
|
|
FileUtils.mkdir_p("#{ENV['legal_documents_dir']}/#{file_dir}", mode: 0775)
|
|
FileUtils.mv(old_path, path)
|
|
LegalDocument.create!(documentable_type: domain.class,
|
|
documentable_id: domain.id,
|
|
document_type: leg_file.name.to_s.split(".").last,
|
|
path: path,
|
|
created_at: leg_file.crdate)
|
|
end
|
|
|
|
hash = {
|
|
item_type: domain.class,
|
|
item_id: domain.id,
|
|
event: event,
|
|
whodunnit: responder.history_domain.user.try(:id),
|
|
object: last_changes,
|
|
object_changes: changes,
|
|
created_at: time,
|
|
children: {
|
|
admin_contacts: responder.history_domain.get_admin_contact_new_ids,
|
|
tech_contacts: responder.history_domain.get_tech_contact_new_ids,
|
|
nameservers: responder.history_domain.import_nameservers_history(domain, time),
|
|
dnskeys: responder.history_domain.import_dnskeys_history(domain, time),
|
|
registrant: [responder.history_domain.new_registrant_id],
|
|
legacy_documents: files.compact.map(&:id)
|
|
}
|
|
}
|
|
data << hash
|
|
|
|
last_changes = new_attrs
|
|
i += 1
|
|
end
|
|
end
|
|
DomainVersion.import_without_validations_or_callbacks data.first.keys, data.map(&:values) if data.any?
|
|
end
|
|
puts "[PID: #{Process.pid}] Legacy Domain #{legacy_domain_id} (#{process_idx}/#{old_size}) finished in #{Time.now.to_f - start}"
|
|
end
|
|
end
|
|
|
|
|
|
|
|
end
|