dsx
dsx

Reputation: 301

Unclear on error message in upload from pandas to Google BigQuery table

Situation

I'm trying to upload a pandas dataframe of Twitter API data to a table in BigQuery.

Here's my dataframe prep code from Google Colab notebook:

!pip install --upgrade google-cloud-language
!pip install pandas-gbq -U

from google.colab import files
uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(name=fn, length=len(uploaded[fn])))

import os

# Imports Credential File:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "pp-004a-d61bf3451d85.json"
print("Service Account Key: {}".format(os.environ["GOOGLE_APPLICATION_CREDENTIALS"]))

!pip install --upgrade tweepy

# VARIABLES

interval = "15"
start = '2022-04-07'
end = '2022-04-12'

# Tweepy
searchQ = '(max muncy) -is:retweet lang:en'
intval_tw = "{}T".format(interval)
start_tw = '{}T00:00:00Z'.format(start)
end_tw   = '{}T23:59:59Z'.format(end)

# index = pd.date_range('1/1/2000', periods=9, freq='T')
# D = calendar day frequency, H = hourly frequency, T, min = minutely frequency

# Library installs

import tweepy
# from twitter_authentication import bearer_token
import time
import pandas as pd
import requests
import json
import numpy as np

bearer_token = "BEARER_TOKEN"


client = tweepy.Client(bearer_token, wait_on_rate_limit=True)

# NEED TO ENSURE HAVE ALL PARAMETERS
gathered_tweets = []
for response in tweepy.Paginator(client.search_recent_tweets,
                                 query = searchQ,
                                 user_fields = ['name', 'description', 'username', 'profile_image_url', 'url', 'pinned_tweet_id', 'verified', 'created_at', 'location', 'public_metrics', 'entities'],
                                 tweet_fields = ['public_metrics', 'created_at','lang', 'attachments', 'context_annotations', 'conversation_id', 'entities', 'geo', 'in_reply_to_user_id', 'possibly_sensitive', 'referenced_tweets', 'reply_settings', 'source'],
                                 media_fields = ['duration_ms', 'media_key', 'preview_image_url', 'type', 'url', 'height', 'width', 'public_metrics'],
                                 expansions = ['author_id', 'attachments.media_keys', 'entities.mentions.username', 'geo.place_id', 'in_reply_to_user_id', 'referenced_tweets.id', 'referenced_tweets.id.author_id'],
                                 start_time = start_tw,
                                 end_time = end_tw,
                              max_results=100):
    time.sleep(1)
    gathered_tweets.append(response)



result = []
user_dict = {}
# Loop through each response object
for response in gathered_tweets:
  # Take all of the users, and put them into a dictionary of dictionaries with the info we want to keep
  for user in response.includes['users']:
      user_dict[user.id] = {'username': user.username,
                            'created_at': user.created_at,
                            'location': user.location,
                            'verified': user.verified,
                            'name': user.name,
                            'description': user.description,
                            'url': user.url,
                            'profile_image_url': user.profile_image_url,
                            'pinned_tweet': user.pinned_tweet_id,
                            'entities': user.entities,
                            'followers': user.public_metrics['followers_count'],
                            'total_tweets': user.public_metrics['tweet_count'],
                            'following': user.public_metrics['following_count'],
                            'listed': user.public_metrics['listed_count'],
                            'tweets': user.public_metrics['tweet_count']
                            }
  for tweet in response.data:
      # For each tweet, find the author's information
      author_info = user_dict[tweet.author_id]
      # Put all of the information we want to keep in a single dictionary for each tweet
      result.append({'author_id': tweet.author_id,
                   'username': author_info['username'],
                   'name': author_info['name'],
                   'author_followers': author_info['followers'],
                   'author_following': author_info['following'],
                   'author_tweets': author_info['tweets'],
                   'author_description': author_info['description'],
                   'author_url': author_info['url'],
                   'profile_image_url': author_info['profile_image_url'],
                   #'pinned_tweet': author_info['pinned_tweet_id'], https://developer.twitter.com/en/docs/twitter-api/tweets/lookup/api-reference/get-tweets
                   #'total_tweets': author_info['tweet_count'],
                   #'listed_count': author_info['listed_count'],
                   'entities': author_info['entities'],
                   'verified': author_info['verified'],
                   'account_created_at': author_info['created_at'],
                   'text': tweet.text,
                   'created_at': tweet.created_at,
                   'lang': tweet.lang,
                   'tweet_id': tweet.id,
                   'retweets': tweet.public_metrics['retweet_count'],
                   'replies': tweet.public_metrics['reply_count'],
                   'likes': tweet.public_metrics['like_count'],
                   'quotes': tweet.public_metrics['quote_count'],
                   'replied': tweet.in_reply_to_user_id,
                   'sensitive': tweet.possibly_sensitive,
                   'referenced_tweets': tweet.referenced_tweets,
                   'reply_settings': tweet.reply_settings,
                   'source': tweet.source
                   #'video_views': tweet.public_metrics['view_count']
                   })

