raena-crawler-engine/fb_marketplace/fb_db_writer.py

267 lines
14 KiB
Python
Executable File

import logging
import psycopg2
###### Looger ######
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S")
class fb_db_writer:
def __init__(self, config):
self.config = config
self.conn = psycopg2.connect(database=self.config.get('database'), user=self.config.get('db_user'), password=self.config.get('db_pass'), host=self.config.get('db_host'), port=self.config.get('db_port'))
self.conn.autocommit = True
self.cur = self.conn.cursor()
def __del__(self):
logging.info("Closing connection.....")
self.conn.close()
def get_id(self, schema, table):
sql = f"""
select max(id) from {schema}.{table}
"""
self.cur.execute(sql)
res = self.cur.fetchone()
if res[0]!=None:
id = res[0]+1
else:
id = 1
return id
def get_aud_id(self, schema, table):
sql = f"""
select max(auditid) from {schema}.{table}
"""
self.cur.execute(sql)
res = self.cur.fetchone()
if res[0]!=None:
id = res[0]+1
else:
id = 1
return id
def rce_product(self, data):
sql = ("select rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images,"
"product_description,product_price_min,product_price_max,ships_from from ")+self.config.get('crawler_schema')+"."+self.config.get('product_tab')+(" "
"where product_page_url_hash = '")+str(data['product_page_url_hash'])+"'"
self.cur.execute(sql)
res = self.cur.fetchone()
id_main = self.get_id(self.config.get('crawler_schema'), self.config.get('product_tab'))
id_aud = self.get_aud_id(self.config.get('crawler_schema'), "aud_"+self.config.get('product_tab'))
if not res:
sql = f"""
insert into {self.config.get('crawler_schema')}.{self.config.get('product_tab')}(id,rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images,product_description,product_price_min,product_price_max,ships_from)
values({id_main},{data['rce_source_id']},'{data['product_page_url']}','{data['product_page_url_hash']}',{data['rce_category_id']},{data['rce_store_id']},
'{data['rce_source_product_name']}','{data['product_images']}','{data['product_description']}',{data['product_price_min']},
{data['product_price_max']},'{data['ships_from']}')
"""
logging.info(sql)
self.cur.execute(sql)
sql = f"""
insert into {self.config.get('crawler_schema')}.aud_{self.config.get('product_tab')}(auditid,id,rce_source_id,product_page_url,product_page_url_hash,
rce_category_id,rce_store_id,rce_source_product_name,product_images,product_description,product_price_min,product_price_max,ships_from,createdat,updatedat)
select {id_aud},id,rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images,
product_description,product_price_min,product_price_max,ships_from,createdat,updatedat from {self.config.get('crawler_schema')}.{self.config.get('product_tab')}
where product_page_url_hash = '{data['product_page_url_hash']}'
"""
logging.info(sql)
self.cur.execute(sql)
else:
if (data['rce_source_id']==res[0] and data['product_page_url']==res[1] and data['product_page_url_hash']==res[2]
and data['rce_category_id']==res[3] and data['rce_store_id']==res[4] and data['rce_source_product_name']==res[5]
and data['product_images']==res[6] and data['product_description']==res[7] and str(data['product_price_min'])==res[8]
and str(data['product_price_max']==res[9]) and data['ships_from']==res[10]):
sql = "update "+self.config.get('crawler_schema')+"."+self.config.get('product_tab')+" set updatedat=GETDATE() " \
"where product_page_url_hash = '"+ str(data['product_page_url_hash'])+"'"
logging.info(sql)
self.cur.execute(sql)
sql = "update "+self.config.get('crawler_schema')+".aud_"+self.config.get('product_tab')+" a set updatedat=b.updatedat " \
"from "+self.config.get('crawler_schema')+"."+self.config.get('product_tab')+" b where a.id=b.id and b.id = "+str(res[0])
logging.info(sql)
self.cur.execute(sql)
else:
sql = f"""
update {self.config.get('crawler_schema')}.{self.config.get('product_tab')} set rce_source_id={data['rce_source_id']}, product_page_url='{data['product_page_url']}',
product_page_url_hash='{data['product_page_url_hash']}',rce_category_id={data['rce_category_id']},rce_store_id={data['rce_store_id']},rce_source_product_name='{data['rce_source_product_name']}',
product_images='{data['product_images']}',product_description='{data['product_description']}',product_price_min={data['product_price_min']},product_price_max={data['product_price_max']},
ships_from='{data['ships_from']}', updatedat=GETDATE() where product_page_url_hash='{data['product_page_url_hash']}'
"""
logging.info(sql)
self.cur.execute(sql)
sql = f"""
insert into {self.config.get('crawler_schema')}.aud_{self.config.get('product_tab')}(auditid,id,rce_source_id,product_page_url,product_page_url_hash,
rce_category_id,rce_store_id,rce_source_product_name,product_images,product_description,product_price_min,product_price_max,ships_from,createdat,updatedat)
select {id_aud},id,rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images,
product_description,product_price_min,product_price_max,ships_from,createdat,updatedat from {self.config.get('crawler_schema')}.{self.config.get('product_tab')}
where product_page_url_hash = '{data['product_page_url_hash']}'
"""
logging.info(sql)
self.cur.execute(sql)
def rce_reseller(self, data):
sql = "select rce_source_id,reseller_name from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_tab')+" where reseller_name = '"+str(data['reseller_name'])+"'"
self.cur.execute(sql)
res = self.cur.fetchone()
id_main = self.get_id(self.config.get('crawler_schema'), self.config.get('reseller_tab'))
id_aud = self.get_aud_id(self.config.get('crawler_schema'), "aud_"+self.config.get('reseller_tab'))
if not res:
sql = f"""
insert into {self.config.get('crawler_schema')}.{self.config.get('reseller_tab')}(id, rce_source_id,reseller_name)
values({id_main}, {data['rce_source_id']},'{data['reseller_name']}')
"""
logging.info(sql)
self.cur.execute(sql)
sql = f"""
insert into {self.config.get('crawler_schema')}.aud_{self.config.get('reseller_tab')}(auditid, id, rce_source_id, reseller_name, createdat, updatedat)
select {id_aud}, id, rce_source_id, reseller_name, createdat, updatedat from {self.config.get('crawler_schema')}.{self.config.get('reseller_tab')}
"""
logging.info(sql)
self.cur.execute(sql)
else:
if data['rce_source_id']==res[0] and data['reseller_name']==res[1]:
sql = "update "+self.config.get('crawler_schema')+"."+self.config.get('reseller_tab')+" set updatedat=GETDATE() " \
"where reseller_name = '"+ str(res[1])+"'"
logging.info(sql)
self.cur.execute(sql)
sql = "update "+self.config.get('crawler_schema')+".aud_"+self.config.get('reseller_tab')+" a set updatedat=b.updatedat " \
"from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_tab')+" b where a.id=b.id and b.id = "+str(res[0])
logging.info(sql)
self.cur.execute(sql)
else:
sql = f"""
update {self.config.get('crawler_schema')}.{self.config.get('reseller_tab')} set rce_source_id={data['rce_source_id']}, reseller_name='{data['reseller_name']}'
updatedat=GETDATE() where reseller_name='{res[1]}'
"""
logging.info(sql)
self.cur.execute(sql)
sql = f"""
insert into {self.config.get('crawler_schema')}.aud_{self.config.get('reseller_tab')}(auditid,id,rce_source_id,reseller_name)
select {id_aud},id, rce_source_id,reseller_name from {self.config.get('crawler_schema')}.{self.config.get('reseller_tab')}
where reseller_name='{res[1]}'
"""
logging.info(sql)
self.cur.execute(sql)
def rce_reseller_store(self, data):
sql = "select rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_store_tab')+" where store_page_url_hash = '"+str(data['store_page_url_hash'])+"'"
self.cur.execute(sql)
res = self.cur.fetchone()
id_main = self.get_id(self.config.get('crawler_schema'), self.config.get('reseller_store_tab'))
id_aud = self.get_aud_id(self.config.get('crawler_schema'), "aud_"+self.config.get('reseller_store_tab'))
if not res:
sql = f"""
insert into {self.config.get('crawler_schema')}.{self.config.get('reseller_store_tab')}(id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id)
values({str(id_main)},{data['rce_source_id']},'{data['store_page_url']}','{data['store_page_url_hash']}','{data['store_location']}',{data['rce_reseller_id']})
"""
logging.info(sql)
self.cur.execute(sql)
sql = f"""
insert into {self.config.get('crawler_schema')}.aud_{self.config.get('reseller_store_tab')}(auditid,id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id,createdat,updatedat)
select {id_aud},id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id,createdat,updatedat from {self.config.get('crawler_schema')}.{self.config.get('reseller_store_tab')}
where store_page_url_hash = '{data['store_page_url_hash']}'
"""
logging.info(sql)
self.cur.execute(sql)
else:
if (data['rce_source_id']==res[0] and data['store_page_url']==res[1] and
data['store_page_url_hash']==res[2] and data['store_location']==res[3] and
data['rce_reseller_id']==res[4]):
sql = "update "+self.config.get('crawler_schema')+"."+self.config.get('reseller_store_tab')+" set updatedat=GETDATE() " \
"where store_page_url_hash = '"+ str(res[2])+"'"
logging.info(sql)
self.cur.execute(sql)
sql = "update "+self.config.get('crawler_schema')+".aud_"+self.config.get('reseller_store_tab')+" a set updatedat=b.updatedat " \
"from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_store_tab')+" b where a.id=b.id and b.id = "+str(res[0])
logging.info(sql)
self.cur.execute(sql)
else:
sql = f"""
update {self.config.get('crawler_schema')}.{self.config.get('reseller_store_tab')} set rce_source_id = {data['rce_source_id']},
store_page_url = '{data['store_page_url']}', store_page_url_hash = '{data['store_page_url_hash']}', store_location = '{data['store_location']}',
rce_reseller_id = {data['rce_reseller_id']}, updatedat=GETDATE() where store_page_url_hash = '{data['store_page_url_hash']}'
"""
logging.info(sql)
self.cur.execute(sql)
sql = f"""
insert into {self.config.get('crawler_schema')}.aud_{self.config.get('reseller_store_tab')}(auditid,id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id,createdat,updatedat)
select {id_aud},id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id,createdat,updatedat from {self.config.get('crawler_schema')}.{self.config.get('reseller_store_tab')}
where store_page_url_hash = '{data['store_page_url_hash']}'
"""
logging.info(sql)
self.cur.execute(sql)
# config = {
# "crawler_name": "raena_crawler_engine_shopee",
# "crawler_schema": "raena_spider_management",
# "category_tab": "test_rce_category",
# "tracker_tab": "crawler_tracker",
# "product_tab": "rce_product",
# "variant_tab": "rce_product_variant",
# "brand_tab": "rce_brand",
# "reseller_tab": "rce_reseller",
# "reseller_store_tab": "rce_reseller_store",
# "review_tab": "rce_ratings_reviews",
# "review_productmodels_tab": "rce_ratings_reviews_productmodels",
# "review_producttags_tab": "rce_ratings_reviews_producttags",
# "review_tags": "rce_tags",
# "source_tab": "rce_source",
# "product_per_category": "136",
# "source_category": "11043145",
# "db_user": "dbadmin",
# "db_pass": "5qCif6eyY3Kmg4z",
# "database": "analytics",
# "db_host": "redshift-cluster-1.cdqj58hfx4p7.ap-southeast-1.redshift.amazonaws.com",
# "db_port": "5439",
# "crawler_main": "1",
# "crawler_slave_no": ""
# }
#
# db_writer = shopee_db_writer(config)
# id = db_writer.get_id(config.get("crawler_schema"), config.get("category_tab"))
#
# print(id)
#
# id = db_writer.get_id(config.get('crawler_schema'), "aud_"+config.get('category_tab'))
#
# print(id)