2024-02-01 06:42:37 +00:00
import logging
import psycopg2
###### Looger ######
format = " %(asctime)s : %(message)s "
logging . basicConfig ( format = format , level = logging . INFO , datefmt = " % Y- % m- %d % H: % M: % S " )
class fb_db_writer :
def __init__ ( self , config ) :
self . config = config
self . conn = psycopg2 . connect ( database = self . config . get ( ' database ' ) , user = self . config . get ( ' db_user ' ) , password = self . config . get ( ' db_pass ' ) , host = self . config . get ( ' db_host ' ) , port = self . config . get ( ' db_port ' ) )
self . conn . autocommit = True
self . cur = self . conn . cursor ( )
def __del__ ( self ) :
logging . info ( " Closing connection..... " )
self . conn . close ( )
def get_id ( self , schema , table ) :
sql = f """
select max ( id ) from { schema } . { table }
"""
self . cur . execute ( sql )
res = self . cur . fetchone ( )
if res [ 0 ] != None :
id = res [ 0 ] + 1
else :
id = 1
return id
def get_aud_id ( self , schema , table ) :
sql = f """
select max ( auditid ) from { schema } . { table }
"""
self . cur . execute ( sql )
res = self . cur . fetchone ( )
if res [ 0 ] != None :
id = res [ 0 ] + 1
else :
id = 1
return id
def rce_product ( self , data ) :
sql = ( " select rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images, "
" product_description,product_price_min,product_price_max,ships_from from " ) + self . config . get ( ' crawler_schema ' ) + " . " + self . config . get ( ' product_tab ' ) + ( " "
" where product_page_url_hash = ' " ) + str ( data [ ' product_page_url_hash ' ] ) + " ' "
self . cur . execute ( sql )
res = self . cur . fetchone ( )
id_main = self . get_id ( self . config . get ( ' crawler_schema ' ) , self . config . get ( ' product_tab ' ) )
id_aud = self . get_aud_id ( self . config . get ( ' crawler_schema ' ) , " aud_ " + self . config . get ( ' product_tab ' ) )
if not res :
sql = f """
insert into { self . config . get ( ' crawler_schema ' ) } . { self . config . get ( ' product_tab ' ) } ( id , rce_source_id , product_page_url , product_page_url_hash , rce_category_id , rce_store_id , rce_source_product_name , product_images , product_description , product_price_min , product_price_max , ships_from )
values ( { id_main } , { data [ ' rce_source_id ' ] } , ' {data['product_page_url']} ' , ' {data['product_page_url_hash']} ' , { data [ ' rce_category_id ' ] } , { data [ ' rce_store_id ' ] } ,
' {data['rce_source_product_name']} ' , ' {data['product_images']} ' , ' {data['product_description']} ' , { data [ ' product_price_min ' ] } ,
{ data [ ' product_price_max ' ] } , ' {data['ships_from']} ' )
"""
logging . info ( sql )
self . cur . execute ( sql )
sql = f """
insert into { self . config . get ( ' crawler_schema ' ) } . aud_ { self . config . get ( ' product_tab ' ) } ( auditid , id , rce_source_id , product_page_url , product_page_url_hash ,
rce_category_id , rce_store_id , rce_source_product_name , product_images , product_description , product_price_min , product_price_max , ships_from , createdat , updatedat )
select { id_aud } , id , rce_source_id , product_page_url , product_page_url_hash , rce_category_id , rce_store_id , rce_source_product_name , product_images ,
product_description , product_price_min , product_price_max , ships_from , createdat , updatedat from { self . config . get ( ' crawler_schema ' ) } . { self . config . get ( ' product_tab ' ) }
where product_page_url_hash = ' {data['product_page_url_hash']} '
"""
logging . info ( sql )
self . cur . execute ( sql )
else :
if ( data [ ' rce_source_id ' ] == res [ 0 ] and data [ ' product_page_url ' ] == res [ 1 ] and data [ ' product_page_url_hash ' ] == res [ 2 ]
and data [ ' rce_category_id ' ] == res [ 3 ] and data [ ' rce_store_id ' ] == res [ 4 ] and data [ ' rce_source_product_name ' ] == res [ 5 ]
and data [ ' product_images ' ] == res [ 6 ] and data [ ' product_description ' ] == res [ 7 ] and str ( data [ ' product_price_min ' ] ) == res [ 8 ]
and str ( data [ ' product_price_max ' ] == res [ 9 ] ) and data [ ' ships_from ' ] == res [ 10 ] ) :
sql = " update " + self . config . get ( ' crawler_schema ' ) + " . " + self . config . get ( ' product_tab ' ) + " set updatedat=GETDATE() " \
" where product_page_url_hash = ' " + str ( data [ ' product_page_url_hash ' ] ) + " ' "
logging . info ( sql )
self . cur . execute ( sql )
sql = " update " + self . config . get ( ' crawler_schema ' ) + " .aud_ " + self . config . get ( ' product_tab ' ) + " a set updatedat=b.updatedat " \
" from " + self . config . get ( ' crawler_schema ' ) + " . " + self . config . get ( ' product_tab ' ) + " b where a.id=b.id and b.id = " + str ( res [ 0 ] )
logging . info ( sql )
self . cur . execute ( sql )
else :
sql = f """
update { self . config . get ( ' crawler_schema ' ) } . { self . config . get ( ' product_tab ' ) } set rce_source_id = { data [ ' rce_source_id ' ] } , product_page_url = ' {data['product_page_url']} ' ,
product_page_url_hash = ' {data['product_page_url_hash']} ' , rce_category_id = { data [ ' rce_category_id ' ] } , rce_store_id = { data [ ' rce_store_id ' ] } , rce_source_product_name = ' {data['rce_source_product_name']} ' ,
product_images = ' {data['product_images']} ' , product_description = ' {data['product_description']} ' , product_price_min = { data [ ' product_price_min ' ] } , product_price_max = { data [ ' product_price_max ' ] } ,
2024-02-05 10:52:23 +00:00
ships_from = ' {data['ships_from']} ' , updatedat = GETDATE ( ) where product_page_url_hash = ' {data['product_page_url_hash']} '
2024-02-01 06:42:37 +00:00
"""
logging . info ( sql )
self . cur . execute ( sql )
sql = f """
insert into { self . config . get ( ' crawler_schema ' ) } . aud_ { self . config . get ( ' product_tab ' ) } ( auditid , id , rce_source_id , product_page_url , product_page_url_hash ,
rce_category_id , rce_store_id , rce_source_product_name , product_images , product_description , product_price_min , product_price_max , ships_from , createdat , updatedat )
select { id_aud } , id , rce_source_id , product_page_url , product_page_url_hash , rce_category_id , rce_store_id , rce_source_product_name , product_images ,
product_description , product_price_min , product_price_max , ships_from , createdat , updatedat from { self . config . get ( ' crawler_schema ' ) } . { self . config . get ( ' product_tab ' ) }
where product_page_url_hash = ' {data['product_page_url_hash']} '
"""
logging . info ( sql )
self . cur . execute ( sql )
def rce_reseller ( self , data ) :
sql = " select rce_source_id,reseller_name from " + self . config . get ( ' crawler_schema ' ) + " . " + self . config . get ( ' reseller_tab ' ) + " where reseller_name = ' " + str ( data [ ' reseller_name ' ] ) + " ' "
self . cur . execute ( sql )
res = self . cur . fetchone ( )
id_main = self . get_id ( self . config . get ( ' crawler_schema ' ) , self . config . get ( ' reseller_tab ' ) )
id_aud = self . get_aud_id ( self . config . get ( ' crawler_schema ' ) , " aud_ " + self . config . get ( ' reseller_tab ' ) )
if not res :
sql = f """
insert into { self . config . get ( ' crawler_schema ' ) } . { self . config . get ( ' reseller_tab ' ) } ( id , rce_source_id , reseller_name )
values ( { id_main } , { data [ ' rce_source_id ' ] } , ' {data['reseller_name']} ' )
"""
logging . info ( sql )
self . cur . execute ( sql )
sql = f """
insert into { self . config . get ( ' crawler_schema ' ) } . aud_ { self . config . get ( ' reseller_tab ' ) } ( auditid , id , rce_source_id , reseller_name , createdat , updatedat )
select { id_aud } , id , rce_source_id , reseller_name , createdat , updatedat from { self . config . get ( ' crawler_schema ' ) } . { self . config . get ( ' reseller_tab ' ) }
"""
logging . info ( sql )
self . cur . execute ( sql )
else :
if data [ ' rce_source_id ' ] == res [ 0 ] and data [ ' reseller_name ' ] == res [ 1 ] :
sql = " update " + self . config . get ( ' crawler_schema ' ) + " . " + self . config . get ( ' reseller_tab ' ) + " set updatedat=GETDATE() " \
" where reseller_name = ' " + str ( res [ 1 ] ) + " ' "
logging . info ( sql )
self . cur . execute ( sql )
sql = " update " + self . config . get ( ' crawler_schema ' ) + " .aud_ " + self . config . get ( ' reseller_tab ' ) + " a set updatedat=b.updatedat " \
" from " + self . config . get ( ' crawler_schema ' ) + " . " + self . config . get ( ' reseller_tab ' ) + " b where a.id=b.id and b.id = " + str ( res [ 0 ] )
logging . info ( sql )
self . cur . execute ( sql )
else :
sql = f """
update { self . config . get ( ' crawler_schema ' ) } . { self . config . get ( ' reseller_tab ' ) } set rce_source_id = { data [ ' rce_source_id ' ] } , reseller_name = ' {data['reseller_name']} '
updatedat = GETDATE ( ) where reseller_name = ' {res[1]} '
"""
logging . info ( sql )
self . cur . execute ( sql )
sql = f """
insert into { self . config . get ( ' crawler_schema ' ) } . aud_ { self . config . get ( ' reseller_tab ' ) } ( auditid , id , rce_source_id , reseller_name )
select { id_aud } , id , rce_source_id , reseller_name from { self . config . get ( ' crawler_schema ' ) } . { self . config . get ( ' reseller_tab ' ) }
where reseller_name = ' {res[1]} '
"""
logging . info ( sql )
self . cur . execute ( sql )
def rce_reseller_store ( self , data ) :
sql = " select rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id from " + self . config . get ( ' crawler_schema ' ) + " . " + self . config . get ( ' reseller_store_tab ' ) + " where store_page_url_hash = ' " + str ( data [ ' store_page_url_hash ' ] ) + " ' "
self . cur . execute ( sql )
res = self . cur . fetchone ( )
id_main = self . get_id ( self . config . get ( ' crawler_schema ' ) , self . config . get ( ' reseller_store_tab ' ) )
id_aud = self . get_aud_id ( self . config . get ( ' crawler_schema ' ) , " aud_ " + self . config . get ( ' reseller_store_tab ' ) )
if not res :
sql = f """
insert into { self . config . get ( ' crawler_schema ' ) } . { self . config . get ( ' reseller_store_tab ' ) } ( id , rce_source_id , store_page_url , store_page_url_hash , store_location , rce_reseller_id )
values ( { str ( id_main ) } , { data [ ' rce_source_id ' ] } , ' {data['store_page_url']} ' , ' {data['store_page_url_hash']} ' , ' {data['store_location']} ' , { data [ ' rce_reseller_id ' ] } )
"""
logging . info ( sql )
self . cur . execute ( sql )
sql = f """
insert into { self . config . get ( ' crawler_schema ' ) } . aud_ { self . config . get ( ' reseller_store_tab ' ) } ( auditid , id , rce_source_id , store_page_url , store_page_url_hash , store_location , rce_reseller_id , createdat , updatedat )
select { id_aud } , id , rce_source_id , store_page_url , store_page_url_hash , store_location , rce_reseller_id , createdat , updatedat from { self . config . get ( ' crawler_schema ' ) } . { self . config . get ( ' reseller_store_tab ' ) }
where store_page_url_hash = ' {data['store_page_url_hash']} '
"""
logging . info ( sql )
self . cur . execute ( sql )
else :
if ( data [ ' rce_source_id ' ] == res [ 0 ] and data [ ' store_page_url ' ] == res [ 1 ] and
data [ ' store_page_url_hash ' ] == res [ 2 ] and data [ ' store_location ' ] == res [ 3 ] and
data [ ' rce_reseller_id ' ] == res [ 4 ] ) :
sql = " update " + self . config . get ( ' crawler_schema ' ) + " . " + self . config . get ( ' reseller_store_tab ' ) + " set updatedat=GETDATE() " \
" where store_page_url_hash = ' " + str ( res [ 2 ] ) + " ' "
logging . info ( sql )
self . cur . execute ( sql )
sql = " update " + self . config . get ( ' crawler_schema ' ) + " .aud_ " + self . config . get ( ' reseller_store_tab ' ) + " a set updatedat=b.updatedat " \
" from " + self . config . get ( ' crawler_schema ' ) + " . " + self . config . get ( ' reseller_store_tab ' ) + " b where a.id=b.id and b.id = " + str ( res [ 0 ] )
logging . info ( sql )
self . cur . execute ( sql )
else :
sql = f """
update { self . config . get ( ' crawler_schema ' ) } . { self . config . get ( ' reseller_store_tab ' ) } set rce_source_id = { data [ ' rce_source_id ' ] } ,
store_page_url = ' {data['store_page_url']} ' , store_page_url_hash = ' {data['store_page_url_hash']} ' , store_location = ' {data['store_location']} ' ,
2024-02-05 10:52:23 +00:00
rce_reseller_id = { data [ ' rce_reseller_id ' ] } , updatedat = GETDATE ( ) where store_page_url_hash = ' {data['store_page_url_hash']} '
2024-02-01 06:42:37 +00:00
"""
logging . info ( sql )
self . cur . execute ( sql )
sql = f """
insert into { self . config . get ( ' crawler_schema ' ) } . aud_ { self . config . get ( ' reseller_store_tab ' ) } ( auditid , id , rce_source_id , store_page_url , store_page_url_hash , store_location , rce_reseller_id , createdat , updatedat )
select { id_aud } , id , rce_source_id , store_page_url , store_page_url_hash , store_location , rce_reseller_id , createdat , updatedat from { self . config . get ( ' crawler_schema ' ) } . { self . config . get ( ' reseller_store_tab ' ) }
where store_page_url_hash = ' {data['store_page_url_hash']} '
"""
logging . info ( sql )
self . cur . execute ( sql )
# config = {
# "crawler_name": "raena_crawler_engine_shopee",
# "crawler_schema": "raena_spider_management",
# "category_tab": "test_rce_category",
# "tracker_tab": "crawler_tracker",
# "product_tab": "rce_product",
# "variant_tab": "rce_product_variant",
# "brand_tab": "rce_brand",
# "reseller_tab": "rce_reseller",
# "reseller_store_tab": "rce_reseller_store",
# "review_tab": "rce_ratings_reviews",
# "review_productmodels_tab": "rce_ratings_reviews_productmodels",
# "review_producttags_tab": "rce_ratings_reviews_producttags",
# "review_tags": "rce_tags",
# "source_tab": "rce_source",
# "product_per_category": "136",
# "source_category": "11043145",
# "db_user": "dbadmin",
# "db_pass": "5qCif6eyY3Kmg4z",
# "database": "analytics",
# "db_host": "redshift-cluster-1.cdqj58hfx4p7.ap-southeast-1.redshift.amazonaws.com",
# "db_port": "5439",
# "crawler_main": "1",
# "crawler_slave_no": ""
# }
#
# db_writer = shopee_db_writer(config)
# id = db_writer.get_id(config.get("crawler_schema"), config.get("category_tab"))
#
# print(id)
#
# id = db_writer.get_id(config.get('crawler_schema'), "aud_"+config.get('category_tab'))
#
# print(id)