ФЭНДОМ


Оригинал:
http://www.caerwyn.com/ipn/2005/07/lab-35-geryon-another-inferno-grid.html
http://www.caerwyn.com/ipn/2005/07/lab-36-geryons-registry.html
http://caerwyn.com/ipn/2005/07/lab-37-geryons-mapreduce.htm
http://caerwyn.com/ipn/2005/08/lab-38-geryons-data-sets.html

Введение Править

Grid computing is everywhere it seems, and one of the obvious applications of Inferno is grid computing. As usual, I know little about it aside from reading a few papers to get me enthused enough to make some effort in that area. If I read all the current research I'd have little time to do the programming. So instead I'll jump in and see if I can swim. I'm trying to setup a simple grid, with tools for MapReduce functionality. I'm calling this system Geryon (because things need names).

This first stab is to see what can be done with the pieces already available in the system. And from this working prototype find out what pieces need to be written to fill in the gaps or improve it in any way.

Geryon is too large to cover in one blog entry so I'll be posting it piecemeal. I'll setup a wiki page to cover the whole setup and running of Geryon as I get further along.

Обзор Править

I'll start with an overview of the system. I have a small number of machines running a mixed set of operating systems: Plan9, Linux, and Windows. On each machine is running one or more emu instances. One instance is running the registry. All emu instances must use that registry service, which is key to the operation of Geryon. Every emu instance must run one rstyxd service for the cpu command. Every service gets registered. So all computation is distributed by using the cpu command to send command blocks.

For storing data, I use a large number of chunks, which are files of equal size (64MB), and each file is used as the storage for kfs, which is registered as a styx service.

On only one emu instance, the master, is a ndb file that maps registered chunk services to "disk" names. This is edited manually. A disk name is of the form dn.app, where n is the number of the disk and app is an application name, e.g. d0.mbox. An application's data is assumed to be quite regular in it's layout on the disk and to span many disks. Running a modified cpu command takes the form

% rcpu {ls /n/d0.mbox} d0.mbox

where rcpu runs cpu on a node of it's choosing, will mount the disk and run the given command block. The disk can be mounted from any node in the cluster by looking up its address in the registry. rcpu also uses the registry to find the available list of cpu nodes.

As well as there being nodes storing application data, each cpu has a set of registered disks for storing the output from commands running locally, e.g., d0.cpu ... dn.cpu. rcpu automatically mounts a disk local to the cpu node running the command block. On top of this shell function I built a mapreduce shell function of the form,

% mapreduce {idx /n/d?.mbox/*/*/*}
        {agg} d0.mbox d1.mbox d2.mbox d3.mbox

The mapreduce command takes a map and reduce command block and a list of disks as input. Mapreduce uses rcpu to run the map command block on each disk on a different cpu (selected in round robin fashion). The mapper generates a key-value pair for all data stored in the files which it navigates on its own. The reducer command block takes as input the key-value pairs which are already sorted and performs a reduce function writing to stdout another key-value (see the MapReduce paper for a clearer explanation). The return value from mapreduce is the list of disks where all the output files are stored. Similar to the description in the MapReduce paper, an intermediate hash function is used to partition the data output from the mapper. And the data is collected and sorted for each partition before being sent to the reducer. The command lines for these are nothing more than, e.g.,

% {mapper} |intermediate 5 /n/d0.cpu/$job.inter.

where intermediate runs a hash function on input and writes to output files with a prefix given in the second argument, and

% cat /n/d?.cpu/$job.inter.0 | sort | 
     {reducer} > /n/d1.cpu/$job.partion.0

Notice that for the reducer all the cpu disks containing intermediate files are mounted. So a cat across a number of disks is performed as

