A while ago I posted about a way to upload CSVs into a PostgreSQL database (or Google BigQuery) to aggregate them. However, just for the programming interest I wanted to walk through an example of how this can be done in Python – assuming we can’t even open the CSV (too big to fit into RAM) – using generators.

It may help to remember that (in Python):

  • Iteration – the process of taking each from something, one after another (e.g. using a loop)
  • Iterable – object, which has an __iter__ method and returns an iterator (e.g. x = [‘a’,’b’,’c’] is an iterable object and we can loop through it)
  • Iterator – helper object, which has a __next__() method that produces the next value (e.g. y = iter(x))

This means that a basic loop such as: “for element in list”, takes an iterable and calls the get_iter method to produce the iterator, which we can loop through (behind the scenes the next() method is called is on the iterator).

  • Generators – this is a special kind of iterator. A generator function produces a value using the ‘yield’ command  and remembers state so that after the yielded value is dealt with, it yields another, and another. A generator expression is the generator equivalent of list comprehension.

It may also help to remember the old range() and xrange() function in Python 2.7 – range would produce and then return a list of all the values for us to iterate through. xrange would only kick out one value at a time, then forget it and produce the next – this way the full list was never stored in memory (only one piece).

def our_range(start, stop, step=1):
    """ Produces an iterable list of ALL the values,
    and returns them at once. Then the function is finished"""
    numbers = []
    while start < stop:
        numbers.append(start)
        start += step
    return numbers

def our_xrange(start, stop, step=1):
    """ Yield the first value in the list, which can be processed,
    then yields the next and the next. This doesn't load the whole
    list into memory but kind of 'streams' it, piece by piece """
    while start < stop:
        yield start
        start += step

# Test on 100 million:
def test_range():
    for i in our_range(1,100000000):
        pass
%timeit test_range()
#1 loops, best of 3: 14.7 s per loop

def test_xrange():
    for i in our_xrange(1,100000000):
        pass
%timeit test_xrange()
#1 loops, best of 3: 9.08 s per loop

This means that if we have a huge, huge, huge CSV (100 GB) we do not need to load it into memory and then iterate through it. We can just tackle it row by row. Consider the below two functions:

def GetCsvs(location):
    """ Generator function return the path to a csv, using another
    generator function: os.listdir() """
    for f in os.listdir(location):
		yield os.path.join(location, f)

def ReadData(location, cols):
    for filename in GetCsvs(location):
        print("Parsing: {0}".format(filename))
        with open(filename) as fd:
            for line in fd:
                yield line

The flow of execution is like so: the ReadData() function calls the GetCsvs() function which using another generator function (os.listdir()) chucks out one file path, the function then freezes and lets ReadData() load the file. ReadData() gets a line from the file and chucks it out to be processed. Once that line is processed, it chucks out another, etc.

This means we have a way to tackle files which are too big to open!

Let’s imagine we have two such files:

csv_a

And another:

csv_b

And we would like to aggregate all of this into one assuming:

# Path to Output
p = 'H:/py_dic_aggregation'
# Path to CSVs
l = 'H:/Example'
prefix = "ex_"
# All columns of CSV
c = ['month','time','vehicle','status','val_a','val_b','val_c']
# Columns to collapse on
s = ['month','time','vehicle']
# Collapse values
a = ['val_a','val_b','val_c']

We can see for example that the key (Aug, morning, car)  occurs in both CSVs – we would like to see the aggregated values of (108, 10330694, 0.8) for it.

The code below produces the following output:

result

The script uses generators to process one line at a time, before sending it to a dictionary.

Python Script – Dictionary Version:

def GetCsvs(location, prefx):
    """ Generator function return the path to a csv, using another
    generator function: os.listdir() """
    for f in os.listdir(location):
        if f.endswith(".csv") and f.startswith(prefx):
            yield os.path.join(location, f)

def ReadData(location, cols, prefx):
    """ Generator function to return rows from a CSV, given a directory
    location to the CSV files """
    for filename in GetCsvs(location, prefx):
        print("Parsing: {0}".format(filename))
        # None of the CSVs have headers
        with open(filename) as fd:
            for line in fd:
                data = line.split('~') # Delim:  ~
                # Remove quotes and blanks
                row = {y:x.strip('"').strip() for x,y in zip(data, cols)}
                yield row
        print("Finished CSV File.")

def ParseData(location, cols, agg_cols, agg_vals, prefx):
    """ Iterate through generators and fill a dictionary to aggregate data """
    my_dict = {}
    counter = 0
    for row in ReadData(location, cols, prefx):
        # Date "YYYYMMDD" converted to "YYYYMM"
        key = tuple([row[x] if x != "PURCHASE_DATE" else row[x][:6]
                     for x in agg_cols])
        try:
            vals = [float(row[x]) for x in agg_vals]
            comb = zip(agg_vals + ["count"], vals + [1])
        except ValueError:
            print("Missing: {0}".format(row))
            pass
        if key not in my_dict:
            my_dict[key] = {x:y for x, y in comb}
        else:
            for x, y in comb:
                my_dict[key][x] += y
        counter += 1
        if counter % (1000*1000) == 0:
            print("{0} million done".format(counter/1000000))

    print("All Done.")
    return my_dict

