a geicko-2 based round robin ranking system designed to test c++ battleship submissions
battleship.dunkirk.sh
1package sftp
2
3import (
4 "encoding"
5 "sort"
6 "sync"
7)
8
9// The goal of the packetManager is to keep the outgoing packets in the same
10// order as the incoming as is requires by section 7 of the RFC.
11
12type packetManager struct {
13 requests chan orderedPacket
14 responses chan orderedPacket
15 fini chan struct{}
16 incoming orderedPackets
17 outgoing orderedPackets
18 sender packetSender // connection object
19 working *sync.WaitGroup
20 packetCount uint32
21 // it is not nil if the allocator is enabled
22 alloc *allocator
23}
24
25type packetSender interface {
26 sendPacket(encoding.BinaryMarshaler) error
27}
28
29func newPktMgr(sender packetSender) *packetManager {
30 s := &packetManager{
31 requests: make(chan orderedPacket, SftpServerWorkerCount),
32 responses: make(chan orderedPacket, SftpServerWorkerCount),
33 fini: make(chan struct{}),
34 incoming: make([]orderedPacket, 0, SftpServerWorkerCount),
35 outgoing: make([]orderedPacket, 0, SftpServerWorkerCount),
36 sender: sender,
37 working: &sync.WaitGroup{},
38 }
39 go s.controller()
40 return s
41}
42
43// // packet ordering
44func (s *packetManager) newOrderID() uint32 {
45 s.packetCount++
46 return s.packetCount
47}
48
49// returns the next orderID without incrementing it.
50// This is used before receiving a new packet, with the allocator enabled, to associate
51// the slice allocated for the received packet with the orderID that will be used to mark
52// the allocated slices for reuse once the request is served
53func (s *packetManager) getNextOrderID() uint32 {
54 return s.packetCount + 1
55}
56
57type orderedRequest struct {
58 requestPacket
59 orderid uint32
60}
61
62func (s *packetManager) newOrderedRequest(p requestPacket) orderedRequest {
63 return orderedRequest{requestPacket: p, orderid: s.newOrderID()}
64}
65func (p orderedRequest) orderID() uint32 { return p.orderid }
66func (p orderedRequest) setOrderID(oid uint32) { p.orderid = oid }
67
68type orderedResponse struct {
69 responsePacket
70 orderid uint32
71}
72
73func (s *packetManager) newOrderedResponse(p responsePacket, id uint32,
74) orderedResponse {
75 return orderedResponse{responsePacket: p, orderid: id}
76}
77func (p orderedResponse) orderID() uint32 { return p.orderid }
78func (p orderedResponse) setOrderID(oid uint32) { p.orderid = oid }
79
80type orderedPacket interface {
81 id() uint32
82 orderID() uint32
83}
84type orderedPackets []orderedPacket
85
86func (o orderedPackets) Sort() {
87 sort.Slice(o, func(i, j int) bool {
88 return o[i].orderID() < o[j].orderID()
89 })
90}
91
92// // packet registry
93// register incoming packets to be handled
94func (s *packetManager) incomingPacket(pkt orderedRequest) {
95 s.working.Add(1)
96 s.requests <- pkt
97}
98
99// register outgoing packets as being ready
100func (s *packetManager) readyPacket(pkt orderedResponse) {
101 s.responses <- pkt
102 s.working.Done()
103}
104
105// shut down packetManager controller
106func (s *packetManager) close() {
107 // pause until current packets are processed
108 s.working.Wait()
109 close(s.fini)
110}
111
112// Passed a worker function, returns a channel for incoming packets.
113// Keep process packet responses in the order they are received while
114// maximizing throughput of file transfers.
115func (s *packetManager) workerChan(runWorker func(chan orderedRequest),
116) chan orderedRequest {
117 // multiple workers for faster read/writes
118 rwChan := make(chan orderedRequest, SftpServerWorkerCount)
119 for i := 0; i < SftpServerWorkerCount; i++ {
120 runWorker(rwChan)
121 }
122
123 // single worker to enforce sequential processing of everything else
124 cmdChan := make(chan orderedRequest)
125 runWorker(cmdChan)
126
127 pktChan := make(chan orderedRequest, SftpServerWorkerCount)
128 go func() {
129 for pkt := range pktChan {
130 switch pkt.requestPacket.(type) {
131 case *sshFxpReadPacket, *sshFxpWritePacket:
132 s.incomingPacket(pkt)
133 rwChan <- pkt
134 continue
135 case *sshFxpClosePacket:
136 // wait for reads/writes to finish when file is closed
137 // incomingPacket() call must occur after this
138 s.working.Wait()
139 }
140 s.incomingPacket(pkt)
141 // all non-RW use sequential cmdChan
142 cmdChan <- pkt
143 }
144 close(rwChan)
145 close(cmdChan)
146 s.close()
147 }()
148
149 return pktChan
150}
151
152// process packets
153func (s *packetManager) controller() {
154 for {
155 select {
156 case pkt := <-s.requests:
157 debug("incoming id (oid): %v (%v)", pkt.id(), pkt.orderID())
158 s.incoming = append(s.incoming, pkt)
159 s.incoming.Sort()
160 case pkt := <-s.responses:
161 debug("outgoing id (oid): %v (%v)", pkt.id(), pkt.orderID())
162 s.outgoing = append(s.outgoing, pkt)
163 s.outgoing.Sort()
164 case <-s.fini:
165 return
166 }
167 s.maybeSendPackets()
168 }
169}
170
171// send as many packets as are ready
172func (s *packetManager) maybeSendPackets() {
173 for {
174 if len(s.outgoing) == 0 || len(s.incoming) == 0 {
175 debug("break! -- outgoing: %v; incoming: %v",
176 len(s.outgoing), len(s.incoming))
177 break
178 }
179 out := s.outgoing[0]
180 in := s.incoming[0]
181 // debug("incoming: %v", ids(s.incoming))
182 // debug("outgoing: %v", ids(s.outgoing))
183 if in.orderID() == out.orderID() {
184 debug("Sending packet: %v", out.id())
185 s.sender.sendPacket(out.(encoding.BinaryMarshaler))
186 if s.alloc != nil {
187 // mark for reuse the slices allocated for this request
188 s.alloc.ReleasePages(in.orderID())
189 }
190 // pop off heads
191 copy(s.incoming, s.incoming[1:]) // shift left
192 s.incoming[len(s.incoming)-1] = nil // clear last
193 s.incoming = s.incoming[:len(s.incoming)-1] // remove last
194 copy(s.outgoing, s.outgoing[1:]) // shift left
195 s.outgoing[len(s.outgoing)-1] = nil // clear last
196 s.outgoing = s.outgoing[:len(s.outgoing)-1] // remove last
197 } else {
198 break
199 }
200 }
201}
202
203// func oids(o []orderedPacket) []uint32 {
204// res := make([]uint32, 0, len(o))
205// for _, v := range o {
206// res = append(res, v.orderId())
207// }
208// return res
209// }
210// func ids(o []orderedPacket) []uint32 {
211// res := make([]uint32, 0, len(o))
212// for _, v := range o {
213// res = append(res, v.id())
214// }
215// return res
216// }