% rcpu {cat /n/*/filename} d0.cpu d1.cpu d2.cpu d3.cpu 

I'll describe the shell code in more detail next post, and a link to a web page describing the whole setup.

Registry Править

A critical piece of Geryon is the registry of disks and cpus. One of the immediate problems to deal with when setting up a grid cluster is the fact that the nodes come and go including the one holding the registry.

To deal with one aspect of this, the registry restarting, I modified grid/register and grid/reglisten commands from Inferno to monitor the availability of the registry, remount it if neccessary and re-announce the service.

I use grid/register for the disk services. Here is an example of how I create a new chunk and register a new kfs disk.

% zeros 1024 65536 > /chunks/chunk0
% (grid/register -a id chunk0 
{disk/kfs -rPW /chunks/chunk0})

I do this a number of times on each node that has spare disk capacity. A simple script registers all disks when I restart a node. All these disks will appear as services in the registry identified as tcp!hostname!port with some attributes including the chunk identifier, which should be unique for the host.

The next step is to name each disk. For this I use ndb. I add a file, /lib/ndb/cluster, to the ndb database with entries of the form

name=d0.mbox kfs 
  master=host0!chunk0 
  replica=d0.replica

name=d1.mbox kfs 
  master=host0!chunk1 
  replica=d1.replica

name=d0.replica kfs 
  master=host1!chunk0 replica

name=d1.replica kfs 
  master=host1!chunk1 replica

The first field is the disk name, which is unique for the cluster. The master is the chunk running on the host that serves kfs for this disk. The replica field identifies a backup disk. I hope in the future to make if possible to dynamically switch between master and replicas and use replicas during computation. But I'll skip it for now. I replicate disks by using Inferno's applylog and updatelog tools.

Once this is all in ndb I can run a script that will update the registry with the disk names.

fn refresh {
 names=`{ndb/query -a  kfs '' name}

 for (i in $names) {
  (host chunk) = ${split ! `{ndb/query name $i master}}
  addr = `{ndb/regquery -n name $host id $chunk}
  if {ftest -e /mnt/registry/ ^$i} {
   (echo host $host automount 1 persist 1 
     addr $addr replica 
     `{ndb/query name $i replica}> /mnt/registry/^$i)
  } {
   (echo $i host $host automount 1 persist 1
     addr $addr replica
     `{ndb/query name $i replica}> /mnt/registry/new)
  }
 }
}

This needs to be run when the ndb file or list of registered services changes. So ideally this should be automatic. I can quite easily see that happen, either by building a ndb file inside the registry and have it respond to changes, or implement an events file in the registry, and attach a process to that. This is a problem to work on later.

Once registered, these disks can be used from any node within the cluster. For example, I use a shell function to take a disk name and mount it as /n/diskname,

% fn rmnt {
 for (file in $*) {
  (disk rest ) := `{cat /mnt/registry/$file}
  while {! ~ $#rest 0} {
   (name val tail) := $rest
   if { ~ $name 'addr'} {mount -c $val /n/ ^ $disk}
   rest = $tail
  }
 }
}
% rmnt d0.mbox d1.mbox
% ls /n/d?.mbox
...

To register a cpu service,

% grid/reglisten -r svc rstyx 'tcp!*!0' {runas $user auxi/rstyxd&}

This will announce on a new address and use that address as the service name in the registry. We can then get a list of all addresses of cpu service

% ndb/regquery -n svc rstyx

For both grid/register and grid/reglisten, the service names are automatically removed from the registry once the process exits. All connections are authenticated. For the kfs disks, they should all use the same /adm/users file, something that should be copied onto the disk when it is initialized, so that permissions are enforced consistently across the cluster.

So far we have all the services we need announced dynamically. We have a naming scheme and the infrastructure for running code anywhere in the cluster. What remains is the shell code to tie it together to build a simple mapreduce.

Mapreduce Править

I have a registry of kfs disks and cpus in my cluster, and the registry is mounted on every node. Upon this infrastructure I've written some shell scripts to simulate a MapReduce function.

I want to quantize and bound the data and the computation. To that end, a process works on only one 64MB disk for input, and may write to another disk for output. The spliting of the data is also crucial to the parallelization. Each input disk can be handled concurrently on any cpu node in the cluster. The namespace for a process is built from finding resources in the registry.

Because there are several cpus available to run a command block, I choose the cpu in round robin using a shell function to populate a list of the available cpu nodes, and take the head of the list with each subsequent call until the list is empty. [1]

cpulist=()
subfn nextcpu {
  if {~ $#cpulist 0} {
    cpulist=`{ndb/regquery -n svc rstyx}
  }
  result = ${hd $cpulist}
  cpulist = ${tl $cpulist}
}

I do a similar thing for finding a disk local to the cpu for output files. Finddisk takes the host name and returns the head of the list of available disks on that host.

Together these two functions are pretty much all the intelligence in selecting nodes for work and allocating disks. Obviously, more could be done.

Now for the definition of rcpu. Using the above functions, it picks the next cpu and output disk. Then it constructs a command block to send to that node. The command block does some initialization such as mounting the disks and running a gridrc file from the users lib directory. As part of the rstyx protocol the client device exports its namespace which is mounted at /n/client; this provides an easy way of distributing shell and dis code to every node. The gridrc file includes the shell function definition for rmnt. [lab 36] The constructed block contains the command block passed as argument to the function. It runs in the background and returns the name of the cpu disk it allocated for output.

rpid=()
fn rcpu {
 cmd:=$1
 disk:=${tl $*}
 cpu:=${nextcpu}
 (net host port) := ${split '!' $cpu}
 disk = ${finddisk $host} $disk
 s=${parse '{cpudisk=/n/' ^ ${hd $disk} ^'
 run /n/client/usr/caerwyn/lib/gridrc
 rmnt ' ^ $"disk ^ ' 
 ' ^ $cmd ^ '}'}
 cpu $cpu sh -c $s &
 rpid=$apid $rpid
 echo ${hd $disk}
}

From this building block I construct the mapreduce (see the file gridlib for its definition). Just to remind you of how it is invoked,

% (mapreduce {idx /n/d?.mbox/*/*/*} {agg} 
   d0.mbox d1.mbox d2.mbox d3.mbox)

This example command counts the word frequency of all files on all mbox disks. An mbox disk is a kfs filesystem containing one text-only mail message per file. The example map command block uses shell pattern matching to traverse the disk and read every file. It writes to standard output a key-value pair, which in this case is a word and the value "1".

The reduce block is a filter that reads key-value lines from stdin and writes key-values to stdout. In this case, agg sums the value field for each distinct key then writes the word and the total word count.

Mapreduce interposes between the map and reduce an intermediate command that hashes the keys into a number of partitions, and writes the key-value to a partition file on the local cpu disk. Mapreduce then needs to concatenate all the intermediate partition files for one partition into a single sorted partition for input into the reduce block. The reduce command block then has all the values for a key as a contiguous stream of lines.

Mapreduce runs the map command block once for each disk passed as argument concurrently. It waits for the map workers to finish then runs the reduce block for each partition concurrently. The number of partitions is hardcoded at the moment, but should be configurable. The result is a list of disks on which the result partition files are stored.

Well that's it. Almost all implemented in inferno shell code. Many things are not handled such as fault tolerance. But this is a starting point for a working grid function. The next step, for me, is to collect a large sample data set to experiment with.

FOOTNOTES

[1] A true closure with lexical binding would be ideal here. I really need to finish the symbolic shell language I started. Sending a closure with it's environment to a remote host is an appealing idea I need to explore further.

data sets Править

I need a large data set to work with so I can try out more ideas using Geyron. I want to use real data; one that can not be analyzed trivially using, say, a relational database.

Examples I considered,

  • crawl the web - a web page repository
  • an rss feed repository
  • web server query logs
  • click logs for a site
  • aggregate data input by users
  • system health records
  • sousveillance logs

Some of these are more difficult to collect than others. Some may contain greater possibility for surprises, and the surprises are what I hope to get by working with real data. Also, a data set where others can collect and duplicate my results would help to keep the conversation concrete.

But I need something right now, and there are two possibilites at hand. I've got the data set from the MIT Reality Mining project. This is about 200+MB uncompressed. This is big enough to test out some tools. But for this size data, Geyron is not likely to offer anything that can't be done in a relational database. However, in principle this data set could grow very large, and developing grid tools to process it might come in handy. For something larger I could use the Internet Archive's crawler to collect around 1 to 50GB of web pages. I'll go with the first option until I get a better idea of what I'm doing.

Before even collecting the data, I need to consider how to organize it. How it will be stored and processed. What file formats to use, etc. So, what are the properties of a large data set? Here's what Rob Pike et al say in their Sawzall paper about Google's repository, "A document repository holding a few billion HTML pages might be stored as several thousand files [each about 1GB in size] each storing a million or so documents of a few kilobytes each, compressed." A 1GB "file" is divided into 64MB chunks which is distributed among many disks, and each chunk has 3 copies. Here's an image of the repository format from the Backrub paper.

I'll try and put that in context of Inferno. A close analog seems to be a tar file of gzipped files. I'll make it easier on myself just for the moment and turn that around into a gzipped tar file. How would I process a .tgz file in one pass?

I wrote a shell filter gettarentry,

gunzip < file.tgz |gettarentry  { getlines {}  | etc. } 

where the command block parameter is applied to each file.

Gettarentry spawns a shell process for each file and writes the data down a pipe between the two. The environment variable file is set in the context for each command block.

After playing with this and thinking that this should be a shell builtin along the lines of sh's alphabet, I find it already is!

The fs command has bundle and unbundle, an equivalent to tar.

% fs bundle in | gzip > fs.bundle.gz

I hadn't considered the sh-alphabet yet in the context of Geryon. This simple use makes it already worthwhile, but there's much more to it. The fs command is a powerful tree walker, with gates and pattern matching and the ability to apply a command block to each file. The commands below show a simple comparison of fs walking the same file hierachy, one bundled and the other already extracted onto a kfs disk.

% time sh -c {gunzip < fs.bundle.gz | fs pipe -1 @{wc} {unbundle -}}
  22191  102136  698644
0l 4.281r 4.281t
% time sh -c { fs pipe -1 @{wc} {walk in}}
  22191  102136  698644
0l 7.188r 7.188t

So, I'm on the right track. This is way more powerful than my simple gettarentry. And it fits well witin the Geryon framework,

rcpu {gunzip < /n/d?.nb/fs.bundle.gz | fs pipe -1 @{wc} {unbundle -}} d0.nb ...

To read one small file within the bundle is not very efficient. But reading and processing the whole fs is faster if bundled and gzipped. Time improvement is gained by considerably less disk reads (1/3), and considerably less interaction with the fs for walking the hierarchy.

This format does not allow me to jump directly to a file within the archive. But if I switch back to the original suggestion, an archive of gzipped files, I get direct access and a few other things. It should be straight forward to append to the archive and know the total size of the archive as I add files.

I'll need to write another module for sh-alphabet to directly handle a repository format where each file is individually gzipped. But the fs/alphabet framework seems the way to go. Another problem to handle is splitting the repository file among many kfs disks. If each 64MB chunk can stand alone as a bundle file, I could consider a 1GB file as just the concatenation of all the archive chunks. [1] It shouldn't matter in what order we process the chunks. If I build an index of each files location within the repository I need to track the chunk, or keep track of the order of chunks and just keep a single offset within the logical 1GB repository file.

The Internet Archive's crawler, Heritrix, stores web pages in the ARC file format. I could add this format to sh-alphabet so it can be processed by fs. The crawler splits the archive into 100MB files. So by using this I've already got a lot going for me.

FILES

The files for this lab include the gettarentry command and a command to extract the data from the Reality Mining MySQL dump into a format better suited to Inferno. http://caerwyn.com/lab/38

FOOTNOTES

[1] Because I am layering file systems my terminology is getting confusing. I have chunks and files and disks at several layers.

http://ipn.caerwyn.com/2005/08/lab-40-distributing-data.html

Обнаружено использование расширения AdBlock.


Викия — это свободный ресурс, который существует и развивается за счёт рекламы. Для блокирующих рекламу пользователей мы предоставляем модифицированную версию сайта.

Викия не будет доступна для последующих модификаций. Если вы желаете продолжать работать со страницей, то, пожалуйста, отключите расширение для блокировки рекламы.

Также на ФЭНДОМЕ

Случайная вики