dfTW00 = pd.DataFrame(result)

dfTW01 = dfTW00

# Create 'engagement' metric
dfTW01['engagement'] = dfTW01['retweets'] + dfTW01['replies'] + dfTW01['likes'] + dfTW01['quotes']

# Add 'tweets' column with value of 1
dfTW01['tweets'] = 1

# Engagement Rate calc
dfTW01['eng_rate'] = (dfTW01['tweets'] / dfTW01['engagement'])

# Add twitter link
dfTW01['base_url'] = 'https://twitter.com/twitter/status/'
# base_url = 'https://twitter.com/twitter/status/'
dfTW01['tweet_link'] = dfTW01['base_url'] + dfTW01['tweet_id'].astype(str)

# Imports the Google Cloud client library
from google.cloud import language_v1

# Instantiates a client
client = language_v1.LanguageServiceClient()


def get_sentiment(text):
    # The text to analyze
    document = language_v1.Document(
        content=text,
        type_=language_v1.types.Document.Type.PLAIN_TEXT
    )

    # Detects the sentiment of the text
    sentiment = client.analyze_sentiment(
        request={"document": document}
    ).document_sentiment

    return sentiment


dfTW01["sentiment"] = dfTW01["text"].apply(get_sentiment)

dfTW02 = dfTW01['sentiment'].astype(str).str.split(expand=True)
dfTW02

dfTW03 = pd.merge(dfTW01, dfTW02, left_index=True, right_index=True)

dfTW03.rename(columns = {1:'magnitude', 3:'score'}, inplace=True)

cols = ['magnitude', 'score']
dfTW03[cols] = dfTW03[cols].apply(pd.to_numeric, errors='coerce', axis=1)

def return_status(x):
    if x >= .5:
        return 'Positive'
    elif x <= -.5:
        return 'Negative'
    return 'Neutral'

dfTW03['sentiment2'] = dfTW03['score'].apply(return_status)

What I've tried

This is what I've used for the upload (I've confirmed the project, dataset and table info are correct):

df.to_gbq('004a01.004a-TW-01', 
                 'pp-004a',
                 chunksize=None, 
                 if_exists='append'
                 )

Results

However, that method is returning this error message:

TypeError: <' not supported between instances of 'int' and 'str'

Assessment

I've found several posts on SO addressing this, but I'm unable to relate them to my situation. (I thought various datatypes could be uploaded to a BigQuery table.)

Primarily, I'm not clear what the error message means by '<' not supported between instances of 'int' and 'str'.

Any input on what that would be greatly appreciated.

Below are the pandas dtypes in my dataframe if helpful.

Dataframe dtypes

Pandas dataframe dtypes:

author_id                           int64
username                           object
name                               object
author_followers                    int64
author_following                    int64
author_tweets                       int64
author_description                 object
author_url                         object
profile_image_url                  object
entities                           object
verified                             bool
account_created_at    datetime64[ns, UTC]
text                               object
created_at            datetime64[ns, UTC]
lang                               object
tweet_id                            int64
retweets                            int64
replies                             int64
likes                               int64
quotes                              int64
replied                           float64
sensitive                            bool
referenced_tweets                  object
reply_settings                     object
source                             object
engagement                          int64
tweets                              int64
eng_rate                          float64
base_url                           object
tweet_link                         object
sentiment                          object
0                                  object
magnitude                         float64
2                                  object
score                             float64
sentiment_rating                  float64
sentiment2                         object
dtype: object

