(* Copyright (C) 2007 Mauricio Fernandez http//eigenclass.org * See README.txt and LICENSE for the redistribution and modification terms *) let num_workers = max (try int_of_string (Array.get Sys.argv 2) with _ -> 16) 2 let exec_name = "./wf-mmap-multicore-nore" let prefix = Bigstring.BM_search.make "GET /ongoing/When/" let digits = [|0; 1; 2; 3; 5; 6; 8; 9|] and ndigits = 8 let sub_until_space src off = let rec copy src i dst j n = if n > 0 then begin let c = Bigstring.String.unsafe_get src i in dst.[j] <- c; copy src (i+1) dst (j+1) (n-1) end in let rec count src i j = match Bigstring.String.unsafe_get src i with ' ' -> j | '.' -> raise Not_found | _ -> count src (i+1) (j+1) in let nchars = count (Bigstring.unsafe_string src) off 0 in let dst = if nchars < 12 then raise Not_found; String.create nchars in copy (Bigstring.unsafe_string src) off dst 0 nchars; dst let check_prefix bs off = let s = Bigstring.unsafe_string bs in for i = 0 to 2 do match Bigstring.String.unsafe_get s (off+i) with '0' .. '9' -> () | _ -> raise Not_found done; if Bigstring.String.unsafe_get s (off+3) <> 'x' || Bigstring.String.unsafe_get s (off+4) <> '/' then raise Not_found let check_match s = for i = 0 to ndigits - 1 do match s.[digits.(i)] with '0'..'9' -> () | _ -> raise Not_found done; if s.[4] <> '/' || s.[7] <> '/' || s.[10] <> '/' then raise Not_found let chunks ?(size=50000000L) filename = let ic = open_in filename in let len = (Unix.LargeFile.stat filename).Unix.LargeFile.st_size in let finished = ref false in Stream.from begin fun _ -> let start = LargeFile.pos_in ic in let stop = Int64.add start size in if !finished then None else if stop >= len then (finished := true; Some (start, len)) else begin LargeFile.seek_in ic stop; try ignore (input_line ic); Some (start, LargeFile.pos_in ic) with End_of_file -> None end end module Hashtbl = Hashtbl.Make(struct type t = string let equal x y = if String.length x <> String.length y then false else x = y let hash x = (Char.code x.[5] - 48) lsl 4 + (Char.code x.[6] - 48) lsl 8 + (Char.code x.[8] - 48) lsl 16 + (Char.code x.[9] - 48) lsl 24 + Char.code x.[String.length x - 1] end) let process_block filename (start, stop) = let hash = Hashtbl.create 2000 in try let off = ref 0 in let bs = Bigstring.map_file ~pos:start (Unix.openfile filename [Unix.O_RDONLY] 0o644) (Int64.to_int (Int64.sub stop start)) in let len = Bigstring.length bs in while true do off := Bigstring.BM_search.find_end prefix bs !off; if !off + 5 >= len then raise Not_found; try check_prefix bs !off; let k = sub_until_space bs (!off+5) in check_match k; off := !off + 22; (* 5 + 17 the match is at least this long *) try incr (Hashtbl.find hash k) with Not_found -> Hashtbl.add hash k (ref 1) with Not_found -> off := !off + 5 done; assert false with Not_found -> hash let sort_results hash = List.sort (fun (_,a) (_,b) -> b - a) (Hashtbl.fold (fun url hits l -> (url, hits) :: l) hash []) let rec print_top_n n = function [] -> () | ((url, hits) :: tl) -> if n > 0 then (Printf.printf "%d: %s\n" hits url; print_top_n (n-1) tl) let merge_hash dst src = Hashtbl.iter (fun k v -> Hashtbl.replace dst k (try Hashtbl.find dst k + !v with Not_found -> !v)) src let addr = Unix.ADDR_INET (Join.Site.get_local_addr(), 54321) let wide_finder filename = let h = Hashtbl.create 1023 in let chunks = chunks filename in def agent(worker) & pending_chunks() & pending_results(n) = match Stream.peek chunks with | Some range -> Stream.junk chunks; worker(filename, range, job_done) & pending_chunks() & pending_results(n+1) | None -> nochunks() & pending_results(n) or job_done(result) & pending_results(n) = merge_hash h result; pending_results(n-1) or pending_results(0) & nochunks() & wait() = print_top_n 10 (sort_results h); reply to wait in spawn pending_chunks() & pending_results(0); (agent, wait) let worker register = def process(filename, range, job_done) = let result = process_block filename range in job_done(result) & register(process) in process let master filename = let register, wait = wide_finder filename in Join.Ns.register Join.Ns.here "register" register; Join.Site.listen addr; for i = 1 to num_workers do match Unix.fork() with | 0 -> Unix.execv exec_name [| exec_name; "--client" |] | _ -> () done; wait() let () = match Array.get Sys.argv 1 with "--client" -> (try let register = Join.Ns.lookup (Join.Ns.there addr) "register" in spawn register(worker register); Join.Site.at_fail (Join.Site.there addr) (def quit() = exit 0; 0 in quit); def wait() & forever() = reply to wait in wait () with _ -> ()) | filename -> master filename