import os import time import requests import json import logging from datetime import datetime, timedelta, timezone from dotenv import load_dotenv from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request from database import get_db_connection, create_table # Setup basic logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("crawler.log"), logging.StreamHandler() ] ) def fetch_outlets(conn): """ Fetch all outlets that have a google_business_id. """ with conn.cursor() as cur: cur.execute(""" SELECT google_business_id, popcorn_code, outlet_name FROM master_outlet WHERE google_business_id IS NOT NULL AND google_business_id != ''; """) rows = cur.fetchall() return [{"google_business_id": str(row[0]).strip(), "outlet_code": str(row[1]), "outlet_name": str(row[2])} for row in rows] def get_oauth_headers(): if not os.path.exists('token.json'): raise FileNotFoundError("token.json not found. Run authorize.py first.") creds = Credentials.from_authorized_user_file('token.json') if creds.expired and creds.refresh_token: creds.refresh(Request()) # Save refreshed credentials with open('token.json', 'w') as token: token.write(creds.to_json()) return { 'Authorization': f'Bearer {creds.token}', 'Content-Type': 'application/json', 'Accept-Language': 'id' } def get_account_name(headers): account_url = "https://mybusinessaccountmanagement.googleapis.com/v1/accounts" res = requests.get(account_url, headers=headers) res.raise_for_status() accounts = res.json().get('accounts', []) if not accounts: raise ValueError("No Google Business accounts found.") return accounts[0]['name'] def crawl_reviews(): """ Main crawling function using Google Business Profile API. """ conn = get_db_connection() if conn is None: return try: # Create table if not exists (schema from database.py should be up-to-date) create_table() headers = get_oauth_headers() account_name = get_account_name(headers) outlets = fetch_outlets(conn) logging.info(f"Found {len(outlets)} outlets with Google Business ID.") # Define cutoff time (3 months ~ 90 days ago) cutoff_time = datetime.now(timezone.utc) - timedelta(days=90) logging.info(f"Filtering reviews published after: {cutoff_time} UTC") total_upserted = 0 for outlet in outlets: location_id = outlet["google_business_id"] outlet_code = outlet["outlet_code"] outlet_name = outlet["outlet_name"] logging.info(f"Crawling Outlet: {outlet_code} - {outlet_name}") # https://mybusiness.googleapis.com/v4/{account_name}/locations/{location_id}/reviews base_url = f"https://mybusiness.googleapis.com/v4/{account_name}/locations/{location_id}/reviews" page_token = None outlet_upserted = 0 outlet_skipped = 0 stop_pagination = False while not stop_pagination: params = {} if page_token: params['pageToken'] = page_token try: response = requests.get(base_url, headers=headers, params=params) response.raise_for_status() data = response.json() reviews = data.get('reviews', []) if not reviews: break # No more reviews with conn.cursor() as cur: for review in reviews: # Convert createTime # e.g., "2023-10-25T14:48:00Z" (sometimes microseconds are present) create_time_str = review.get("createTime") publish_time = None if create_time_str: if "." in create_time_str: clean_time = create_time_str.split('.')[0] else: clean_time = create_time_str.replace("Z", "") publish_time = datetime.strptime(clean_time, "%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc) # Check 3-month filter if publish_time and publish_time < cutoff_time: outlet_skipped += 1 # Business Profile API usually returns reviews newest first # so we can optionally stop here, but let's process all in this page just in case stop_pagination = True continue # Extract fields review_id = review.get("reviewId") comment = review.get("comment", "") # Remove Google Translate prefix/suffix if present if "(Original)" in comment: parts = comment.split("(Original)") if len(parts) > 1: comment = parts[-1].strip() elif "(Diterjemahkan oleh Google)" in comment: parts = comment.split("\n\n(Diterjemahkan oleh Google)") if len(parts) > 1: comment = parts[0].strip() else: parts = comment.split("(Diterjemahkan oleh Google)") comment = parts[0].strip() author_name = review.get("reviewer", {}).get("displayName", "Unknown") rating_str = review.get("starRating") # e.g., "FIVE" # Convert rating "FIVE" to 5 rating_map = {"ONE": 1, "TWO": 2, "THREE": 3, "FOUR": 4, "FIVE": 5} rating = rating_map.get(rating_str, 0) language = None # The API might not return language explicitly # Upsert logic # We still try to fill place_id if possible, but we don't have it here. # So we will insert NULL for place_id and let google_business_id identify it later if needed. # Oh wait, the table definition has place_id. Let's still use place_id if we want, # but we didn't select it. Let's just insert NULL for place_id (if nullable). # Let's check schema: place_id was PRIMARY key component? # NO, review_id is PRIMARY KEY. place_id is a normal column. cur.execute(""" INSERT INTO google_review ( review_id, place_id, original_text, author_display_name, publish_time, rating, outlet_code, language ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (review_id) DO UPDATE SET original_text = EXCLUDED.original_text, author_display_name = EXCLUDED.author_display_name, rating = EXCLUDED.rating, language = EXCLUDED.language, publish_time = EXCLUDED.publish_time, updated_at = CURRENT_TIMESTAMP; """, (review_id, None, comment, author_name, publish_time, rating, outlet_code, language)) outlet_upserted += 1 total_upserted += 1 conn.commit() page_token = data.get('nextPageToken') if not page_token: break # No more pages except Exception as e: logging.error(f" Error fetching reviews for {outlet_code}: {e}") if response is not None and response.status_code != 200: logging.error(f" API Response Error: {response.text}") conn.rollback() break # Stop pagination on error logging.info(f" Updates: {outlet_upserted} (Skipped {outlet_skipped} older than 3 months)") logging.info(f"Crawl finished. Total reviews upserted: {total_upserted}") finally: conn.close() def main(): # Only run once, immediately logging.info("Starting crawler process (Google Business Profile API)...") crawl_reviews() if __name__ == "__main__": main()