google_map_review/crawler.py

215 lines
9.5 KiB
Python

import os
import time
import requests
import json
import logging
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from database import get_db_connection, create_table
# Setup basic logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("crawler.log"),
logging.StreamHandler()
]
)
def fetch_outlets(conn):
"""
Fetch all outlets that have a google_business_id.
"""
with conn.cursor() as cur:
cur.execute("""
SELECT google_business_id, popcorn_code, outlet_name
FROM master_outlet
WHERE google_business_id IS NOT NULL
AND google_business_id != '';
""")
rows = cur.fetchall()
return [{"google_business_id": str(row[0]).strip(), "outlet_code": str(row[1]), "outlet_name": str(row[2])} for row in rows]
def get_oauth_headers():
if not os.path.exists('token.json'):
raise FileNotFoundError("token.json not found. Run authorize.py first.")
creds = Credentials.from_authorized_user_file('token.json')
if creds.expired and creds.refresh_token:
creds.refresh(Request())
# Save refreshed credentials
with open('token.json', 'w') as token:
token.write(creds.to_json())
return {
'Authorization': f'Bearer {creds.token}',
'Content-Type': 'application/json',
'Accept-Language': 'id'
}
def get_account_name(headers):
account_url = "https://mybusinessaccountmanagement.googleapis.com/v1/accounts"
res = requests.get(account_url, headers=headers)
res.raise_for_status()
accounts = res.json().get('accounts', [])
if not accounts:
raise ValueError("No Google Business accounts found.")
return accounts[0]['name']
def crawl_reviews():
"""
Main crawling function using Google Business Profile API.
"""
conn = get_db_connection()
if conn is None:
return
try:
# Create table if not exists (schema from database.py should be up-to-date)
create_table()
headers = get_oauth_headers()
account_name = get_account_name(headers)
outlets = fetch_outlets(conn)
logging.info(f"Found {len(outlets)} outlets with Google Business ID.")
# Define cutoff time (3 months ~ 90 days ago)
cutoff_time = datetime.now(timezone.utc) - timedelta(days=90)
logging.info(f"Filtering reviews published after: {cutoff_time} UTC")
total_upserted = 0
for outlet in outlets:
location_id = outlet["google_business_id"]
outlet_code = outlet["outlet_code"]
outlet_name = outlet["outlet_name"]
logging.info(f"Crawling Outlet: {outlet_code} - {outlet_name}")
# https://mybusiness.googleapis.com/v4/{account_name}/locations/{location_id}/reviews
base_url = f"https://mybusiness.googleapis.com/v4/{account_name}/locations/{location_id}/reviews"
page_token = None
outlet_upserted = 0
outlet_skipped = 0
stop_pagination = False
while not stop_pagination:
params = {}
if page_token:
params['pageToken'] = page_token
try:
response = requests.get(base_url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
reviews = data.get('reviews', [])
if not reviews:
break # No more reviews
with conn.cursor() as cur:
for review in reviews:
# Convert createTime
# e.g., "2023-10-25T14:48:00Z" (sometimes microseconds are present)
create_time_str = review.get("createTime")
publish_time = None
if create_time_str:
if "." in create_time_str:
clean_time = create_time_str.split('.')[0]
else:
clean_time = create_time_str.replace("Z", "")
publish_time = datetime.strptime(clean_time, "%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc)
# Check 3-month filter
if publish_time and publish_time < cutoff_time:
outlet_skipped += 1
# Business Profile API usually returns reviews newest first
# so we can optionally stop here, but let's process all in this page just in case
stop_pagination = True
continue
# Extract fields
review_id = review.get("reviewId")
comment = review.get("comment", "")
# Remove Google Translate prefix/suffix if present
if "(Original)" in comment:
parts = comment.split("(Original)")
if len(parts) > 1:
comment = parts[-1].strip()
elif "(Diterjemahkan oleh Google)" in comment:
parts = comment.split("\n\n(Diterjemahkan oleh Google)")
if len(parts) > 1:
comment = parts[0].strip()
else:
parts = comment.split("(Diterjemahkan oleh Google)")
comment = parts[0].strip()
author_name = review.get("reviewer", {}).get("displayName", "Unknown")
rating_str = review.get("starRating") # e.g., "FIVE"
# Convert rating "FIVE" to 5
rating_map = {"ONE": 1, "TWO": 2, "THREE": 3, "FOUR": 4, "FIVE": 5}
rating = rating_map.get(rating_str, 0)
language = None # The API might not return language explicitly
phone = review.get("phone") # Extract phone if provided
# Upsert logic
# We still try to fill place_id if possible, but we don't have it here.
# So we will insert NULL for place_id and let google_business_id identify it later if needed.
# Oh wait, the table definition has place_id. Let's still use place_id if we want,
# but we didn't select it. Let's just insert NULL for place_id (if nullable).
# Let's check schema: place_id was PRIMARY key component?
# NO, review_id is PRIMARY KEY. place_id is a normal column.
cur.execute("""
INSERT INTO google_review (
review_id, place_id, original_text, author_display_name, publish_time, rating, outlet_code, language, phone
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (review_id) DO UPDATE SET
original_text = EXCLUDED.original_text,
author_display_name = EXCLUDED.author_display_name,
rating = EXCLUDED.rating,
language = EXCLUDED.language,
publish_time = EXCLUDED.publish_time,
phone = EXCLUDED.phone,
updated_at = CURRENT_TIMESTAMP;
""", (review_id, None, comment, author_name, publish_time, rating, outlet_code, language, phone))
outlet_upserted += 1
total_upserted += 1
conn.commit()
page_token = data.get('nextPageToken')
if not page_token:
break # No more pages
except Exception as e:
logging.error(f" Error fetching reviews for {outlet_code}: {e}")
if response is not None and response.status_code != 200:
logging.error(f" API Response Error: {response.text}")
conn.rollback()
break # Stop pagination on error
logging.info(f" Updates: {outlet_upserted} (Skipped {outlet_skipped} older than 3 months)")
logging.info(f"Crawl finished. Total reviews upserted: {total_upserted}")
finally:
conn.close()
def main():
# Only run once, immediately
logging.info("Starting crawler process (Google Business Profile API)...")
crawl_reviews()
if __name__ == "__main__":
main()