539 lines
20 KiB
Python
539 lines
20 KiB
Python
import hashlib
|
|
import logging
|
|
import string
|
|
import re
|
|
import playwright
|
|
import psycopg2
|
|
from playwright.sync_api import sync_playwright
|
|
from hasaki_db_writer import hasaki_db_writer
|
|
import pandas as pd
|
|
from bs4 import BeautifulSoup
|
|
from Util import translate_text_to_english
|
|
from fake_useragent import UserAgent
|
|
import time
|
|
import random
|
|
from seleniumwire import webdriver
|
|
from selenium.webdriver.chrome.service import Service
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
import brotli
|
|
import json
|
|
|
|
class HasakiProductInfo:
|
|
def __init__(self, config):
|
|
logging.info("Initializing HasakiProductInfo")
|
|
self.pattern = r'[' + string.punctuation + ']'
|
|
self.config = config
|
|
self.crawler_name = self.config.get("crawler_name")
|
|
self.product_limit = int(self.config.get("product_per_category"))
|
|
self.conn = psycopg2.connect(database=self.config.get('database'), user=self.config.get('db_user'),
|
|
password=self.config.get('db_pass'), host=self.config.get('db_host'),
|
|
port=self.config.get('db_port'))
|
|
self.conn.autocommit = True
|
|
self.cur = self.conn.cursor()
|
|
self.cur.execute(
|
|
f"""select id from {self.config.get('crawler_schema')}.{self.config.get('source_tab')} where source_name='Hasaki'""")
|
|
try:
|
|
self.rce_source_id = self.cur.fetchone()[0]
|
|
except:
|
|
logging.info("Source tab is empty. Please check. Exiting.....")
|
|
exit(1)
|
|
|
|
self.db_writer = hasaki_db_writer(config)
|
|
|
|
def __del__(self):
|
|
print("Closing connection.....")
|
|
self.conn.close()
|
|
|
|
def start_processing(self):
|
|
logging.info("Starting to collect product info from Hasaki........")
|
|
|
|
logging.info("Fetching product list from DB......")
|
|
|
|
sql = f"""
|
|
select * from {self.config.get('crawler_schema')}.{self.config.get('tracker_tab')} where flag = 0
|
|
order by categoryid, product_section, product_rank
|
|
"""
|
|
|
|
self.cur.execute(sql)
|
|
rows = self.cur.fetchall()
|
|
logging.info("Found {} products.......".format(str(len(rows))))
|
|
cnt = 1
|
|
for row in rows:
|
|
logging.info("========= Fetching product info {}/{}: {} =========".format(str(cnt),str(len(rows)),row[3]))
|
|
|
|
try:
|
|
self.get_product_info(row)
|
|
except:
|
|
pass
|
|
|
|
sql = f"""
|
|
update {self.config.get('crawler_schema')}.{self.config.get('tracker_tab')} set flag = 1
|
|
where categoryid={row[9]} and product_section='{row[1]}' and product_rank={row[8]} and product_url='{row[3]}'
|
|
"""
|
|
logging.info(sql)
|
|
self.cur.execute(sql)
|
|
|
|
cnt += 1
|
|
|
|
#time.sleep(random.randint(7, 23))
|
|
|
|
|
|
def get_product_info(self, data):
|
|
|
|
raw_data = self.get_raw_product_data(data[3])
|
|
|
|
print(raw_data)
|
|
|
|
if raw_data:
|
|
self.product_info(data, raw_data)
|
|
|
|
self.rating_info(raw_data)
|
|
|
|
self.seo_info(raw_data)
|
|
|
|
def get_raw_product_data(self, url):
|
|
retries = 1
|
|
for _ in range(retries):
|
|
try:
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(
|
|
headless=False,
|
|
args=[
|
|
"--disable-dev-shm-usage",
|
|
"--disable-blink-features=AutomationControlled",
|
|
"--disable-component-extensions-with-background-pages"
|
|
]
|
|
)
|
|
ua = UserAgent(platforms='mobile')
|
|
random_mobile_ua = ua.random
|
|
logging.info("using user agent: {}".format(random_mobile_ua))
|
|
|
|
context = browser.new_context(user_agent=random_mobile_ua)
|
|
context.add_init_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
|
page = context.new_page()
|
|
|
|
api_requests = {}
|
|
|
|
try:
|
|
page.goto(url, timeout=5000)
|
|
time.sleep(1)
|
|
page.reload()
|
|
with page.expect_response("**/wap/v2/product/detail**") as response:
|
|
api_requests = response.value.json()
|
|
except playwright._impl._errors.TimeoutError:
|
|
logging.info("Timeout occurred. Retrying.....")
|
|
page.reload()
|
|
with page.expect_response("**/wap/v2/product/detail**") as response:
|
|
api_requests = response.value.json()
|
|
finally:
|
|
browser.close()
|
|
|
|
return api_requests
|
|
except Exception as e:
|
|
logging.error(f"An error occurred: {str(e)}")
|
|
logging.info("Retrying...")
|
|
|
|
api_requests = self.get_raw_product_data_selenium(url)
|
|
|
|
return api_requests
|
|
|
|
def get_raw_product_data_selenium(self, url):
|
|
ua = UserAgent(platforms='mobile')
|
|
random_mobile_ua = ua.random
|
|
logging.info("using user agent: {}".format(random_mobile_ua))
|
|
|
|
op = webdriver.ChromeOptions()
|
|
op.add_argument(f"user-agent={random_mobile_ua}")
|
|
op.add_experimental_option("useAutomationExtension", False)
|
|
op.add_argument('--no-sandbox')
|
|
op.add_argument('--disable-notifications')
|
|
op.add_argument("--lang=en-GB")
|
|
op.headless = False
|
|
|
|
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=op)
|
|
|
|
driver.get(url)
|
|
|
|
iteminfo = ""
|
|
|
|
for request in driver.requests:
|
|
if request.response:
|
|
if '/wap/v2/product/detail' in request.url:
|
|
encoding = request.response.headers.get('content-encoding')
|
|
# print(encoding)
|
|
if encoding:
|
|
iteminfo = brotli.decompress(request.response.body)
|
|
else:
|
|
iteminfo = request.response.body
|
|
|
|
driver.quit()
|
|
|
|
iteminfo_json = json.loads(iteminfo)
|
|
return iteminfo_json
|
|
|
|
def product_info(self, data, raw_data):
|
|
|
|
#region rce_brand
|
|
|
|
data_brand = {}
|
|
|
|
data_brand['rce_source_id'] = self.rce_source_id
|
|
data_brand['rce_source_brand_status'] = 1
|
|
data_brand['rce_source_brand_id'] = 0
|
|
data_brand['brand_page_url'] = ""
|
|
data_brand['brand_page_url_hash'] = ""
|
|
data_brand['brand_name'] = ""
|
|
data_brand['brand_following'] = ""
|
|
data_brand['brand_rating'] = ""
|
|
|
|
try:
|
|
|
|
data_brand['rce_source_brand_id'] = raw_data['brand']['id']
|
|
|
|
try:
|
|
data_brand['brand_page_url'] = "https://hasaki.vn/" + raw_data['brand']['url'] + ".html"
|
|
data_brand['brand_page_url'] = str(data_brand['brand_page_url']).replace("'","")
|
|
data_brand['brand_page_url_hash'] = hashlib.md5(data_brand['brand_page_url'].encode('utf-8')).hexdigest()
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
data_brand['brand_name'] = translate_text_to_english(str(raw_data['brand']['name']).replace("'",""))
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
data_brand['brand_following'] = raw_data['brand']['following']
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
data_brand['brand_rating'] = raw_data['brand']['rating']
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
self.db_writer.rce_brand(data_brand)
|
|
except Exception as e:
|
|
logging.info(e)
|
|
except:
|
|
pass
|
|
|
|
#endregion
|
|
|
|
#region rce_product
|
|
|
|
data_product = {}
|
|
|
|
try:
|
|
|
|
data_product['rce_source_product_id'] = raw_data['id']
|
|
data_product['rce_source_id'] = self.rce_source_id
|
|
data_product['rce_source_product_status'] = 1
|
|
data_product['product_page_url'] = str(raw_data['url']).replace("'","")
|
|
data_product['product_page_url_hash'] = hashlib.md5(data_product['product_page_url'].encode('utf-8')).hexdigest()
|
|
data_product['rce_category_id'] = data[9]
|
|
data_product['rce_store_id'] = 0
|
|
|
|
data_product['rce_source_product_name'] = str(raw_data['name']) + str(raw_data['alt_name'])
|
|
data_product['rce_source_product_name'] = translate_text_to_english(str(re.sub(self.pattern, '', data_product['rce_source_product_name'])))
|
|
data_product['rce_source_product_name'] = str(data_product['rce_source_product_name']).replace("'", "")
|
|
|
|
data_product['product_images'] = data[4]
|
|
|
|
data_product['product_description'] = ""
|
|
try:
|
|
|
|
description_raw = raw_data['description']
|
|
soup = BeautifulSoup(description_raw, 'html.parser')
|
|
data_product['product_description'] = translate_text_to_english(re.sub(self.pattern, '',soup.get_text()).replace("'",""))
|
|
data_product['product_description'] = str(data_product['product_description']).replace("'","")
|
|
except:
|
|
pass
|
|
|
|
data_product['rce_brand_id'] = ""
|
|
try:
|
|
sql = f"""
|
|
select id from {self.config.get('crawler_schema')}.{self.config.get('brand_tab')} where
|
|
rce_source_id = {self.rce_source_id} and rce_source_brand_id = {raw_data['brand']['id']}
|
|
"""
|
|
self.cur.execute(sql)
|
|
res = self.cur.fetchone()
|
|
data_product['rce_brand_id'] = res[0]
|
|
except:
|
|
pass
|
|
|
|
|
|
data_product['product_sold_total'] = 0
|
|
|
|
data_product['product_sold'] = 0
|
|
try:
|
|
data_product['product_sold'] = raw_data['bought']
|
|
except:
|
|
pass
|
|
|
|
data_product['product_price_min'] = 0
|
|
data_product['product_price_max'] = 0
|
|
try:
|
|
data_product['product_price_min'] = raw_data['int_final_price']
|
|
data_product['product_price_max'] = raw_data['int_final_price']
|
|
except:
|
|
pass
|
|
|
|
|
|
data_product['product_price_min_before_discount'] = 0
|
|
data_product['product_price_max_before_discount'] = 0
|
|
try:
|
|
market_price = raw_data['market_price']
|
|
market_price = re.sub(r'\D', '', market_price)
|
|
data_product['product_price_min_before_discount'] = market_price
|
|
data_product['product_price_max_before_discount'] = market_price
|
|
except:
|
|
pass
|
|
|
|
data_product['ratings'] = 0.0
|
|
try:
|
|
data_product['ratings'] = raw_data['rating']['avg_rate']
|
|
except:
|
|
pass
|
|
|
|
|
|
data_product['ships_from'] = ""
|
|
data_product['product_section'] = data[1]
|
|
data_product['countryoforigin'] = ""
|
|
data_product['rank'] = data[8]
|
|
|
|
try:
|
|
self.db_writer.rce_product(data_product)
|
|
except Exception as e:
|
|
logging.info(e)
|
|
|
|
#region rce_product_variant
|
|
|
|
variant_items = raw_data['attribute']['items']
|
|
|
|
df_variant = pd.DataFrame({}, columns=['product_variant_name', 'rce_source_variant_id', 'product_variant_price',
|
|
'product_variant_stock', 'product_variant_sku'])
|
|
|
|
data_variant = {}
|
|
for variant in variant_items:
|
|
for item in variant['options']:
|
|
data_variant['product_variant_name'] = item['long_label']
|
|
for product in item['products']:
|
|
data_variant['rce_source_variant_id'] = product['id']
|
|
data_variant['product_variant_price'] = product['price']
|
|
data_variant['product_variant_stock'] = product['quantity']
|
|
data_variant['product_variant_sku'] = product['sku']
|
|
|
|
# variants_arr.append(data_variant)
|
|
|
|
tmp = pd.DataFrame([[data_variant['product_variant_name'],
|
|
data_variant['rce_source_variant_id'],
|
|
data_variant['product_variant_price'],
|
|
data_variant['product_variant_stock'],
|
|
data_variant['product_variant_sku']]],
|
|
columns=['product_variant_name', 'rce_source_variant_id',
|
|
'product_variant_price',
|
|
'product_variant_stock', 'product_variant_sku'])
|
|
df_variant = pd.concat([df_variant, tmp])
|
|
|
|
df_variant_merged = df_variant.groupby('product_variant_sku').agg({
|
|
'product_variant_name': ' '.join,
|
|
'rce_source_variant_id': 'first',
|
|
'product_variant_price': 'first',
|
|
'product_variant_stock': 'first'
|
|
}).reset_index()
|
|
|
|
#print(df_variant_merged.to_string())
|
|
|
|
for index, row in df_variant_merged.iterrows():
|
|
try:
|
|
data_variant = {}
|
|
|
|
data_variant['rce_source_variant_id'] = row['rce_source_variant_id']
|
|
data_variant['product_variant_name'] = translate_text_to_english(row['product_variant_name'])
|
|
data_variant['product_variant_name'] = re.sub(self.pattern, '', data_variant['product_variant_name']).replace("'","")
|
|
data_variant['product_variant_price'] = row['product_variant_price']
|
|
data_variant['product_variant_price_before_discount'] = 0
|
|
data_variant['product_variant_stock'] = row['product_variant_stock']
|
|
data_variant['product_variant_sku'] = row['product_variant_sku']
|
|
|
|
data_variant['rce_product_id'] = ""
|
|
|
|
sql = f"""
|
|
select id from {self.config.get('crawler_schema')}.{self.config.get('product_tab')} where
|
|
rce_source_product_id = {data_product['rce_source_product_id']} and rce_source_id = {data_product['rce_source_id']}
|
|
"""
|
|
self.cur.execute(sql)
|
|
data_variant['rce_product_id'] = self.cur.fetchone()[0]
|
|
|
|
try:
|
|
self.db_writer.rce_product_variant(data_variant)
|
|
except Exception as e:
|
|
logging.info(e)
|
|
except:
|
|
pass
|
|
|
|
|
|
|
|
#endregion
|
|
|
|
except:
|
|
pass
|
|
|
|
#endregion
|
|
|
|
def rating_info(self, raw_data):
|
|
|
|
try:
|
|
|
|
reviews1 = []
|
|
reviews2 = []
|
|
|
|
try:
|
|
reviews1 = raw_data['short_rating_data']['image_reviews']
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
reviews2 = raw_data['short_rating_data']['reviews']
|
|
except:
|
|
pass
|
|
|
|
reviews = self.join_lists(reviews1, reviews2)
|
|
|
|
|
|
|
|
for review in reviews:
|
|
data_review = {}
|
|
|
|
data_review["rce_product_id"] = ""
|
|
data_review["username"] = ""
|
|
data_review["review"] = ""
|
|
data_review["img_url"] = ""
|
|
data_review["review_like_count"] = 0
|
|
data_review["user_tier"] = ""
|
|
data_review["shop_id"] = 0
|
|
data_review["video_url"] = ""
|
|
data_review["rating"] = ""
|
|
|
|
sql = f"""
|
|
select id from {self.config.get('crawler_schema')}.{self.config.get('product_tab')} where
|
|
rce_source_product_id = {raw_data['id']} and rce_source_id = {self.rce_source_id}
|
|
"""
|
|
self.cur.execute(sql)
|
|
data_review["rce_product_id"] = self.cur.fetchone()[0]
|
|
|
|
try:
|
|
data_review["username"] = str(review['user_fullname']).replace("'", "")
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
data_review["review"] = translate_text_to_english(review['content']).replace("'", "")
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
data_review["rating"] = review['rating']['star']
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
self.db_writer.rce_ratings_reviews(data_review)
|
|
except Exception as e:
|
|
logging.info(e)
|
|
except Exception as e:
|
|
logging.info(e)
|
|
|
|
def join_lists(self, list1, list2):
|
|
# Check if both lists are None
|
|
if list1 is None and list2 is None:
|
|
return None
|
|
# Check if one of the lists is None
|
|
elif list1 is None:
|
|
return list2
|
|
elif list2 is None:
|
|
return list1
|
|
else:
|
|
return list1 + list2
|
|
|
|
def seo_info(self, raw_data):
|
|
|
|
try:
|
|
data_seo = {}
|
|
|
|
data_seo['rce_product_id'] = 0
|
|
data_seo['rce_source_id'] = self.rce_source_id
|
|
data_seo['seo_title'] = ""
|
|
data_seo['seo_description'] = ""
|
|
data_seo['seo_url'] = ""
|
|
data_seo['seo_url_hash'] = ""
|
|
data_seo['seo_image'] = ""
|
|
data_seo['seo_price_amount'] = 0
|
|
data_seo['seo_price_currency'] = ""
|
|
data_seo['seo_product_band'] = ""
|
|
data_seo['seo_product_availability'] = ""
|
|
data_seo['seo_product_category'] = ""
|
|
data_seo['seo_product_condition'] = ""
|
|
data_seo['seo_product_retailer_item_id'] = 0
|
|
data_seo['seo_product_robots'] = ""
|
|
|
|
sql = f"""
|
|
select id from {self.config.get('crawler_schema')}.{self.config.get('product_tab')} where
|
|
rce_source_product_id = {raw_data['id']} and rce_source_id = {self.rce_source_id}
|
|
"""
|
|
self.cur.execute(sql)
|
|
data_seo['rce_product_id'] = self.cur.fetchone()[0]
|
|
|
|
try: data_seo['seo_title'] = translate_text_to_english(raw_data['seo']['og:title']).replace("'","")
|
|
except: pass
|
|
|
|
try: data_seo['seo_description'] = translate_text_to_english(raw_data['seo']['og:description']).replace("'","")
|
|
except: pass
|
|
|
|
try: data_seo['seo_url'] = str(raw_data['seo']['og:url']).replace("'","")
|
|
except: pass
|
|
|
|
try: data_seo['seo_image'] = str(raw_data['seo']['og:image']).replace("'","")
|
|
except: pass
|
|
|
|
try: data_seo['seo_price_amount'] = raw_data['seo']['price:amount']
|
|
except: pass
|
|
|
|
try: data_seo['seo_price_currency'] = str(raw_data['seo']['price:currency']).replace("'","")
|
|
except: pass
|
|
|
|
try: data_seo['seo_product_band'] = translate_text_to_english(raw_data['seo']['product:band']).replace("'","")
|
|
except: pass
|
|
|
|
try: data_seo['seo_product_availability'] = str(raw_data['seo']['product:availability']).replace("'","")
|
|
except: pass
|
|
|
|
try: data_seo['seo_product_category'] = translate_text_to_english(raw_data['seo']['product:category']).replace("'","")
|
|
except: pass
|
|
|
|
try: data_seo['seo_product_condition'] = translate_text_to_english(raw_data['seo']['product:condition']).replace("'","")
|
|
except: pass
|
|
|
|
try: data_seo['seo_product_retailer_item_id'] = raw_data['seo']['product:retailer_item_id']
|
|
except: pass
|
|
|
|
try: data_seo['seo_product_robots'] = raw_data['seo']['product:robots']
|
|
except: pass
|
|
|
|
try:
|
|
self.db_writer.rce_seo(data_seo)
|
|
except Exception as e:
|
|
logging.info(e)
|
|
|
|
except:
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|