**Example for file-sytem-based (and hence 'external') recursive horizontal partitioning**

Copyright Jens Dittrich

show statistics (word count) for a given input file, in this case a list of geographical names
taken from http://download.geonames.org/export/dump

I artificially shuffled the data for educational purposes. In general, real data is not necessarily sorted. This notebook is about that more general case.

In [1]:
!wc geonames/DE.shuffled.txt


  197229 3336759 24267205 geonames/DE.shuffled.txt


show the first three rows of that file:

In [2]:
!head -3 geonames/DE.shuffled.txt

2938883	Darßer Ort	Darsser Ort		54.47313	12.50287	P	PPL	DE		12	00	13073	13073012	0		2	Europe/Berlin	2015-09-04
2863190	Niederlamitzhammer	Niederlamitzhammer		50.16478	11.98968	P	PPL	DE		02	094	09479	09479129	0		574	Europe/Berlin	2013-02-19
2869620	Moorriem	Moorriem		53.25	8.33333	L	AREA	DE		06				0		-2	Europe/Berlin	1994-01-08


In [3]:
# read a single row from the input file and transform it into a key/value-format:
f = open("geonames/DE.shuffled.txt", "r")
count = 0
for row in f:
    row = row.split('\t')
    key = row[0]
    #key = row[2]
    value = (row[4], row[5])
    print(key, "->", value) 
    count += 1
    if count > 5:
        break
f.close()

2938883 -> ('54.47313', '12.50287')
2863190 -> ('50.16478', '11.98968')
2869620 -> ('53.25', '8.33333')
2844257 -> ('50.25418', '6.62014')
2895454 -> ('51.63333', '10.61667')
6486776 -> ('50.738', '7.10588')


In [4]:
# extract data from an original text row of the input file
# only keep required attributes
def ASCIIName_Position(row, delimiter):
    row = row.split(delimiter)
    key = row[2].strip('\"')
    value = (row[4], row[5])
    return key, value

# extract the name from a row:
def ID_ASCIIName(row, delimiter):
    row = row.split(delimiter)
    key = row[0].strip('\"')
    value = (row[4], row[5])
    return key, value

# extract data from the key/value-representation of intermediate files
def keyValueExtractor(row, delimiter):
    row = row.split(delimiter)
    key = row[0].strip()
    value = (row[1].strip(), row[2].strip())
    return key, value

# a partitioning function returning the <position>-th character from <inputString>
def radixPartitioningFunction(inputString, position):
    if len(inputString) > position:
        return inputString[position]
    else:
        return inputString[-1]

In [5]:
import os
from tqdm import tqdm

# output path for the files created by our index:
outputPath = 'myindex/'
sourceFile = "geonames/DE.shuffled.txt"

# main-memory that we are allowed to use for buffers:
inputBufferSize = 10*1024**2
outputBufferSize = 1*1024**2

In [6]:
inputBufferSize

10485760

In [7]:
# a single partitioning step

# notice that we explicitly keep the data in text format 
# this allows you to inspect the results of the algorithm easily
# for performance reasons it would be better to keep the data in binary format once it was parsed
def partitioningStep(outputPath, inputFileName, keyValueExtractorFunction, partitioningFunction, recursionLevel=0, delimiter='\t'):
    # initialize a dictionary for the output partitions and files created in this partitioning step
    outputFiles = {}
    # buffered read of the input file:
    # open the input file
    f = open(inputFileName, "r", buffering=inputBufferSize)

    # for each text row in the input:
    for row in f:
        # extract the key,value-pair:
        key,value = keyValueExtractorFunction(row, delimiter)
        # ignore entries with an empty key:
        if key == "":
            print("ignored row: ", row)
            continue
        # compute the partition of the key:
        partition = partitioningFunction(key.lower(), recursionLevel)

        # check if the output file already exists, create it if not:
        if partition not in outputFiles:
            if not os.path.exists(outputPath):
                os.makedirs(outputPath)
            outputPartition = outputPath + str(partition) + '.csv'
            fout = open(outputPartition, "w", buffering=outputBufferSize)
            outputFiles[partition] = (fout, outputPartition, str(partition))
        
        outputFile = outputFiles[partition][0]
        # write key/value-entry to corresponding output partition:
        outString = str(key) + ',' + str(value[0]) + ',' + str(value[1]) + '\n' 
        outputFile.write(outString)
    # close the input file:
    f.close()
    
    # close the output files:
    for key, value in outputFiles.items():
        value[0].close()
    # return the dictionary of output files and the recursion level,
    # handy for using this function in a recursion:
    return outputFiles, recursionLevel+1

