import_tada_member/import.py

161 lines
6.2 KiB
Python

import pandas as pd
import re
from sqlalchemy import create_engine
import concurrent.futures
# ==========================================
# KONFIGURASI DATABASE & FILE
# ==========================================
DB_USER = 'postgres'
DB_PASSWORD = 'Mulut!23Berkomunikasi'
DB_HOST = '192.169.0.10' # atau IP server DB Anda
DB_PORT = '5432'
DB_NAME = 'postgres'
TABLE_NAME = 'tada_member'
FILE_EXCEL = 'customer-data-rawformat-2604230703365650-1.xlsx'
# ==========================================
# KONFIGURASI MULTI-THREADING
# ==========================================
MAX_THREADS = 16 # Atur jumlah thread secara dinamis (misal: 4, 8, atau 16)
CHUNK_SIZE = 1000 # Jumlah baris data yang diinsert per thread/proses
# ==========================================
# MAPPING KOLOM (SANGAT PENTING)
# ==========================================
# Sesuaikan key (kiri) dengan nama kolom di EXCEL.
# Sesuaikan value (kanan) dengan nama kolom di POSTGRESQL.
COLUMN_MAPPING = {
'Card Number': 'card_number',
'Name': 'name',
'Phone Number': 'phone_number',
'Email': 'email',
'Gender': 'gender',
'Birthday': 'birthday',
'City': 'city',
'Card Status': 'card_status',
'Level': 'level',
'Wallet Info': 'point_amount', # Ini nanti akan berisi angka yang sudah di-ekstrak
'Total Spending (IDR)': 'total_spending',
}
def extract_wallet_amount(text):
"""
Fungsi untuk mengekstrak angka amount dari teks Wallet Info.
Target: "[wallet name: Poin, amount: 127600, ..." -> 127600
"""
if pd.isna(text):
return None
# Regex untuk mengambil angka setelah "[wallet name: Poin, amount: " dan sebelum ","
# \s* mengatasi spasi yang mungkin tidak konsisten
match = re.search(r"\[wallet name:\s*Poin,\s*amount:\s*([^,]+)", str(text))
if match:
try:
return float(match.group(1).strip())
except ValueError:
return None
return None
def clean_phone_number(text):
"""
Membersihkan akhiran .0 pada nomor HP jika terbaca sebagai float oleh pandas.
"""
if pd.isna(text):
return None
# Ubah menjadi string dan hapus spasi berlebih
phone_str = str(text).strip()
# Jika berakhiran .0, potong 2 karakter terakhir
if phone_str.endswith('.0'):
phone_str = phone_str[:-2]
return phone_str
def insert_data_chunk(chunk, engine, table_name, chunk_id):
"""
Fungsi worker untuk thread: memasukkan potongan data (chunk) ke database.
"""
try:
# if_exists='append' untuk menambah data ke tabel yang sudah ada
chunk.to_sql(table_name, engine, if_exists='append', index=False)
print(f"✅ Chunk {chunk_id} berhasil di-insert ({len(chunk)} baris).")
return True
except Exception as e:
print(f"❌ Error pada Chunk {chunk_id}: {e}")
return False
def main():
# 1. Dapatkan daftar kolom langsung dari mapping (tanpa perlu baca warna background)
target_cols = list(COLUMN_MAPPING.keys())
# 2. Baca semua sheet dari Excel
print("Membaca semua sheet dari file Excel...")
# sheet_name=None akan membaca semua sheet menjadi dictionary {nama_sheet: dataframe}
all_sheets_dict = pd.read_excel(FILE_EXCEL, sheet_name=None)
# 3. Gabungkan semua sheet menjadi satu DataFrame
df_combined = pd.concat(all_sheets_dict.values(), ignore_index=True)
print(f"Total data tergabung dari semua sheet: {len(df_combined)} baris.")
# 4. Filter kolom: Hanya ambil kolom yang kita butuhkan (sesuai keys di mapping)
available_target_cols = [col for col in target_cols if col in df_combined.columns]
df_filtered = df_combined[available_target_cols].copy()
# 5. Proses Kolom 'Wallet Info' dan 'Phone Number'
if 'Wallet Info' in df_filtered.columns:
print("Mengekstrak data pada kolom Wallet Info...")
df_filtered['Wallet Info'] = df_filtered['Wallet Info'].apply(extract_wallet_amount)
if 'Phone Number' in df_filtered.columns:
print("Membersihkan format nomor HP...")
df_filtered['Phone Number'] = df_filtered['Phone Number'].apply(clean_phone_number)
# 6. Rename nama kolom Excel agar sesuai dengan nama kolom PostgreSQL
# Hanya rename kolom yang ada di COLUMN_MAPPING
print("Menyesuaikan nama kolom dengan tabel PostgreSQL...")
df_final = df_filtered.rename(columns=COLUMN_MAPPING)
# Pastikan kita hanya mengimport kolom yang sudah di-mapping ke DB
# Jika ada kolom kuning yang tidak ada di mapping, kita drop agar DB tidak error
db_columns = list(COLUMN_MAPPING.values())
cols_to_import = [col for col in df_final.columns if col in db_columns]
df_final = df_final[cols_to_import]
# 7. Insert ke PostgreSQL menggunakan Multi-Threading
print(f"\nMenyiapkan proses insert dengan {MAX_THREADS} thread...")
try:
# Format koneksi SQLAlchemy
# (tambahkan parameter pool_size agar aman untuk banyak thread yang buka koneksi bersamaan)
engine_url = f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
engine = create_engine(engine_url, pool_size=MAX_THREADS, max_overflow=5)
# Memecah dataframe menjadi beberapa chunk (potongan kecil)
chunks = [df_final[i:i + CHUNK_SIZE] for i in range(0, df_final.shape[0], CHUNK_SIZE)]
total_chunks = len(chunks)
print(f"Data sebanyak {len(df_final)} baris dipecah menjadi {total_chunks} chunk.")
# Menjalankan multi-threading menggunakan ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
# Submit semua task (chunk) ke executor
futures = {
executor.submit(insert_data_chunk, chunk, engine, TABLE_NAME, i+1): i+1
for i, chunk in enumerate(chunks)
}
# Menunggu dan memantau hasil dari setiap thread yang selesai
berhasil = 0
for future in concurrent.futures.as_completed(futures):
if future.result():
berhasil += 1
print(f"\n🎉 Selesai! {berhasil} dari {total_chunks} chunk berhasil di-import ke PostgreSQL.")
except Exception as e:
print(f"❌ Terjadi kesalahan fatal saat setup database: {e}")
if __name__ == "__main__":
main()