summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordefanor <defanor@uberspace.net>2020-09-18 15:05:34 +0300
committerdefanor <defanor@uberspace.net>2020-09-18 15:05:34 +0300
commit07717e84876b7e62e3a62dd0878e19692c6c2662 (patch)
treecc075a3e45b31191ae92968052f0aed7b3d32ac9
parentc9b55c44dc4f31822cf9f3176f44ec283a63125b (diff)
Introduce XML input queue
Instead of processing XML elements from parser callbacks, they are now queued to be processed after parsing. This allows to explicitly return error codes from processing functions, as well as to reset the parser without relying on an additional stream state (REXMPP_STREAM_RESTART, which is removed).
-rw-r--r--src/rexmpp.c125
-rw-r--r--src/rexmpp.h18
2 files changed, 80 insertions, 63 deletions
diff --git a/src/rexmpp.c b/src/rexmpp.c
index 41a45bd..bdd878b 100644
--- a/src/rexmpp.c
+++ b/src/rexmpp.c
@@ -248,6 +248,8 @@ rexmpp_err_t rexmpp_init (rexmpp_t *s, const char *jid)
s->server_socket = -1;
s->current_element_root = NULL;
s->current_element = NULL;
+ s->input_queue = NULL;
+ s->input_queue_last = NULL;
s->stream_features = NULL;
s->roster_items = NULL;
s->roster_ver = NULL;
@@ -387,6 +389,11 @@ void rexmpp_cleanup (rexmpp_t *s) {
s->current_element_root = NULL;
s->current_element = NULL;
}
+ if (s->input_queue != NULL) {
+ xmlFreeNodeList(s->input_queue);
+ s->input_queue = NULL;
+ s->input_queue_last = NULL;
+ }
if (s->server_srv != NULL) {
ares_free_data(s->server_srv);
s->server_srv = NULL;
@@ -773,11 +780,11 @@ void rexmpp_iq_reply (rexmpp_t *s,
rexmpp_send(s, iq_stanza);
}
-void rexmpp_iq_new (rexmpp_t *s,
- const char *type,
- const char *to,
- xmlNodePtr payload,
- rexmpp_iq_callback_t cb)
+rexmpp_err_t rexmpp_iq_new (rexmpp_t *s,
+ const char *type,
+ const char *to,
+ xmlNodePtr payload,
+ rexmpp_iq_callback_t cb)
{
unsigned int i;
rexmpp_iq_t *prev = NULL, *last = s->active_iq;
@@ -811,7 +818,7 @@ void rexmpp_iq_new (rexmpp_t *s,
iq->cb = cb;
iq->next = s->active_iq;
s->active_iq = iq;
- rexmpp_send(s, iq_stanza);
+ return rexmpp_send(s, iq_stanza);
}
rexmpp_err_t rexmpp_sm_ack (rexmpp_t *s) {
@@ -829,10 +836,13 @@ rexmpp_err_t rexmpp_sm_send_req (rexmpp_t *s) {
return rexmpp_send(s, ack);
}
-void rexmpp_recv (rexmpp_t *s) {
+rexmpp_err_t rexmpp_process_element (rexmpp_t *s, xmlNodePtr elem);
+
+rexmpp_err_t rexmpp_recv (rexmpp_t *s) {
char chunk_raw[4096], *chunk;
ssize_t chunk_raw_len, chunk_len;
int sasl_err;
+ rexmpp_err_t err = REXMPP_SUCCESS;
/* Loop here in order to consume data from TLS buffers, which
wouldn't show up on select(). */
do {
@@ -850,13 +860,35 @@ void rexmpp_recv (rexmpp_t *s) {
rexmpp_log(s, LOG_ERR, "SASL decoding error: %s",
gsasl_strerror(sasl_err));
s->sasl_state = REXMPP_SASL_ERROR;
- return;
+ return REXMPP_E_SASL;
}
} else {
chunk = chunk_raw;
chunk_len = chunk_raw_len;
}
xmlParseChunk(s->xml_parser, chunk, chunk_len, 0);
+
+ xmlNodePtr elem;
+ for (elem = s->input_queue;
+ /* Skipping everything after an error. Might be better to
+ process it anyway, but it could lead to more errors if
+ the processing isn't done carefully. */
+ elem != NULL && (err == REXMPP_SUCCESS || err == REXMPP_E_AGAIN);
+ elem = elem->next)
+ {
+ if (s->xml_in_cb != NULL && s->xml_in_cb(s, elem) != 0) {
+ rexmpp_log(s, LOG_WARNING,
+ "Message processing was cancelled by xml_in_cb.");
+ } else {
+ err = rexmpp_process_element(s, elem);
+ }
+ }
+ xmlFreeNodeList(s->input_queue);
+ s->input_queue = NULL;
+ s->input_queue_last = NULL;
+ if (err != REXMPP_SUCCESS && err != REXMPP_E_AGAIN) {
+ return err;
+ }
} else if (chunk_raw_len == 0) {
if (s->tls_state == REXMPP_TLS_ACTIVE) {
s->tls_state = REXMPP_TLS_CLOSED;
@@ -887,6 +919,7 @@ void rexmpp_recv (rexmpp_t *s) {
}
}
} while (chunk_raw_len > 0 && s->tcp_state == REXMPP_TCP_CONNECTED);
+ return err;
}
rexmpp_err_t rexmpp_stream_open (rexmpp_t *s) {
@@ -1016,8 +1049,8 @@ rexmpp_err_t rexmpp_tls_handshake (rexmpp_t *s) {
return rexmpp_stream_open(s);
} else {
/* A STARTTLS connection, restart the stream. */
- s->stream_state = REXMPP_STREAM_RESTART;
- return REXMPP_SUCCESS;
+ xmlCtxtResetPush(s->xml_parser, "", 0, "", "utf-8");
+ return rexmpp_stream_open(s);
}
} else {
rexmpp_log(s, LOG_ERR, "Unexpected TLS handshake error: %s",
@@ -1354,17 +1387,15 @@ void rexmpp_bound (rexmpp_t *s, xmlNodePtr req, xmlNodePtr response, int success
}
}
-void rexmpp_stream_bind (rexmpp_t *s) {
+rexmpp_err_t rexmpp_stream_bind (rexmpp_t *s) {
/* Issue a bind request. */
s->stream_state = REXMPP_STREAM_BIND;
xmlNodePtr bind_cmd = xmlNewNode(NULL, "bind");
xmlNewNs(bind_cmd, "urn:ietf:params:xml:ns:xmpp-bind", NULL);
- rexmpp_iq_new(s, "set", NULL, bind_cmd, rexmpp_bound);
+ return rexmpp_iq_new(s, "set", NULL, bind_cmd, rexmpp_bound);
}
-void rexmpp_process_element (rexmpp_t *s) {
- xmlNodePtr elem = s->current_element;
-
+rexmpp_err_t rexmpp_process_element (rexmpp_t *s, xmlNodePtr elem) {
/* IQs. These are the ones that should be processed by the library;
if a user-facing application wants to handle them on its own, it
should cancel further processing by the library (so we can send
@@ -1574,7 +1605,7 @@ void rexmpp_process_element (rexmpp_t *s) {
/* Nothing to negotiate. */
if (xmlFirstElementChild(elem) == NULL) {
rexmpp_stream_is_ready(s);
- return;
+ return REXMPP_SUCCESS;
}
/* TODO: check for required features properly here. Currently
@@ -1588,7 +1619,7 @@ void rexmpp_process_element (rexmpp_t *s) {
xmlNodePtr starttls_cmd = xmlNewNode(NULL, "starttls");
xmlNewNs(starttls_cmd, "urn:ietf:params:xml:ns:xmpp-tls", NULL);
rexmpp_send(s, starttls_cmd);
- return;
+ return REXMPP_SUCCESS;
}
child = rexmpp_xml_find_child(elem, "urn:ietf:params:xml:ns:xmpp-sasl",
@@ -1620,7 +1651,7 @@ void rexmpp_process_element (rexmpp_t *s) {
rexmpp_log(s, LOG_CRIT, "Failed to initialise SASL session: %s",
gsasl_strerror(sasl_err));
s->sasl_state = REXMPP_SASL_ERROR;
- return;
+ return REXMPP_E_SASL;
}
sasl_err = gsasl_step64 (s->sasl_session, "", (char**)&sasl_buf);
if (sasl_err != GSASL_OK) {
@@ -1630,7 +1661,7 @@ void rexmpp_process_element (rexmpp_t *s) {
rexmpp_log(s, LOG_ERR, "SASL error: %s",
gsasl_strerror(sasl_err));
s->sasl_state = REXMPP_SASL_ERROR;
- return;
+ return REXMPP_E_SASL;
}
}
xmlNodePtr auth_cmd = xmlNewNode(NULL, "auth");
@@ -1639,7 +1670,7 @@ void rexmpp_process_element (rexmpp_t *s) {
xmlNodeAddContent(auth_cmd, sasl_buf);
free(sasl_buf);
rexmpp_send(s, auth_cmd);
- return;
+ return REXMPP_SUCCESS;
}
child = rexmpp_xml_find_child(elem, "urn:xmpp:sm:3", "sm");
@@ -1652,14 +1683,13 @@ void rexmpp_process_element (rexmpp_t *s) {
xmlNewProp(sm_resume, "previd", s->stream_id);
xmlNewProp(sm_resume, "h", buf);
rexmpp_send(s, sm_resume);
- return;
+ return REXMPP_SUCCESS;
}
child =
rexmpp_xml_find_child(elem, "urn:ietf:params:xml:ns:xmpp-bind", "bind");
if (child != NULL) {
- rexmpp_stream_bind(s);
- return;
+ return rexmpp_stream_bind(s);
}
}
@@ -1668,7 +1698,7 @@ void rexmpp_process_element (rexmpp_t *s) {
"error")) {
rexmpp_log(s, LOG_ERR, "stream error");
s->stream_state = REXMPP_STREAM_ERROR;
- return;
+ return REXMPP_E_STREAM;
}
/* STARTTLS negotiation,
@@ -1676,12 +1706,11 @@ void rexmpp_process_element (rexmpp_t *s) {
if (s->stream_state == REXMPP_STREAM_STARTTLS) {
if (rexmpp_xml_match(elem, "urn:ietf:params:xml:ns:xmpp-tls",
"proceed")) {
- rexmpp_tls_start(s);
- return;
+ return rexmpp_tls_start(s);
} else if (rexmpp_xml_match(elem, "urn:ietf:params:xml:ns:xmpp-tls",
"failure")) {
rexmpp_log(s, LOG_ERR, "STARTTLS failure");
- return;
+ return REXMPP_E_TLS;
}
}
@@ -1701,15 +1730,14 @@ void rexmpp_process_element (rexmpp_t *s) {
rexmpp_log(s, LOG_ERR, "SASL error: %s",
gsasl_strerror(sasl_err));
s->sasl_state = REXMPP_SASL_ERROR;
- return;
+ return REXMPP_E_SASL;
}
}
xmlNodePtr response = xmlNewNode(NULL, "response");
xmlNewNs(response, "urn:ietf:params:xml:ns:xmpp-sasl", NULL);
xmlNodeAddContent(response, sasl_buf);
free(sasl_buf);
- rexmpp_send(s, response);
- return;
+ return rexmpp_send(s, response);
} else if (rexmpp_xml_match(elem, "urn:ietf:params:xml:ns:xmpp-sasl",
"success")) {
sasl_err = gsasl_step64 (s->sasl_session, xmlNodeGetContent(elem),
@@ -1721,17 +1749,16 @@ void rexmpp_process_element (rexmpp_t *s) {
rexmpp_log(s, LOG_ERR, "SASL error: %s",
gsasl_strerror(sasl_err));
s->sasl_state = REXMPP_SASL_ERROR;
- return;
+ return REXMPP_E_SASL;
}
s->sasl_state = REXMPP_SASL_ACTIVE;
- s->stream_state = REXMPP_STREAM_RESTART;
- return;
+ xmlCtxtResetPush(s->xml_parser, "", 0, "", "utf-8");
+ return rexmpp_stream_open(s);
} else if (rexmpp_xml_match(elem, "urn:ietf:params:xml:ns:xmpp-sasl",
"failure")) {
/* todo: would be nice to retry here, but just giving up for now */
rexmpp_log(s, LOG_ERR, "SASL failure");
- rexmpp_stop(s);
- return;
+ return rexmpp_stop(s);
}
}
@@ -1791,8 +1818,7 @@ void rexmpp_process_element (rexmpp_t *s) {
"urn:ietf:params:xml:ns:xmpp-bind",
"bind");
if (child != NULL) {
- rexmpp_stream_bind(s);
- return;
+ return rexmpp_stream_bind(s);
}
}
}
@@ -1801,10 +1827,11 @@ void rexmpp_process_element (rexmpp_t *s) {
s->stanzas_in_count++;
}
if (rexmpp_xml_match(elem, "urn:xmpp:sm:3", "r")) {
- rexmpp_sm_ack(s);
+ return rexmpp_sm_ack(s);
} else if (rexmpp_xml_match(elem, "urn:xmpp:sm:3", "a")) {
rexmpp_sm_handle_ack(s, elem);
}
+ return REXMPP_SUCCESS;
}
@@ -1887,14 +1914,13 @@ void rexmpp_sax_end_elem_ns (rexmpp_t *s,
if (s->current_element != s->current_element_root) {
s->current_element = s->current_element->parent;
} else {
- if (s->xml_in_cb != NULL && s->xml_in_cb(s, s->current_element) != 0) {
- rexmpp_log(s, LOG_WARNING,
- "Message processing was cancelled by xml_in_cb.");
+ if (s->input_queue == NULL) {
+ s->input_queue = s->current_element;
+ s->input_queue_last = s->current_element;
} else {
- rexmpp_process_element(s);
+ s->input_queue_last->next = s->current_element;
+ s->input_queue_last = s->current_element;
}
-
- xmlFreeNode(s->current_element);
s->current_element = NULL;
s->current_element_root = NULL;
}
@@ -2039,17 +2065,6 @@ rexmpp_err_t rexmpp_run (rexmpp_t *s, fd_set *read_fds, fd_set *write_fds) {
rexmpp_tls_handshake(s);
}
- /* Restarting a stream if needed after the above actions. Since it
- involves resetting the parser, functions called by that parser
- can't do it on their own. */
- if (s->tcp_state == REXMPP_TCP_CONNECTED &&
- (s->tls_state == REXMPP_TLS_ACTIVE ||
- s->tls_state == REXMPP_TLS_INACTIVE) &&
- s->stream_state == REXMPP_STREAM_RESTART) {
- xmlCtxtResetPush(s->xml_parser, "", 0, "", "utf-8");
- rexmpp_stream_open(s);
- }
-
/* Closing the stream once everything is sent. */
if (s->tcp_state == REXMPP_TCP_CONNECTED &&
s->stream_state == REXMPP_STREAM_CLOSE_REQUESTED &&
diff --git a/src/rexmpp.h b/src/rexmpp.h
index 7fc535f..2dfbde8 100644
--- a/src/rexmpp.h
+++ b/src/rexmpp.h
@@ -96,8 +96,6 @@ enum stream_st {
REXMPP_STREAM_SM_ACKS,
/** Resuming a stream. */
REXMPP_STREAM_SM_RESUME,
- /** Restarting a stream. */
- REXMPP_STREAM_RESTART,
/** The streams are ready for use: messaging and other higher-level
things not covered here. */
REXMPP_STREAM_READY,
@@ -194,7 +192,9 @@ enum rexmpp_err {
/** A roster item is not found. */
REXMPP_E_ROSTER_ITEM_NOT_FOUND,
/** An erroneous parameter is supplied. */
- REXMPP_E_PARAM
+ REXMPP_E_PARAM,
+ /** A stream error. */
+ REXMPP_E_STREAM
};
typedef enum rexmpp_err rexmpp_err_t;
@@ -315,6 +315,8 @@ struct rexmpp
xmlParserCtxtPtr xml_parser;
xmlNodePtr current_element_root;
xmlNodePtr current_element;
+ xmlNodePtr input_queue;
+ xmlNodePtr input_queue_last;
/* TLS structures. */
void *tls_session_data;
@@ -379,11 +381,11 @@ rexmpp_err_t rexmpp_send (rexmpp_t *s, xmlNodePtr node);
library. If an application wants to track replies on its own, it
should use ::rexmpp_send.
*/
-void rexmpp_iq_new (rexmpp_t *s,
- const char *type,
- const char *to,
- xmlNodePtr payload,
- rexmpp_iq_callback_t cb);
+rexmpp_err_t rexmpp_iq_new (rexmpp_t *s,
+ const char *type,
+ const char *to,
+ xmlNodePtr payload,
+ rexmpp_iq_callback_t cb);
/**
@brief Determines the maximum time to wait before the next