treeadvisor/import_osm/src/main.rs

284 lines
11 KiB
Rust

use std::collections::HashMap;
use std::env::args;
use std::fs::File;
use std::io::{Seek, SeekFrom, BufReader};
use std::time::Instant;
use std::sync::mpsc::{sync_channel, Receiver};
use std::sync::Arc;
use std::thread;
use osm_pbf_iter::*;
use geo::LineString;
use indicatif::{ProgressBar, ProgressStyle};
pub struct PrimSource {
req_rx: Receiver<Blob>
}
impl PrimSource {
pub fn recv_primitives<F: FnOnce(osm_pbf_iter::primitive_block::PrimitivesIterator) -> R, R>(&self, f: F) -> Option<R> {
self.req_rx.recv()
.ok()
.map(|blob| {
let data = blob.into_data();
let primitive_block = PrimitiveBlock::parse(&data);
f(primitive_block.primitives())
})
}
}
fn process_osm<F: FnOnce(PrimSource) -> R + 'static + Send + Clone, R: Send + 'static>(filename: &str, f: F) -> Vec<R> {
let progress = ProgressBar::new(0)
.with_style(
ProgressStyle::default_bar()
.template("{prefix:yellow} [{wide_bar:green/red}] {msg:lightblue}")
).with_prefix(filename.to_string());
// open file
progress.set_message("Opening");
let mut file = File::open(filename).unwrap();
let filesize = file.seek(SeekFrom::End(0)).unwrap();
progress.set_length(filesize);
file.seek(SeekFrom::Start(0)).unwrap();
let mut reader = BlobReader::new(BufReader::new(file));
let cpus = num_cpus::get();
let mut worker_results = Vec::with_capacity(cpus);
// start workers
let mut workers = Vec::with_capacity(cpus);
for _ in 0..cpus {
let (req_tx, req_rx) = sync_channel::<Blob>(2);
let (res_tx, res_rx) = sync_channel::<R>(1);
workers.push((req_tx, res_rx));
let f = f.clone();
thread::spawn(move || {
let prim_src = PrimSource { req_rx };
let result = f(prim_src);
res_tx.send(result).unwrap();
});
}
// feed
progress.set_message("Parsing");
let start = Instant::now();
let mut w = 0;
let mut pos = 0;
for blob in &mut reader {
pos += match &blob {
Blob::Raw(bytes) => bytes.len(),
Blob::Zlib(bytes) => bytes.len(),
} as u64;
let req_tx = &workers[w].0;
w = (w + 1) % cpus;
req_tx.send(blob).unwrap();
progress.set_position(pos);
}
// receive results
progress.set_message("Finishing");
for (req_tx, res_rx) in workers.into_iter() {
drop(req_tx);
let worker_res = res_rx.recv().unwrap();
worker_results.push(worker_res);
}
progress.set_position(filesize);
// stats
let stop = Instant::now();
let duration = stop.duration_since(start);
let duration = duration.as_secs() as f64 + (duration.subsec_nanos() as f64 / 1e9);
let mut f = reader.into_inner();
match f.seek(SeekFrom::Current(0)) {
Ok(pos) => {
let rate = pos as f64 / 1024f64 / 1024f64 / duration;
progress.set_message(format!("{:.2} MB/s", rate));
},
Err(_) => (),
}
progress.finish();
worker_results
}
fn main() {
let mut node_coords: HashMap<i64, (f64, f64)> = HashMap::new();
// phase 1: nodes
for arg in args().skip(1) {
let worker_res = process_osm(&arg, move |prim_src| {
let mut res = HashMap::new();
while prim_src.recv_primitives(|iter| {
for primitive in iter {
match primitive {
Primitive::Node(node) => {
res.insert(node.id as i64, (node.lon, node.lat));
}
Primitive::Way(_) => {}
Primitive::Relation(_) => {}
}
}
true
}).unwrap_or(false) {}
res
});
for mut res in worker_res {
if node_coords.is_empty() {
node_coords = res;
} else {
// merge
for (id, coords) in res.drain() {
node_coords.insert(id, coords);
}
}
}
}
println!("{} nodes", node_coords.len());
let node_coords = Arc::new(node_coords);
let mut way_paths: HashMap<i64, LineString<f64>> = HashMap::new();
// phase 2: ways
for arg in args().skip(1) {
let node_coords = node_coords.clone();
let worker_res = process_osm(&arg, move |prim_src| {
const DB_URL: &str = "host=10.233.1.2 dbname=treeadvisor user=treeadvisor password=123";
let mut db = postgres::Client::connect(DB_URL, postgres::NoTls)
.expect("DB");
let mut res = HashMap::new();
let mut running = true;
while running {
running = prim_src.recv_primitives(|iter| {
let mut tx = db.transaction().unwrap();
for primitive in iter {
match primitive {
Primitive::Node(_) => {}
Primitive::Way(way) => {
let tags: serde_json::Map<String, serde_json::Value> = way.tags()
.map(|(k, v)| (k.to_string(), serde_json::Value::String(v.to_string())))
.collect();
let points = LineString::from(
way.refs()
.filter_map(|id| node_coords.get(&id))
.cloned()
.collect::<Vec<_>>()
);
tx.execute(
"INSERT INTO osm_ways (geo, id, attrs) VALUES ($1, $2, $3)",
&[&points, &(way.id as i64), &serde_json::Value::Object(tags)]
).unwrap();
res.insert(way.id as i64, points);
}
Primitive::Relation(_) => {}
}
}
tx.commit().unwrap();
true
}).unwrap_or(false);
}
res
});
for mut res in worker_res {
if way_paths.is_empty() {
way_paths = res;
} else {
// merge
for (id, coords) in res.drain() {
way_paths.insert(id, coords);
}
}
}
}
println!("{} ways", way_paths.len());
let way_paths = Arc::new(way_paths);
// phase 3: rels
let mut multipoly_count = 0;
for arg in args().skip(1) {
let way_paths = way_paths.clone();
multipoly_count += process_osm(&arg, move |prim_src| {
const DB_URL: &str = "host=10.233.1.2 dbname=treeadvisor user=treeadvisor password=123";
let mut db = postgres::Client::connect(DB_URL, postgres::NoTls)
.expect("DB");
let mut count = 0;
let mut running = true;
while running {
running = prim_src.recv_primitives(|iter| {
let mut tx = db.transaction().unwrap();
for primitive in iter {
match primitive {
Primitive::Node(_) => {}
Primitive::Way(_) => {}
Primitive::Relation(rel) => {
let tags: serde_json::Map<String, serde_json::Value> = rel.tags()
.map(|(k, v)| (k.to_string(), serde_json::Value::String(v.to_string())))
.collect();
if tags.get("type").and_then(|val| val.as_str()) == Some("multipolygon") {
continue;
}
let get_members = |target_role| rel.members()
.filter_map(|(role, id, member_type)| {
if member_type == RelationMemberType::Way && role == target_role {
way_paths.get(&(id as i64))
.and_then(|path| {
if path.is_closed() {
Some((id, path))
} else {
None
}
})
} else {
None
}
})
.collect::<Vec<_>>();
let outers = get_members("outer");
if outers.len() > 0 {
tx.execute(
"INSERT INTO osm_multipolygons (id, attrs) VALUES ($1, $2)",
&[&(rel.id as i64), &serde_json::Value::Object(tags)]
).unwrap();
for (member_id, path) in outers.into_iter() {
tx.execute(
"INSERT INTO osm_multipolygon_members (id, m_id, m_role, m_geo) VALUES ($1, $2, 'outer', polygon(pclose($3::path)))",
&[&(rel.id as i64), &(member_id as i64), &path]
).unwrap();
}
let inners = get_members("inner");
for (member_id, path) in inners.into_iter() {
tx.execute(
"INSERT INTO osm_multipolygon_members (id, m_id, m_role, m_geo) VALUES ($1, $2, 'inner', polygon(pclose($3::path)))",
&[&(rel.id as i64), &(member_id as i64), &path]
).unwrap();
}
count += 1;
}
}
}
}
tx.commit().unwrap();
true
}).unwrap_or(false);
}
count
}).iter().sum::<usize>();
}
println!("{} multipolygons", multipoly_count);
}