(* Copyright (C) 2007 Mauricio Fernandez http//eigenclass.org * See README.txt and LICENSE for the redistribution and modification terms *) let num_workers = max (try int_of_string (Array.get Sys.argv 2) with _ -> 16) 2 let exec_name = "./wf-mmap-multicore" let prefix = Bigstring.BM_search.make "GET /ongoing/When/" let re = "[0-9][0-9][0-9]x/\\([0-9][0-9][0-9][0-9]/[0-9][0-9]/[0-9][0-9]/[^ .]+\\) " let re = Str.regexp re let copy_until_space src off dst = let rec loop src i dst j = let c = Bigstring.String.unsafe_get src i in dst.[j] <- c; if c <> ' ' then loop src (i+1) dst (j+1) in loop (Bigstring.unsafe_string src) off dst 0 let chunks ?(size=50000000L) filename = let ic = open_in filename in let len = (Unix.LargeFile.stat filename).Unix.LargeFile.st_size in let finished = ref false in Stream.from begin fun _ -> let start = LargeFile.pos_in ic in let stop = Int64.add start size in if !finished then None else if stop >= len then (finished := true; Some (start, len)) else begin LargeFile.seek_in ic stop; try ignore (input_line ic); Some (start, LargeFile.pos_in ic) with End_of_file -> None end end let process_block filename (start, stop) = let hash = Hashtbl.create 2000 in try let buf = String.make 2048 ' ' in let off = ref 0 in let bs = Bigstring.map_file ~pos:start (Unix.openfile filename [Unix.O_RDONLY] 0o644) (Int64.to_int (Int64.sub stop start)) in let len = Bigstring.length bs in while true do off := Bigstring.BM_search.find_end prefix bs !off; if !off >= len then raise Not_found; copy_until_space bs !off buf; if Str.string_match re buf 0 then let k = Str.matched_group 1 buf in off := !off + 17; (* the match is at least this long *) try incr (Hashtbl.find hash k) with Not_found -> Hashtbl.add hash k (ref 1) done; assert false with Not_found -> hash let sort_results hash = List.sort (fun (_,a) (_,b) -> b - a) (Hashtbl.fold (fun url hits l -> (url, hits) :: l) hash []) let rec print_top_n n = function [] -> () | ((url, hits) :: tl) -> if n > 0 then (Printf.printf "%d: %s\n" hits url; print_top_n (n-1) tl) let merge_hash dst src = Hashtbl.iter (fun k v -> Hashtbl.replace dst k (try Hashtbl.find dst k + !v with Not_found -> !v)) src let addr = Unix.ADDR_INET (Join.Site.get_local_addr(), 54321) let wide_finder filename = let h = Hashtbl.create 1023 in let chunks = chunks filename in def agent(worker) & pending_chunks() & pending_results(n) = match Stream.peek chunks with | Some range -> Stream.junk chunks; worker(filename, range, job_done) & pending_chunks() & pending_results(n+1) | None -> nochunks() & pending_results(n) or job_done(result) & pending_results(n) = merge_hash h result; pending_results(n-1) or pending_results(0) & nochunks() & wait() = print_top_n 10 (sort_results h); reply to wait in spawn pending_chunks() & pending_results(0); (agent, wait) let worker register = def process(filename, range, job_done) = let result = process_block filename range in job_done(result) & register(process) in process let master filename = let register, wait = wide_finder filename in Join.Ns.register Join.Ns.here "register" register; Join.Site.listen addr; for i = 1 to num_workers do match Unix.fork() with | 0 -> Unix.execv exec_name [| exec_name; "--client" |] | _ -> () done; wait() let () = match Array.get Sys.argv 1 with "--client" -> (try let register = Join.Ns.lookup (Join.Ns.there addr) "register" in spawn register(worker register); Join.Site.at_fail (Join.Site.there addr) (def quit() = exit 0; 0 in quit); def wait() & forever() = reply to wait in wait () with _ -> ()) | filename -> master filename