In [8]:
# a simple prefix-index built on geonameid
import shutil

# remove the entire index (for demonstration purposes)
if os.path.exists(outputPath):
    shutil.rmtree(outputPath)

sourceFile = "geonames/DE.shuffled.txt"
#sourceFile = "geonames/allCountries.txt"

# performs the initial partitioning step:
outputFiles = partitioningStep(
    outputPath,
    sourceFile,
    ID_ASCIIName,
    radixPartitioningFunction)

print(len(outputFiles[0]))

7


In [9]:
# recursive partitioning:

# notice that we "simulate" recursion by collecting the return values of a single partitioning step in a queue
# this leads to a level-wise partitioning tree

queue = [outputFiles]
# the following cut off determines whether a file is further partitioned:
partitioningCutOff = 100*1024 # Bytes, i.e. 4KB

while len(queue)> 0:
    # remove next entry from the queue:
    outputFiles, recursionLevel  = queue.pop(0)

    # condition to stop the recursion based on the recursion level:
    if (recursionLevel>=10):
        continue

    # loop over all output partitions found in <outputFiles>:
    for key, value in outputFiles.items():
        # directory for the input, and output partitions:
        _inputPartitionFileName, _outputPath = value[1], value[1].split('.')[0] + '/'
        # get the file size of the input file:
        fileSize = os.stat(_inputPartitionFileName).st_size
        print(_inputPartitionFileName, fileSize)
        # skip further partitioning steps if the input is too small:
        if fileSize < partitioningCutOff:
            continue

            # call the partitioning function:
        outputFiles = partitioningStep(_outputPath,
                     _inputPartitionFileName,
                     keyValueExtractor,
                     radixPartitioningFunction,
                     recursionLevel, 
                     delimiter=',')
        # remove the input file
        os.remove(_inputPartitionFileName)
        # append the result of this partitioning step to the queue
        # we need this if we want to recurse further
        queue.append( outputFiles )

myindex/2.csv 3861749
myindex/6.csv 367363
myindex/1.csv 401044
myindex/7.csv 81014
myindex/9.csv 39788
myindex/3.csv 75672
myindex/8.csv 60885
myindex/2/9.csv 1478180
myindex/2/8.csv 2382916
myindex/2/7.csv 578
myindex/2/6.csv 49
myindex/2/0.csv 26
myindex/6/4.csv 33574
myindex/6/5.csv 291641
myindex/6/3.csv 912
myindex/6/6.csv 7524
myindex/6/9.csv 30225
myindex/6/2.csv 3462
myindex/6/1.csv 25
myindex/1/1.csv 302159
myindex/1/0.csv 86568
myindex/1/2.csv 12292
myindex/1/5.csv 25
myindex/2/9/3.csv 246449
myindex/2/9/4.csv 245923
myindex/2/9/5.csv 245667
myindex/2/9/1.csv 247608
myindex/2/9/0.csv 245827
myindex/2/9/2.csv 246706
myindex/2/8/6.csv 246470
myindex/2/8/4.csv 246592
myindex/2/8/9.csv 246153
myindex/2/8/5.csv 247914
myindex/2/8/1.csv 247060
myindex/2/8/8.csv 246252
myindex/2/8/3.csv 246660
myindex/2/8/7.csv 246946
myindex/2/8/2.csv 246743
myindex/2/8/0.csv 162126
myindex/6/5/4.csv 42737
myindex/6/5/5.csv 213407
myindex/6/5/2.csv 10501
myindex/6/5/3.csv 3255
myindex/6/5/1.csv 83

In [10]:
# from http://code.activestate.com/recipes/217212-treepy-graphically-displays-the-directory-structur/
from os import listdir, sep
from os.path import abspath, basename, isdir
from sys import argv

