import logging import psycopg2 ###### Looger ###### format = "%(asctime)s: %(message)s" logging.basicConfig(format=format, level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S") class fb_db_writer: def __init__(self, config): self.config = config self.conn = psycopg2.connect(database=self.config.get('database'), user=self.config.get('db_user'), password=self.config.get('db_pass'), host=self.config.get('db_host'), port=self.config.get('db_port')) self.conn.autocommit = True self.cur = self.conn.cursor() def __del__(self): logging.info("Closing connection.....") self.conn.close() def get_id(self, schema, table): sql = f""" select max(id) from {schema}.{table} """ self.cur.execute(sql) res = self.cur.fetchone() if res[0]!=None: id = res[0]+1 else: id = 1 return id def get_aud_id(self, schema, table): sql = f""" select max(auditid) from {schema}.{table} """ self.cur.execute(sql) res = self.cur.fetchone() if res[0]!=None: id = res[0]+1 else: id = 1 return id def rce_product(self, data): sql = ("select rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images," "product_description,product_price_min,product_price_max,ships_from from ")+self.config.get('crawler_schema')+"."+self.config.get('product_tab')+(" " "where product_page_url_hash = '")+str(data['product_page_url_hash'])+"'" self.cur.execute(sql) res = self.cur.fetchone() id_main = self.get_id(self.config.get('crawler_schema'), self.config.get('product_tab')) id_aud = self.get_aud_id(self.config.get('crawler_schema'), "aud_"+self.config.get('product_tab')) if not res: sql = f""" insert into {self.config.get('crawler_schema')}.{self.config.get('product_tab')}(id,rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images,product_description,product_price_min,product_price_max,ships_from) values({id_main},{data['rce_source_id']},'{data['product_page_url']}','{data['product_page_url_hash']}',{data['rce_category_id']},{data['rce_store_id']}, '{data['rce_source_product_name']}','{data['product_images']}','{data['product_description']}',{data['product_price_min']}, {data['product_price_max']},'{data['ships_from']}') """ logging.info(sql) self.cur.execute(sql) sql = f""" insert into {self.config.get('crawler_schema')}.aud_{self.config.get('product_tab')}(auditid,id,rce_source_id,product_page_url,product_page_url_hash, rce_category_id,rce_store_id,rce_source_product_name,product_images,product_description,product_price_min,product_price_max,ships_from,createdat,updatedat) select {id_aud},id,rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images, product_description,product_price_min,product_price_max,ships_from,createdat,updatedat from {self.config.get('crawler_schema')}.{self.config.get('product_tab')} where product_page_url_hash = '{data['product_page_url_hash']}' """ logging.info(sql) self.cur.execute(sql) else: if (data['rce_source_id']==res[0] and data['product_page_url']==res[1] and data['product_page_url_hash']==res[2] and data['rce_category_id']==res[3] and data['rce_store_id']==res[4] and data['rce_source_product_name']==res[5] and data['product_images']==res[6] and data['product_description']==res[7] and str(data['product_price_min'])==res[8] and str(data['product_price_max']==res[9]) and data['ships_from']==res[10]): sql = "update "+self.config.get('crawler_schema')+"."+self.config.get('product_tab')+" set updatedat=GETDATE() " \ "where product_page_url_hash = '"+ str(data['product_page_url_hash'])+"'" logging.info(sql) self.cur.execute(sql) sql = "update "+self.config.get('crawler_schema')+".aud_"+self.config.get('product_tab')+" a set updatedat=b.updatedat " \ "from "+self.config.get('crawler_schema')+"."+self.config.get('product_tab')+" b where a.id=b.id and b.id = "+str(res[0]) logging.info(sql) self.cur.execute(sql) else: sql = f""" update {self.config.get('crawler_schema')}.{self.config.get('product_tab')} set rce_source_id={data['rce_source_id']}, product_page_url='{data['product_page_url']}', product_page_url_hash='{data['product_page_url_hash']}',rce_category_id={data['rce_category_id']},rce_store_id={data['rce_store_id']},rce_source_product_name='{data['rce_source_product_name']}', product_images='{data['product_images']}',product_description='{data['product_description']}',product_price_min={data['product_price_min']},product_price_max={data['product_price_max']}, ships_from='{data['ships_from']}', updatedat=GETDATE() where product_page_url_hash='{data['product_page_url_hash']}' """ logging.info(sql) self.cur.execute(sql) sql = f""" insert into {self.config.get('crawler_schema')}.aud_{self.config.get('product_tab')}(auditid,id,rce_source_id,product_page_url,product_page_url_hash, rce_category_id,rce_store_id,rce_source_product_name,product_images,product_description,product_price_min,product_price_max,ships_from,createdat,updatedat) select {id_aud},id,rce_source_id,product_page_url,product_page_url_hash,rce_category_id,rce_store_id,rce_source_product_name,product_images, product_description,product_price_min,product_price_max,ships_from,createdat,updatedat from {self.config.get('crawler_schema')}.{self.config.get('product_tab')} where product_page_url_hash = '{data['product_page_url_hash']}' """ logging.info(sql) self.cur.execute(sql) def rce_reseller(self, data): sql = "select rce_source_id,reseller_name from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_tab')+" where reseller_name = '"+str(data['reseller_name'])+"'" self.cur.execute(sql) res = self.cur.fetchone() id_main = self.get_id(self.config.get('crawler_schema'), self.config.get('reseller_tab')) id_aud = self.get_aud_id(self.config.get('crawler_schema'), "aud_"+self.config.get('reseller_tab')) if not res: sql = f""" insert into {self.config.get('crawler_schema')}.{self.config.get('reseller_tab')}(id, rce_source_id,reseller_name) values({id_main}, {data['rce_source_id']},'{data['reseller_name']}') """ logging.info(sql) self.cur.execute(sql) sql = f""" insert into {self.config.get('crawler_schema')}.aud_{self.config.get('reseller_tab')}(auditid, id, rce_source_id, reseller_name, createdat, updatedat) select {id_aud}, id, rce_source_id, reseller_name, createdat, updatedat from {self.config.get('crawler_schema')}.{self.config.get('reseller_tab')} """ logging.info(sql) self.cur.execute(sql) else: if data['rce_source_id']==res[0] and data['reseller_name']==res[1]: sql = "update "+self.config.get('crawler_schema')+"."+self.config.get('reseller_tab')+" set updatedat=GETDATE() " \ "where reseller_name = '"+ str(res[1])+"'" logging.info(sql) self.cur.execute(sql) sql = "update "+self.config.get('crawler_schema')+".aud_"+self.config.get('reseller_tab')+" a set updatedat=b.updatedat " \ "from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_tab')+" b where a.id=b.id and b.id = "+str(res[0]) logging.info(sql) self.cur.execute(sql) else: sql = f""" update {self.config.get('crawler_schema')}.{self.config.get('reseller_tab')} set rce_source_id={data['rce_source_id']}, reseller_name='{data['reseller_name']}' updatedat=GETDATE() where reseller_name='{res[1]}' """ logging.info(sql) self.cur.execute(sql) sql = f""" insert into {self.config.get('crawler_schema')}.aud_{self.config.get('reseller_tab')}(auditid,id,rce_source_id,reseller_name) select {id_aud},id, rce_source_id,reseller_name from {self.config.get('crawler_schema')}.{self.config.get('reseller_tab')} where reseller_name='{res[1]}' """ logging.info(sql) self.cur.execute(sql) def rce_reseller_store(self, data): sql = "select rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_store_tab')+" where store_page_url_hash = '"+str(data['store_page_url_hash'])+"'" self.cur.execute(sql) res = self.cur.fetchone() id_main = self.get_id(self.config.get('crawler_schema'), self.config.get('reseller_store_tab')) id_aud = self.get_aud_id(self.config.get('crawler_schema'), "aud_"+self.config.get('reseller_store_tab')) if not res: sql = f""" insert into {self.config.get('crawler_schema')}.{self.config.get('reseller_store_tab')}(id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id) values({str(id_main)},{data['rce_source_id']},'{data['store_page_url']}','{data['store_page_url_hash']}','{data['store_location']}',{data['rce_reseller_id']}) """ logging.info(sql) self.cur.execute(sql) sql = f""" insert into {self.config.get('crawler_schema')}.aud_{self.config.get('reseller_store_tab')}(auditid,id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id,createdat,updatedat) select {id_aud},id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id,createdat,updatedat from {self.config.get('crawler_schema')}.{self.config.get('reseller_store_tab')} where store_page_url_hash = '{data['store_page_url_hash']}' """ logging.info(sql) self.cur.execute(sql) else: if (data['rce_source_id']==res[0] and data['store_page_url']==res[1] and data['store_page_url_hash']==res[2] and data['store_location']==res[3] and data['rce_reseller_id']==res[4]): sql = "update "+self.config.get('crawler_schema')+"."+self.config.get('reseller_store_tab')+" set updatedat=GETDATE() " \ "where store_page_url_hash = '"+ str(res[2])+"'" logging.info(sql) self.cur.execute(sql) sql = "update "+self.config.get('crawler_schema')+".aud_"+self.config.get('reseller_store_tab')+" a set updatedat=b.updatedat " \ "from "+self.config.get('crawler_schema')+"."+self.config.get('reseller_store_tab')+" b where a.id=b.id and b.id = "+str(res[0]) logging.info(sql) self.cur.execute(sql) else: sql = f""" update {self.config.get('crawler_schema')}.{self.config.get('reseller_store_tab')} set rce_source_id = {data['rce_source_id']}, store_page_url = '{data['store_page_url']}', store_page_url_hash = '{data['store_page_url_hash']}', store_location = '{data['store_location']}', rce_reseller_id = {data['rce_reseller_id']}, updatedat=GETDATE() where store_page_url_hash = '{data['store_page_url_hash']}' """ logging.info(sql) self.cur.execute(sql) sql = f""" insert into {self.config.get('crawler_schema')}.aud_{self.config.get('reseller_store_tab')}(auditid,id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id,createdat,updatedat) select {id_aud},id,rce_source_id,store_page_url,store_page_url_hash,store_location,rce_reseller_id,createdat,updatedat from {self.config.get('crawler_schema')}.{self.config.get('reseller_store_tab')} where store_page_url_hash = '{data['store_page_url_hash']}' """ logging.info(sql) self.cur.execute(sql) # config = { # "crawler_name": "raena_crawler_engine_shopee", # "crawler_schema": "raena_spider_management", # "category_tab": "test_rce_category", # "tracker_tab": "crawler_tracker", # "product_tab": "rce_product", # "variant_tab": "rce_product_variant", # "brand_tab": "rce_brand", # "reseller_tab": "rce_reseller", # "reseller_store_tab": "rce_reseller_store", # "review_tab": "rce_ratings_reviews", # "review_productmodels_tab": "rce_ratings_reviews_productmodels", # "review_producttags_tab": "rce_ratings_reviews_producttags", # "review_tags": "rce_tags", # "source_tab": "rce_source", # "product_per_category": "136", # "source_category": "11043145", # "db_user": "dbadmin", # "db_pass": "5qCif6eyY3Kmg4z", # "database": "analytics", # "db_host": "redshift-cluster-1.cdqj58hfx4p7.ap-southeast-1.redshift.amazonaws.com", # "db_port": "5439", # "crawler_main": "1", # "crawler_slave_no": "" # } # # db_writer = shopee_db_writer(config) # id = db_writer.get_id(config.get("crawler_schema"), config.get("category_tab")) # # print(id) # # id = db_writer.get_id(config.get('crawler_schema'), "aud_"+config.get('category_tab')) # # print(id)