Improved CSV import speed by several orders of magnitude.

This commit is contained in:
Eevee 2009-07-31 00:03:02 -07:00
parent 7566351ce1
commit e8ed55c297

View file

@ -27,8 +27,7 @@ def command_csvimport(engine_uri, directory='.'):
from sqlalchemy.orm.attributes import instrumentation_registry
# Use autocommit in case rows fail due to foreign key incest
session = connect(engine_uri, autocommit=True, autoflush=False)
session = connect(engine_uri)
metadata.create_all()
@ -74,11 +73,18 @@ def command_csvimport(engine_uri, directory='.'):
reader = csv.reader(csvfile, lineterminator='\n')
column_names = [unicode(column) for column in reader.next()]
# Self-referential tables may contain rows with foreign keys of
# other rows in the same table that do not yet exist. We'll keep
# a running list of these and try inserting them again after the
# rest are done
failed_rows = []
# Self-referential tables may contain rows with foreign keys of other
# rows in the same table that do not yet exist. Pull these out and add
# them to the session last
# ASSUMPTION: Self-referential tables have a single PK called "id"
deferred_rows = [] # ( row referring to id, [foreign ids we need] )
seen_ids = {} # primary key we've seen => 1
# Fetch foreign key columns that point at this table, if any
self_ref_columns = []
for column in table_obj.c:
if any(_.references(table_obj) for _ in column.foreign_keys):
self_ref_columns.append(column)
for csvs in reader:
row = table_class()
@ -101,33 +107,44 @@ def command_csvimport(engine_uri, directory='.'):
setattr(row, column_name, value)
try:
session.add(row)
session.flush()
except IntegrityError, e:
failed_rows.append(row)
# May need to stash this row and add it later if it refers to a
# later row in this table
if self_ref_columns:
foreign_ids = [getattr(row, _.name) for _ in self_ref_columns]
foreign_ids = [_ for _ in foreign_ids if _] # remove NULL ids
# Loop over the failed rows and keep trying to insert them. If a loop
# doesn't manage to insert any rows, bail.
do_another_loop = True
while failed_rows and do_another_loop:
do_another_loop = False
if not foreign_ids:
# NULL key. Remember this row and add as usual.
seen_ids[row.id] = 1
for i, row in enumerate(failed_rows):
try:
session.add(row)
session.flush()
elif all(_ in seen_ids for _ in foreign_ids):
# Non-NULL key we've already seen. Remember it and commit
# so we know the old row exists when we add the new one
session.commit()
seen_ids[row.id] = 1
# Success!
del failed_rows[i]
do_another_loop = True
except IntegrityError, e:
pass
else:
# Non-NULL future id. Save this and insert it later!
deferred_rows.append((row, foreign_ids))
continue
if failed_rows:
print len(failed_rows), "rows failed"
else:
print 'loaded'
session.add(row)
session.commit()
# Attempt to add any spare rows we've collected
for row, foreign_ids in deferred_rows:
if not all(_ in seen_ids for _ in foreign_ids):
# Could happen if row A refers to B which refers to C.
# This is ridiculous and doesn't happen in my data so far
raise ValueError("Too many levels of self-reference! "
"Row was: " + str(row.__dict__))
session.add(row)
seen_ids[row.id] = 1
session.commit()
print 'loaded'
def command_csvexport(engine_uri, directory='.'):
import csv