파이썬 크롤링 프로그램
Category
개발Customer
아주화장품Date
2025내용
서버 관리, 코드 최적화, 관리자 개발, 프론트 앤드 개발1. 핵심 기능 및 로직
Ecount ERP 시스템에서 거래처 데이터를 자동으로 동기화
멀티스레딩 및 비동기 처리
threading 모듈을 사용하여 인터넷 연결 모니터링을 별도의 스레드로 실행합니다.
schedule 라이브러리로 정기적인 작업 스케줄링을 구현합니다.
웹 스크래핑 자동화
Selenium WebDriver를 사용하여 Ecount ERP 시스템에 자동 로그인 및 데이터 추출을 수행합니다.
chrome_options를 통해 브라우저 설정을 최적화하여 안정성과 성능을 향상시킵니다.
데이터베이스 연동
MySQL 데이터베이스와 연동하여 거래처 정보를 실시간으로 갱신합니다.
트랜잭션 관리를 통해 데이터 무결성을 보장합니다.
예외 처리 및 로깅
세분화된 예외 처리로 다양한 오류 상황에 대응합니다.
텔레그램 API를 통해 실시간 로깅 및 알림 시스템을 구현합니다.
파일 시스템 활용
동기화 상태 및 트리거 관리를 위해 파일 시스템을 활용합니다.
다운로드된 엑셀 파일의 이름을 타임스탬프로 변경하여 버전 관리를 용이하게 합니다.
보안 강화
환경 변수나 별도의 설정 파일을 통해 민감한 정보(API 키, 데이터베이스 자격 증명 등)를 관리합니다.
프로세스 관리
signal 모듈을 사용하여 프로세스의 정상적인 종료를 보장합니다.
전역 변수를 통해 프로세스 상태를 관리하고, 정리(cleanup) 함수로 리소스를 안전하게 해제합니다.
데이터 검증 및 전처리
Pandas를 사용하여 엑셀 데이터를 효율적으로 처리합니다.
거래처 코드 검증 로직을 통해 데이터 품질을 관리합니다.
python
import os import time import sys import shutil from datetime import datetime, timedelta from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options import pandas as pd import mysql.connector import requests import schedule import socket import signal import threading from config import com_code, user_id, password, telegram_token, telegram_chat_id, db_host, db_id, db_password, db_name from telegram import Update from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, CallbackContext # 저장된 PID를 전역 변수로 사용 bot_pid = os.getpid() # 현재 프로세스 PID 저장 # 전역 변수 browser = None conn = None cursor = None internet_monitor_thread = None is_running = True def signal_handler(signum, frame): global is_running print("종료 신호를 받았습니다. 정리 중...") is_running = False cleanup() sys.exit(0) def cleanup(): global browser, conn, cursor, internet_monitor_thread alert_message = f"프로세스 ID {bot_pid} 가 종료 또는 강제종료 처리중입니다." send_telegram_message(telegram_token, telegram_chat_id, alert_message) if browser: print("브라우저 종료 중...") browser.quit() if cursor: print("데이터베이스 커서 종료 중...") cursor.close() if conn: print("데이터베이스 연결 종료 중...") conn.close() if internet_monitor_thread and internet_monitor_thread.is_alive(): print("인터넷 모니터링 스레드 종료 중...") internet_monitor_thread.join(timeout=5) os.kill(bot_pid, signal.SIGINT) # 모든 스케줄된 작업 취소 schedule.clear() print("정리 완료.") # 신호 핸들러 등록 signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # 인터넷 연결 상태 확인 함수 def check_internet_connection(): try: socket.create_connection(("www.google.com", 80)) return True except OSError: return False # 연결 상태를 기록할 변수 disconnected_start_time = None def monitor_internet(): global is_running, disconnected_start_time while is_running: if check_internet_connection(): if disconnected_start_time is not None: disconnected_end_time = datetime.now() duration = disconnected_end_time - disconnected_start_time message = f"인터넷 연결이 복구되었습니다. \n" \ f"연결 끊김 기간: {disconnected_start_time.strftime('%Y-%m-%d %H:%M:%S')} ~ {disconnected_end_time.strftime('%Y-%m-%d %H:%M:%S')}\n" \ f"총 끊긴 시간: {duration.total_seconds() // 60} 분" send_telegram_message(telegram_token, telegram_chat_id, message) print(message) disconnected_start_time = None else: if disconnected_start_time is None: disconnected_start_time = datetime.now() time.sleep(60) # 1분 대기 def send_telegram_message(token, chat_id, message): url = f"https://api.telegram.org/bot{token}/sendMessage" data = { 'chat_id': chat_id, 'text': message, 'parse_mode': 'Markdown' } response = requests.post(url, data=data) return response # 현재 시간과 가장 가까운 스케줄 시간 계산 def get_next_schedule_time(): now = datetime.now() schedule_times = [ datetime.now().replace(hour=8, minute=0, second=0, microsecond=0), datetime.now().replace(hour=9, minute=0, second=0, microsecond=0), datetime.now().replace(hour=10, minute=0, second=0, microsecond=0), datetime.now().replace(hour=11, minute=0, second=0, microsecond=0), datetime.now().replace(hour=12, minute=0, second=0, microsecond=0), datetime.now().replace(hour=13, minute=0, second=0, microsecond=0), datetime.now().replace(hour=14, minute=0, second=0, microsecond=0), datetime.now().replace(hour=15, minute=0, second=0, microsecond=0), datetime.now().replace(hour=16, minute=0, second=0, microsecond=0), datetime.now().replace(hour=17, minute=0, second=0, microsecond=0), datetime.now().replace(hour=18, minute=0, second=0, microsecond=0) ] next_time = None for schedule_time in schedule_times: if schedule_time > now: next_time = schedule_time break if not next_time: # 오늘의 모든 스케줄이 지났다면 next_time = schedule_times[0] + timedelta(days=1) return next_time # 텔레그램 알람 메시지 전송 next_schedule_time = get_next_schedule_time() time_to_next_schedule = next_schedule_time - datetime.now() alert_message = f"프로세스 ID {bot_pid} \n 품목리스트 데이터 자동 동기화가 시작 되었습니다. \n다음 업데이트는 {next_schedule_time.strftime('%Y-%m-%d %H:%M:%S')}입니다. \n" \ f"{time_to_next_schedule.total_seconds() // 60:.0f}분 후에 시작됩니다." send_telegram_message(telegram_token, telegram_chat_id, alert_message) def job(): global browser, conn, cursor status_file = "C:\\Selenium_crawling\\sync_status.txt" download_folder = "C:\\Selenium_crawling\\download" with open(status_file, "w") as f: f.write("in_progress") # MySQL 연결 설정 db_config = { 'host': db_host, 'user': db_id, 'password': db_password, 'database': db_name } # Chrome 브라우저 옵션 설정 chrome_options = Options() chrome_options.add_experimental_option("detach", True) chrome_options.add_experimental_option("excludeSwitches", ["enable-logging"]) chrome_options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--lang=en-US') chrome_options.add_argument('--mute-audio') chrome_options.add_argument('--window-size=1270,1080') chrome_options.add_argument("--disable-popup-blocking") chrome_options.add_argument("--disable-extensions") chrome_options.add_experimental_option( "prefs", { "download.default_directory": download_folder, "download.prompt_for_download": False, "download.directory_upgrade": True, "safebrowsing.enabled": True } ) try: # Selenium 브라우저 시작 browser = webdriver.Chrome(options=chrome_options) print('브라우저 실행 중...') browser.get('https://login.ecount.com/Login/KR/') browser.implicitly_wait(5) # 로그인 browser.find_element(By.ID, 'com_code').send_keys(com_code) browser.find_element(By.ID, 'id').send_keys(user_id) browser.find_element(By.ID, 'passwd').send_keys(password) browser.find_element(By.ID, 'save').click() print('로그인 성공') time.sleep(5) # 메뉴 이동 browser.find_element(By.ID, 'ma4').click() print('재고1 클릭') time.sleep(5) browser.find_element(By.ID, 'ma29').click() print('기초등록 클릭') time.sleep(5) browser.find_element(By.ID, 'ma170').click() print('거래처등록 클릭') time.sleep(5) browser.find_element(By.XPATH, '//*[@id="group10outputExcel"]').click() print('엑셀 다운로드 클릭') # 다운로드 완료 대기 start_time = time.time() while True: time.sleep(1) files = os.listdir(download_folder) if len(files) == 0: print("다운로드 폴더가 비어있습니다. 파일을 기다립니다...") elif any(file.endswith(".crdownload") for file in files): print("파일 다운로드 중...") elif any(file.endswith(".xlsx") for file in files): print("파일 다운로드 완료") break if time.time() - start_time > 60: raise TimeoutError("파일 다운로드가 시간 내에 완료되지 않았습니다.") # 다운로드된 파일 처리 downloaded_file = next(os.path.join(download_folder, f) for f in os.listdir(download_folder) if f.endswith(".xlsx")) now = datetime.now().strftime("%Y%m%d_%H%M%S") new_file_name = os.path.join(download_folder, f"기초등록_{now}.xlsx") shutil.move(downloaded_file, new_file_name) print(f"파일이 {new_file_name}으로 저장되었습니다.") # MySQL 연결 conn = mysql.connector.connect(**db_config) cursor = conn.cursor() # 엑셀 파일 읽기 df = pd.read_excel(new_file_name, skiprows=2) print("엑셀 파일을 성공적으로 읽었습니다.") # 데이터 처리 변수 count = {'insert': 0, 'update': 0} new_data = [] no_data_count = 0 unregistered_data = [] now_db = datetime.now().strftime("%Y-%m-%d %H:%M:%S") for index, row in df.iterrows(): try: 거래처코드 = str(row.iloc[0]).strip() if not pd.isna(row.iloc[0]) else None 거래처명 = row.iloc[2] if not pd.isna(row.iloc[2]) else None 영문코드 = row.iloc[1] if not pd.isna(row.iloc[1]) else None 대표자명 = row.iloc[3] if not pd.isna(row.iloc[3]) else None 전화 = row.iloc[4] if not pd.isna(row.iloc[4]) else None Fax = row.iloc[5] if not pd.isna(row.iloc[5]) else None 주소1 = row.iloc[6] if not pd.isna(row.iloc[6]) else None 담장자명 = row.iloc[7] if not pd.isna(row.iloc[7]) else None 담당자연락처 = row.iloc[8] if not pd.isna(row.iloc[8]) else None 담당자이메일 = row.iloc[9] if not pd.isna(row.iloc[9]) else None 계좌정보 = row.iloc[11] if not pd.isna(row.iloc[11]) else None 종목 = row.iloc[12] if not pd.isna(row.iloc[12]) else None # 거래처코드 검증 EXCLUDED_CODES = ['AJ-금융', 'AJ-기관', 'AJ-아주'] if not 거래처코드: no_data_count += 1 unregistered_data.append("None: 거래처코드 누락") continue if any(excluded_code in 거래처코드 for excluded_code in EXCLUDED_CODES): no_data_count += 1 unregistered_data.append(f"{거래처코드}: 포함된 거래처코드") continue if len(거래처코드) < 3: no_data_count += 1 unregistered_data.append(f"{거래처코드}: 거래처코드 길이 부족") continue if not 거래처명: no_data_count += 1 unregistered_data.append(f"{거래처코드}: 거래처명 누락") continue # 기존 데이터 확인 cursor.execute("SELECT COUNT(*) FROM aju_client WHERE client_no = %s", (거래처코드,)) exists = cursor.fetchone()[0] > 0 if exists: sql_update = """ UPDATE aju_client SET client_caid = %s, client_name = %s, client_ceo = %s, client_update = %s, client_jongmok = %s, client_addr = %s, client_phone = %s, client_fax = %s, client_email_tax1 = %s, client_person_charge = %s, client_person_charge_balju_phone = %s WHERE client_no = %s """ values = ( 영문코드, 거래처명, 대표자명, now_db, 종목, 주소1, 전화, Fax, 담당자이메일, 담장자명, 담당자연락처, 거래처코드 ) cursor.execute(sql_update, values) conn.commit() count['update'] += 1 else: sql_insert = """ INSERT INTO aju_client ( client_no, client_caid, client_name, client_type, client_ceo, client_update, client_jongmok, client_zip, client_addr, client_phone, client_fax, client_email_tax1, client_email_tax2, client_email_tax3, client_email_tax4, client_our_person_charge, client_person_charge, client_person_charge_balju, client_person_charge_balju_phone, client_person_charge_balju_mobile, client_print, client_trade_date, client_darea_code, is_darae_chk ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ values = ( 거래처코드, 영문코드, 거래처명, '', 대표자명, now_db, 종목, '', 주소1, 전화, Fax, 계좌정보, '', '', '', '', 담장자명, '', 담당자연락처, '', '', '', '0', '' ) cursor.execute(sql_insert, values) conn.commit() count['insert'] += 1 new_data.append(f"{거래처코드} - {거래처명}") except mysql.connector.Error as e: print(f"행 {index} 처리 중 SQL 오류: {e}") conn.rollback() # 결과 메시지 전송 update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") next_schedule_time = get_next_schedule_time() unregistered_data_summary = "\n".join(unregistered_data) if unregistered_data else "없음" new_data_summary = "\n".join(new_data) if new_data else "없음" message = ( f"업데이트 날짜 : {update_time}\n" f"다음 업데이트 : {next_schedule_time.strftime('%Y-%m-%d %H:%M:%S')}\n" f"신규등록 : {count['insert']}건\n" f"업데이트 : {count['update']}건\n" "ㅡㅡㅡㅡㅡㅡㅡ\n" "신규등록된 데이터\n" f"{new_data_summary}\n" "ㅡㅡㅡㅡㅡㅡㅡ\n" f"인서트, 업데이트 하지 않은 데이터 {no_data_count}건\n" f"{unregistered_data_summary}\n" ) send_telegram_message(telegram_token, telegram_chat_id, message) print("텔레그램 메시지가 성공적으로 전송되었습니다.") with open(status_file, "w") as f: f.write("completed") print(f"작업 완료: insert: {count['insert']} / update: {count['update']}") except Exception as e: print(f"작업 중 오류 발생: {e}") with open(status_file, "w") as f: f.write("error") raise finally: if browser: browser.quit() browser = None if cursor: cursor.close() cursor = None if conn: conn.close() conn = None print("브라우저와 데이터베이스 연결을 종료했습니다.") if __name__ == "__main__": try: # 스케줄 설정 schedule.every().day.at("08:00").do(job) schedule.every().day.at("09:00").do(job) schedule.every().day.at("10:00").do(job) schedule.every().day.at("11:00").do(job) schedule.every().day.at("12:00").do(job) schedule.every().day.at("13:00").do(job) schedule.every().day.at("14:00").do(job) schedule.every().day.at("15:00").do(job) schedule.every().day.at("16:00").do(job) schedule.every().day.at("17:00").do(job) schedule.every().day.at("18:00").do(job) # 인터넷 모니터링 스레드 internet_monitor_thread = threading.Thread(target=monitor_internet) internet_monitor_thread.daemon = True internet_monitor_thread.start() while is_running: schedule.run_pending() trigger_file = "C:\\Selenium_crawling\\trigger_sync.txt" if os.path.exists(trigger_file): os.remove(trigger_file) print("트리거 파일 감지됨. 즉시 동기화 작업 실행 중...") with open("C:\\Selenium_crawling\\sync_status.txt", "w") as f: f.write("in_progress") try: job() except Exception as e: print(f"동기화 작업 중 오류 발생: {e}") with open("C:\\Selenium_crawling\\sync_status.txt", "w") as f: f.write("error") time.sleep(1) except KeyboardInterrupt: print("사용자에 의해 프로그램이 중단되었습니다. 정리 중...") finally: cleanup() sys.exit(0)
PHP
$statusFile = "C:\\Selenium_crawling\\sync_status.txt";
$triggerFile = "C:\\Selenium_crawling\\trigger_sync.txt";
if (file_exists($statusFile)) {
$status = file_get_contents($statusFile);
if ($status === 'in_progress') {
echo "이미 동기화 작업이 진행 중입니다.";
exit;
}
}
if (file_put_contents($triggerFile, "sync") !== false) {
echo "동기화 작업이 요청되었습니다. 최대 3분 정도 소요될 수 있습니다.";
} else {
echo "동기화 작업 요청에 실패하였습니다.";
}