import_osm: refactor
This commit is contained in:
parent
e69a4b6dd0
commit
3f348fa7c6
|
@ -488,8 +488,6 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "osm_pbf_iter"
|
name = "osm_pbf_iter"
|
||||||
version = "0.2.0"
|
version = "0.2.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "b1e22dc554505976589d669702894e4ba5fc320ef232808f3f143adfc48e2f0a"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"flate2",
|
"flate2",
|
||||||
|
|
|
@ -11,7 +11,8 @@ lto = true
|
||||||
opt-level = 3
|
opt-level = 3
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
osm_pbf_iter = "0.2"
|
#osm_pbf_iter = "0.2"
|
||||||
|
osm_pbf_iter = { path = "../../../programming/rust-osm-pbf-iter" }
|
||||||
num_cpus = "1"
|
num_cpus = "1"
|
||||||
postgres = { version = "0.19", features = ["with-geo-types-0_7", "with-serde_json-1"] }
|
postgres = { version = "0.19", features = ["with-geo-types-0_7", "with-serde_json-1"] }
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
|
|
@ -3,177 +3,177 @@ use std::env::args;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{Seek, SeekFrom, BufReader};
|
use std::io::{Seek, SeekFrom, BufReader};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use std::sync::mpsc::{sync_channel, SyncSender, Receiver};
|
use std::sync::mpsc::{sync_channel, Receiver};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
use osm_pbf_iter::*;
|
use osm_pbf_iter::*;
|
||||||
|
|
||||||
fn phase1_worker(req_rx: Receiver<Blob>, res_tx: SyncSender<HashMap<i64, (f64, f64)>>) {
|
pub struct PrimSource {
|
||||||
let mut res = HashMap::new();
|
req_rx: Receiver<Blob>
|
||||||
|
|
||||||
loop {
|
|
||||||
let blob = match req_rx.recv() {
|
|
||||||
Ok(blob) => blob,
|
|
||||||
Err(_) => break,
|
|
||||||
};
|
|
||||||
|
|
||||||
let data = blob.into_data();
|
|
||||||
let primitive_block = PrimitiveBlock::parse(&data);
|
|
||||||
for primitive in primitive_block.primitives() {
|
|
||||||
match primitive {
|
|
||||||
Primitive::Node(node) => {
|
|
||||||
res.insert(node.id as i64, (node.lon, node.lat));
|
|
||||||
}
|
|
||||||
Primitive::Way(_) => {}
|
|
||||||
Primitive::Relation(_) => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
res_tx.send(res).unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn phase2_worker(req_rx: Receiver<Blob>, res_tx: SyncSender<()>, node_coords: Arc<HashMap<i64, (f64, f64)>>) {
|
impl PrimSource {
|
||||||
const DB_URL: &str = "host=10.233.1.2 dbname=treeadvisor user=treeadvisor password=123";
|
pub fn recv_primitives<F: FnOnce(osm_pbf_iter::primitive_block::PrimitivesIterator) -> R, R>(&self, f: F) -> Option<R> {
|
||||||
|
self.req_rx.recv()
|
||||||
|
.ok()
|
||||||
|
.map(|blob| {
|
||||||
|
let data = blob.into_data();
|
||||||
|
let primitive_block = PrimitiveBlock::parse(&data);
|
||||||
|
f(primitive_block.primitives())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut db = postgres::Client::connect(DB_URL, postgres::NoTls)
|
fn process_osm<F: FnOnce(PrimSource) -> R + 'static + Send + Clone, R: Send + 'static>(filename: &str, f: F) -> Vec<R> {
|
||||||
.expect("DB");
|
let cpus = num_cpus::get();
|
||||||
|
let mut worker_results = Vec::with_capacity(cpus);
|
||||||
|
|
||||||
loop {
|
// start workers
|
||||||
let blob = match req_rx.recv() {
|
let mut workers = Vec::with_capacity(cpus);
|
||||||
Ok(blob) => blob,
|
for _ in 0..cpus {
|
||||||
Err(_) => break,
|
let (req_tx, req_rx) = sync_channel::<Blob>(2);
|
||||||
};
|
let (res_tx, res_rx) = sync_channel::<R>(1);
|
||||||
|
workers.push((req_tx, res_rx));
|
||||||
|
|
||||||
let data = blob.into_data();
|
let f = f.clone();
|
||||||
let primitive_block = PrimitiveBlock::parse(&data);
|
thread::spawn(move || {
|
||||||
let mut tx = db.transaction().unwrap();
|
let prim_src = PrimSource { req_rx };
|
||||||
for primitive in primitive_block.primitives() {
|
let result = f(prim_src);
|
||||||
match primitive {
|
res_tx.send(result).unwrap();
|
||||||
Primitive::Node(_) => {}
|
});
|
||||||
Primitive::Way(way) => {
|
|
||||||
let tags: serde_json::Map<String, serde_json::Value> = way.tags()
|
|
||||||
.map(|(k, v)| (k.to_string(), serde_json::Value::String(v.to_string())))
|
|
||||||
.collect();
|
|
||||||
let points = way.refs()
|
|
||||||
.filter_map(|id| node_coords.get(&id))
|
|
||||||
.cloned()
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
tx.execute(
|
|
||||||
"INSERT INTO osm_ways (geo, id, attrs) VALUES ($1, $2, $3)",
|
|
||||||
&[&geo::LineString::from(points), &(way.id as i64), &serde_json::Value::Object(tags)]
|
|
||||||
).unwrap();
|
|
||||||
}
|
|
||||||
Primitive::Relation(_) => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tx.commit().unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
res_tx.send(()).unwrap();
|
// open file
|
||||||
|
println!("Open {}", filename);
|
||||||
|
let f = File::open(filename).unwrap();
|
||||||
|
let mut reader = BlobReader::new(BufReader::new(f));
|
||||||
|
let start = Instant::now();
|
||||||
|
|
||||||
|
// feed
|
||||||
|
let mut w = 0;
|
||||||
|
for blob in &mut reader {
|
||||||
|
let req_tx = &workers[w].0;
|
||||||
|
w = (w + 1) % cpus;
|
||||||
|
|
||||||
|
req_tx.send(blob).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// receive results
|
||||||
|
for (req_tx, res_rx) in workers.into_iter() {
|
||||||
|
drop(req_tx);
|
||||||
|
let worker_res = res_rx.recv().unwrap();
|
||||||
|
worker_results.push(worker_res);
|
||||||
|
}
|
||||||
|
|
||||||
|
// stats
|
||||||
|
let stop = Instant::now();
|
||||||
|
let duration = stop.duration_since(start);
|
||||||
|
let duration = duration.as_secs() as f64 + (duration.subsec_nanos() as f64 / 1e9);
|
||||||
|
let mut f = reader.into_inner();
|
||||||
|
match f.seek(SeekFrom::Current(0)) {
|
||||||
|
Ok(pos) => {
|
||||||
|
let rate = pos as f64 / 1024f64 / 1024f64 / duration;
|
||||||
|
println!("Processed {} MB in {:.2} seconds ({:.2} MB/s)",
|
||||||
|
pos / 1024 / 1024, duration, rate);
|
||||||
|
},
|
||||||
|
Err(_) => (),
|
||||||
|
}
|
||||||
|
|
||||||
|
worker_results
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let cpus = num_cpus::get();
|
|
||||||
|
|
||||||
let mut node_coords: HashMap<i64, (f64, f64)> = HashMap::new();
|
let mut node_coords: HashMap<i64, (f64, f64)> = HashMap::new();
|
||||||
|
// phase 1: nodes
|
||||||
for arg in args().skip(1) {
|
for arg in args().skip(1) {
|
||||||
let mut phase1_workers = Vec::with_capacity(cpus);
|
let worker_res = process_osm(&arg, move |prim_src| {
|
||||||
for _ in 0..cpus {
|
let mut res = HashMap::new();
|
||||||
let (req_tx, req_rx) = sync_channel(2);
|
|
||||||
let (res_tx, res_rx) = sync_channel(0);
|
|
||||||
phase1_workers.push((req_tx, res_rx));
|
|
||||||
thread::spawn(move || {
|
|
||||||
phase1_worker(req_rx, res_tx);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("Phase1: open {}", arg);
|
while prim_src.recv_primitives(|iter| {
|
||||||
let f = File::open(&arg).unwrap();
|
for primitive in iter {
|
||||||
let mut reader = BlobReader::new(BufReader::new(f));
|
match primitive {
|
||||||
let start = Instant::now();
|
Primitive::Node(node) => {
|
||||||
|
res.insert(node.id as i64, (node.lon, node.lat));
|
||||||
|
}
|
||||||
|
Primitive::Way(_) => {}
|
||||||
|
Primitive::Relation(_) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut w = 0;
|
true
|
||||||
for blob in &mut reader {
|
}).unwrap_or(false) {}
|
||||||
let req_tx = &phase1_workers[w].0;
|
res
|
||||||
w = (w + 1) % cpus;
|
});
|
||||||
|
for mut res in worker_res {
|
||||||
req_tx.send(blob).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (req_tx, res_rx) in phase1_workers.into_iter() {
|
|
||||||
drop(req_tx);
|
|
||||||
let mut worker_res = res_rx.recv().unwrap();
|
|
||||||
if node_coords.is_empty() {
|
if node_coords.is_empty() {
|
||||||
node_coords = worker_res;
|
node_coords = res;
|
||||||
} else {
|
} else {
|
||||||
// merge
|
// merge
|
||||||
for (id, coords) in worker_res.drain() {
|
for (id, coords) in res.drain() {
|
||||||
node_coords.insert(id, coords);
|
node_coords.insert(id, coords);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let stop = Instant::now();
|
|
||||||
let duration = stop.duration_since(start);
|
|
||||||
let duration = duration.as_secs() as f64 + (duration.subsec_nanos() as f64 / 1e9);
|
|
||||||
let mut f = reader.into_inner();
|
|
||||||
match f.seek(SeekFrom::Current(0)) {
|
|
||||||
Ok(pos) => {
|
|
||||||
let rate = pos as f64 / 1024f64 / 1024f64 / duration;
|
|
||||||
println!("Phase1: Processed {} MB in {:.2} seconds ({:.2} MB/s)",
|
|
||||||
pos / 1024 / 1024, duration, rate);
|
|
||||||
},
|
|
||||||
Err(_) => (),
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
println!("{} nodes", node_coords.len());
|
println!("{} nodes", node_coords.len());
|
||||||
let node_coords = Arc::new(node_coords);
|
let node_coords = Arc::new(node_coords);
|
||||||
|
|
||||||
|
let mut way_coords: HashMap<i64, Vec<(f64, f64)>> = HashMap::new();
|
||||||
|
// phase 2: ways
|
||||||
for arg in args().skip(1) {
|
for arg in args().skip(1) {
|
||||||
let mut phase2_workers = Vec::with_capacity(cpus);
|
let node_coords = node_coords.clone();
|
||||||
for _ in 0..cpus {
|
let worker_res = process_osm(&arg, move |prim_src| {
|
||||||
let (req_tx, req_rx) = sync_channel(2);
|
const DB_URL: &str = "host=10.233.1.2 dbname=treeadvisor user=treeadvisor password=123";
|
||||||
let (res_tx, res_rx) = sync_channel(0);
|
|
||||||
phase2_workers.push((req_tx, res_rx));
|
let mut db = postgres::Client::connect(DB_URL, postgres::NoTls)
|
||||||
let node_coords = node_coords.clone();
|
.expect("DB");
|
||||||
thread::spawn(move || {
|
|
||||||
phase2_worker(req_rx, res_tx, node_coords);
|
let mut res = HashMap::new();
|
||||||
});
|
|
||||||
|
let mut running = true;
|
||||||
|
while running {
|
||||||
|
running = prim_src.recv_primitives(|iter| {
|
||||||
|
let mut tx = db.transaction().unwrap();
|
||||||
|
for primitive in iter {
|
||||||
|
match primitive {
|
||||||
|
Primitive::Node(_) => {}
|
||||||
|
Primitive::Way(way) => {
|
||||||
|
let tags: serde_json::Map<String, serde_json::Value> = way.tags()
|
||||||
|
.map(|(k, v)| (k.to_string(), serde_json::Value::String(v.to_string())))
|
||||||
|
.collect();
|
||||||
|
let points = way.refs()
|
||||||
|
.filter_map(|id| node_coords.get(&id))
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
tx.execute(
|
||||||
|
"INSERT INTO osm_ways (geo, id, attrs) VALUES ($1, $2, $3)",
|
||||||
|
&[&geo::LineString::from(points.clone()), &(way.id as i64), &serde_json::Value::Object(tags)]
|
||||||
|
).unwrap();
|
||||||
|
res.insert(way.id as i64, points);
|
||||||
|
}
|
||||||
|
Primitive::Relation(_) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tx.commit().unwrap();
|
||||||
|
|
||||||
|
true
|
||||||
|
}).unwrap_or(false);
|
||||||
|
}
|
||||||
|
res
|
||||||
|
});
|
||||||
|
for mut res in worker_res {
|
||||||
|
if way_coords.is_empty() {
|
||||||
|
way_coords = res;
|
||||||
|
} else {
|
||||||
|
// merge
|
||||||
|
for (id, coords) in res.drain() {
|
||||||
|
way_coords.insert(id, coords);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("Phase2: open {}", arg);
|
|
||||||
let f = File::open(&arg).unwrap();
|
|
||||||
let mut reader = BlobReader::new(BufReader::new(f));
|
|
||||||
let start = Instant::now();
|
|
||||||
|
|
||||||
let mut w = 0;
|
|
||||||
for blob in &mut reader {
|
|
||||||
let req_tx = &phase2_workers[w].0;
|
|
||||||
w = (w + 1) % cpus;
|
|
||||||
|
|
||||||
req_tx.send(blob).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (req_tx, res_rx) in phase2_workers.into_iter() {
|
|
||||||
drop(req_tx);
|
|
||||||
let _worker_res = res_rx.recv().unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
let stop = Instant::now();
|
|
||||||
let duration = stop.duration_since(start);
|
|
||||||
let duration = duration.as_secs() as f64 + (duration.subsec_nanos() as f64 / 1e9);
|
|
||||||
let mut f = reader.into_inner();
|
|
||||||
match f.seek(SeekFrom::Current(0)) {
|
|
||||||
Ok(pos) => {
|
|
||||||
let rate = pos as f64 / 1024f64 / 1024f64 / duration;
|
|
||||||
println!("Phase2: Processed {} MB in {:.2} seconds ({:.2} MB/s)",
|
|
||||||
pos / 1024 / 1024, duration, rate);
|
|
||||||
},
|
|
||||||
Err(_) => (),
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
let way_coords = Arc::new(way_coords);
|
||||||
|
|
||||||
|
// phase 3: rels (TODO)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue