raena-crawler-engine/amazon_crawler_engine/amazon_products.py

517 lines
21 KiB
Python
Executable File

import hashlib
import logging
import random
import sys
import string
import undetected_chromedriver as webdriver
from selenium.webdriver.common.by import By
import psycopg2
import time
import re
from amazon_db_writer import amazon_db_writer
from datetime import datetime
from pyvirtualdisplay import Display
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
class amazon_products:
def __init__(self, config):
self.config = config
self.crawler_name = self.config.get("crawler_name")
self.pattern = r'[' + string.punctuation + ']'
self.conn = psycopg2.connect(database=self.config.get('database'), user=self.config.get('db_user'), password=self.config.get('db_pass'), host=self.config.get('db_host'), port=self.config.get('db_port'))
self.conn.autocommit = True
self.cur = self.conn.cursor()
self.cur.execute("select id from "+self.config.get('crawler_schema')+"."+self.config.get('source_tab')+" where source_name='Amazon'")
self.rce_source_id = self.cur.fetchone()[0]
self.cur.execute("select * from "+self.config.get('crawler_schema')+"."+self.config.get('tracker_tab')+" where crawler_name='raena_crawler_enginer_amazon' and flag=0 order by id")
self.items = self.cur.fetchall()
self.db_writer = amazon_db_writer(config)
#self.display = Display(visible=0, size=(800, 600))
#self.display.start()
def __del__(self):
print("Closing connection.....")
self.conn.close()
#self.display.stop()
def start_processing(self):
count = 0
for item in self.items:
count += 1
try:
logging.info("============== Getting info for {}/{}: {} ================".format(str(count),str(len(self.items)),str(item)))
start = datetime.now()
self.get_product_info(item)
end = datetime.now()
logging.info('Total time taken to fetch the product: {}'.format(str(end-start)))
except Exception as e:
print(e)
def reseller_info(self, driver):
try:
store_urls = []
try:
driver.find_element(By.CSS_SELECTOR, '.a-icon.a-icon-arrow.a-icon-small.arrow-icon').click()
time.sleep(5)
offers = driver.find_elements(By.CSS_SELECTOR, '#aod-offer-soldBy')
for offer in offers:
try:
store_url = offer.find_element(By.CSS_SELECTOR, '.a-fixed-left-grid-col.a-col-right').find_element(By.TAG_NAME, 'a').get_attribute('href')
store_urls.append(store_url)
except:
pass
except:
try:
store_url = driver.find_element(By.CSS_SELECTOR, '#sellerProfileTriggerId').get_attribute('href')
store_urls.append(store_url)
except:
pass
pass
if store_urls:
store_urls = list(set(store_urls))
return_item = ""
flag = 0
for store_url in store_urls:
driver.get(store_url)
driver.implicitly_wait(5)
##### reseller info
data_reseller = {}
data_reseller['rce_source_id'] = self.rce_source_id
data_reseller['rce_source_reseller_status'] = 1
data_reseller['reseller_name'] = ""
data_reseller['reseller_average_rating'] = 0.0
data_reseller['reseller_description'] = ""
try:
data_reseller['reseller_name'] = driver.find_element(By.CSS_SELECTOR,'#seller-name').text
data_reseller['reseller_name'] = data_reseller['reseller_name'].replace("'","")
except:
pass
try:
data_reseller['reseller_average_rating'] = float(driver.find_element(By.CSS_SELECTOR,'#effective-timeperiod-rating-year-description').text)
except:
try:
data_reseller['reseller_average_rating'] = float(driver.find_element(By.CSS_SELECTOR,'#effective-timeperiod-rating-year-description').text)
except:
pass
pass
try:
data_reseller['reseller_description'] = driver.find_element(By.CSS_SELECTOR, '#spp-expander-about-seller .a-row').text
data_reseller['reseller_description'] = data_reseller['reseller_description'].replace("'","")
except:
pass
try:
self.db_writer.rce_reseller(data_reseller)
except Exception as e:
logging.info(e)
##### Store info
data_reseller_store = {}
data_reseller_store['rce_source_store_status'] = 1
data_reseller_store['store_page_url'] = store_url
data_reseller_store['store_page_url_hash'] = hashlib.md5(data_reseller_store['store_page_url'].encode('utf-8')).hexdigest()
data_reseller_store['store_location'] = ""
data_reseller_store['rce_reseller_id'] = ""
data_reseller_store['rce_source_id'] = self.rce_source_id
try:
self.cur.execute("select id from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_tab')+" where reseller_name = '"+str(data_reseller['reseller_name'])+"'")
rce_reseller_id = self.cur.fetchone()
data_reseller_store['rce_reseller_id'] = rce_reseller_id[0]
if flag == 0:
return_item = data_reseller_store['rce_reseller_id']
flag = 1
except:
pass
try:
self.db_writer.rce_reseller_store(data_reseller_store)
except Exception as e:
logging.info(e)
time.sleep(2)
else:
##### reseller info
data_reseller = {}
data_reseller['rce_source_id'] = self.rce_source_id
data_reseller['rce_source_reseller_status'] = 1
data_reseller['reseller_name'] = "Amazon.ae"
data_reseller['reseller_average_rating'] = 0.0
data_reseller['reseller_description'] = ""
try:
self.db_writer.rce_reseller(data_reseller)
except Exception as e:
logging.info(e)
##### Store info
data_reseller_store = {}
data_reseller_store['rce_source_store_status'] = 1
data_reseller_store['store_page_url'] = "amazon.ae"
data_reseller_store['store_page_url_hash'] = hashlib.md5(data_reseller_store['store_page_url'].encode('utf-8')).hexdigest()
data_reseller_store['store_location'] = ""
data_reseller_store['rce_reseller_id'] = ""
data_reseller_store['rce_source_id'] = self.rce_source_id
try:
self.cur.execute("select id from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_tab')+" where reseller_name = '"+str(data_reseller['reseller_name'])+"'")
rce_reseller_id = self.cur.fetchone()
data_reseller_store['rce_reseller_id'] = rce_reseller_id[0]
return_item = data_reseller_store['rce_reseller_id']
except:
pass
try:
self.db_writer.rce_reseller_store(data_reseller_store)
except Exception as e:
logging.info(e)
return return_item
except Exception as e:
print(e)
def brand_info(self, driver):
data_brand = {}
data_brand['rce_source_id'] = self.rce_source_id
data_brand['rce_source_brand_status'] = 1
data_brand['brand_page_url'] = ""
data_brand['brand_page_url_hash'] = ""
data_brand['brand_name'] = ""
try:
data_brand['brand_page_url'] = driver.find_element(By.CSS_SELECTOR, '#bylineInfo').get_attribute('href')
data_brand['brand_page_url_hash'] = hashlib.md5(data_brand['brand_page_url'].encode('utf-8')).hexdigest()
try:
data_brand['brand_name'] = driver.find_element(By.CSS_SELECTOR, '.po-brand .po-break-word').text
except:
pass
try:
self.db_writer.rce_brand(data_brand)
except Exception as e:
logging.info(e)
return data_brand['brand_name']
except:
pass
def product_info(self, driver, category, keyword, url, url_hash, brand_name, rce_reseller_id):
data_product = {}
data_product['rce_source_product_id'] = 0
data_product['rce_source_id'] = self.rce_source_id
data_product['rce_source_product_status'] = 1
data_product['product_page_url'] = url.replace("'","''")
data_product['product_page_url_hash'] = url_hash
data_product['rce_category_id'] = category
data_product['rce_brand_id'] = ""
data_product['rce_store_id'] = ""
data_product['rce_source_product_name'] = ""
data_product['product_images'] = ""
data_product['product_description'] = ""
data_product['product_sold_total'] = 0
data_product['product_sold'] = 0
data_product['product_price_min'] = ""
data_product['product_price_min_before_discount'] =""
data_product['product_price_max'] = ""
data_product['product_price_max_before_discount'] = ""
data_product['ratings'] = 0.0
data_product['product_section'] = keyword
try:
sql = "select id from "+self.config.get('crawler_schema')+"."+self.config.get('brand_tab')+" where brand_name = '"+str(brand_name)+"'"
self.cur.execute(sql)
data_product['rce_brand_id'] = self.cur.fetchone()[0]
except: pass
try:
sql = "select id from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_store_tab')+" where rce_reseller_id = "+str(rce_reseller_id)+""
self.cur.execute(sql)
data_product['rce_store_id'] = self.cur.fetchone()[0]
except: pass
try:
rce_source_product_name = driver.find_element(By.CSS_SELECTOR,'#productTitle').text
data_product['rce_source_product_name'] = str(re.sub(self.pattern, '', rce_source_product_name)).replace("'","''")
except: pass
try:
product_images_element = driver.find_element(By.CSS_SELECTOR, '#magnifierLens')
product_images_raw = product_images_element.find_elements(By.TAG_NAME, 'img')
product_images = []
for product_image in product_images_raw:
url = product_image.get_attribute('src')
product_images.append(url)
data_product['product_images'] = str(product_images)
except: pass
try:
description = ""
des_rank = ""
try:
des_raws = driver.find_element(By.CSS_SELECTOR, '.a-unordered-list.a-vertical.a-spacing-mini').find_elements(By.CSS_SELECTOR, '.a-list-item')
for des_raw in des_raws:
try:
des = des_raw.text
description += des
except:
pass
except:
pass
try:
des_rank = driver.find_element(By.XPATH, '/html/body/div[2]/div/div[6]/div[24]/div/ul[1]').find_element(By.CSS_SELECTOR, '.a-list-item').text
except:
pass
data_product['product_description'] = description+des_rank
except:
pass
try:
price_whole = driver.find_element(By.CSS_SELECTOR, '.reinventPricePriceToPayMargin .a-price-whole').text
price_fraction = driver.find_element(By.CSS_SELECTOR, '.reinventPricePriceToPayMargin .a-price-fraction').text
price = price_whole+"."+price_fraction
data_product['product_price_min'] = price
data_product['product_price_max'] = price
except:
pass
try:
d_price_whole = driver.find_element(By.CSS_SELECTOR, '.reinventPricePriceToPayMargin .a-price-whole').text
d_price_fraction = driver.find_element(By.CSS_SELECTOR, '.reinventPricePriceToPayMargin .a-price-fraction').text
price = d_price_whole+"."+d_price_fraction
data_product['product_price_min'] = price
data_product['product_price_max'] = price
except:
pass
try:
data_product['product_price_min_before_discount'] = (driver.find_element(By.CSS_SELECTOR, '.a-text-price').text).replace('AED', '')
data_product['product_price_max_before_discount'] = data_product['product_price_min_before_discount']
except:
pass
try:
data_product['ratings'] = driver.find_element(By.CSS_SELECTOR, '#averageCustomerReviews .a-color-base').text
except:
pass
try:
self.db_writer.rce_product(data_product)
except Exception as e:
logging.info(e)
### rce_product_variant
try:
is_variant = driver.find_element(By.CSS_SELECTOR, '.a-unordered-list.a-nostyle.a-button-list.a-declarative.a-button-toggle-group.a-horizontal.a-spacing-top-micro.swatches.swatchesSquare.imageSwatches')
if is_variant:
variants = is_variant.find_elements(By.TAG_NAME, 'li')
#random.shuffle(variants)
for variant in variants:
variant.click()
data_variant = {}
data_variant['rce_source_variant_id'] = 0
data_variant['rce_product_id'] = ""
data_variant['product_variant_name'] = ""
data_variant['product_variant_price'] = ""
data_variant['product_variant_price_before_discount'] = ""
data_variant['product_variant_stock'] = 0
try:
sql = "select id from "+self.config.get('crawler_schema')+"."+self.config.get('product_tab')+" where rce_source_product_name = '"+str(data_product['rce_source_product_name'])+"'"
self.cur.execute(sql)
data_variant['rce_product_id'] = self.cur.fetchone()[0]
except:
pass
try:
product_variant_name = driver.find_element(By.CSS_SELECTOR,'#productTitle').text
data_variant['product_variant_name'] = str(re.sub(self.pattern, '', product_variant_name)).replace("'","''")
except: pass
try:
d_price_whole = driver.find_element(By.CSS_SELECTOR, '.reinventPricePriceToPayMargin .a-price-whole').text
d_price_fraction = driver.find_element(By.CSS_SELECTOR, '.reinventPricePriceToPayMargin .a-price-fraction').text
price = d_price_whole+"."+d_price_fraction
data_variant['product_variant_price'] = price
except:
pass
try:
data_variant['product_variant_price_before_discount'] = (driver.find_element(By.CSS_SELECTOR, '.a-text-price').text).replace('AED', '')
except:
pass
try:
self.db_writer.rce_product_variant(data_variant)
except Exception as e:
logging.info(e)
time.sleep(random.randint(2,5))
else:
logging.info('No variant found')
except:
logging.info('No variant found')
pass
def rating_info(self, driver, rce_reseller_id, url_hash):
try:
driver.find_element(By.CSS_SELECTOR, '#reviews-medley-footer .a-link-emphasis').click()
driver.implicitly_wait(5)
data_reviews = driver.find_elements(By.CSS_SELECTOR, '.a-section.review.aok-relative')
for data in data_reviews:
data_review = {}
data_review["id"] = ""
data_review["rce_product_id"] = ""
data_review["username"] = ""
data_review["review"] = ""
data_review["img_url"] = ""
data_review["review_like_count"] = 0
data_review["user_tier"] = ""
data_review["shop_id"] = 0
data_review["video_url"] = ""
data_review["rating"] = ""
try:
sql = "select max(id) from "+self.config.get('crawler_schema')+"."+self.config.get('review_tab')
self.cur.execute(sql)
rating_id = self.cur.fetchone()
if rating_id[0]==None:
rating_id = 1
else:
rating_id = int(rating_id[0]) + 1
data_review["id"] = rating_id
except:
pass
try:
sql = "select id from "+self.config.get('crawler_schema')+"."+self.config.get('product_tab')+" where product_page_url_hash = '"+str(url_hash)+"'"
self.cur.execute(sql)
data_review["rce_product_id"] = self.cur.fetchone()[0]
except: pass
try: data_review["username"] = data.find_element(By.CSS_SELECTOR, '.a-profile-name').text
except: pass
try:
data_review["review"] = data.find_element(By.CSS_SELECTOR, '.a-size-base.review-text.review-text-content').text
data_review["review"] = data_review["review"].replace("'","")
except: pass
try:
rating = data.find_element(By.CSS_SELECTOR, '.a-icon.a-icon-star.review-rating .a-icon-alt').get_attribute("textContent")
data_review["rating"] = rating.replace(' out of 5 stars', '')
except: pass
try:
sql = "select id from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_store_tab')+" where rce_reseller_id = "+str(rce_reseller_id)+""
self.cur.execute(sql)
data_review["shop_id"] = self.cur.fetchone()[0]
except: pass
try:
self.db_writer.rce_ratings_reviews(data_review)
except Exception as e:
logging.info(e)
except:
pass
def get_product_info(self,item):
try:
op = webdriver.ChromeOptions()
op.add_argument('--no-sandbox')
op.add_argument('--disable-notifications')
op.add_argument("--lang=en-GB")
op.add_argument('--user-data-dir=/home/ec2-user/chrome_cache/')
#op.headless = True
driver=webdriver.Chrome(options=op)
try:
driver.get('https://www.amazon.ae')
time.sleep(3)
except Exception as e:
print(e)
##### Reseller info #####
driver.get(item[4])
driver.implicitly_wait(5)
rce_reseller_id = self.reseller_info(driver)
##### Product Info #####
driver.get(item[4])
driver.implicitly_wait(5)
##### Brand Info
brand_name = self.brand_info(driver)
##### Product info
self.product_info(driver, item[2], item[3], item[4], item[5], brand_name, rce_reseller_id)
##### Rating Info #####
driver.get(item[4])
driver.implicitly_wait(5)
self.rating_info(driver, rce_reseller_id, item[5])
sql = f"""
update {self.config.get('crawler_schema')}.{self.config.get('tracker_tab')} set flag = 1 where product_page_url_hash='{item[5]}'
"""
self.cur.execute(sql)
driver.close()
except Exception as e:
print(e)
driver.close()