Reputation: 3520
I recently disabled the legacy Cloud Messaging API and switched to the new Firebase Cloud Messaging API (V1) as shown in the screenshot below:
Previously, I used the following code to subscribe a user to a topic and then send messages to the topic, and its work well:
...
@classmethod
def subscribe_to_topic(cls, device_id, topics: list):
success = []
for topic in topics:
try:
url = f'https://iid.googleapis.com/iid/v1/{device_id}/rel/topics/{topic}'
headers = _get_header()
result = requests.post(url, headers=headers)
success.append(True) if result.status_code == 200 else success.append(False)
logger.info(f'subscribe user <{device_id}> in topic <{topic}> with status code: {result.status_code}')
except Exception as e:
logger.error(f'{e}')
return all(success)
After switching to the new API, I tried to update my code as follows, but it didn't work:
...
@classmethod
def subscribe_to_topic(cls, device_id, topics: list):
"""Subscribes a device to a topic using FCM v1 API."""
payload = {"registrationTokens": [device_id]}
success = []
for topic in topics:
try:
url = (f"https://fcm.googleapis.com/v1/"
f"projects/{environ.get('FCM_PROJECT_NAME')}/topics/{topic}:subscribe")
headers = _get_header()
result = requests.post(url, headers=headers, json=payload)
success.append(True) if result.status_code == 200 else success.append(False)
logger.info(f'subscribe user <{device_id}> in topic <{topic}> with status code: {result.status_code}')
except Exception as e:
logger.error(f'{e}')
return all(success)
I have read the Firebase documentation, but I'm still unsure how to properly migrate my topic subscription and message sending functionality to the new API.
Could someone provide guidance or an example on how to achieve this with the new Firebase Cloud Messaging API (V1)?
Any help or examples would be greatly appreciated!
Upvotes: 1
Views: 551
Reputation: 120
Please refer this.
Recently , i also migrated from legacy to v1 , i have used admin sdk to sub /unsub topic. Here's the link for installing admin sdk.
Upvotes: 1
Reputation: 3520
Here's how I resolved it (step-by-step) using the Firebase Admin SDK.
Install Firebase Admin SDK:
pip install firebase-admin
Authentication using Service Account:
Ensure you have a service account JSON file for authentication. Set the GOOGLE_APPLICATION_CREDENTIALS
environment variable to point to this file(Add it to the .env file (environment variables)):
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/serviceAccountKey.json" # got this from your firebase console project.
Update code to use Firebase Admin SDK:
Instead of manually crafting HTTP requests, use the Firebase Admin SDK to handle topic subscriptions and message sending.
Here's the updated code:
import logging
from os import environ
from firebase_admin import messaging, initialize_app, credentials
logger = logging.getLogger('firebase')
class Firebase:
_firebase_app = None
@classmethod
def _get_app(cls):
"""Initializes the Firebase app instance."""
if not cls._firebase_app:
cred = credentials.Certificate(environ.get("GOOGLE_APPLICATION_CREDENTIALS"))
cls._firebase_app = initialize_app(cred)
return cls._firebase_app
@classmethod
def subscribe_to_topic(cls, device_id, topics: list):
"""Subscribes a device to a topic using FCM v1 API."""
_ = cls._get_app()
for topic in topics:
response = messaging.subscribe_to_topic(device_id, topic)
if response.failure_count > 0: # mean an error occured
logger.info("Error") # log its reason
@classmethod
def unsubscribe_to_topic(cls, device_id, topics: list):
"""Unsubscribes a device from a topic."""
for topic in topics:
response = messaging.unsubscribe_from_topic(device_id, topic)
@classmethod
def send_to_topic(cls, topic, message, priority='normal'):
"""Sending message to a topic by topic name."""
try:
_ = cls._get_app()
message = messaging.Message(
data=message,
topic=topic,
)
response = messaging.send(message)
return response
except Exception as e:
logger.error(f"Error sending message to topic '{topic}': {e}")
raise # Re-raise the exception for handling
Here is an example of how to use Firebase
class:
device_id = "your_device_registration_token"
topics = ["news", "updates", "sports"]
result = Firebase.subscribe_to_topic(device_id, topics) # First of all subscribe device_id to the topics...
topic = "news" # Our desired topic
message = {
"title": "Breaking News",
"body": "Stay informed with the latest updates."
}
response = Firebase.send_to_topic(topic, message) # Send message to the news topic
print(f"Message sent. Response: {response}")
Upvotes: 0