61 lines
1.5 KiB
Python
61 lines
1.5 KiB
Python
from tokopedia_config import Config
|
|
from tokopedia_logger import logger
|
|
import psycopg2
|
|
|
|
class DBConnector:
|
|
def __init__(self):
|
|
config = Config().get()
|
|
self.host = config.get('db_host')
|
|
self.database = config.get('database')
|
|
self.user = config.get('db_user')
|
|
self.password = config.get('db_pass')
|
|
self.port = config.get('db_port')
|
|
self.dbconn = None
|
|
|
|
def create_connection(self):
|
|
return psycopg2.connect(
|
|
database=self.database,
|
|
user=self.user,
|
|
password=self.password,
|
|
host=self.host,
|
|
port=self.port
|
|
)
|
|
|
|
def __enter__(self):
|
|
self.dbconn = self.create_connection()
|
|
return self.dbconn
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.dbconn.close()
|
|
|
|
class DB(object):
|
|
connection = None
|
|
|
|
def __new__(cls, *args, **kw):
|
|
if not hasattr(cls, '_instance'):
|
|
orig = super(DB, cls)
|
|
cls._instance = orig.__new__(cls, *args, **kw)
|
|
return cls._instance
|
|
|
|
def get_connection(self):
|
|
if not self.connection:
|
|
self.connection = DBConnector().create_connection()
|
|
return self.connection
|
|
|
|
def execute_query(self, query):
|
|
connection = self.get_connection()
|
|
connection.autocommit = True
|
|
try:
|
|
cursor = connection.cursor()
|
|
except psycopg2.ProgrammingError:
|
|
connection = self.get_connection()
|
|
cursor = connection.cursor()
|
|
cursor.execute(query)
|
|
return cursor
|
|
|
|
def fetchone(self, query):
|
|
return self.execute_query(query).fetchone()
|
|
|
|
def fetchall(self, query):
|
|
return self.execute_query(query).fetchall()
|