Reputation: 320
I'm using GCP Pubsub in Rails application. Environment:
Rails: v5.1.4
Ruby: v2.4.2
google-cloud-pubsub: v0.29.0
MacOS: 10.12.6 Sierra
this is code:
require 'json'
require 'google/cloud/pubsub'
module ActiveJob
module QueueAdapters
class PubSubQueueAdapter
def enqueue(job)
Rails.logger.info "[PubSubQueueAdapter enqueue job #{job.inspect}"
topic = PubSubQueueAdapter.pubsub.topic(job.queue_name)
topic.publish(job.class.name, arg: job.arguments)
end
class << self
def pubsub
@pubsub ||= begin
project_id = Rails.application.config.x.settings['project_id']
Google::Cloud::Pubsub.new(
project_id: project_id,
credentials: "#{Rails.root.join('config')}/#{Rails.application.config.x.settings['auth_file']}"
)
end
end
def run_worker!(queue_name = 'default')
p 'Running worker'
topic = pubsub.topic(queue_name)
subscription = topic.subscription("#{queue_name}_task") || topic.subscribe("#{queue_name}_task")
listener = subscription.listen do |message|
message.acknowledge!
end
listener.start
end
end
end
end
end
And I exec a rake command:
$ bundle exec rake run_worker
"Running worker"
E0121 21:39:49.713197000 140736594088896 backup_poller.cc:105] run_poller: {"created":"@1516538389.713182000","description":"Shutting down timer system","file":"src/core/lib/iomgr/timer_generic.cc","file_line":630}
What is this error message??
I wanna use streaming subscriber.
I thought It can solve to use SubscriberClient
Google Cloud Pubsub document.
But def streaming_pull
did not work. What to say of that, I could not use Google::Cloud::Pubsub::V1::Subscriber.new
.
What should I do for using streaming subscriber in rails?
Upvotes: 0
Views: 423
Reputation: 178
I believe because listener
is a local variable, and you do not block, you are destructing the listener
and shutting down the subscriber as soon as you exit the block. StreamingPull is supported in Ruby, and I believe you are using it, you are just exiting the function before you have connected to Pub/Sub. The error message you are seeing I believe is unrelated to Pub/Sub, and is from a lower level of the gRPC code.
Upvotes: 1