Upvotes: 0

Views: 2601

Answers (1)

Scott B
Scott B

Reputation: 2964

Instead of to_gbq() function from Pandas, you may try and use load_table_from_dataframe() function from BigQuery library in loading your dataframe to BigQuery.

Please see the below sample python code using load_table_from_dataframe():

import datetime

from google.cloud import bigquery
import pandas
import pytz

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
table_id = "my-project.my-dataset.my-table"

records = [
    {
        "title": "The Meaning of Life",
        "release_year": 1983,
        "length_minutes": 112.5,
        "release_date": pytz.timezone("Europe/Paris")
        .localize(datetime.datetime(1983, 5, 9, 13, 0, 0))
        .astimezone(pytz.utc),
        # Assume UTC timezone when a datetime object contains no timezone.
        "dvd_release": datetime.datetime(2002, 1, 22, 7, 0, 0),
    },
    {
        "title": "Monty Python and the Holy Grail",
        "release_year": 1975,
        "length_minutes": 91.5,
        "release_date": pytz.timezone("Europe/London")
        .localize(datetime.datetime(1975, 4, 9, 23, 59, 2))
        .astimezone(pytz.utc),
        "dvd_release": datetime.datetime(2002, 7, 16, 9, 0, 0),
    },
    {
        "title": "Life of Brian",
        "release_year": 1979,
        "length_minutes": 94.25,
        "release_date": pytz.timezone("America/New_York")
        .localize(datetime.datetime(1979, 8, 17, 23, 59, 5))
        .astimezone(pytz.utc),
        "dvd_release": datetime.datetime(2008, 1, 14, 8, 0, 0),
    },
    {
        "title": "And Now for Something Completely Different",
        "release_year": 1971,
        "length_minutes": 88.0,
        "release_date": pytz.timezone("Europe/London")
        .localize(datetime.datetime(1971, 9, 28, 23, 59, 7))
        .astimezone(pytz.utc),
        "dvd_release": datetime.datetime(2003, 10, 22, 10, 0, 0),
    },
]
dataframe = pandas.DataFrame(
    records,
    # In the loaded table, the column order reflects the order of the
    # columns in the DataFrame.
    columns=[
        "title",
        "release_year",
        "length_minutes",
        "release_date",
        "dvd_release",
    ],
    # Optionally, set a named index, which can also be written to the
    # BigQuery table.
    index=pandas.Index(
        ["Q24980", "Q25043", "Q24953", "Q16403"], name="wikidata_id"
    ),
)
job_config = bigquery.LoadJobConfig(
    # Specify a (partial) schema. All columns are always written to the
    # table. The schema is used to assist in data type definitions.
    schema=[
        # Specify the type of columns whose type cannot be auto-detected. For
        # example the "title" column uses pandas dtype "object", so its
        # data type is ambiguous.
        bigquery.SchemaField("title", bigquery.enums.SqlTypeNames.STRING),
        # Indexes are written if included in the schema by name.
        bigquery.SchemaField("wikidata_id", bigquery.enums.SqlTypeNames.STRING),
    ],
    # Optionally, set the write disposition. BigQuery appends loaded rows
    # to an existing table by default, but with WRITE_TRUNCATE write
    # disposition it replaces the table with the loaded data.
    write_disposition="WRITE_TRUNCATE"
)

job = client.load_table_from_dataframe(
    dataframe, table_id, job_config=job_config
)  # Make an API request.
job.result()  # Wait for the job to complete.

table = client.get_table(table_id)  # Make an API request.
print(
    "Loaded {} rows and {} columns to {}".format(
        table.num_rows, len(table.schema), table_id
    )
)

Upvotes: 1

Related Questions