Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ingress thread deadlock for resubmit packets #730

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/bm/bm_sim/queueing.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,9 @@ class QueueingLogicRL {
//! high priority can starve lower-priority queues. For example, if the queue
//! with priority `0` always contains at least one element, the other queues
//! will never be served.
//! As for QueueingLogicRL, the write behavior (push_front()) is blocking: once
//! a logical queue is full, subsequent incoming elements will be dropped until
//! the queue starts draining again.
//! As for QueueingLogicRL, the write behavior (push_front()) is not blocking:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I do not know the BMv2 internals very well at all, so just throwing up a random red flag on comment changes -- After your changes is the phrase "As for QueueingLogicRL" still correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't change anything in this file. I realized this comment was not correct so I fixed it, but it has nothing to do with the rest of the changes.

//! once a logical queue is full, subsequent incoming elements will be dropped
//! until the queue starts draining again.
//! Look at the documentation for QueueingLogic for more information about the
//! template parameters (they are the same).
template <typename T, typename FMap>
Expand Down
111 changes: 103 additions & 8 deletions targets/simple_switch/simple_switch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
*
*/

#include <bm/bm_sim/_assert.h>
#include <bm/bm_sim/parser.h>
#include <bm/bm_sim/tables.h>
#include <bm/bm_sim/logger.h>

#include <unistd.h>

#include <condition_variable>
#include <deque>
#include <iostream>
#include <fstream>
#include <mutex>
Expand Down Expand Up @@ -98,10 +101,94 @@ class SimpleSwitch::MirroringSessions {
std::unordered_map<mirror_id_t, MirroringSessionConfig> sessions_map;
};

// Arbitrates which packets are processed by the ingress thread. Resubmit and
// recirculate packets go to a high priority queue, while normal pakcets go to a
// low priority queue. We assume that starvation is not going to be a problem.
// Resubmit packets are dropped if the queue is full in order to make sure the
// ingress thread cannot deadlock. We do the same for recirculate packets even
// though the same argument does not apply for them. Enqueueing normal packets
// is blocking (back pressure is applied to the interface).
class SimpleSwitch::InputBuffer {
public:
enum class PacketType {
NORMAL,
RESUBMIT,
RECIRCULATE,
SENTINEL // signal for the ingress thread to terminate
};

InputBuffer(size_t capacity_hi, size_t capacity_lo)
: capacity_hi(capacity_hi), capacity_lo(capacity_lo) { }

int push_front(PacketType packet_type, std::unique_ptr<Packet> &&item) {
switch (packet_type) {
case PacketType::NORMAL:
return push_front(&queue_lo, capacity_lo, &cvar_can_push_lo,
std::move(item), true);
case PacketType::RESUBMIT:
case PacketType::RECIRCULATE:
return push_front(&queue_hi, capacity_hi, &cvar_can_push_hi,
std::move(item), false);
case PacketType::SENTINEL:
return push_front(&queue_hi, capacity_hi, &cvar_can_push_hi,
std::move(item), true);
}
_BM_UNREACHABLE("Unreachable statement");
return 0;
}

void pop_back(std::unique_ptr<Packet> *pItem) {
Lock lock(mutex);
cvar_can_pop.wait(
lock, [this] { return (queue_hi.size() + queue_lo.size()) > 0; });
// give higher priority to resubmit/recirculate queue
if (queue_hi.size() > 0) {
*pItem = std::move(queue_hi.back());
queue_hi.pop_back();
lock.unlock();
cvar_can_push_hi.notify_one();
} else {
*pItem = std::move(queue_lo.back());
queue_lo.pop_back();
lock.unlock();
cvar_can_push_lo.notify_one();
}
}

private:
using Mutex = std::mutex;
using Lock = std::unique_lock<Mutex>;
using QueueImpl = std::deque<std::unique_ptr<Packet> >;

int push_front(QueueImpl *queue, size_t capacity,
std::condition_variable *cvar,
std::unique_ptr<Packet> &&item, bool blocking) {
Lock lock(mutex);
while (queue->size() == capacity) {
if (!blocking) return 0;
cvar->wait(lock);
}
queue->push_front(std::move(item));
lock.unlock();
cvar_can_pop.notify_one();
return 1;
}

mutable std::mutex mutex;
mutable std::condition_variable cvar_can_push_hi;
mutable std::condition_variable cvar_can_push_lo;
mutable std::condition_variable cvar_can_pop;
size_t capacity_hi;
size_t capacity_lo;
QueueImpl queue_hi;
QueueImpl queue_lo;
};

SimpleSwitch::SimpleSwitch(port_t max_port, bool enable_swap)
: Switch(enable_swap),
max_port(max_port),
input_buffer(1024),
input_buffer(new InputBuffer(
1024 /* normal capacity */, 1024 /* resubmit/recirc capacity */)),
#ifdef SSWITCH_PRIORITY_QUEUEING_ON
egress_buffers(max_port, nb_egress_threads,
64, EgressThreadMapper(nb_egress_threads),
Expand Down Expand Up @@ -175,7 +262,8 @@ SimpleSwitch::receive_(port_t port_num, const char *buffer, int len) {
.set(get_ts().count());
}

input_buffer.push_front(std::move(packet));
input_buffer->push_front(
InputBuffer::PacketType::NORMAL, std::move(packet));
return 0;
}

Expand All @@ -191,12 +279,17 @@ SimpleSwitch::start_and_return_() {
}

SimpleSwitch::~SimpleSwitch() {
input_buffer.push_front(nullptr);
input_buffer->push_front(
InputBuffer::PacketType::SENTINEL, nullptr);
for (size_t i = 0; i < nb_egress_threads; i++) {
// The push_front call is called inside a while loop because there is no
// guarantee that the sentinel was enqueued otherwise. It should not be an
// issue because at this stage the ingress thread has been sent a signal to
// stop, and only egress clones can be sent to the buffer.
#ifdef SSWITCH_PRIORITY_QUEUEING_ON
egress_buffers.push_front(i, 0, nullptr);
while (egress_buffers.push_front(i, 0, nullptr) == 0) continue;
#else
egress_buffers.push_front(i, nullptr);
while (egress_buffers.push_front(i, nullptr) == 0) continue;
#endif
}
output_buffer.push_front(nullptr);
Expand Down Expand Up @@ -375,7 +468,7 @@ SimpleSwitch::ingress_thread() {

while (1) {
std::unique_ptr<Packet> packet;
input_buffer.pop_back(&packet);
input_buffer->pop_back(&packet);
if (packet == nullptr) break;

// TODO(antonin): only update these if swapping actually happened?
Expand Down Expand Up @@ -491,7 +584,8 @@ SimpleSwitch::ingress_thread() {
copy_field_list_and_set_type(packet, packet_copy,
PKT_INSTANCE_TYPE_RESUBMIT,
field_list_id);
input_buffer.push_front(std::move(packet_copy));
input_buffer->push_front(
InputBuffer::PacketType::RESUBMIT, std::move(packet_copy));
continue;
}
}
Expand Down Expand Up @@ -634,7 +728,8 @@ SimpleSwitch::egress_thread(size_t worker_id) {
// TODO(antonin): really it may be better to create a new packet here or
// to fold this functionality into the Packet class?
packet_copy->set_ingress_length(packet_size);
input_buffer.push_front(std::move(packet_copy));
input_buffer->push_front(
InputBuffer::PacketType::RECIRCULATE, std::move(packet_copy));
continue;
}
}
Expand Down
6 changes: 5 additions & 1 deletion targets/simple_switch/simple_switch.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ class SimpleSwitch : public Switch {

class MirroringSessions;

class InputBuffer;

enum PktInstanceType {
PKT_INSTANCE_TYPE_NORMAL,
PKT_INSTANCE_TYPE_INGRESS_CLONE,
Expand Down Expand Up @@ -169,7 +171,9 @@ class SimpleSwitch : public Switch {
private:
port_t max_port;
std::vector<std::thread> threads_;
Queue<std::unique_ptr<Packet> > input_buffer;
std::unique_ptr<InputBuffer> input_buffer;
// for these queues, the write operation is non-blocking and we drop the
// packet if the queue is full
#ifdef SSWITCH_PRIORITY_QUEUEING_ON
bm::QueueingLogicPriRL<std::unique_ptr<Packet>, EgressThreadMapper>
#else
Expand Down