Skip to content

Commit

Permalink
Fix ingress thread deadlock for resubmit packets
Browse files Browse the repository at this point in the history
Resubmit packets were written to the input_buffer with a blocking call
by the ingress thread. Since the ingress thread is also in charge of
draining the input_buffer, this could have lead to a deadlock. Resubmit
packets can now be dropped if the buffer is full. To limit the number of
resubmit packets being lost, we place them is a higher priority queue
than "normal" packets.

Fixes #729
  • Loading branch information
antoninbas committed Mar 7, 2019
1 parent ca27b9f commit e373de4
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 12 deletions.
6 changes: 3 additions & 3 deletions include/bm/bm_sim/queueing.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,9 @@ class QueueingLogicRL {
//! high priority can starve lower-priority queues. For example, if the queue
//! with priority `0` always contains at least one element, the other queues
//! will never be served.
//! As for QueueingLogicRL, the write behavior (push_front()) is blocking: once
//! a logical queue is full, subsequent incoming elements will be dropped until
//! the queue starts draining again.
//! As for QueueingLogicRL, the write behavior (push_front()) is not blocking:
//! once a logical queue is full, subsequent incoming elements will be dropped
//! until the queue starts draining again.
//! Look at the documentation for QueueingLogic for more information about the
//! template parameters (they are the same).
template <typename T, typename FMap>
Expand Down
111 changes: 103 additions & 8 deletions targets/simple_switch/simple_switch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
*
*/

#include <bm/bm_sim/_assert.h>
#include <bm/bm_sim/parser.h>
#include <bm/bm_sim/tables.h>
#include <bm/bm_sim/logger.h>

#include <unistd.h>

#include <condition_variable>
#include <deque>
#include <iostream>
#include <fstream>
#include <mutex>
Expand Down Expand Up @@ -98,10 +101,94 @@ class SimpleSwitch::MirroringSessions {
std::unordered_map<mirror_id_t, MirroringSessionConfig> sessions_map;
};

// Arbitrates which packets are processed by the ingres thread. Resubmit and
// recirculate packets go to a high priority queue, while normal pakcets go to a
// low priority queue. We assume that starvation is not going to be a problem.
// Resubmit packets are dropped if the queue is full in order to make sure the
// ingress thread cannot deadlock. We do the same for recirculate packets even
// though the same argument does not apply for them. Enqueueing normal packets
// is blocking (back pressure is applied to the interface).
class SimpleSwitch::InputBuffer {
public:
enum class PacketType {
NORMAL,
RESUBMIT,
RECIRCULATE,
SENTINEL // signal for the ingress thread to terminate
};

InputBuffer(size_t capacity_hi, size_t capacity_lo)
: capacity_hi(capacity_hi), capacity_lo(capacity_lo) { }

int push_front(PacketType packet_type, std::unique_ptr<Packet> &&item) {
switch (packet_type) {
case PacketType::NORMAL:
return push_front(&queue_lo, capacity_lo, &cvar_can_push_lo,
std::move(item), true);
case PacketType::RESUBMIT:
case PacketType::RECIRCULATE:
return push_front(&queue_hi, capacity_hi, &cvar_can_push_hi,
std::move(item), false);
case PacketType::SENTINEL:
return push_front(&queue_hi, capacity_hi, &cvar_can_push_hi,
std::move(item), true);
}
_BM_UNREACHABLE("Unreachable statement");
return 0;
}

void pop_back(std::unique_ptr<Packet> *pItem) {
Lock lock(mutex);
cvar_can_pop.wait(
lock, [this] { return (queue_hi.size() + queue_lo.size()) > 0; });
// give higher priority to resubmit queue
if (queue_hi.size() > 0) {
*pItem = std::move(queue_hi.back());
queue_hi.pop_back();
lock.unlock();
cvar_can_push_hi.notify_one();
} else {
*pItem = std::move(queue_lo.back());
queue_lo.pop_back();
lock.unlock();
cvar_can_push_lo.notify_one();
}
}

private:
using Mutex = std::mutex;
using Lock = std::unique_lock<Mutex>;
using QueueImpl = std::deque<std::unique_ptr<Packet> >;

int push_front(QueueImpl *queue, size_t capacity,
std::condition_variable *cvar,
std::unique_ptr<Packet> &&item, bool blocking) {
Lock lock(mutex);
while (queue->size() == capacity) {
if (!blocking) return 0;
cvar->wait(lock);
}
queue->push_front(std::move(item));
lock.unlock();
cvar_can_pop.notify_one();
return 1;
}

mutable std::mutex mutex;
mutable std::condition_variable cvar_can_push_hi;
mutable std::condition_variable cvar_can_push_lo;
mutable std::condition_variable cvar_can_pop;
size_t capacity_hi;
size_t capacity_lo;
QueueImpl queue_hi;
QueueImpl queue_lo;
};

SimpleSwitch::SimpleSwitch(port_t max_port, bool enable_swap)
: Switch(enable_swap),
max_port(max_port),
input_buffer(1024),
input_buffer(new InputBuffer(
1024 /* normal capacity */, 1024 /* resubmit/recirc capacity */)),
#ifdef SSWITCH_PRIORITY_QUEUEING_ON
egress_buffers(max_port, nb_egress_threads,
64, EgressThreadMapper(nb_egress_threads),
Expand Down Expand Up @@ -175,7 +262,8 @@ SimpleSwitch::receive_(port_t port_num, const char *buffer, int len) {
.set(get_ts().count());
}

input_buffer.push_front(std::move(packet));
input_buffer->push_front(
InputBuffer::PacketType::NORMAL, std::move(packet));
return 0;
}

Expand All @@ -191,12 +279,17 @@ SimpleSwitch::start_and_return_() {
}

SimpleSwitch::~SimpleSwitch() {
input_buffer.push_front(nullptr);
input_buffer->push_front(
InputBuffer::PacketType::SENTINEL, nullptr);
for (size_t i = 0; i < nb_egress_threads; i++) {
// The push_front call is called inside a while loop because there is no
// guarantee that the sentinel was enqueued otherwise. It should not be an
// issue because at this stage the ingress thread has been sent a signal to
// stop, and only egress clones can be sent to the buffer.
#ifdef SSWITCH_PRIORITY_QUEUEING_ON
egress_buffers.push_front(i, 0, nullptr);
while (egress_buffers.push_front(i, 0, nullptr) == 0) continue;
#else
egress_buffers.push_front(i, nullptr);
while (egress_buffers.push_front(i, nullptr) == 0) continue;
#endif
}
output_buffer.push_front(nullptr);
Expand Down Expand Up @@ -375,7 +468,7 @@ SimpleSwitch::ingress_thread() {

while (1) {
std::unique_ptr<Packet> packet;
input_buffer.pop_back(&packet);
input_buffer->pop_back(&packet);
if (packet == nullptr) break;

// TODO(antonin): only update these if swapping actually happened?
Expand Down Expand Up @@ -491,7 +584,8 @@ SimpleSwitch::ingress_thread() {
copy_field_list_and_set_type(packet, packet_copy,
PKT_INSTANCE_TYPE_RESUBMIT,
field_list_id);
input_buffer.push_front(std::move(packet_copy));
input_buffer->push_front(
InputBuffer::PacketType::RESUBMIT, std::move(packet_copy));
continue;
}
}
Expand Down Expand Up @@ -634,7 +728,8 @@ SimpleSwitch::egress_thread(size_t worker_id) {
// TODO(antonin): really it may be better to create a new packet here or
// to fold this functionality into the Packet class?
packet_copy->set_ingress_length(packet_size);
input_buffer.push_front(std::move(packet_copy));
input_buffer->push_front(
InputBuffer::PacketType::RECIRCULATE, std::move(packet_copy));
continue;
}
}
Expand Down
6 changes: 5 additions & 1 deletion targets/simple_switch/simple_switch.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class SimpleSwitch : public Switch {

class MirroringSessions;

class InputBuffer;

enum PktInstanceType {
PKT_INSTANCE_TYPE_NORMAL,
PKT_INSTANCE_TYPE_INGRESS_CLONE,
Expand Down Expand Up @@ -169,7 +171,9 @@ class SimpleSwitch : public Switch {
private:
port_t max_port;
std::vector<std::thread> threads_;
Queue<std::unique_ptr<Packet> > input_buffer;
std::unique_ptr<InputBuffer> input_buffer;
// for these queues, the write operation is non-blocking and we drop the
// packet if the queue is full
#ifdef SSWITCH_PRIORITY_QUEUEING_ON
bm::QueueingLogicPriRL<std::unique_ptr<Packet>, EgressThreadMapper>
#else
Expand Down

0 comments on commit e373de4

Please sign in to comment.