Reputation: 1092
Development from the previous thread found that the assumptions when asking the question were off-topic (subprocess was actually not causing the problems), so I'm making a more focused post.
My error message:
No handlers could be found for logger "google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager"
My intent:
Pass on Google PubSub message attributes as Python variables for re-use in later code.
My code:
import time
import logging
from google.cloud import pubsub_v1
project_id = "redacted"
subscription_name = "redacted"
def receive_messages_with_custom_attributes(project_id, subscription_name):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_sync_pull_custom_attributes]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
print('Received message: {}'.format(message.data))
if message.attributes:
#print('Attributes:')
for key in message.attributes:
value = message.attributes.get(key);
#commented out to not print to terminal
#which should not be necessary
#print('{}: {}'.format(key, value))
message.ack()
print("this is before variables")
dirpath = "~/subfolder1/"
print(dirpath)
namepath = message.data["name"]
print(namepath)
fullpath = dirpath + namepath
print(fullpath)
print("this is after variables")
subscriber.subscribe(subscription_path, callback=callback)
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
# [END pubsub_subscriber_sync_pull_custom_attributes]
receive_messages_with_custom_attributes(project_id, subscription_name)
My full console output from running the above code:
Listening for messages on projects/[redacted]
Received message: {
"kind": "storage#object",
"id": "[redacted]/0.testing/1548033442364022",
"selfLink": "https://www.googleapis.com/storage/v1/b/[redacted]/o/BSD%2F0.testing",
"name": "BSD/0.testing",
"bucket": "[redacted]",
"generation": "1548033442364022",
"metageneration": "1",
"contentType": "application/octet-stream",
"timeCreated": "2019-01-21T01:17:22.363Z",
"updated": "2019-01-21T01:17:22.363Z",
"storageClass": "MULTI_REGIONAL",
"timeStorageClassUpdated": "2019-01-21T01:17:22.363Z",
"size": "0",
"md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
"mediaLink": "https://www.googleapis.com/download/storage/v1/b/[redacted]/o/BSD%2F0.testing?generation=1548033442364022&alt=media",
"crc32c": "AAAAAA==",
"etag": "CPb0uvvZ/d8CEAE="
}
this is before variables
/home/[redacted]
No handlers could be found for logger "google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager"
As you can see, the first string and string-defined-as-variable were printed, but the code breaks on attempting to define variables from the just-generated dictionary, and no further print()
s were executed.
Potentially related thread, that user was publishing with cron jobs, and found a fix from crontab envpaths, but my situation is receiving and not using any cron jobs, but might hint at another layer behind/within python?
Can anyone please help me with adding a handler to make this code run as intended?
Upvotes: 2
Views: 1095
Reputation: 1092
As Nahuel and tripleee explained, the problem is with the messages being BYTES instead of strings. However, their code didn't exactly work, and still threw out errors, and I have no idea why. By cross-referencing with Google's sample code for the pubsub appengine website, and a few more hours of trial and error, I have found the following code to be working. Might be inelegant and/or have bad practices, in that case please edit it and make it more robust.
#Continues from after message.ack(), above code remains unchanged
#except needing to <import json>
#this makes a message.data a true python dict with strings.
payload = json.loads(message.data.decode('utf-8'))
#this finds the value of the dict with key "name"
namepath = payload["name"]
#this is just a static string to pre-pend to the file path
dirpath = "/home/[redacted]/"
#combine them into a single functioning path
fullpath = dirpath + namepath
#currently type 'unicode', so convert them to type 'str'
fullpath = fullpath.encode("utf-8")
And at the end we will have a fullpath that is purely type 'str' to be used by later functions/commands.
Upvotes: 0
Reputation: 1040
First, if I understand correctly by what you are showing in your output, you are using a Pub/Sub notification to send a message whenever you make changes to a Cloud Storage object. This information could be helpful.
Now, message.data["name"]
is not going to work because message.data is a BYTES object. Thus, can't be indexed as a dict.
To treat it as a dict, you first have to decode it as base64 (import base64
). After that, what you are left is a string which looks like JSON format. You then use json.load()
(don't forget to import json
) to transform this string into a dict. Now you can index the message.
The code for this will be:
print("This is before variables")
dirpath = "/subfolder1/"
print(dirpath)
#Transform the bytes object into a string by decoding it
namepath = base64.b64decode(message.data).decode('utf-8')
#Transform the json formated string into a dict
namepath = json.loads(namepath)
print(namepath["name"])
fullpath = dirpath + namepath["name"]
print(fullpath)
print("this is after variables")
Now, if your intent is to read the attributes only, they are properly defined at the top like:
if message.attributes:
print('Attributes:')
for key in message.attributes:
value = message.attributes.get(key)
print('{}: {}'.format(key, value))
So, you could use:
print("this is before variables")
dirpath = "~/subfolder1/"
print(dirpath)
namepath = message.attributes["objectId"]
print(namepath)
fullpath = dirpath + namepath
print(fullpath)
print("this is after variables")
Keep in mind that for this particular case, "objectId"
is the name of the file because it's the attribute that the notification from Pub/Sub for Cloud Storage uses. If you pretend to send custom messages, change "objectId"
to your desired attribute name.
Upvotes: 1