Compare commits
1 Commits
master
...
feat-notic
Author | SHA1 | Date |
---|---|---|
William Casarin | c12da6a229 |
26
src/db.rs
26
src/db.rs
|
@ -6,6 +6,7 @@ use crate::event::{single_char_tagname, Event};
|
|||
use crate::hexrange::hex_range;
|
||||
use crate::hexrange::HexSearch;
|
||||
use crate::nip05;
|
||||
use crate::notice::Notice;
|
||||
use crate::schema::{upgrade_db, STARTUP_SQL};
|
||||
use crate::subscription::ReqFilter;
|
||||
use crate::subscription::Subscription;
|
||||
|
@ -32,7 +33,7 @@ pub type PooledConnection = r2d2::PooledConnection<r2d2_sqlite::SqliteConnection
|
|||
/// Events submitted from a client, with a return channel for notices
|
||||
pub struct SubmittedEvent {
|
||||
pub event: Event,
|
||||
pub notice_tx: tokio::sync::mpsc::Sender<String>,
|
||||
pub notice_tx: tokio::sync::mpsc::Sender<Notice>,
|
||||
}
|
||||
|
||||
/// Database file
|
||||
|
@ -158,7 +159,9 @@ pub async fn db_writer(
|
|||
event.get_event_id_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send("pubkey is not allowed to publish to this relay".to_owned())
|
||||
.try_send(Notice::message(
|
||||
"pubkey is not allowed to publish to this relay".to_owned(),
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
|
@ -189,10 +192,10 @@ pub async fn db_writer(
|
|||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send(
|
||||
.try_send(Notice::message(
|
||||
"NIP-05 verification is no longer valid (expired/wrong domain)"
|
||||
.to_owned(),
|
||||
)
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
|
@ -203,7 +206,9 @@ pub async fn db_writer(
|
|||
event.get_author_prefix()
|
||||
);
|
||||
notice_tx
|
||||
.try_send("NIP-05 verification needed to publish events".to_owned())
|
||||
.try_send(Notice::message(
|
||||
"NIP-05 verification needed to publish events".to_owned(),
|
||||
))
|
||||
.ok();
|
||||
continue;
|
||||
}
|
||||
|
@ -229,6 +234,7 @@ pub async fn db_writer(
|
|||
Ok(updated) => {
|
||||
if updated == 0 {
|
||||
trace!("ignoring duplicate or deleted event");
|
||||
notice_tx.try_send(Notice::duplicate(event.id)).ok();
|
||||
} else {
|
||||
info!(
|
||||
"persisted event: {:?} from: {:?} in: {:?}",
|
||||
|
@ -239,16 +245,14 @@ pub async fn db_writer(
|
|||
event_write = true;
|
||||
// send this out to all clients
|
||||
bcast_tx.send(event.clone()).ok();
|
||||
notice_tx.try_send(Notice::saved(event.id)).ok();
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("event insert failed: {:?}", err);
|
||||
notice_tx
|
||||
.try_send(
|
||||
"relay experienced an error trying to publish the latest event"
|
||||
.to_owned(),
|
||||
)
|
||||
.ok();
|
||||
let msg =
|
||||
"relay experienced an error trying to publish the latest event".into();
|
||||
notice_tx.try_send(Notice::err_msg(msg, event.id)).ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ pub mod event;
|
|||
pub mod hexrange;
|
||||
pub mod info;
|
||||
pub mod nip05;
|
||||
pub mod notice;
|
||||
pub mod schema;
|
||||
pub mod subscription;
|
||||
pub mod utils;
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
use crate::error;
|
||||
|
||||
pub enum EventResultStatus {
|
||||
Saved,
|
||||
Duplicate,
|
||||
Error(String),
|
||||
}
|
||||
|
||||
pub struct EventResult {
|
||||
pub id: String,
|
||||
pub status: EventResultStatus,
|
||||
}
|
||||
|
||||
pub enum Notice {
|
||||
Message(String),
|
||||
EventResult(EventResult),
|
||||
}
|
||||
|
||||
impl Notice {
|
||||
pub fn err(err: error::Error, id: String) -> Notice {
|
||||
Notice::err_msg(format!("{}", err), id)
|
||||
}
|
||||
|
||||
pub fn message(msg: String) -> Notice {
|
||||
Notice::Message(msg)
|
||||
}
|
||||
|
||||
pub fn saved(id: String) -> Notice {
|
||||
Notice::EventResult(EventResult {
|
||||
id,
|
||||
status: EventResultStatus::Saved,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn duplicate(id: String) -> Notice {
|
||||
Notice::EventResult(EventResult {
|
||||
id,
|
||||
status: EventResultStatus::Duplicate,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn err_msg(msg: String, id: String) -> Notice {
|
||||
Notice::EventResult(EventResult {
|
||||
id,
|
||||
status: EventResultStatus::Error(msg),
|
||||
})
|
||||
}
|
||||
}
|
|
@ -10,6 +10,7 @@ use crate::event::Event;
|
|||
use crate::event::EventCmd;
|
||||
use crate::info::RelayInfo;
|
||||
use crate::nip05;
|
||||
use crate::notice::{EventResultStatus, Notice};
|
||||
use crate::subscription::Subscription;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
|
@ -405,8 +406,17 @@ fn convert_to_msg(msg: String, max_bytes: Option<usize>) -> Result<NostrMessage>
|
|||
}
|
||||
|
||||
/// Turn a string into a NOTICE message ready to send over a WebSocket
|
||||
fn make_notice_message(msg: &str) -> Message {
|
||||
Message::text(json!(["NOTICE", msg]).to_string())
|
||||
fn make_notice_message(notice: Notice) -> Message {
|
||||
let json = match notice {
|
||||
Notice::Message(ref msg) => json!(["NOTICE", msg]),
|
||||
Notice::EventResult(ref res) => match &res.status {
|
||||
EventResultStatus::Saved => json!(["OK", res.id, "true"]),
|
||||
EventResultStatus::Duplicate => json!(["OK", res.id, "true", "duplicate"]),
|
||||
EventResultStatus::Error(msg) => json!(["OK", res.id, "false", msg]),
|
||||
},
|
||||
};
|
||||
|
||||
Message::text(json.to_string())
|
||||
}
|
||||
|
||||
struct ClientInfo {
|
||||
|
@ -435,7 +445,7 @@ async fn nostr_server(
|
|||
// we will send out the tx handle to any query we generate.
|
||||
let (query_tx, mut query_rx) = mpsc::channel::<db::QueryResult>(256);
|
||||
// Create channel for receiving NOTICEs
|
||||
let (notice_tx, mut notice_rx) = mpsc::channel::<String>(32);
|
||||
let (notice_tx, mut notice_rx) = mpsc::channel::<Notice>(32);
|
||||
|
||||
// last time this client sent data (message, ping, etc.)
|
||||
let mut last_message_time = Instant::now();
|
||||
|
@ -480,7 +490,7 @@ async fn nostr_server(
|
|||
ws_stream.send(Message::Ping(Vec::new())).await.ok();
|
||||
},
|
||||
Some(notice_msg) = notice_rx.recv() => {
|
||||
ws_stream.send(make_notice_message(¬ice_msg)).await.ok();
|
||||
ws_stream.send(make_notice_message(notice_msg)).await.ok();
|
||||
},
|
||||
Some(query_result) = query_rx.recv() => {
|
||||
// database informed us of a query result we asked for
|
||||
|
@ -528,7 +538,7 @@ async fn nostr_server(
|
|||
},
|
||||
Some(Ok(Message::Binary(_))) => {
|
||||
ws_stream.send(
|
||||
make_notice_message("binary messages are not accepted")).await.ok();
|
||||
make_notice_message(Notice::message("binary messages are not accepted".into()))).await.ok();
|
||||
continue;
|
||||
},
|
||||
Some(Ok(Message::Ping(_) | Message::Pong(_))) => {
|
||||
|
@ -538,8 +548,7 @@ async fn nostr_server(
|
|||
},
|
||||
Some(Err(WsError::Capacity(MessageTooLong{size, max_size}))) => {
|
||||
ws_stream.send(
|
||||
make_notice_message(
|
||||
&format!("message too large ({} > {})",size, max_size))).await.ok();
|
||||
make_notice_message(Notice::message(format!("message too large ({} > {})",size, max_size)))).await.ok();
|
||||
continue;
|
||||
},
|
||||
None |
|
||||
|
@ -581,13 +590,15 @@ async fn nostr_server(
|
|||
} else {
|
||||
info!("client: {} sent a far future-dated event", cid);
|
||||
if let Some(fut_sec) = settings.options.reject_future_seconds {
|
||||
ws_stream.send(make_notice_message(&format!("The event created_at field is out of the acceptable range (+{}sec) for this relay and was not stored.",fut_sec))).await.ok();
|
||||
let msg = format!("The event created_at field is out of the acceptable range (+{}sec) for this relay and was not stored.",fut_sec);
|
||||
let notice = Notice::err_msg(msg, e.id);
|
||||
ws_stream.send(make_notice_message(notice)).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
info!("client: {} sent an invalid event", cid);
|
||||
ws_stream.send(make_notice_message("event was invalid")).await.ok();
|
||||
ws_stream.send(make_notice_message(Notice::message("event was invalid".into()))).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -609,7 +620,7 @@ async fn nostr_server(
|
|||
},
|
||||
Err(e) => {
|
||||
info!("Subscription error: {}", e);
|
||||
ws_stream.send(make_notice_message(&e.to_string())).await.ok();
|
||||
ws_stream.send(make_notice_message(Notice::err(e, s.id))).await.ok();
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@ -628,7 +639,7 @@ async fn nostr_server(
|
|||
conn.unsubscribe(&c);
|
||||
} else {
|
||||
info!("invalid command ignored");
|
||||
ws_stream.send(make_notice_message("could not parse command")).await.ok();
|
||||
ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok();
|
||||
}
|
||||
},
|
||||
Err(Error::ConnError) => {
|
||||
|
@ -637,11 +648,11 @@ async fn nostr_server(
|
|||
}
|
||||
Err(Error::EventMaxLengthError(s)) => {
|
||||
info!("client: {} sent event larger ({} bytes) than max size", cid, s);
|
||||
ws_stream.send(make_notice_message("event exceeded max size")).await.ok();
|
||||
ws_stream.send(make_notice_message(Notice::message("event exceeded max size".into()))).await.ok();
|
||||
},
|
||||
Err(Error::ProtoParseError) => {
|
||||
info!("client {} sent event that could not be parsed", cid);
|
||||
ws_stream.send(make_notice_message("could not parse command")).await.ok();
|
||||
ws_stream.send(make_notice_message(Notice::message("could not parse command".into()))).await.ok();
|
||||
},
|
||||
Err(e) => {
|
||||
info!("got non-fatal error from client: {}, error: {:?}", cid, e);
|
||||
|
|
Loading…
Reference in New Issue