【问题标题】:Problem trying to stream geotagged tweets into PostgreSQL using Python Jupyter Notebook尝试使用 Python Jupyter Notebook 将带有地理标记的推文流式传输到 PostgreSQL 时出现问题
【发布时间】:2023-04-07 21:51:01
【问题描述】:

我正在尝试使用 Jupyter Notebook 中编写的 Python 代码在具有 PostGIS 扩展名的 PostgreSQL 数据库中流式传输推文,但没有成功。我使用了很多教程作为参考,在第一次尝试中,代码似乎可以工作并且没有错误。我什至打印了我已连接到 Twitter API 的消息。然而,没有推文被上传到 PostgreSQL 数据库中。我认为问题可能出在过滤器上(因为也许我正在使用暂时没有推文的过滤器),但经过一些运行删除过滤器或使用其他过滤器后,我发现这不是问题。我认为与 PostgreSQL 的连接也不是问题,因为我尝试将推文直接打印到 Jupyter Notebook 并且没有错误也没有错误。

在根据指南进行一些更改并检查 PostgreSQL 表的格式后,我看到代码连接到 Twitter API,但我一直收到此消息:'str' object is not callable

PostgreSQL 表是使用以下代码创建的,目的是将推文的坐标与点几何一起存储:

CREATE TABLE tweets (tweet_id VARCHAR PRIMARY KEY, user_id VARCHAR, username TEXT, tweet TEXT, hashtags TEXT, lang TEXT, created_at TIMESTAMP, coordinates GEOMETRY);

使用的Python代码是下一个:

#!/usr/bin/env python
# coding: utf-8

#Import libraries
import tweepy
import pandas as pd
import json
import psycopg2
import time
from html.parser import HTMLParser

#Insert Twitter keys
ckey = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
csecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
atoken = "xxxxxxxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
asecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

#Authorize the Twitter API
auth = tweepy.OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)

#Call the Twitter API
api = tweepy.API(auth)

#Define Listener block
class MyStreamListener(tweepy.StreamListener):
   
   def __init__(self, time_limit=300):
       self.start_time = time.time()
       self.limit = time_limit
       super(MyStreamListener, self).__init__()
   
   def on_connect(self):
       print("Connected to Twitter API.")
       
   def on_status(self, status):
       print(status.text)
       
   def on_data(self, raw_data):
       try:
           datos = json.loads(raw_data)
           #Filter only tweets with coordinates
           if datos["coordinates"] != None:
               #Obtain all the variables to store in each column
               tweet_id = datos['id_str']
               user_id = datos['user']['id']
               user_name = datos['user']['name']
               tweet = datos['text']
               hashtags = datos["entities"]["hashtags"]
               lang = datos['user']['lang']
               created_at = datos['created_at']
               coordinates = datos["coordinates"]["coordinates"]
           
               # Connect to database
               dbConnect(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates)
           
           if (time.time() - self.start_time) > self.limit:
               print(time.time(), self.start_time, self.limit)
               return False
       
       except Exception as e:
           print(e)
           
   def on_error(self, status_code):
       if status_code == 420:
           # Returning False in on_data disconnects the stream
           return False

def dbConnect(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates):
   
   #Connect to Twitter database created in PostgreSQL
   conn = psycopg2.connect(host="localhost",database="datos_twitter",port=5433,user="xxxxxxx",password="xxxxxxx")
   #Create a cursor to perform database operations
   cur = conn.cursor()

   #With the cursor, insert tweets into a PostgreSQL table
   command = "INSERT INTO tweets (tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)" 
   cur.execute(command(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates))
   
   #Commit changes
   conn.commit()
   
   #Close cursor and the connection
   cur.close()
   conn.close()

#Streaming of tweets
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener, tweet_mode="extended")
#Filtering of tweets by spatial box and keywords
myStream.filter(locations=[-10.78,34.15, 5.95,44.04], track=['Madrid', 'madrid'])

【问题讨论】:

  • in on_data() 打印 status 看看里面有什么?

标签:
python
postgresql
twitter
jupyter-notebook
twitterapi-python