stream.rs
4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
//
//
use std::time::Duration;
use std::error::Error;
use std::sync::atomic::{AtomicU32, Ordering};
use std::collections::HashMap;
use futures_util::{StreamExt};
use reqwest::Client;
use reqwest::header::{HeaderName, HeaderMap};
static REQUEST_COUNTER: AtomicU32 = AtomicU32::new(0);
#[derive(Debug, Clone, serde::Serialize)]
pub struct StreamResponse {
request_id: u32,
status: u16,
status_text: String,
headers: HashMap<String, String>
}
#[derive(Clone, serde::Serialize)]
pub struct EndPayload {
request_id: u32,
status: u16,
}
#[derive(Clone, serde::Serialize)]
pub struct ChunkPayload {
request_id: u32,
chunk: bytes::Bytes,
}
#[tauri::command]
pub async fn stream_fetch(
window: tauri::Window,
method: String,
url: String,
headers: HashMap<String, String>,
body: Vec<u8>,
) -> Result<StreamResponse, String> {
let event_name = "stream-response";
let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::SeqCst);
let mut _headers = HeaderMap::new();
for (key, value) in &headers {
_headers.insert(key.parse::<HeaderName>().unwrap(), value.parse().unwrap());
}
// println!("method: {:?}", method);
// println!("url: {:?}", url);
// println!("headers: {:?}", headers);
// println!("headers: {:?}", _headers);
let method = method.parse::<reqwest::Method>().map_err(|err| format!("failed to parse method: {}", err))?;
let client = Client::builder()
.default_headers(_headers)
.redirect(reqwest::redirect::Policy::limited(3))
.connect_timeout(Duration::new(3, 0))
.build()
.map_err(|err| format!("failed to generate client: {}", err))?;
let mut request = client.request(
method.clone(),
url.parse::<reqwest::Url>().map_err(|err| format!("failed to parse url: {}", err))?
);
if method == reqwest::Method::POST || method == reqwest::Method::PUT || method == reqwest::Method::PATCH {
let body = bytes::Bytes::from(body);
// println!("body: {:?}", body);
request = request.body(body);
}
// println!("client: {:?}", client);
// println!("request: {:?}", request);
let response_future = request.send();
let res = response_future.await;
let response = match res {
Ok(res) => {
// get response and emit to client
let mut headers = HashMap::new();
for (name, value) in res.headers() {
headers.insert(
name.as_str().to_string(),
std::str::from_utf8(value.as_bytes()).unwrap().to_string()
);
}
let status = res.status().as_u16();
tauri::async_runtime::spawn(async move {
let mut stream = res.bytes_stream();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
// println!("chunk: {:?}", bytes);
if let Err(e) = window.emit(event_name, ChunkPayload{ request_id, chunk: bytes }) {
println!("Failed to emit chunk payload: {:?}", e);
}
}
Err(err) => {
println!("Error chunk: {:?}", err);
}
}
}
if let Err(e) = window.emit(event_name, EndPayload{ request_id, status: 0 }) {
println!("Failed to emit end payload: {:?}", e);
}
});
StreamResponse {
request_id,
status,
status_text: "OK".to_string(),
headers,
}
}
Err(err) => {
let error: String = err.source()
.map(|e| e.to_string())
.unwrap_or_else(|| "Unknown error occurred".to_string());
println!("Error response: {:?}", error);
tauri::async_runtime::spawn( async move {
if let Err(e) = window.emit(event_name, ChunkPayload{ request_id, chunk: error.into() }) {
println!("Failed to emit chunk payload: {:?}", e);
}
if let Err(e) = window.emit(event_name, EndPayload{ request_id, status: 0 }) {
println!("Failed to emit end payload: {:?}", e);
}
});
StreamResponse {
request_id,
status: 599,
status_text: "Error".to_string(),
headers: HashMap::new(),
}
}
};
// println!("Response: {:?}", response);
Ok(response)
}