161 lines
6.3 KiB
Python
161 lines
6.3 KiB
Python
import pandas as pd
|
|
import re
|
|
from sqlalchemy import create_engine
|
|
import concurrent.futures
|
|
|
|
# ==========================================
|
|
# KONFIGURASI DATABASE & FILE
|
|
# ==========================================
|
|
DB_USER = 'postgres'
|
|
DB_PASSWORD = 'Mulut!23Berkomunikasi'
|
|
DB_HOST = '192.169.0.10' # atau IP server DB Anda
|
|
DB_PORT = '5432'
|
|
DB_NAME = 'postgres'
|
|
TABLE_NAME = 'tada_member'
|
|
|
|
FILE_EXCEL = 'C:/Users/abdul.aziz/Downloads/customer-data-rawformat-2604230703365650-1.xlsx'
|
|
|
|
# ==========================================
|
|
# KONFIGURASI MULTI-THREADING
|
|
# ==========================================
|
|
MAX_THREADS = 16 # Atur jumlah thread secara dinamis (misal: 4, 8, atau 16)
|
|
CHUNK_SIZE = 1000 # Jumlah baris data yang diinsert per thread/proses
|
|
|
|
# ==========================================
|
|
# MAPPING KOLOM (SANGAT PENTING)
|
|
# ==========================================
|
|
# Sesuaikan key (kiri) dengan nama kolom di EXCEL.
|
|
# Sesuaikan value (kanan) dengan nama kolom di POSTGRESQL (berdasarkan screenshot Anda).
|
|
COLUMN_MAPPING = {
|
|
'Card Number': 'card_number',
|
|
'Name': 'name',
|
|
'Phone Number': 'phone_number',
|
|
'Email': 'email',
|
|
'Gender': 'gender',
|
|
'Birthday': 'birthday',
|
|
'City': 'city',
|
|
'Card Status': 'card_status',
|
|
'Level': 'level',
|
|
'Wallet Info': 'point_amount', # Ini nanti akan berisi angka yang sudah di-ekstrak
|
|
'Total Spending (IDR)': 'total_spending',
|
|
}
|
|
|
|
def extract_wallet_amount(text):
|
|
"""
|
|
Fungsi untuk mengekstrak angka amount dari teks Wallet Info.
|
|
Target: "[wallet name: Poin, amount: 127600, ..." -> 127600
|
|
"""
|
|
if pd.isna(text):
|
|
return None
|
|
|
|
# Regex untuk mengambil angka setelah "[wallet name: Poin, amount: " dan sebelum ","
|
|
# \s* mengatasi spasi yang mungkin tidak konsisten
|
|
match = re.search(r"\[wallet name:\s*Poin,\s*amount:\s*([^,]+)", str(text))
|
|
|
|
if match:
|
|
try:
|
|
return float(match.group(1).strip())
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
def clean_phone_number(text):
|
|
"""
|
|
Membersihkan akhiran .0 pada nomor HP jika terbaca sebagai float oleh pandas.
|
|
"""
|
|
if pd.isna(text):
|
|
return None
|
|
|
|
# Ubah menjadi string dan hapus spasi berlebih
|
|
phone_str = str(text).strip()
|
|
|
|
# Jika berakhiran .0, potong 2 karakter terakhir
|
|
if phone_str.endswith('.0'):
|
|
phone_str = phone_str[:-2]
|
|
|
|
return phone_str
|
|
|
|
def insert_data_chunk(chunk, engine, table_name, chunk_id):
|
|
"""
|
|
Fungsi worker untuk thread: memasukkan potongan data (chunk) ke database.
|
|
"""
|
|
try:
|
|
# if_exists='append' untuk menambah data ke tabel yang sudah ada
|
|
chunk.to_sql(table_name, engine, if_exists='append', index=False)
|
|
print(f"✅ Chunk {chunk_id} berhasil di-insert ({len(chunk)} baris).")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Error pada Chunk {chunk_id}: {e}")
|
|
return False
|
|
|
|
def main():
|
|
# 1. Dapatkan daftar kolom langsung dari mapping (tanpa perlu baca warna background)
|
|
target_cols = list(COLUMN_MAPPING.keys())
|
|
|
|
# 2. Baca semua sheet dari Excel
|
|
print("Membaca semua sheet dari file Excel...")
|
|
# sheet_name=None akan membaca semua sheet menjadi dictionary {nama_sheet: dataframe}
|
|
all_sheets_dict = pd.read_excel(FILE_EXCEL, sheet_name=None)
|
|
|
|
# 3. Gabungkan semua sheet menjadi satu DataFrame
|
|
df_combined = pd.concat(all_sheets_dict.values(), ignore_index=True)
|
|
print(f"Total data tergabung dari semua sheet: {len(df_combined)} baris.")
|
|
|
|
# 4. Filter kolom: Hanya ambil kolom yang kita butuhkan (sesuai keys di mapping)
|
|
available_target_cols = [col for col in target_cols if col in df_combined.columns]
|
|
df_filtered = df_combined[available_target_cols].copy()
|
|
|
|
# 5. Proses Kolom 'Wallet Info' dan 'Phone Number'
|
|
if 'Wallet Info' in df_filtered.columns:
|
|
print("Mengekstrak data pada kolom Wallet Info...")
|
|
df_filtered['Wallet Info'] = df_filtered['Wallet Info'].apply(extract_wallet_amount)
|
|
|
|
if 'Phone Number' in df_filtered.columns:
|
|
print("Membersihkan format nomor HP...")
|
|
df_filtered['Phone Number'] = df_filtered['Phone Number'].apply(clean_phone_number)
|
|
|
|
# 6. Rename nama kolom Excel agar sesuai dengan nama kolom PostgreSQL
|
|
# Hanya rename kolom yang ada di COLUMN_MAPPING
|
|
print("Menyesuaikan nama kolom dengan tabel PostgreSQL...")
|
|
df_final = df_filtered.rename(columns=COLUMN_MAPPING)
|
|
|
|
# Pastikan kita hanya mengimport kolom yang sudah di-mapping ke DB
|
|
# Jika ada kolom kuning yang tidak ada di mapping, kita drop agar DB tidak error
|
|
db_columns = list(COLUMN_MAPPING.values())
|
|
cols_to_import = [col for col in df_final.columns if col in db_columns]
|
|
df_final = df_final[cols_to_import]
|
|
|
|
# 7. Insert ke PostgreSQL menggunakan Multi-Threading
|
|
print(f"\nMenyiapkan proses insert dengan {MAX_THREADS} thread...")
|
|
try:
|
|
# Format koneksi SQLAlchemy
|
|
# (tambahkan parameter pool_size agar aman untuk banyak thread yang buka koneksi bersamaan)
|
|
engine_url = f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
|
|
engine = create_engine(engine_url, pool_size=MAX_THREADS, max_overflow=5)
|
|
|
|
# Memecah dataframe menjadi beberapa chunk (potongan kecil)
|
|
chunks = [df_final[i:i + CHUNK_SIZE] for i in range(0, df_final.shape[0], CHUNK_SIZE)]
|
|
total_chunks = len(chunks)
|
|
print(f"Data sebanyak {len(df_final)} baris dipecah menjadi {total_chunks} chunk.")
|
|
|
|
# Menjalankan multi-threading menggunakan ThreadPoolExecutor
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
|
|
# Submit semua task (chunk) ke executor
|
|
futures = {
|
|
executor.submit(insert_data_chunk, chunk, engine, TABLE_NAME, i+1): i+1
|
|
for i, chunk in enumerate(chunks)
|
|
}
|
|
|
|
# Menunggu dan memantau hasil dari setiap thread yang selesai
|
|
berhasil = 0
|
|
for future in concurrent.futures.as_completed(futures):
|
|
if future.result():
|
|
berhasil += 1
|
|
|
|
print(f"\n🎉 Selesai! {berhasil} dari {total_chunks} chunk berhasil di-import ke PostgreSQL.")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Terjadi kesalahan fatal saat setup database: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |