267 lines
14 KiB
Python
Executable File
267 lines
14 KiB
Python
Executable File
import logging
|
|
import psycopg2
|
|
|
|
###### Looger ######
|
|
format = "%(asctime)s: %(message)s"
|
|
logging.basicConfig(format=format, level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S")
|
|
|
|
class fb_db_writer:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.conn = psycopg2.connect(database=self.config.get('database'), user=self.config.get('db_user'), password=self.config.get('db_pass'), host=self.config.get('db_host'), port=self.config.get('db_port'))
|
|
self.conn.autocommit = True
|
|
self.cur = self.conn.cursor()
|
|
|
|
def __del__(self):
|
|
logging.info("Closing connection.....")
|
|
self.conn.close()
|
|
|
|
|
|
def get_id(self, schema, table):
|
|
sql = f"""
|
|
select max(id) from {schema}.{table}
|
|
"""
|
|
self.cur.execute(sql)
|
|
res = self.cur.fetchone()
|
|
|
|
if res[0]!=None:
|
|
id = res[0]+1
|
|
else:
|
|
id = 1
|
|
|
|
return id
|
|
|
|
def get_aud_id(self, schema, table):
|
|
sql = f"""
|
|
select max(auditid) from {schema}.{table}
|
|
"""
|
|
self.cur.execute(sql)
|
|
res = self.cur.fetchone()
|
|
|
|
if res[0]!=None:
|
|
id = res[0]+1
|
|
else:
|
|
id = 1
|
|
|
|
return id
|
|
|
|
def rce_product(self, data):
|
|
|
|
sql = ("select rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images,"
|
|
"product_description,product_price_min,product_price_max,ships_from from ")+self.config.get('crawler_schema')+"."+self.config.get('product_tab')+(" "
|
|
"where product_page_url_hash = '")+str(data['product_page_url_hash'])+"'"
|
|
self.cur.execute(sql)
|
|
res = self.cur.fetchone()
|
|
|
|
id_main = self.get_id(self.config.get('crawler_schema'), self.config.get('product_tab'))
|
|
id_aud = self.get_aud_id(self.config.get('crawler_schema'), "aud_"+self.config.get('product_tab'))
|
|
|
|
if not res:
|
|
sql = f"""
|
|
insert into {self.config.get('crawler_schema')}.{self.config.get('product_tab')}(id,rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images,product_description,product_price_min,product_price_max,ships_from)
|
|
values({id_main},{data['rce_source_id']},'{data['product_page_url']}','{data['product_page_url_hash']}',{data['rce_category_id']},{data['rce_store_id']},
|
|
'{data['rce_source_product_name']}','{data['product_images']}','{data['product_description']}',{data['product_price_min']},
|
|
{data['product_price_max']},'{data['ships_from']}')
|
|
"""
|
|
logging.info(sql)
|
|
|
|
self.cur.execute(sql)
|
|
|
|
sql = f"""
|
|
insert into {self.config.get('crawler_schema')}.aud_{self.config.get('product_tab')}(auditid,id,rce_source_id,product_page_url,product_page_url_hash,
|
|
rce_category_id,rce_store_id,rce_source_product_name,product_images,product_description,product_price_min,product_price_max,ships_from,createdat,updatedat)
|
|
select {id_aud},id,rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images,
|
|
product_description,product_price_min,product_price_max,ships_from,createdat,updatedat from {self.config.get('crawler_schema')}.{self.config.get('product_tab')}
|
|
where product_page_url_hash = '{data['product_page_url_hash']}'
|
|
"""
|
|
logging.info(sql)
|
|
self.cur.execute(sql)
|
|
else:
|
|
if (data['rce_source_id']==res[0] and data['product_page_url']==res[1] and data['product_page_url_hash']==res[2]
|
|
and data['rce_category_id']==res[3] and data['rce_store_id']==res[4] and data['rce_source_product_name']==res[5]
|
|
and data['product_images']==res[6] and data['product_description']==res[7] and str(data['product_price_min'])==res[8]
|
|
and str(data['product_price_max']==res[9]) and data['ships_from']==res[10]):
|
|
|
|
sql = "update "+self.config.get('crawler_schema')+"."+self.config.get('product_tab')+" set updatedat=GETDATE() " \
|
|
"where product_page_url_hash = '"+ str(data['product_page_url_hash'])+"'"
|
|
logging.info(sql)
|
|
self.cur.execute(sql)
|
|
|
|
sql = "update "+self.config.get('crawler_schema')+".aud_"+self.config.get('product_tab')+" a set updatedat=b.updatedat " \
|
|
"from "+self.config.get('crawler_schema')+"."+self.config.get('product_tab')+" b where a.id=b.id and b.id = "+str(res[0])
|
|
logging.info(sql)
|
|
self.cur.execute(sql)
|
|
else:
|
|
sql = f"""
|
|
update {self.config.get('crawler_schema')}.{self.config.get('product_tab')} set rce_source_id={data['rce_source_id']}, product_page_url='{data['product_page_url']}',
|
|
product_page_url_hash='{data['product_page_url_hash']}',rce_category_id={data['rce_category_id']},rce_store_id={data['rce_store_id']},rce_source_product_name='{data['rce_source_product_name']}',
|
|
product_images='{data['product_images']}',product_description='{data['product_description']}',product_price_min={data['product_price_min']},product_price_max={data['product_price_max']},
|
|
ships_from='{data['ships_from']}' where product_page_url_hash='{data['product_page_url_hash']}'
|
|
"""
|
|
logging.info(sql)
|
|
self.cur.execute(sql)
|
|
|
|
sql = f"""
|
|
insert into {self.config.get('crawler_schema')}.aud_{self.config.get('product_tab')}(auditid,id,rce_source_id,product_page_url,product_page_url_hash,
|
|
rce_category_id,rce_store_id,rce_source_product_name,product_images,product_description,product_price_min,product_price_max,ships_from,createdat,updatedat)
|
|
select {id_aud},id,rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images,
|
|
product_description,product_price_min,product_price_max,ships_from,createdat,updatedat from {self.config.get('crawler_schema')}.{self.config.get('product_tab')}
|
|
where product_page_url_hash = '{data['product_page_url_hash']}'
|
|
"""
|
|
logging.info(sql)
|
|
self.cur.execute(sql)
|
|
|
|
def rce_reseller(self, data):
|
|
|
|
sql = "select rce_source_id,reseller_name from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_tab')+" where reseller_name = '"+str(data['reseller_name'])+"'"
|
|
self.cur.execute(sql)
|
|
res = self.cur.fetchone()
|
|
|
|
id_main = self.get_id(self.config.get('crawler_schema'), self.config.get('reseller_tab'))
|
|
id_aud = self.get_aud_id(self.config.get('crawler_schema'), "aud_"+self.config.get('reseller_tab'))
|
|
|
|
|
|
if not res:
|
|
sql = f"""
|
|
insert into {self.config.get('crawler_schema')}.{self.config.get('reseller_tab')}(id, rce_source_id,reseller_name)
|
|
values({id_main}, {data['rce_source_id']},'{data['reseller_name']}')
|
|
"""
|
|
logging.info(sql)
|
|
|
|
self.cur.execute(sql)
|
|
|
|
sql = f"""
|
|
insert into {self.config.get('crawler_schema')}.aud_{self.config.get('reseller_tab')}(auditid, id, rce_source_id, reseller_name, createdat, updatedat)
|
|
select {id_aud}, id, rce_source_id, reseller_name, createdat, updatedat from {self.config.get('crawler_schema')}.{self.config.get('reseller_tab')}
|
|
"""
|
|
logging.info(sql)
|
|
|
|
self.cur.execute(sql)
|
|
|
|
else:
|
|
|
|
if data['rce_source_id']==res[0] and data['reseller_name']==res[1]:
|
|
|
|
sql = "update "+self.config.get('crawler_schema')+"."+self.config.get('reseller_tab')+" set updatedat=GETDATE() " \
|
|
"where reseller_name = '"+ str(res[1])+"'"
|
|
logging.info(sql)
|
|
self.cur.execute(sql)
|
|
|
|
sql = "update "+self.config.get('crawler_schema')+".aud_"+self.config.get('reseller_tab')+" a set updatedat=b.updatedat " \
|
|
"from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_tab')+" b where a.id=b.id and b.id = "+str(res[0])
|
|
logging.info(sql)
|
|
self.cur.execute(sql)
|
|
else:
|
|
sql = f"""
|
|
update {self.config.get('crawler_schema')}.{self.config.get('reseller_tab')} set rce_source_id={data['rce_source_id']}, reseller_name='{data['reseller_name']}'
|
|
updatedat=GETDATE() where reseller_name='{res[1]}'
|
|
"""
|
|
logging.info(sql)
|
|
self.cur.execute(sql)
|
|
|
|
sql = f"""
|
|
insert into {self.config.get('crawler_schema')}.aud_{self.config.get('reseller_tab')}(auditid,id,rce_source_id,reseller_name)
|
|
select {id_aud},id, rce_source_id,reseller_name from {self.config.get('crawler_schema')}.{self.config.get('reseller_tab')}
|
|
where reseller_name='{res[1]}'
|
|
"""
|
|
logging.info(sql)
|
|
|
|
self.cur.execute(sql)
|
|
|
|
def rce_reseller_store(self, data):
|
|
|
|
sql = "select rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_store_tab')+" where store_page_url_hash = '"+str(data['store_page_url_hash'])+"'"
|
|
self.cur.execute(sql)
|
|
res = self.cur.fetchone()
|
|
|
|
|
|
id_main = self.get_id(self.config.get('crawler_schema'), self.config.get('reseller_store_tab'))
|
|
id_aud = self.get_aud_id(self.config.get('crawler_schema'), "aud_"+self.config.get('reseller_store_tab'))
|
|
|
|
if not res:
|
|
sql = f"""
|
|
insert into {self.config.get('crawler_schema')}.{self.config.get('reseller_store_tab')}(id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id)
|
|
values({str(id_main)},{data['rce_source_id']},'{data['store_page_url']}','{data['store_page_url_hash']}','{data['store_location']}',{data['rce_reseller_id']})
|
|
"""
|
|
logging.info(sql)
|
|
|
|
self.cur.execute(sql)
|
|
|
|
sql = f"""
|
|
insert into {self.config.get('crawler_schema')}.aud_{self.config.get('reseller_store_tab')}(auditid,id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id,createdat,updatedat)
|
|
select {id_aud},id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id,createdat,updatedat from {self.config.get('crawler_schema')}.{self.config.get('reseller_store_tab')}
|
|
where store_page_url_hash = '{data['store_page_url_hash']}'
|
|
"""
|
|
logging.info(sql)
|
|
|
|
self.cur.execute(sql)
|
|
|
|
else:
|
|
|
|
if (data['rce_source_id']==res[0] and data['store_page_url']==res[1] and
|
|
data['store_page_url_hash']==res[2] and data['store_location']==res[3] and
|
|
data['rce_reseller_id']==res[4]):
|
|
|
|
sql = "update "+self.config.get('crawler_schema')+"."+self.config.get('reseller_store_tab')+" set updatedat=GETDATE() " \
|
|
"where store_page_url_hash = '"+ str(res[2])+"'"
|
|
logging.info(sql)
|
|
self.cur.execute(sql)
|
|
|
|
sql = "update "+self.config.get('crawler_schema')+".aud_"+self.config.get('reseller_store_tab')+" a set updatedat=b.updatedat " \
|
|
"from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_store_tab')+" b where a.id=b.id and b.id = "+str(res[0])
|
|
logging.info(sql)
|
|
self.cur.execute(sql)
|
|
else:
|
|
|
|
sql = f"""
|
|
update {self.config.get('crawler_schema')}.{self.config.get('reseller_store_tab')} set rce_source_id = {data['rce_source_id']},
|
|
store_page_url = '{data['store_page_url']}', store_page_url_hash = '{data['store_page_url_hash']}', store_location = '{data['store_location']}',
|
|
rce_reseller_id = {data['rce_reseller_id']}, updatedat=GETDATE()
|
|
"""
|
|
logging.info(sql)
|
|
self.cur.execute(sql)
|
|
|
|
sql = f"""
|
|
insert into {self.config.get('crawler_schema')}.aud_{self.config.get('reseller_store_tab')}(auditid,id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id,createdat,updatedat)
|
|
select {id_aud},id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id,createdat,updatedat from {self.config.get('crawler_schema')}.{self.config.get('reseller_store_tab')}
|
|
where store_page_url_hash = '{data['store_page_url_hash']}'
|
|
"""
|
|
logging.info(sql)
|
|
|
|
self.cur.execute(sql)
|
|
|
|
|
|
# config = {
|
|
# "crawler_name": "raena_crawler_engine_shopee",
|
|
# "crawler_schema": "raena_spider_management",
|
|
# "category_tab": "test_rce_category",
|
|
# "tracker_tab": "crawler_tracker",
|
|
# "product_tab": "rce_product",
|
|
# "variant_tab": "rce_product_variant",
|
|
# "brand_tab": "rce_brand",
|
|
# "reseller_tab": "rce_reseller",
|
|
# "reseller_store_tab": "rce_reseller_store",
|
|
# "review_tab": "rce_ratings_reviews",
|
|
# "review_productmodels_tab": "rce_ratings_reviews_productmodels",
|
|
# "review_producttags_tab": "rce_ratings_reviews_producttags",
|
|
# "review_tags": "rce_tags",
|
|
# "source_tab": "rce_source",
|
|
# "product_per_category": "136",
|
|
# "source_category": "11043145",
|
|
# "db_user": "dbadmin",
|
|
# "db_pass": "5qCif6eyY3Kmg4z",
|
|
# "database": "analytics",
|
|
# "db_host": "redshift-cluster-1.cdqj58hfx4p7.ap-southeast-1.redshift.amazonaws.com",
|
|
# "db_port": "5439",
|
|
# "crawler_main": "1",
|
|
# "crawler_slave_no": ""
|
|
# }
|
|
#
|
|
# db_writer = shopee_db_writer(config)
|
|
# id = db_writer.get_id(config.get("crawler_schema"), config.get("category_tab"))
|
|
#
|
|
# print(id)
|
|
#
|
|
# id = db_writer.get_id(config.get('crawler_schema'), "aud_"+config.get('category_tab'))
|
|
#
|
|
# print(id) |