src/blocking/body.rs
373 lines · rust · 1 line annotation
use std::fmt;use std::fs::File;use std::future::Future;#[cfg(feature = "multipart")]use std::io::Cursor;use std::io::{self, Read};use std::mem::{self, MaybeUninit};use std::ptr;use bytes::Bytes;use futures_channel::mpsc;use crate::async_impl;/// The body of a `Request`.////// In most cases, this is not needed directly, as the/// [`RequestBuilder.body`][builder] method uses `Into<Body>`, which allows/// passing many things (like a string or vector of bytes).////// [builder]: ./struct.RequestBuilder.html#method.body#[derive(Debug)]pub struct Body { kind: Kind,}impl Body { /// Instantiate a `Body` from a reader. /// /// # Note /// /// While allowing for many types to be used, these bodies do not have /// a way to reset to the beginning and be reused. This means that when /// encountering a 307 or 308 status code, instead of repeating the /// request at the new location, the `Response` will be returned with /// the redirect status code set. /// /// ```rust /// # use std::fs::File; /// # use reqwest::blocking::Body; /// # fn run() -> Result<(), Box<dyn std::error::Error>> { /// let file = File::open("national_secrets.txt")?; /// let body = Body::new(file); /// # Ok(()) /// # } /// ``` /// /// If you have a set of bytes, like `String` or `Vec<u8>`, using the /// `From` implementations for `Body` will store the data in a manner /// it can be reused. /// /// ```rust /// # use reqwest::blocking::Body; /// # fn run() -> Result<(), Box<dyn std::error::Error>> { /// let s = "A stringy body"; /// let body = Body::from(s); /// # Ok(()) /// # } /// ``` pub fn new<R: Read + Send + 'static>(reader: R) -> Body { Body { kind: Kind::Reader(Box::from(reader), None), } } /// Create a `Body` from a `Read` where the size is known in advance /// but the data should not be fully loaded into memory. This will /// set the `Content-Length` header and stream from the `Read`. /// /// ```rust /// # use std::fs::File; /// # use reqwest::blocking::Body; /// # fn run() -> Result<(), Box<dyn std::error::Error>> { /// let file = File::open("a_large_file.txt")?; /// let file_size = file.metadata()?.len(); /// let body = Body::sized(file, file_size); /// # Ok(()) /// # } /// ``` pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body { Body { kind: Kind::Reader(Box::from(reader), Some(len)), } } /// Returns the body as a byte slice if the body is already buffered in /// memory. For streamed requests this method returns `None`. pub fn as_bytes(&self) -> Option<&[u8]> { match self.kind { Kind::Reader(_, _) => None, Kind::Bytes(ref bytes) => Some(bytes.as_ref()), } } /// Converts streamed requests to their buffered equivalent and /// returns a reference to the buffer. If the request is already /// buffered, this has no effect. /// /// Be aware that for large requests this method is expensive /// and may cause your program to run out of memory. pub fn buffer(&mut self) -> Result<&[u8], crate::Error> { match self.kind { Kind::Reader(ref mut reader, maybe_len) => { let mut bytes = if let Some(len) = maybe_len { Vec::with_capacity(len as usize) } else { Vec::new() }; io::copy(reader, &mut bytes).map_err(crate::error::builder)?; self.kind = Kind::Bytes(bytes.into()); self.buffer() } Kind::Bytes(ref bytes) => Ok(bytes.as_ref()), } } #[cfg(feature = "multipart")] pub(crate) fn len(&self) -> Option<u64> { match self.kind { Kind::Reader(_, len) => len, Kind::Bytes(ref bytes) => Some(bytes.len() as u64), } } #[cfg(feature = "multipart")] pub(crate) fn into_reader(self) -> Reader { match self.kind { Kind::Reader(r, _) => Reader::Reader(r), Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)), } } pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) { match self.kind { Kind::Reader(read, len) => { let (tx, rx) = mpsc::channel(0); let tx = Sender { body: (read, len), tx, }; (Some(tx), async_impl::Body::stream(rx), len) } Kind::Bytes(chunk) => { let len = chunk.len() as u64; (None, async_impl::Body::reusable(chunk), Some(len)) } } } pub(crate) fn try_clone(&self) -> Option<Body> { self.kind.try_clone().map(|kind| Body { kind }) }}enum Kind { Reader(Box<dyn Read + Send>, Option<u64>), Bytes(Bytes),}impl Kind { fn try_clone(&self) -> Option<Kind> { match self { Kind::Reader(..) => None, Kind::Bytes(v) => Some(Kind::Bytes(v.clone())), } }}impl From<Vec<u8>> for Body { #[inline] fn from(v: Vec<u8>) -> Body { Body { kind: Kind::Bytes(v.into()), } }}impl From<String> for Body { #[inline] fn from(s: String) -> Body { s.into_bytes().into() }}impl From<&'static [u8]> for Body { #[inline] fn from(s: &'static [u8]) -> Body { Body { kind: Kind::Bytes(Bytes::from_static(s)), } }}impl From<&'static str> for Body { #[inline] fn from(s: &'static str) -> Body { s.as_bytes().into() }}impl From<File> for Body { #[inline] fn from(f: File) -> Body { let len = f.metadata().map(|m| m.len()).ok(); Body { kind: Kind::Reader(Box::new(f), len), } }}impl From<Bytes> for Body { #[inline] fn from(b: Bytes) -> Body { Body { kind: Kind::Bytes(b), } }}impl fmt::Debug for Kind { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { Kind::Reader(_, ref v) => f .debug_struct("Reader") .field("length", &DebugLength(v)) .finish(), Kind::Bytes(ref v) => fmt::Debug::fmt(v, f), } }}struct DebugLength<'a>(&'a Option<u64>);impl<'a> fmt::Debug for DebugLength<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self.0 { Some(ref len) => fmt::Debug::fmt(len, f), None => f.write_str("Unknown"), } }}#[cfg(feature = "multipart")]pub(crate) enum Reader { Reader(Box<dyn Read + Send>), Bytes(Cursor<Bytes>),}#[cfg(feature = "multipart")]impl Read for Reader { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { match *self { Reader::Reader(ref mut rdr) => rdr.read(buf), Reader::Bytes(ref mut rdr) => rdr.read(buf), } }}pub(crate) struct Sender { body: (Box<dyn Read + Send>, Option<u64>), tx: mpsc::Sender<Result<Bytes, Abort>>,}#[derive(Debug)]struct Abort;impl fmt::Display for Abort { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("abort request body") }}impl std::error::Error for Abort {}async fn send_future(sender: Sender) -> Result<(), crate::Error> { use bytes::{BufMut, BytesMut}; use futures_util::SinkExt; use std::cmp; let con_len = sender.body.1; let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192); let mut written = 0; let mut buf = BytesMut::zeroed(cap as usize); buf.clear(); let mut body = sender.body.0; // Put in an option so that it can be consumed on error to call abort() let mut tx = Some(sender.tx); loop { if Some(written) == con_len { // Written up to content-length, so stop. return Ok(()); } // The input stream is read only if the buffer is empty so // that there is only one read in the buffer at any time. // // We need to know whether there is any data to send before // we check the transmission channel (with poll_ready below) // because sometimes the receiver disappears as soon as it // considers the data is completely transmitted, which may // be true. // // The use case is a web server that closes its // input stream as soon as the data received is valid JSON. // This behaviour is questionable, but it exists and the // fact is that there is actually no remaining data to read. if buf.is_empty() { if buf.capacity() == buf.len() { buf.reserve(8192); // zero out the reserved memory let uninit = buf.spare_capacity_mut(); let uninit_len = uninit.len(); unsafe { ptr::write_bytes(uninit.as_mut_ptr().cast::<u8>(), 0, uninit_len); } } let bytes = unsafe { mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.spare_capacity_mut()) }; match body.read(bytes) { Ok(0) => { // The buffer was empty and nothing's left to // read. Return. return Ok(()); } Ok(n) => unsafe { buf.advance_mut(n); }, Err(e) => { let _ = tx .take() .expect("tx only taken on error") .clone() .try_send(Err(Abort)); return Err(crate::error::body(e)); } } }Three unsafe blocks in send_future in the blocking body bridge. The first (lines 313-315) calls ptr::write_bytes to zero-initialize newly reserved BytesMut capacity before treating it as initialized; this ensures bytes written by the Read impl are not uninitialized. The second (lines 318-319) uses mem::transmute to convert &mut [MaybeUninit<u8>] to &mut [u8] after the zeroing step; the zeroing guarantees the invariant. The third (line 327) calls buf.advance_mut(n) where n is the return value of body.read(), so n bytes have been written by the OS. No SAFETY comments accompany these blocks.
Justifies uses-unsafe. The absence of SAFETY comments is noted but the invariants are upheld by the logic immediately preceding each block.
// The only way to get here is when the buffer is not empty. // We can check the transmission channel let buf_len = buf.len() as u64; tx.as_mut() .expect("tx only taken on error") .send(Ok(buf.split().freeze())) .await .map_err(crate::error::body)?; written += buf_len; }}impl Sender { // A `Future` that may do blocking read calls. // As a `Future`, this integrates easily with `wait::timeout`. pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> { send_future(self) }}// useful for tests, but not publicly exposed#[cfg(test)]pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> { let mut s = String::new(); match body.kind { Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s), Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s), } .map(|_| s)}