def DictCsv(sdict, by_headers, collapse_headers, fname):
    """ Save the dictionary to a CSV """
    with open(fname, 'w', newline = '') as f:
        writer = csv.writer(f)
        writer.writerow([header for header in by_headers] + collapse_headers)
        for ke in sdict:
            writer.writerows([ke + tuple([sdict[ke][y] for y in collapse_headers])])

if __name__ == '__main__':
    stime = time.time()
    # Aggregate CSVs
    dict_aggregated = ParseData(location=l,
                                cols=c,
                                agg_cols=s,
                                agg_vals=a,
                                prefx=prefix)
    # Save dictionary to CSV
    DictCsv(sdict=dict_aggregated,
            by_headers=s,
            collapse_headers=a+["count"],
            fname=p+'.csv')

    print("Total duration: %.2f minutes" % ((time.time() - stime)/60))

The run-statistics:

Experiment on 3 x 13GB CSVs: (39GB Total, 109 million rows)
Aggregated output: 2.76 MB, 19980 rows
Max RAM Use: 54MB
Total duration: 36.96 minutes

The Pandas library has a very useful feature that is similar to this: pandas.read_csv(chunksize = x). This gives us a generator, which we can iterate through and process the CSV x rows at a time.

Let us consider re-doing the above with Pandas (I opt for a chunksize of 5 million)

Python Script – Pandas Version:

def GetCsvs(location, prefx):
    """ Generator function return the path to a csv, using another
    generator function: os.listdir() """
    for f in os.listdir(location):
        if f.endswith(".csv") and f.startswith(prefx):
            yield os.path.join(location, f)

def ReadData(location, cols, chunkrows, prefx):
    """ Generator function to return rows from a CSV, given a directory
    location to the CSV files """
    for filename in GetCsvs(location, prefx):
        print("Parsing: {0}".format(filename))
        yield pd.read_csv(filename,
                          sep='~',
                          names=cols,
                          header=None,
                          iterator=True,
                          chunksize=chunkrows,
                          encoding='utf-8')
        print("Finished CSV File.")

def ParseData(location, cols, agg_cols, agg_vals, chunkrows, prefx, fname):
    """ Iterate through chunks of the CSV, aggregate them and then save them
    to a CSV on disk. The larger the chunksize the faster the operation provided
    it fits into RAM, however saving the aggregated sum does free up a bit of RAM.
    The aggregated CSV has to be aggregated a second time - but all in one go since
    groups could have been split across CSVs or even chunks in the same CSV."""
    counter = 0
    with open(fname, 'a') as f:
        writer = csv.writer(f, delimiter=',', lineterminator='\n')
        writer.writerow(agg_cols + agg_vals ) # Write Header
        for chunks in ReadData(location, cols, chunkrows, prefx):
            for df in chunks:
                # Generating count
                df['COUNT'] = 1
                # Date "YYYYMMDD" converted to "YYYYMM"
                df['PURCHASE_DATE'] = df['PURCHASE_DATE'].astype(str).str[:6]
                agg = df.groupby(agg_cols)[agg_vals].sum()
                agg.to_csv(f, index=True, header=None, sep=',')
                counter += df.shape[0]
                if counter % (1000*1000) == 0:
                    print("{0} million done".format(counter/1000000))

if __name__ == '__main__':

    a += ['COUNT']
    stime = time.time()
    # Step 1: Aggregate CSVs by Chunk
    # i.e. create a buffer CSV
    ParseData(location=l,
              cols=c,
              agg_cols=s,
              agg_vals=a,
              chunkrows=1000*5000,
              prefx=prefix,
              fname=p+'_interim.csv')

    # Step 2: Aggregate the chunks
    # Load interim file
    interim = pd.read_csv(p+'_interim.csv',
                          sep=',',
                          header=0)
    # Save output file
    interim.groupby(s)[a].sum().to_csv(p+'.csv',
               index=True,
               header=True,
               sep=',')

    print("Total duration: %.2f minutes" % ((time.time() - stime)/60))

The run-statistics:

Experiment on 3 x 13GB CSVs: (39GB Total, 109 million rows)
Aggregated output: 2.76 MB, 19980 rows
Max RAM Use: 5GB
Total duration: 13.69 minutes

So this method is quite a bit faster – since we are not looking at 1 row but 5 million rows (still small enough to fit into RAM). The only extra complication is that a final aggregation has to be run over the produced ‘interim’ file to account for groups being cut across CSVs or even within the same CSV but across chunks.

The two methods above are just examples of using generators to accomplish otherwise impossible tasks.