Class: ExportPoolXpToTractionJob
- Inherits:
-
Struct
- Object
- Struct
- ExportPoolXpToTractionJob
- Includes:
- CompoundSampleHelper
- Defined in:
- app/jobs/export_pool_xp_to_traction_job.rb
Overview
Combine the samples from a Pool XP tube into a compound sample, generate a bioscan-pool-xp-tube-to-traction message and submit it to RabbitMQ so that it can be forwarded to Traction by the message processor.
Instance Attribute Summary collapse
-
#barcode ⇒ Object
Returns the value of attribute barcode.
Instance Method Summary collapse
- #add_tls_params(connection_params) ⇒ Object
- #avro_encode_message(message, schema) ⇒ Object
- #connection_params ⇒ Object
- #fetch_response(uri_str, limit = 10) ⇒ Object
- #get_message_data(barcode) ⇒ Object
- #get_message_schema(subject, version) ⇒ Object
- #perform ⇒ Object
- #send_message(message, subject, version) ⇒ Object
Methods included from CompoundSampleHelper
#find_or_create_compound_sample
Instance Attribute Details
#barcode ⇒ Object
Returns the value of attribute barcode
5 6 7 |
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 5 def @barcode end |
Instance Method Details
#add_tls_params(connection_params) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 122 def add_tls_params(connection_params) connection_params[:tls] = true begin connection_params[:tls_ca_certificates] = [configatron.amqp.broker.ca_certificate!] rescue Configatron::UndefinedKeyError # Should not be the case in production! connection_params[:verify_peer] = false end connection_params end |
#avro_encode_message(message, schema) ⇒ Object
84 85 86 87 88 89 90 91 92 93 |
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 84 def (, schema) schema = Avro::Schema.parse(schema) stream = StringIO.new writer = Avro::IO::DatumWriter.new(schema) encoder = Avro::IO::BinaryEncoder.new(stream) encoder.write("\xC3\x01") # Avro single-object container file header encoder.write([schema.crc_64_avro_fingerprint].pack('Q')) # 8 byte schema fingerprint writer.write(, encoder) stream.string end |
#connection_params ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 109 def connection_params rabbit_config = configatron.amqp.broker connection_params = { host: rabbit_config.host, username: rabbit_config.username, password: rabbit_config.password, vhost: rabbit_config.vhost } rabbit_config.tls ? add_tls_params(connection_params) : connection_params end |
#fetch_response(uri_str, limit = 10) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 24 def fetch_response(uri_str, limit = 10) raise IOError, 'Too many HTTP redirects' if limit.zero? response = Net::HTTP.get_response(URI.parse(uri_str)) case response when Net::HTTPSuccess response when Net::HTTPRedirection fetch_response(response['location'], limit - 1) else response.error! end end |
#get_message_data(barcode) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 39 def () tube = Tube.() project = tube.projects&.first study = tube.studies&.first sample = find_or_create_compound_sample(study, tube.samples) { messageUuid: UUIDTools::UUID..to_s, messageCreateDateUtc: Time.now.utc, tubeBarcode: tube., library: { volume: 100.0, concentration: 0.0, boxBarcode: 'Unspecified' }, request: { costCode: project&.project_cost_code, libraryType: 'Pacbio_Amplicon', studyUuid: study&.uuid }, sample: { sampleName: sample.name, sampleUuid: sample.uuid, speciesName: 'Unidentified' } } end |
#get_message_schema(subject, version) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 67 def (subject, version) # Prefer to use the cached schema if it exists. cache_file_path = "data/avro_schema_cache/#{subject}_v#{version}.avsc" if File.exist?(cache_file_path) Rails.logger.debug { "Using cached schema for #{subject} v#{version}" } return File.read(cache_file_path) end # Default to fetching the schema from the registry and caching it. Rails.logger.debug { "Fetching and caching schema for #{subject} v#{version}" } response = fetch_response("#{configatron.amqp.schemas.registry_url}#{subject}/versions/#{version}") resp_json = JSON.parse(response.body) schema_str = resp_json['schema'] File.write(cache_file_path, schema_str) schema_str end |
#perform ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 9 def perform = () subject_obj = configatron.amqp.schemas.subjects[:export_pool_xp_to_traction] subject = subject_obj[:subject] version = subject_obj[:version] schema = (subject, version) = (, schema) (, subject, version) rescue StandardError => e Rails.logger.error("Error exporting Pool XP tube to Traction: <#{e.}>") raise end |
#send_message(message, subject, version) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 95 def (, subject, version) conn = Bunny.new(connection_params) conn.start begin channel = conn.create_channel exchange = channel.headers(configatron.amqp.broker.exchange, passive: true) headers = { subject: subject, version: version, encoder_type: 'binary' } exchange.publish(, headers: headers, persistent: true) ensure conn.close end end |