Reputation: 3731
In my SmtpService
I'd like to send the response header right away and the body when processing is finished. This should follow the SMTP exchange:
C: DATA
S: 354 Start mail input
C: ... data ...
C: ... more ...
C: .
S: 250 Ok
I've got this much in the playground:
#[macro_use]
extern crate log;
extern crate bytes;
extern crate tokio_proto;
extern crate tokio_service;
extern crate futures;
use std::io;
use bytes::Bytes;
use tokio_service::Service;
use tokio_proto::streaming::{Message, Body};
use futures::{future, Future, Stream};
use futures::sync::oneshot;
//use model::request::SmtpCommand;
//use model::response::SmtpReply;
#[derive(Eq, PartialEq, Debug)]
enum SmtpCommand {
Data,
}
#[derive(Eq, PartialEq, Debug)]
enum SmtpReply {
OkInfo,
StartMailInputChallenge,
TransactionFailure,
CommandNotImplementedFailure
}
pub struct SmtpService;
impl Service for SmtpService {
// For non-streaming protocols, service errors are always io::Error
type Error = io::Error;
// These types must match the corresponding protocol types:
type Request = Message<SmtpCommand, Body<Bytes, Self::Error>>;
type Response = Message<SmtpReply, Body<SmtpReply, Self::Error>>;
// The future for computing the response; box it for simplicity.
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
// Produce a future for computing a response from a request.
fn call(&self, command: Self::Request) -> Self::Future {
info!("Received {:?}", command);
match command {
Message::WithBody(cmd, cmd_body) => {
match cmd {
SmtpCommand::Data => {
// start => SmtpReply::StartMailInputChallenge
// ok => SmtpReply::OkInfo
// err => SmtpReply::TransactionFailure
let (tx, rx) = oneshot::channel();
let fut = cmd_body
.inspect(|chunk| info!("data: {:?}", chunk))
.map(|_| tx.send(SmtpReply::OkInfo))
.map_err(|_| tx.send(SmtpReply::TransactionFailure))
.map(|_| Body::from(rx));
// ??? How to wire the fut future into the response message?
let msg = Message::WithBody(SmtpReply::StartMailInputChallenge, fut);
Box::new(future::ok(msg)) as Self::Future
}
_ => Box::new(future::ok(Message::WithoutBody(
SmtpReply::CommandNotImplementedFailure,
))),
}
}
Message::WithoutBody(cmd) => {
Box::new(future::ok(Message::WithoutBody(match cmd {
_ => SmtpReply::CommandNotImplementedFailure,
})))
}
}
}
}
fn main() {
println!("Hello, world!");
}
I'm wondering if it's even possible or do I need to produce two messages instead - one for DATA and a second for the actual byte stream?
The error I get shows a mismatch in the message structure; the body/future is obviously out of place:
error[E0271]: type mismatch resolving `<futures::FutureResult<tokio_proto::streaming::Message<SmtpReply, futures::stream::Map<futures::stream::MapErr<futures::stream::Map<futures::stream::Inspect<tokio_proto::streaming::Body<bytes::Bytes, std::io::Error>, [closure@src/main.rs:57:42: 57:76]>, [closure@src/main.rs:58:38: 58:68 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:59:42: 59:84 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:60:38: 60:56 rx:futures::Receiver<SmtpReply>]>>, std::io::Error> as futures::Future>::Item == tokio_proto::streaming::Message<SmtpReply, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>`
--> src/main.rs:66:25
|
66 | Box::new(future::ok(msg)) as Self::Future
| ^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::stream::Map`, found struct `tokio_proto::streaming::Body`
|
= note: expected type `tokio_proto::streaming::Message<_, futures::stream::Map<futures::stream::MapErr<futures::stream::Map<futures::stream::Inspect<tokio_proto::streaming::Body<bytes::Bytes, std::io::Error>, [closure@src/main.rs:57:42: 57:76]>, [closure@src/main.rs:58:38: 58:68 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:59:42: 59:84 tx:futures::Sender<SmtpReply>]>, [closure@src/main.rs:60:38: 60:56 rx:futures::Receiver<SmtpReply>]>>`
found type `tokio_proto::streaming::Message<_, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>`
= note: required for the cast to the object type `futures::Future<Error=std::io::Error, Item=tokio_proto::streaming::Message<SmtpReply, tokio_proto::streaming::Body<SmtpReply, std::io::Error>>>`
Upvotes: 1
Views: 254
Reputation: 5530
The future returned by call
ends when the Response
is returned; you cannot "drive" further actions in that future.
This means you need to spawn a new task generating the (streamed) body; you'll need a Handle
from tokio_core
for that.
Also the Body
needs to be created from a mpsc
channel, not a oneshot
; you could send many body chunks.
#[macro_use]
extern crate log;
extern crate bytes;
extern crate tokio_core;
extern crate tokio_proto;
extern crate tokio_service;
extern crate futures;
use std::io;
use bytes::Bytes;
use tokio_service::Service;
use tokio_proto::streaming::{Message, Body};
use futures::{future, Future, Stream, Sink};
use futures::sync::mpsc;
//use model::request::SmtpCommand;
//use model::response::SmtpReply;
#[derive(Eq, PartialEq, Debug)]
pub enum SmtpCommand {
Data,
}
#[derive(Eq, PartialEq, Debug)]
pub enum SmtpReply {
OkInfo,
StartMailInputChallenge,
TransactionFailure,
CommandNotImplementedFailure
}
pub struct SmtpService {
handle: tokio_core::reactor::Handle,
}
impl Service for SmtpService {
// For non-streaming protocols, service errors are always io::Error
type Error = io::Error;
// These types must match the corresponding protocol types:
type Request = Message<SmtpCommand, Body<Bytes, Self::Error>>;
type Response = Message<SmtpReply, Body<SmtpReply, Self::Error>>;
// The future for computing the response; box it for simplicity.
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
// Produce a future for computing a response from a request.
fn call(&self, command: Self::Request) -> Self::Future {
info!("Received {:?}", command);
match command {
Message::WithBody(cmd, cmd_body) => {
match cmd {
SmtpCommand::Data => {
// start => SmtpReply::StartMailInputChallenge
// ok => SmtpReply::OkInfo
// err => SmtpReply::TransactionFailure
let (tx, rx) = mpsc::channel::<io::Result<SmtpReply>>(1);
let fut = cmd_body
// read cmd stream; for_each results in a Future,
// which completes when the stream is finished
.for_each(|chunk| {
info!("data: {:?}", chunk);
Ok(())
})
// now send the result body
.then(move |r| match r {
Ok(_) => tx.send(Ok(SmtpReply::OkInfo)),
Err(_) => tx.send(Ok(SmtpReply::TransactionFailure)),
})
// could send further body messages:
// .and_then(|tx| tx.send(...))
// ignore any send errors; spawn needs a future with
// Item=() and Error=().
.then(|_| Ok(()))
;
self.handle.spawn(fut);
let body : Body<SmtpReply, Self::Error> = Body::from(rx);
let msg : Self::Response = Message::WithBody(SmtpReply::StartMailInputChallenge, body);
Box::new(future::ok(msg)) as Self::Future
}
_ => Box::new(future::ok(Message::WithoutBody(
SmtpReply::CommandNotImplementedFailure,
))),
}
}
Message::WithoutBody(cmd) => {
Box::new(future::ok(Message::WithoutBody(match cmd {
_ => SmtpReply::CommandNotImplementedFailure,
})))
}
}
}
}
fn main() {
println!("Hello, world!");
}
Upvotes: 1