raena-crawler-engine/tokopedia_crawler_engine/tokopedia_db_writer.py

61 lines
1.5 KiB
Python

from tokopedia_config import Config
from tokopedia_logger import logger
import psycopg2
class DBConnector:
def __init__(self):
config = Config().get()
self.host = config.get('db_host')
self.database = config.get('database')
self.user = config.get('db_user')
self.password = config.get('db_pass')
self.port = config.get('db_port')
self.dbconn = None
def create_connection(self):
return psycopg2.connect(
database=self.database,
user=self.user,
password=self.password,
host=self.host,
port=self.port
)
def __enter__(self):
self.dbconn = self.create_connection()
return self.dbconn
def __exit__(self, exc_type, exc_val, exc_tb):
self.dbconn.close()
class DB(object):
connection = None
def __new__(cls, *args, **kw):
if not hasattr(cls, '_instance'):
orig = super(DB, cls)
cls._instance = orig.__new__(cls, *args, **kw)
return cls._instance
def get_connection(self):
if not self.connection:
self.connection = DBConnector().create_connection()
return self.connection
def execute_query(self, query):
connection = self.get_connection()
connection.autocommit = True
try:
cursor = connection.cursor()
except psycopg2.ProgrammingError:
connection = self.get_connection()
cursor = connection.cursor()
cursor.execute(query)
return cursor
def fetchone(self, query):
return self.execute_query(query).fetchone()
def fetchall(self, query):
return self.execute_query(query).fetchall()