import pandas as pd import re from sqlalchemy import create_engine import concurrent.futures # ========================================== # KONFIGURASI DATABASE & FILE # ========================================== DB_USER = 'postgres' DB_PASSWORD = 'Mulut!23Berkomunikasi' DB_HOST = '192.169.0.10' # atau IP server DB Anda DB_PORT = '5432' DB_NAME = 'postgres' TABLE_NAME = 'tada_member' FILE_EXCEL = 'customer-data-rawformat-2604230703365650-1.xlsx' # ========================================== # KONFIGURASI MULTI-THREADING # ========================================== MAX_THREADS = 16 # Atur jumlah thread secara dinamis (misal: 4, 8, atau 16) CHUNK_SIZE = 1000 # Jumlah baris data yang diinsert per thread/proses # ========================================== # MAPPING KOLOM (SANGAT PENTING) # ========================================== # Sesuaikan key (kiri) dengan nama kolom di EXCEL. # Sesuaikan value (kanan) dengan nama kolom di POSTGRESQL. COLUMN_MAPPING = { 'Card Number': 'card_number', 'Name': 'name', 'Phone Number': 'phone_number', 'Email': 'email', 'Gender': 'gender', 'Birthday': 'birthday', 'City': 'city', 'Card Status': 'card_status', 'Level': 'level', 'Wallet Info': 'point_amount', # Ini nanti akan berisi angka yang sudah di-ekstrak 'Total Spending (IDR)': 'total_spending', } def extract_wallet_amount(text): """ Fungsi untuk mengekstrak angka amount dari teks Wallet Info. Target: "[wallet name: Poin, amount: 127600, ..." -> 127600 """ if pd.isna(text): return None # Regex untuk mengambil angka setelah "[wallet name: Poin, amount: " dan sebelum "," # \s* mengatasi spasi yang mungkin tidak konsisten match = re.search(r"\[wallet name:\s*Poin,\s*amount:\s*([^,]+)", str(text)) if match: try: return float(match.group(1).strip()) except ValueError: return None return None def clean_phone_number(text): """ Membersihkan akhiran .0 pada nomor HP jika terbaca sebagai float oleh pandas. """ if pd.isna(text): return None # Ubah menjadi string dan hapus spasi berlebih phone_str = str(text).strip() # Jika berakhiran .0, potong 2 karakter terakhir if phone_str.endswith('.0'): phone_str = phone_str[:-2] return phone_str def insert_data_chunk(chunk, engine, table_name, chunk_id): """ Fungsi worker untuk thread: memasukkan potongan data (chunk) ke database. """ try: # if_exists='append' untuk menambah data ke tabel yang sudah ada chunk.to_sql(table_name, engine, if_exists='append', index=False) print(f"āœ… Chunk {chunk_id} berhasil di-insert ({len(chunk)} baris).") return True except Exception as e: print(f"āŒ Error pada Chunk {chunk_id}: {e}") return False def main(): # 1. Dapatkan daftar kolom langsung dari mapping (tanpa perlu baca warna background) target_cols = list(COLUMN_MAPPING.keys()) # 2. Baca semua sheet dari Excel print("Membaca semua sheet dari file Excel...") # sheet_name=None akan membaca semua sheet menjadi dictionary {nama_sheet: dataframe} all_sheets_dict = pd.read_excel(FILE_EXCEL, sheet_name=None) # 3. Gabungkan semua sheet menjadi satu DataFrame df_combined = pd.concat(all_sheets_dict.values(), ignore_index=True) print(f"Total data tergabung dari semua sheet: {len(df_combined)} baris.") # 4. Filter kolom: Hanya ambil kolom yang kita butuhkan (sesuai keys di mapping) available_target_cols = [col for col in target_cols if col in df_combined.columns] df_filtered = df_combined[available_target_cols].copy() # 5. Proses Kolom 'Wallet Info' dan 'Phone Number' if 'Wallet Info' in df_filtered.columns: print("Mengekstrak data pada kolom Wallet Info...") df_filtered['Wallet Info'] = df_filtered['Wallet Info'].apply(extract_wallet_amount) if 'Phone Number' in df_filtered.columns: print("Membersihkan format nomor HP...") df_filtered['Phone Number'] = df_filtered['Phone Number'].apply(clean_phone_number) # 6. Rename nama kolom Excel agar sesuai dengan nama kolom PostgreSQL # Hanya rename kolom yang ada di COLUMN_MAPPING print("Menyesuaikan nama kolom dengan tabel PostgreSQL...") df_final = df_filtered.rename(columns=COLUMN_MAPPING) # Pastikan kita hanya mengimport kolom yang sudah di-mapping ke DB # Jika ada kolom kuning yang tidak ada di mapping, kita drop agar DB tidak error db_columns = list(COLUMN_MAPPING.values()) cols_to_import = [col for col in df_final.columns if col in db_columns] df_final = df_final[cols_to_import] # 7. Insert ke PostgreSQL menggunakan Multi-Threading print(f"\nMenyiapkan proses insert dengan {MAX_THREADS} thread...") try: # Format koneksi SQLAlchemy # (tambahkan parameter pool_size agar aman untuk banyak thread yang buka koneksi bersamaan) engine_url = f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}' engine = create_engine(engine_url, pool_size=MAX_THREADS, max_overflow=5) # Memecah dataframe menjadi beberapa chunk (potongan kecil) chunks = [df_final[i:i + CHUNK_SIZE] for i in range(0, df_final.shape[0], CHUNK_SIZE)] total_chunks = len(chunks) print(f"Data sebanyak {len(df_final)} baris dipecah menjadi {total_chunks} chunk.") # Menjalankan multi-threading menggunakan ThreadPoolExecutor with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: # Submit semua task (chunk) ke executor futures = { executor.submit(insert_data_chunk, chunk, engine, TABLE_NAME, i+1): i+1 for i, chunk in enumerate(chunks) } # Menunggu dan memantau hasil dari setiap thread yang selesai berhasil = 0 for future in concurrent.futures.as_completed(futures): if future.result(): berhasil += 1 print(f"\nšŸŽ‰ Selesai! {berhasil} dari {total_chunks} chunk berhasil di-import ke PostgreSQL.") except Exception as e: print(f"āŒ Terjadi kesalahan fatal saat setup database: {e}") if __name__ == "__main__": main()