def tree(dir, padding, print_files=False):
    print (padding[:-1] + '+' + basename(abspath(dir)) + '/')
    padding = padding + ' '
    files = []
    if print_files:
        files = listdir(dir)
    else:
        files = [x for x in listdir(dir) if isdir(dir + sep + x)]
    count = 0
    for file in files:
        count += 1
        #print (padding + '|')
        path = dir + sep + file
        if isdir(path):
            if count == len(files):
                tree(path, padding + ' ', print_files)
            else:
                tree(path, padding + '∣', print_files)
        else:
            print (padding + '+' + file)

In [11]:
# print the partitioning tree created by the recursive partitioning:
# note that the leaf nodes, i.e. the csv-files are not printed
tree('myindex',' ', True)

+myindex/
  +7.csv
  +3.csv
  +6/
  ∣ +6.csv
  ∣ +4.csv
  ∣ +1.csv
  ∣ +3.csv
  ∣ +2.csv
  ∣ +9.csv
  ∣ +5/
  ∣   +4.csv
  ∣   +0.csv
  ∣   +1.csv
  ∣   +3.csv
  ∣   +2.csv
  ∣   +5/
  ∣     +6.csv
  ∣     +7.csv
  ∣     +5.csv
  ∣     +4.csv
  ∣     +0.csv
  ∣     +1.csv
  ∣     +3.csv
  ∣     +2.csv
  ∣     +9.csv
  ∣     +8.csv
  +1/
  ∣ +5.csv
  ∣ +0.csv
  ∣ +2.csv
  ∣ +1/
  ∣   +6.csv
  ∣   +7.csv
  ∣   +5.csv
  ∣   +4.csv
  ∣   +0.csv
  ∣   +1.csv
  ∣   +3.csv
  ∣   +2.csv
  ∣   +9/
  ∣   ∣ +6.csv
  ∣   ∣ +5.csv
  ∣   ∣ +0.csv
  ∣   ∣ +1.csv
  ∣   ∣ +2.csv
  ∣   ∣ +7/
  ∣   ∣ ∣ +6.csv
  ∣   ∣ ∣ +7.csv
  ∣   ∣ ∣ +5.csv
  ∣   ∣ ∣ +4.csv
  ∣   ∣ ∣ +3.csv
  ∣   ∣ ∣ +2.csv
  ∣   ∣ ∣ +9.csv
  ∣   ∣ ∣ +8.csv
  ∣   ∣ +9.csv
  ∣   ∣ +8.csv
  ∣   +8.csv
  +9.csv
  +8.csv
  +2/
    +6.csv
    +7.csv
    +0.csv
    +9/
    ∣ +0/
    ∣ ∣ +6.csv
    ∣ ∣ +7.csv
    ∣ ∣ +5.csv
    ∣ ∣ +4.csv
    ∣ ∣ +0.csv
    ∣ ∣ +1.csv
    ∣ ∣ +3.csv
    ∣ ∣ +2.csv
    ∣ ∣ +9.csv
    ∣ ∣ +8.csv
    ∣ +1/
    ∣

In [12]:
# simple recursive prefix search:
def search(searchString, path=outputPath, recursionDepth=0):
    print ("search", path, recursionDepth)
    if len(searchString) <= recursionDepth:
        return set({}), "WARNING: entire directory "+path+" qualifies! Result omitted!"
    directory = path + searchString[recursionDepth]
    if os.path.exists(directory):
        return search(searchString, directory+"/", recursionDepth+1)
    else:
        result = set()
        file = directory +".csv"
        if os.path.exists(file):
            f = open(file, "r")
            for row in f:
                key,value = keyValueExtractor(f.readline(),",")
                if key.startswith(searchString):
                    #print(key, "->", value) 
                    result.add((key,value))
            f.close()
            return result, "results retrieved from last matching file leave " + file 
        else:
            return set(), "no result found" # empty set, i.e. no results foun
            
search ("11974229")

search myindex/ 0
search myindex/1/ 1
search myindex/1/1/ 2
search myindex/1/1/9/ 3
search myindex/1/1/9/7/ 4


({('11974229', ('48.4165', '12.81853'))},
 'results retrieved from last matching file leave myindex/1/1/9/7/4.csv')