Added logic to insert fake data to mysql using Faker library.

This commit is contained in:
Kanthi Subramanian 2022-04-22 16:55:02 -04:00
parent e3e15fb238
commit 5c9c1cd55c
3 changed files with 98 additions and 3 deletions

21
tests/fake_data.py Normal file
View File

@ -0,0 +1,21 @@
from faker import Faker
class FakeData:
def get_fake_row(primary_num):
fake = Faker()
#emp_no,birth_date,first_name,last_name,gender,hire_date,salary,num_years,
# bonus,small_value,int_value,discount,num_years_signed,bonus_signed,
# small_value_signed,int_value_signed,last_modified_date_time,
# last_access_time,married_status,perDiemRate,hourlyRate,jobDescription,updated_time
row = (primary_num, fake.date(), fake.name()[:10], fake.name()[:10], 'M', fake.date(),
fake.unique.random_int(), fake.pyint(0, 10),
fake.unique.random_int(), fake.unique.random_int(),
fake.unique.random_int(), fake.unique.random_int(),
fake.unique.pyint(0, -10, -1), fake.unique.random_int(),
fake.unique.random_int(), fake.unique.random_int(),
fake.date_time(), fake.time(), 'M', fake.pyfloat(), fake.pyfloat(), fake.job(),
fake.date_time())
return row

View File

@ -27,14 +27,36 @@ class MySqlConnection:
return self.conn
def execute_sql(self, sql):
def get_column_names(self, sql):
column_names = ''
if self.conn.is_connected:
self.cursor = self.conn.cursor()
self.cursor.execute(sql)
for result in self.cursor:
print(result)
column_names = self.cursor.column_names
if (self.conn and self.conn.is_connected()):
self.conn.commit()
return column_names
def execute_sql(self, sql, data=None):
if self.conn.is_connected():
self.cursor = self.conn.cursor()
try:
if self.cursor:
self.cursor.execute(sql)
if data:
self.cursor.execute(sql, data)
else:
self.cursor.execute(sql)
for result in self.cursor:
print(result)

View File

@ -1,7 +1,7 @@
import unittest
from tests.mysql_connection import MySqlConnection
from fake_data import FakeData
class MyTestCase(unittest.TestCase):
@ -16,11 +16,63 @@ class MyTestCase(unittest.TestCase):
def tearDownClass(cls):
print("Teardown class")
def get_insert_sql_query(self, col_names, column_length):
values_template = ''
for i in range(1, column_length + 1):
values_template += f" %s, "
return f"insert into employees ({col_names}) values({values_template.rstrip(', ')})"
def test_something(self):
conn = MySqlConnection()
conn.create_connection()
conn.execute_sql("select * from employees limit 1")
col_names = conn.get_column_names('select * from employees limit 1')
# conn.close()
sql_query = self.get_insert_sql_query(','.join(col_names), len(col_names))
print(sql_query)
x = range(1, 1000000)
for n in x:
fake_row = FakeData.get_fake_row(n)
print(fake_row)
conn.execute_sql(sql_query, fake_row)
conn.close()
#
# while True:
# n += 1
#
# fake_row = FakeData.get_fake_row(n)
# print(fake_row)
def insert_fake_records(self):
conn = MySqlConnection()
conn.create_connection()
row = {}
n = 0
while True:
n += 1
fake_row = FakeData.get_fake_row(n)
print(fake_row)
conn.execute_sql()
#
# cursor.execute(' \
# INSERT INTO `people` (first_name, last_name, email, zipcode, city, country, birthdate) \
# VALUES ("%s", "%s", "%s", %s, "%s", "%s", "%s"); \
# ' % (row[0], row[1], row[2], row[3], row[4], row[5], row[6]))
#
# if n % 100 == 0:
# print("iteration %s" % n)
# time.sleep(0.5)
# conn.commit()
if __name__ == '__main__':
unittest.main()