Class: ExportPoolXpToTractionJob

Inherits:
Struct
  • Object
show all
Includes:
CompoundSampleHelper
Defined in:
app/jobs/export_pool_xp_to_traction_job.rb

Overview

Combine the samples from a Pool XP tube into a compound sample, generate a bioscan-pool-xp-tube-to-traction message and submit it to RabbitMQ so that it can be forwarded to Traction by the message processor.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from CompoundSampleHelper

#find_or_create_compound_sample

Instance Attribute Details

#barcodeObject

Returns the value of attribute barcode

Returns:

  • (Object)

    the current value of barcode



5
6
7
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 5

def barcode
  @barcode
end

Instance Method Details

#add_tls_params(connection_params) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 122

def add_tls_params(connection_params)
  connection_params[:tls] = true

  begin
    connection_params[:tls_ca_certificates] = [configatron.amqp.broker.ca_certificate!]
  rescue Configatron::UndefinedKeyError
    # Should not be the case in production!
    connection_params[:verify_peer] = false
  end

  connection_params
end

#avro_encode_message(message, schema) ⇒ Object



84
85
86
87
88
89
90
91
92
93
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 84

def avro_encode_message(message, schema)
  schema = Avro::Schema.parse(schema)
  stream = StringIO.new
  writer = Avro::IO::DatumWriter.new(schema)
  encoder = Avro::IO::BinaryEncoder.new(stream)
  encoder.write("\xC3\x01") # Avro single-object container file header
  encoder.write([schema.crc_64_avro_fingerprint].pack('Q')) # 8 byte schema fingerprint
  writer.write(message, encoder)
  stream.string
end

#connection_paramsObject



109
110
111
112
113
114
115
116
117
118
119
120
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 109

def connection_params
  rabbit_config = configatron.amqp.broker

  connection_params = {
    host: rabbit_config.host,
    username: rabbit_config.username,
    password: rabbit_config.password,
    vhost: rabbit_config.vhost
  }

  rabbit_config.tls ? add_tls_params(connection_params) : connection_params
end

#fetch_response(uri_str, limit = 10) ⇒ Object

Raises:

  • (IOError)


24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 24

def fetch_response(uri_str, limit = 10)
  raise IOError, 'Too many HTTP redirects' if limit.zero?

  response = Net::HTTP.get_response(URI.parse(uri_str))

  case response
  when Net::HTTPSuccess
    response
  when Net::HTTPRedirection
    fetch_response(response['location'], limit - 1)
  else
    response.error!
  end
end

#get_message_data(barcode) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 39

def get_message_data(barcode)
  tube = Tube.find_by_barcode(barcode)
  project = tube.projects&.first
  study = tube.studies&.first
  sample = find_or_create_compound_sample(study, tube.samples)

  {
    messageUuid: UUIDTools::UUID.timestamp_create.to_s,
    messageCreateDateUtc: Time.now.utc,
    tubeBarcode: tube.human_barcode,
    library: {
      volume: 100.0,
      concentration: 0.0,
      boxBarcode: 'Unspecified'
    },
    request: {
      costCode: project&.project_cost_code,
      libraryType: 'Pacbio_Amplicon',
      studyUuid: study&.uuid
    },
    sample: {
      sampleName: sample.name,
      sampleUuid: sample.uuid,
      speciesName: 'Unidentified'
    }
  }
end

#get_message_schema(subject, version) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 67

def get_message_schema(subject, version)
  # Prefer to use the cached schema if it exists.
  cache_file_path = "data/avro_schema_cache/#{subject}_v#{version}.avsc"
  if File.exist?(cache_file_path)
    Rails.logger.debug { "Using cached schema for #{subject} v#{version}" }
    return File.read(cache_file_path)
  end

  # Default to fetching the schema from the registry and caching it.
  Rails.logger.debug { "Fetching and caching schema for #{subject} v#{version}" }
  response = fetch_response("#{configatron.amqp.schemas.registry_url}#{subject}/versions/#{version}")
  resp_json = JSON.parse(response.body)
  schema_str = resp_json['schema']
  File.write(cache_file_path, schema_str)
  schema_str
end

#performObject



9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 9

def perform
  message_data = get_message_data(barcode)

  subject_obj = configatron.amqp.schemas.subjects[:export_pool_xp_to_traction]
  subject = subject_obj[:subject]
  version = subject_obj[:version]

  schema = get_message_schema(subject, version)
  encoded_message = avro_encode_message(message_data, schema)
  send_message(encoded_message, subject, version)
rescue StandardError => e
  Rails.logger.error("Error exporting Pool XP tube to Traction: <#{e.message}>")
  raise
end

#send_message(message, subject, version) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'app/jobs/export_pool_xp_to_traction_job.rb', line 95

def send_message(message, subject, version)
  conn = Bunny.new(connection_params)
  conn.start

  begin
    channel = conn.create_channel
    exchange = channel.headers(configatron.amqp.broker.exchange, passive: true)
    headers = { subject: subject, version: version, encoder_type: 'binary' }
    exchange.publish(message, headers: headers, persistent: true)
  ensure
    conn.close
  end
end