import hashlib import logging import string import re import playwright import psycopg2 from playwright.sync_api import sync_playwright from hasaki_db_writer import hasaki_db_writer import pandas as pd from bs4 import BeautifulSoup from Util import translate_text_to_english from fake_useragent import UserAgent import time import random from pyvirtualdisplay import Display from seleniumwire import webdriver from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager import brotli import json ###### Looger ###### logging.basicConfig(filename="/home/ubuntu/logs/hasaki_crawler.log", filemode='a', format='%(asctime)s,%(msecs)d %(name)s %(levelname)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) class HasakiProductInfo: def __init__(self, config): logging.info("Initializing HasakiProductInfo") self.pattern = r'[' + string.punctuation + ']' self.config = config self.crawler_name = self.config.get("crawler_name") self.product_limit = int(self.config.get("product_per_category")) self.conn = psycopg2.connect(database=self.config.get('database'), user=self.config.get('db_user'), password=self.config.get('db_pass'), host=self.config.get('db_host'), port=self.config.get('db_port')) self.conn.autocommit = True self.cur = self.conn.cursor() self.cur.execute( f"""select id from {self.config.get('crawler_schema')}.{self.config.get('source_tab')} where source_name='Hasaki'""") try: self.rce_source_id = self.cur.fetchone()[0] except: logging.info("Source tab is empty. Please check. Exiting.....") exit(1) self.db_writer = hasaki_db_writer(config) #self.display = Display(visible=0, size=(800, 600)) #self.display.start() def __del__(self): logging.info("Closing connection.....") self.conn.close() def start_processing(self): logging.info("Starting to collect product info from Hasaki........") logging.info("Fetching product list from DB......") sql = f""" select * from {self.config.get('crawler_schema')}.{self.config.get('tracker_tab')} where flag = 0 order by categoryid, product_section, product_rank """ self.cur.execute(sql) rows = self.cur.fetchall() logging.info("Found {} products.......".format(str(len(rows)))) cnt = 1 for row in rows: logging.info("========= Fetching product info {}/{}: {} =========".format(str(cnt),str(len(rows)),row[3])) try: self.get_product_info(row) except: pass sql = f""" update {self.config.get('crawler_schema')}.{self.config.get('tracker_tab')} set flag = 1 where categoryid={row[9]} and product_section='{row[1]}' and product_rank={row[8]} and product_url='{row[3]}' """ logging.info(sql) self.cur.execute(sql) cnt += 1 #time.sleep(random.randint(7, 23)) #self.display.stop() def get_product_info(self, data): raw_data = self.get_raw_product_data(data[3]) logging.info(raw_data) if raw_data: self.product_info(data, raw_data) self.rating_info(raw_data) self.seo_info(raw_data) def get_raw_product_data(self, url): retries = 1 for _ in range(retries): try: with sync_playwright() as p: browser = p.chromium.launch( headless=False, args=[ "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled", "--disable-component-extensions-with-background-pages" ] ) ua = UserAgent(platforms='mobile') random_mobile_ua = ua.random logging.info("using user agent: {}".format(random_mobile_ua)) context = browser.new_context(user_agent=random_mobile_ua) context.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") page = context.new_page() api_requests = {} try: page.goto(url, timeout=5000) time.sleep(1) page.reload() with page.expect_response("**/wap/v2/product/detail**") as response: api_requests = response.value.json() except playwright._impl._errors.TimeoutError: logging.info("Timeout occurred. Retrying.....") page.reload() with page.expect_response("**/wap/v2/product/detail**") as response: api_requests = response.value.json() finally: browser.close() return api_requests except Exception as e: logging.error(f"An error occurred: {str(e)}") logging.info("Retrying...") api_requests = self.get_raw_product_data_selenium(url) return api_requests def get_raw_product_data_selenium(self, url): ua = UserAgent(platforms='mobile') random_mobile_ua = ua.random logging.info("using user agent: {}".format(random_mobile_ua)) op = webdriver.ChromeOptions() op.add_argument(f"user-agent={random_mobile_ua}") op.add_experimental_option("useAutomationExtension", False) op.add_argument('--no-sandbox') op.add_argument('--disable-notifications') op.add_argument("--lang=en-GB") op.headless = False driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=op) driver.get(url) iteminfo = "" for request in driver.requests: if request.response: if '/wap/v2/product/detail' in request.url: encoding = request.response.headers.get('content-encoding') # logging.info(encoding) if encoding: iteminfo = brotli.decompress(request.response.body) else: iteminfo = request.response.body driver.quit() iteminfo_json = json.loads(iteminfo) return iteminfo_json def product_info(self, data, raw_data): #region rce_brand data_brand = {} data_brand['rce_source_id'] = self.rce_source_id data_brand['rce_source_brand_status'] = 1 data_brand['rce_source_brand_id'] = 0 data_brand['brand_page_url'] = "" data_brand['brand_page_url_hash'] = "" data_brand['brand_name'] = "" data_brand['brand_following'] = "" data_brand['brand_rating'] = "" try: data_brand['rce_source_brand_id'] = raw_data['brand']['id'] try: data_brand['brand_page_url'] = "https://hasaki.vn/" + raw_data['brand']['url'] + ".html" data_brand['brand_page_url'] = str(data_brand['brand_page_url']).replace("'","") data_brand['brand_page_url_hash'] = hashlib.md5(data_brand['brand_page_url'].encode('utf-8')).hexdigest() except: pass try: data_brand['brand_name'] = translate_text_to_english(str(raw_data['brand']['name']).replace("'","")) except: pass try: data_brand['brand_following'] = raw_data['brand']['following'] except: pass try: data_brand['brand_rating'] = raw_data['brand']['rating'] except: pass try: self.db_writer.rce_brand(data_brand) except Exception as e: logging.info(e) except: pass #endregion #region rce_product data_product = {} try: data_product['rce_source_product_id'] = raw_data['id'] data_product['rce_source_id'] = self.rce_source_id data_product['rce_source_product_status'] = 1 data_product['product_page_url'] = str(raw_data['url']).replace("'","") data_product['product_page_url_hash'] = hashlib.md5(data_product['product_page_url'].encode('utf-8')).hexdigest() data_product['rce_category_id'] = data[9] data_product['rce_store_id'] = 0 data_product['rce_source_product_name'] = str(raw_data['name']) + str(raw_data['alt_name']) data_product['rce_source_product_name'] = translate_text_to_english(str(re.sub(self.pattern, '', data_product['rce_source_product_name']))) data_product['rce_source_product_name'] = str(data_product['rce_source_product_name']).replace("'", "") data_product['product_images'] = data[4] data_product['product_description'] = "" try: description_raw = raw_data['description'] soup = BeautifulSoup(description_raw, 'html.parser') data_product['product_description'] = translate_text_to_english(re.sub(self.pattern, '',soup.get_text()).replace("'","")) data_product['product_description'] = str(data_product['product_description']).replace("'","") except: pass data_product['rce_brand_id'] = "" try: sql = f""" select id from {self.config.get('crawler_schema')}.{self.config.get('brand_tab')} where rce_source_id = {self.rce_source_id} and rce_source_brand_id = {raw_data['brand']['id']} """ self.cur.execute(sql) res = self.cur.fetchone() data_product['rce_brand_id'] = res[0] except: pass data_product['product_sold_total'] = 0 data_product['product_sold'] = 0 try: data_product['product_sold'] = raw_data['bought'] except: pass data_product['product_price_min'] = 0 data_product['product_price_max'] = 0 try: data_product['product_price_min'] = raw_data['int_final_price'] data_product['product_price_max'] = raw_data['int_final_price'] except: pass data_product['product_price_min_before_discount'] = 0 data_product['product_price_max_before_discount'] = 0 try: market_price = raw_data['market_price'] market_price = re.sub(r'\D', '', market_price) data_product['product_price_min_before_discount'] = market_price data_product['product_price_max_before_discount'] = market_price except: pass data_product['ratings'] = 0.0 try: data_product['ratings'] = raw_data['rating']['avg_rate'] except: pass data_product['ships_from'] = "" data_product['product_section'] = data[1] data_product['countryoforigin'] = "" data_product['rank'] = data[8] try: self.db_writer.rce_product(data_product) except Exception as e: logging.info(e) #region rce_product_variant variant_items = raw_data['attribute']['items'] df_variant = pd.DataFrame({}, columns=['product_variant_name', 'rce_source_variant_id', 'product_variant_price', 'product_variant_stock', 'product_variant_sku']) data_variant = {} for variant in variant_items: for item in variant['options']: data_variant['product_variant_name'] = item['long_label'] for product in item['products']: data_variant['rce_source_variant_id'] = product['id'] data_variant['product_variant_price'] = product['price'] data_variant['product_variant_stock'] = product['quantity'] data_variant['product_variant_sku'] = product['sku'] # variants_arr.append(data_variant) tmp = pd.DataFrame([[data_variant['product_variant_name'], data_variant['rce_source_variant_id'], data_variant['product_variant_price'], data_variant['product_variant_stock'], data_variant['product_variant_sku']]], columns=['product_variant_name', 'rce_source_variant_id', 'product_variant_price', 'product_variant_stock', 'product_variant_sku']) df_variant = pd.concat([df_variant, tmp]) df_variant_merged = df_variant.groupby('product_variant_sku').agg({ 'product_variant_name': ' '.join, 'rce_source_variant_id': 'first', 'product_variant_price': 'first', 'product_variant_stock': 'first' }).reset_index() #logging.info(df_variant_merged.to_string()) for index, row in df_variant_merged.iterrows(): try: data_variant = {} data_variant['rce_source_variant_id'] = row['rce_source_variant_id'] data_variant['product_variant_name'] = translate_text_to_english(row['product_variant_name']) data_variant['product_variant_name'] = re.sub(self.pattern, '', data_variant['product_variant_name']).replace("'","") data_variant['product_variant_price'] = row['product_variant_price'] data_variant['product_variant_price_before_discount'] = 0 data_variant['product_variant_stock'] = row['product_variant_stock'] data_variant['product_variant_sku'] = row['product_variant_sku'] data_variant['rce_product_id'] = "" sql = f""" select id from {self.config.get('crawler_schema')}.{self.config.get('product_tab')} where rce_source_product_id = {data_product['rce_source_product_id']} and rce_source_id = {data_product['rce_source_id']} """ self.cur.execute(sql) data_variant['rce_product_id'] = self.cur.fetchone()[0] try: self.db_writer.rce_product_variant(data_variant) except Exception as e: logging.info(e) except: pass #endregion except: pass #endregion def rating_info(self, raw_data): try: reviews1 = [] reviews2 = [] try: reviews1 = raw_data['short_rating_data']['image_reviews'] except: pass try: reviews2 = raw_data['short_rating_data']['reviews'] except: pass reviews = self.join_lists(reviews1, reviews2) for review in reviews: data_review = {} data_review["rce_product_id"] = "" data_review["username"] = "" data_review["review"] = "" data_review["img_url"] = "" data_review["review_like_count"] = 0 data_review["user_tier"] = "" data_review["shop_id"] = 0 data_review["video_url"] = "" data_review["rating"] = "" sql = f""" select id from {self.config.get('crawler_schema')}.{self.config.get('product_tab')} where rce_source_product_id = {raw_data['id']} and rce_source_id = {self.rce_source_id} """ self.cur.execute(sql) data_review["rce_product_id"] = self.cur.fetchone()[0] try: data_review["username"] = str(review['user_fullname']).replace("'", "") except: pass try: data_review["review"] = translate_text_to_english(review['content']).replace("'", "") except: pass try: data_review["rating"] = review['rating']['star'] except: pass try: self.db_writer.rce_ratings_reviews(data_review) except Exception as e: logging.info(e) except Exception as e: logging.info(e) def join_lists(self, list1, list2): # Check if both lists are None if list1 is None and list2 is None: return None # Check if one of the lists is None elif list1 is None: return list2 elif list2 is None: return list1 else: return list1 + list2 def seo_info(self, raw_data): try: data_seo = {} data_seo['rce_product_id'] = 0 data_seo['rce_source_id'] = self.rce_source_id data_seo['seo_title'] = "" data_seo['seo_description'] = "" data_seo['seo_url'] = "" data_seo['seo_url_hash'] = "" data_seo['seo_image'] = "" data_seo['seo_price_amount'] = 0 data_seo['seo_price_currency'] = "" data_seo['seo_product_band'] = "" data_seo['seo_product_availability'] = "" data_seo['seo_product_category'] = "" data_seo['seo_product_condition'] = "" data_seo['seo_product_retailer_item_id'] = 0 data_seo['seo_product_robots'] = "" sql = f""" select id from {self.config.get('crawler_schema')}.{self.config.get('product_tab')} where rce_source_product_id = {raw_data['id']} and rce_source_id = {self.rce_source_id} """ self.cur.execute(sql) data_seo['rce_product_id'] = self.cur.fetchone()[0] try: data_seo['seo_title'] = translate_text_to_english(raw_data['seo']['og:title']).replace("'","") except: pass try: data_seo['seo_description'] = translate_text_to_english(raw_data['seo']['og:description']).replace("'","") except: pass try: data_seo['seo_url'] = str(raw_data['seo']['og:url']).replace("'","") except: pass try: data_seo['seo_image'] = str(raw_data['seo']['og:image']).replace("'","") except: pass try: data_seo['seo_price_amount'] = raw_data['seo']['price:amount'] except: pass try: data_seo['seo_price_currency'] = str(raw_data['seo']['price:currency']).replace("'","") except: pass try: data_seo['seo_product_band'] = translate_text_to_english(raw_data['seo']['product:band']).replace("'","") except: pass try: data_seo['seo_product_availability'] = str(raw_data['seo']['product:availability']).replace("'","") except: pass try: data_seo['seo_product_category'] = translate_text_to_english(raw_data['seo']['product:category']).replace("'","") except: pass try: data_seo['seo_product_condition'] = translate_text_to_english(raw_data['seo']['product:condition']).replace("'","") except: pass try: data_seo['seo_product_retailer_item_id'] = raw_data['seo']['product:retailer_item_id'] except: pass try: data_seo['seo_product_robots'] = raw_data['seo']['product:robots'] except: pass try: self.db_writer.rce_seo(data_seo) except Exception as e: logging.info(e) except: pass